Logo Search packages:      
Sourcecode: pan version File versions  Download package

queue.c

/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/*
 * Author: Charles Kerr <charles@rebelbase.com>
 *
 * Copyright (C) 2002  Charles Kerr <charles@rebelbase.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * 
 * A note on implementation
 * The queue defers all public tasks (like insert, remove, etc.)
 * to the queue thread.  This ensures that all tasks are handled
 * in sequence, rather than concurrently.  This ensures that callbacks
 * will be invoked in sync with the queue, so the queue's state won't
 * change during the callback.
 */

/*********************
**********************  Includes
*********************/

#include <config.h>

#include <errno.h>

#include <glib.h>

#include <unistd.h>

#include <pan/base/base-prefs.h>
#include <pan/base/debug.h>
#include <pan/base/log.h>
#include <pan/base/pan-glib-extensions.h>
#include <pan/base/pan-i18n.h>
#include <pan/base/server.h>

#include <pan/nntp.h>
#include <pan/queue.h>
#include <pan/socket-pool.h>
#include <pan/task-xml.h>

/*********************
**********************  Defines / Enumerated types
*********************/

typedef enum
{
      TODO_INSERT,
      TODO_REMOVE,
      TODO_MOVE,
      TODO_REQUEUE,
      TODO_STOP,
      TODO_SHUTDOWN,
      TODO_ONLINE
}
QueueTodoAction;

/*********************
**********************  Macros
*********************/

/*********************
**********************  Structures / Typedefs
*********************/

typedef struct
{
      GSList * tasks;
      int index_1;
      QueueTodoAction action;
}
QueueTodo;

/*********************
**********************  Private Function Prototypes
*********************/

static void fire_queue_size_changed (int running_qty, int total_qty);
static void fire_connection_size_changed (int increment);
static void fire_online_status_changed (gboolean online);
static void fire_message_id_status_changed (const PString ** mids, int qty);

static void queue_run_thread (gpointer data, gpointer user_data);

static void* queue_mainloop (void*);

static void queue_run_what_we_can (void);

static void queue_set_task_status (Task* task,
                                   QueueTaskStatus status);

static void queue_new_todo (QueueTodoAction     action,
                            GSList            * tasks,
                            int                 i);

static guint queue_get_length (void);

static void queue_do_todo (void);

/*********************
**********************  Constants
*********************/

/***********
************  Extern
***********/

/***********
************  Public
***********/

/***********
************  Private
***********/

/*********************
**********************  Variables
*********************/

/***********
************  Extern
***********/

/***********
************  Public
***********/

PanCallback * queue_tasks_added           = NULL;
PanCallback * queue_tasks_removed         = NULL;
PanCallback * queue_tasks_moved           = NULL;

/***********
************  Private
***********/

static GQueue * todo_queue = NULL;
static GSList* task_list = NULL;

static GThreadPool * _tpool = NULL;

static GMutex * task_lock = NULL;
static GMutex * cond_mutex = NULL;
static GMutex * todo_mutex = NULL;

static GCond * qcond = NULL;
static gboolean work_to_do = FALSE;

static GHashTable * server_to_pool = NULL;
static GHashTable * task_to_status = NULL;
static GHashTable * message_id_hash = NULL;

static guint running_tasks = 0;

static gboolean _online = TRUE;
static gboolean _remove_failed_tasks = TRUE;
static gboolean _dirty = FALSE;

/*********************
**********************  BEGINNING OF SOURCE
*********************/

/************
*************  PUBLIC ROUTINES
************/

void
queue_wakeup (void)
{
      debug_enter ("queue_wakeup");

      g_mutex_lock (cond_mutex);
      work_to_do = TRUE;
      g_cond_signal (qcond);
      g_mutex_unlock (cond_mutex);

      debug_exit ("queue_wakeup");
}

void
queue_init (gboolean online, gboolean remove_failed_tasks)
{
      debug_enter ("queue_init");

      server_to_pool = g_hash_table_new (g_direct_hash, g_direct_equal);
      task_to_status = g_hash_table_new (g_direct_hash, g_direct_equal);
      message_id_hash = g_hash_table_new_full (pstring_hash, pstring_equal, (GDestroyNotify)pstring_free, NULL);

      todo_queue = g_queue_new ();

      queue_tasks_added = pan_callback_new ();
      queue_tasks_removed = pan_callback_new ();
      queue_tasks_moved = pan_callback_new ();

      task_lock = g_mutex_new ();
      cond_mutex = g_mutex_new ();
      todo_mutex = g_mutex_new ();

      qcond = g_cond_new ();

      _online = online;

      _remove_failed_tasks = remove_failed_tasks;
      _tpool = g_thread_pool_new (queue_run_thread, NULL, 5, TRUE, NULL);

      /* fire up the queue thread */
      g_thread_create (queue_mainloop, NULL, FALSE, NULL);

      debug_exit ("queue_init");
}

/***
**** SOCKET POOL
***/

static void
pool_count_ghfunc (gpointer key, gpointer value, gpointer user_data)
{
      int * pi = (int *) user_data;
      const SocketPool * pool = (const SocketPool*) value;
      (*pi) += socket_pool_get_connection_qty (pool);
}

static void
pool_socket_avail_cb (gpointer call_object, gpointer call_arg, gpointer user_data)
{
      /* update the connection count */
      int qty = 0;
      g_hash_table_foreach (server_to_pool, pool_count_ghfunc, &qty);
      fire_connection_size_changed (qty);

      /* wakeup the queue */
      queue_wakeup ();
}

static SocketPool*
get_socket_pool_from_server (Server * server)
{
      SocketPool * pool = (SocketPool*) g_hash_table_lookup (server_to_pool, server);
      if (pool == NULL) {
            pool = socket_pool_new (server);
            socket_pool_set_online (pool, _online);
            pan_callback_add (socket_pool_get_available_callback(pool), pool_socket_avail_cb, NULL);
            g_hash_table_insert (server_to_pool, server, pool);
      }
      return pool;
}

/***
****
***/

int
queue_get_message_id_status (const PString * message_id)
{
      return GPOINTER_TO_INT (g_hash_table_lookup (message_id_hash, message_id));
}
static void
queue_add_message_id_state (const PString    ** mids,
                            int                 mid_qty,
                            int                 state)
{
      int i;

      /* sanity clause */
      g_return_if_fail (state);
      g_return_if_fail (mids!=NULL);
      g_return_if_fail (mid_qty>0);
      g_return_if_fail (pstring_is_set(mids[0]));

      /* add the state */
      for (i=0; i<mid_qty; ++i)
      {
            const int new_state = state | queue_get_message_id_status(mids[i]);
            g_hash_table_insert (message_id_hash, pstring_dup(mids[i]), GINT_TO_POINTER(new_state));
      }

      fire_message_id_status_changed (mids, mid_qty);
}

static void
queue_remove_message_id_state (const PString   ** mids,
                               int                mid_qty,
                               int                state)
{
      int i;

      /* sanity clause */
      g_return_if_fail (state);
      g_return_if_fail (mids!=NULL);
      g_return_if_fail (mid_qty>0);
      g_return_if_fail (pstring_is_set(mids[0]));

      /* remove the state */
      for (i=0; i<mid_qty; ++i)
      {
            const PString * id = mids[i];
            const int new_state = queue_get_message_id_status(id) & ~state;
            if (!new_state)
                  g_hash_table_remove (message_id_hash, id);
            else
                  g_hash_table_insert (message_id_hash, pstring_dup(id), GINT_TO_POINTER(new_state));
      }

      fire_message_id_status_changed (mids, mid_qty);
}

static int
task_get_type_state (const Task * task)
{
      int state = 0;

      g_return_val_if_fail (task!=NULL, 0);

      switch (task->type)
      {
            case TASK_TYPE_BODY:
            case TASK_TYPE_BODIES:
                  state |= QUEUE_MESSAGE_ID_DOWNLOAD;
                  break;
            case TASK_TYPE_SAVE:
                  state |= QUEUE_MESSAGE_ID_SAVE;
                  break;
            case TASK_TYPE_HEADERS:
                  break;
            default:
                  state = 0;
                  break;
      }

      return state;
}

static void
queue_add_message_ids (Task * task)
{
      const int state = task_get_type_state (task);

      if (state)
      {
            int i;
            PString ** mids = g_newa (PString*, task->identifiers->len);

            for (i=0; i<task->identifiers->len; ++i) {
                  MessageIdentifier * mid = MESSAGE_IDENTIFIER (g_ptr_array_index (task->identifiers, i));
                  mids[i] = &mid->message_id;
            }

            queue_add_message_id_state ((const PString**)mids, task->identifiers->len, state);
      }
}

static void
queue_remove_message_ids (Task * task)
{
      const int state = task_get_type_state (task);

      if (state)
      {
            int i;
            PString ** mids = g_newa (PString*, task->identifiers->len);

            for (i=0; i<task->identifiers->len; ++i) {
                  MessageIdentifier * mid = MESSAGE_IDENTIFIER (g_ptr_array_index (task->identifiers, i));
                  mids[i] = &mid->message_id;
            }

            queue_remove_message_id_state ((const PString**)mids, task->identifiers->len, state);
      }
}

/***
****
***/

gboolean
queue_is_online (void)
{
      return _online;
}
void
queue_set_online (gboolean online)
{
      queue_new_todo (TODO_ONLINE, NULL, online);
}

static void
real_queue_set_online_ghfunc (gpointer key, gpointer value, gpointer user_data)
{
      socket_pool_set_online ((SocketPool*)value, _online);
}

static void
real_queue_set_online (gint p, gboolean * do_wakeup)
{
      if (_online == p)
      {
            if (_online)
                  *do_wakeup = TRUE;
      }
      else
      {
            GSList * l;

            _online = p;

            /* set/clear the abort flag on all the tasks */
            g_mutex_lock (task_lock);
            for (l=task_list; l!=NULL; l=l->next)
                  TASK(l->data)->hint_abort = !_online;
            g_mutex_unlock (task_lock);

            /* set the socket pools offline */
            g_hash_table_foreach (server_to_pool, real_queue_set_online_ghfunc, NULL);

            if (_online)
                  *do_wakeup = TRUE;

            /* let listeners know */
            fire_online_status_changed (_online);
      }
}


void
queue_shutdown (void)
{
      queue_new_todo (TODO_SHUTDOWN, NULL, 0);
}
static void
real_queue_shutdown (void)
{
      /* clear the status hashtable */
      g_hash_table_destroy (task_to_status);
      task_to_status = NULL;

      /* optionally save the tasks */

      /* unref the tasks */

      /* close the sockets */
      /* FIXME: need to clear the socket pools */
}


void
queue_add (Task * task)
{
      g_return_if_fail (task!=NULL);
      queue_insert_tasks (g_slist_append (NULL, task), task->high_priority ? 0 : -1);
}

GPtrArray*
queue_get_tasks (void)
{
      GSList * l;
      GPtrArray * a = g_ptr_array_new ();

        g_mutex_lock (task_lock);
      for (l=task_list; l!=NULL; l=l->next) {
            pan_object_ref (PAN_OBJECT(l->data));
            g_ptr_array_add (a, l->data);
      }
        g_mutex_unlock (task_lock);

      return a;
}

gboolean
queue_is_empty (void)
{
      gboolean retval;

      g_mutex_lock (task_lock);
      retval = task_list == NULL;
      g_mutex_unlock (task_lock);

      return retval;
}

/**
***
**/

char*
queue_get_tasks_filename (void)
{
      return g_build_filename (get_data_dir(), "tasks.xml", NULL);
}

static void
queue_save_tasks (void)
{
      GPtrArray * tasks;
      char * filename;

      /* save the tasks */
            filename = queue_get_tasks_filename ();
      tasks = queue_get_tasks ();
      task_xml_write (filename, (const Task**)tasks->pdata, tasks->len);

      /* cleanup */
      pan_g_ptr_array_foreach (tasks, (GFunc)pan_object_unref, NULL);
      g_ptr_array_free (tasks, TRUE);
      g_free (filename);
}


/**
***
**/

guint
queue_get_running_task_count (void)
{
      gint running_count = 0;
      GSList *l;

      g_mutex_lock (task_lock);
      for (l=task_list; l!=NULL; l=l->next)
      {
            QueueTaskStatus status = queue_get_task_status (l->data);

            if (status == QUEUE_TASK_STATUS_RUNNING ||
                status == QUEUE_TASK_STATUS_QUEUED)
                  ++running_count;
      }
      g_mutex_unlock (task_lock);

      return running_count;
}

/**
***
**/

void
queue_stop_tasks (GSList * tasks)
{
      queue_new_todo (TODO_STOP, tasks, -1);
}
static void
real_queue_stop_tasks (GSList * tasks)
{
      GSList * l;
      for (l=tasks; l!=NULL; l=l->next)
      {
            Task * task = TASK(l->data);
            const QueueTaskStatus status = queue_get_task_status (task);

            if (status == QUEUE_TASK_STATUS_RUNNING)
            {
                  /* ask a running task to stop */
                  task->hint_abort = TRUE;
                  queue_set_task_status (task, QUEUE_TASK_STATUS_STOPPING);
            }
            else
            {
                  /* if the task was stopped because we went offline, set it to queued instead of stopped. */
                  queue_set_task_status (task, task->hint_abort && !queue_is_online()
                              ? QUEUE_TASK_STATUS_QUEUED
                              : QUEUE_TASK_STATUS_STOPPED);

                  /* clear the abort flag */
                  task->hint_abort = FALSE;
            }
      }
}

/**
***
**/

void
queue_insert_tasks (GSList * tasks, int index)
{
      if (tasks != NULL)
            queue_new_todo (TODO_INSERT, tasks, index);
}

static void
real_queue_insert_tasks (GSList * new_tasks, int index, gboolean * do_wakeup)
{
      GSList * tmp;
      GSList * insertme;
      guint task_len;
      debug_enter ("real_queue_insert_tasks");
      debug1 (DEBUG_QUEUE, "real_queue_insert_tasks: inserting at position %d", index);

      /* sanity clause */
      g_return_if_fail (new_tasks!=NULL);
      g_return_if_fail (index==-1 || (guint)index<=queue_get_length());

      /* make our own GSList nodes for inserting */
      insertme = g_slist_copy (new_tasks);

      /* inside a task_lock... */
      g_mutex_lock (task_lock);
      {
            /**
            ***  Add to the list
            **/
            if (task_list == NULL) /* no previous list */
                  task_list = insertme;
            else if (index == 0)
                  task_list = g_slist_concat (insertme, task_list);
            else if (index == -1)
                  task_list = g_slist_concat (task_list, insertme);
            else {
                  GSList * nth = g_slist_nth (task_list, index);
                  if (nth == NULL) { /* index out of range, append to end */
                        index = g_slist_length (task_list);
                        nth = g_slist_last (task_list);
                  }
                  g_slist_last (insertme)->next = nth->next;
                  nth->next = insertme;
            }

            /**
            ***  Mark the new tasks as queued
            **/
            for (tmp=new_tasks; tmp!=NULL; tmp=tmp->next) {
                  Task * task = TASK(tmp->data);
                  queue_add_message_ids (task);
                  g_hash_table_insert (task_to_status, task, GINT_TO_POINTER(QUEUE_TASK_STATUS_QUEUED));
            }

            /* we need this for fire_queue_size_changed */
            task_len = g_slist_length (task_list);
      }
      _dirty = TRUE;
      g_mutex_unlock (task_lock);

      /* cleanup */
      fire_queue_size_changed (running_tasks, task_len);
      pan_callback_call (queue_tasks_added, new_tasks, GINT_TO_POINTER(index));
      *do_wakeup = TRUE;
      debug_exit ("real_queue_insert_tasks");
}

/**
***
**/

void
queue_remove_last_task (void)
{
        GPtrArray * tasks;
                                                                                                                     
        /* remove the last task, if one exists */
        tasks = queue_get_tasks ();
        if (tasks->len) {
                Task * task = TASK(g_ptr_array_index (tasks, tasks->len-1));
                queue_remove_tasks (g_slist_append(NULL,task));
        }
                                                                                                                     
        /* cleanup */
        pan_g_ptr_array_foreach (tasks, (GFunc)pan_object_unref, NULL);
        g_ptr_array_free (tasks, TRUE);
}

void
queue_remove_tasks (GSList * tasks)
{
      g_slist_foreach (tasks, (GFunc)pan_object_ref, NULL); /* balanced at end of real_queue_remove_tasks */
      queue_new_todo (TODO_REMOVE, tasks, -1);
}
static void
real_queue_remove_tasks (GSList * tasks, gboolean * do_wakeup)
{
      GSList * l;
      GSList * removed = NULL;
      debug_enter ("real_queue_remove_tasks");

      for (l=tasks; l!=NULL; l=l->next)
      {
            Task * task = TASK(l->data);

            if (task->thread_qty > 0)
            {
                  queue_set_task_status (task, QUEUE_TASK_STATUS_REMOVING);
                  task->hint_abort = TRUE;
            }
            else /* not running -- this task can be removed */
            {
                  g_mutex_lock (task_lock);
                  if (g_slist_find (task_list, task) != NULL)
                  {
                        task_list = g_slist_remove (task_list, task);
                        removed = g_slist_prepend (removed, task);
                        _dirty = TRUE;
                  }
                  g_mutex_unlock (task_lock);
            }
      }

      if (removed != NULL)
      {
            removed = g_slist_reverse (removed);

            fire_queue_size_changed (running_tasks, g_slist_length(task_list));
            *do_wakeup = TRUE;
            pan_callback_call (queue_tasks_removed, removed, NULL);

            for (l=removed; l!=NULL; l=l->next)
            {
                  Task * task = TASK(l->data);

                  pan_callback_call (task->task_ran_callback, task, GINT_TO_POINTER(task->state.health));
                  g_hash_table_remove (task_to_status, task);
                  queue_remove_message_ids (task);
                  pan_object_unref (PAN_OBJECT(task));
            }

            g_slist_free (removed);
      }

      g_slist_foreach (tasks, (GFunc)pan_object_unref, NULL); /* balanced at begin of queue_remove_tasks */
      debug_exit ("real_queue_remove_tasks");
}

/**
***
**/

void
queue_move_tasks (GSList * tasks, int index)
{
      g_return_if_fail (tasks!=NULL);

      queue_new_todo (TODO_MOVE, tasks, index);
}

static void
real_queue_move_tasks (GSList * tasks, int moveto_index)
{
      GSList * l;
      GSList * ref;

      /* sanity clause */
      g_return_if_fail (tasks!=NULL);
      g_return_if_fail (task_list!=NULL);

      g_mutex_lock (task_lock);

      /* find the point of reference */
      if (moveto_index == 0)
            ref = NULL;
      else {
            ref = g_slist_nth (task_list, moveto_index-1);
            while (ref != NULL) {
                  if (g_slist_find (tasks, ref->data) == NULL)
                        break;
                  ref = ref->next;
            }
            if (ref == NULL)
                  ref = g_slist_last (task_list);
      }

      /* remove the tasks */
      for (l=tasks; l!=NULL; l=l->next)
            task_list = g_slist_remove (task_list, l->data);

      /* add them back in */
      if (ref == NULL) {
            l = g_slist_copy (tasks);
            g_slist_last(l)->next = task_list;
            task_list = l;
            moveto_index = 0;
      } else {
            l = g_slist_copy (tasks);
            if (task_list != NULL) {
                  g_slist_last(l)->next = ref->next;
                  ref->next = l;
            } else
                  task_list = l;    
            moveto_index = g_slist_index (task_list, tasks->data);
      }

      /* let everyone know */
      _dirty = TRUE;
      g_mutex_unlock (task_lock);
      pan_callback_call (queue_tasks_moved, tasks, GINT_TO_POINTER(moveto_index));
      debug_exit ("real_queue_move_tasks");
}

/**
***
**/

void
queue_requeue_failed_tasks (GSList * tasks)
{
      GSList * l;

      /* sanity clause */
      g_return_if_fail (tasks != NULL);
      for (l=tasks; l!=NULL; l=l->next) {
            Task * task = (Task*) l->data;
            g_return_if_fail (queue_get_task_status(task) == QUEUE_TASK_STATUS_STOPPED);
      }

      queue_new_todo (TODO_REQUEUE, tasks, -1);
}
static void
real_requeue_failed_tasks (GSList * tasks, gboolean * do_wakeup)
{
      if (tasks != NULL)
      {
            GSList * l;

            for (l=tasks; l!=NULL; l=l->next) {
                  Task * task = TASK(l->data);
                  task->hint_abort = FALSE;
                  task_state_set_health (&task->state, TASK_OK);
                  queue_set_task_status (task, QUEUE_TASK_STATUS_QUEUED);
            }

            *do_wakeup = TRUE;
      }
}

/**
***
**/

gboolean
queue_get_remove_failed_tasks (void)
{
      return _remove_failed_tasks;
}

void
queue_set_remove_failed_tasks (gboolean b)
{
      _remove_failed_tasks = b;
}

/**
***
**/

QueueTaskStatus
queue_get_task_status (const Task* task)
{
      QueueTaskStatus status = QUEUE_TASK_STATUS_NOT_QUEUED;

      if (task_to_status != NULL) {
            gpointer p = g_hash_table_lookup (task_to_status, task);
            if (p != NULL)
                  status = GPOINTER_TO_INT (p);
      }

      return status;
}

static void
queue_set_task_status (Task* task, QueueTaskStatus status)
{
      g_return_if_fail (task!=NULL);
      g_return_if_fail (g_slist_index(task_list, task) != -1);

      if (task_to_status != NULL)
            g_hash_table_insert (task_to_status, task, GINT_TO_POINTER(status));
}

/************
*************  PRIVATE ROUTINES
************/

static void
queue_new_todo (QueueTodoAction action, GSList * tasks, int i)
{
      QueueTodo *todo = g_new (QueueTodo, 1);
      todo->tasks = tasks;
      todo->index_1 = i;
      todo->action = action;

      g_mutex_lock (todo_mutex);
      g_queue_push_tail (todo_queue, todo);
      debug3 (DEBUG_QUEUE, "todo queue now has %u tasks (new task type: %d, int 1: %d)",
            todo_queue->length, action, i);
      g_mutex_unlock (todo_mutex);

      queue_wakeup ();
}

static guint
queue_get_length (void)
{
      guint size;
      g_mutex_lock (task_lock);
      size = g_slist_length (task_list);
      g_mutex_unlock (task_lock);
      return size;
}

typedef struct
{
      Server * server;
      TaskFunc func;
      Task * task;
      PanSocket * sock;
}
QueueRunStruct;

static void
increment_running_task_qty (int inc)
{
      running_tasks += inc;
      fire_queue_size_changed (running_tasks, g_slist_length(task_list));
}

static void
queue_run_thread (gpointer gp_data, gpointer user_data)
{
      QueueRunStruct * data = (QueueRunStruct*) gp_data;
      Task * task = data->task;
      PanSocket * sock = data->sock;

      /* get ready to run */
      if (++task->thread_qty == 1)
      {
            /* let the GUI know what's going on */
            status_item_set_active (STATUS_ITEM(task), TRUE);
            increment_running_task_qty (1);
      }
      if (sock != NULL)
            task->sockets = g_slist_prepend (task->sockets, sock);

      /* run the task's workproc */
      (data->func)(task, sock);

      /* workproc done */
      if (sock != NULL)
            task->sockets = g_slist_remove (task->sockets, sock);
      if (!--task->thread_qty)
      {
            const QueueTaskStatus qts = queue_get_task_status (task);

            /* let the GUI know what's going on */
            status_item_set_active (STATUS_ITEM(task), FALSE);
            increment_running_task_qty (-1);

            /* if the task is slated for removal or stopping,
             * then the queue is waiting for the task's threads to finish.
             * Since this is the last thread, remind the queue to do its work. */
            switch (qts) {
                  case QUEUE_TASK_STATUS_REMOVING:
                        queue_remove_tasks (g_slist_append (NULL, task));
                        break;
                  case QUEUE_TASK_STATUS_STOPPING:
                        queue_stop_tasks (g_slist_append (NULL, task));
                        break;
                  default:
                        queue_set_task_status (task, QUEUE_TASK_STATUS_QUEUED);
                        break;
            }
      }

      /* clean up the socket */
      if (sock != NULL)
      {
            /* if the task aborted, the read buffer may still have contents,
             * so throw away the socket */
            if ((task->state.health != TASK_OK) || task->hint_abort)
                  pan_socket_set_error_flag (sock, TRUE);

            socket_pool_checkin (get_socket_pool_from_server (data->server), sock);
      }
      else /* since we didn't checkin a socket to wake the queue, let's do it explicitly */
      {
            queue_wakeup ();
      }

      /* cleanup */
      g_free (data);
}

static void
queue_run (Task * task, TaskFunc func, PanSocket * sock, Server * server)
{
      QueueRunStruct * data;

      g_return_if_fail (task!=NULL);

      /* set the task to `working' here in the queue's thread so that
       * queue_run_what_we_can() doesn't kick off two workprocs at exactly
       * the same time, causing threading issues.  All workprocs can safely
       * assume that the state is TASK_WORKING when they begin, and it's up
       * to them to tell the queue if another concurrent workproc would be good.
       * (See task-save.c's task_save_run_download()). */
      task_state_set_work_working (&task->state);

      /* This is mostly for the benefit of the GUI */
      queue_set_task_status (task, QUEUE_TASK_STATUS_RUNNING);

      data = g_new0 (QueueRunStruct, 1);
      data->server = server;
      data->func = func;
      data->task = task;
      data->sock = sock;

      g_thread_pool_push (_tpool, data, NULL);
}

/**
 * Run any tasks in the task_list which can be run
 * (ie, can get a socket, if necessary) right now.
 */
static void
queue_run_what_we_can (void)
{
      int n = 0;
      GSList * l = NULL;
      debug_enter ("queue_run_what_we_can");

      /* walk through the queue and see if any tasks are waiting to be run. */
      l = task_list;
      while (l != NULL)
      {
            Task * task = TASK(l->data);
            const TaskState state = task->state;
            const QueueTaskStatus qts = queue_get_task_status (task); 

            debug5 (DEBUG_QUEUE,
                  "queue_run_what_we_can: #%d task %p (%s) task-state %d queue-task-state %d",
                  n++, task, status_item_describe(STATUS_ITEM(task)), (int)state.health, (int)qts);

            switch (state.health)
            {
                  case TASK_FAIL:
                  {
                        if (qts != QUEUE_TASK_STATUS_STOPPED)
                        {
                              if (_remove_failed_tasks)
                                    queue_remove_tasks (g_slist_append (NULL, task));
                              else
                                    queue_set_task_status (task, QUEUE_TASK_STATUS_STOPPED);
                        }
                        break;
                  }
                  case TASK_FAIL_NETWORK:
                  {
                        /* for now, just retry.
                           maybe at some point in the future we can have the task sleep a bit first.
                           note the intentional fall-through. */
                        task_state_set_health (&task->state, TASK_OK);
                  }
                  case TASK_OK:
                  {
                        const gboolean runnable = qts==QUEUE_TASK_STATUS_QUEUED || qts==QUEUE_TASK_STATUS_RUNNING;

                        switch (state.work)
                        {
                              case TASK_NEED_SOCKET:
                                    if (runnable) {
                                          PanSocket * sock;
                                          SocketPool * pool = get_socket_pool_from_server (state.server);
                                          socket_pool_request_connection (pool);
                                          sock = socket_pool_checkout (pool);
                                          if (sock != NULL)
                                                queue_run (task, state.func, sock, state.server);
                                    }
                                    break;

                              case TASK_NEED_WORK:
                                    if (runnable)
                                          queue_run (task, state.func, NULL, state.server);
                                    break;

                              case TASK_WORKING:
                                    /* don't do anything -- the task is busy */
                                    break;

                              case TASK_COMPLETED:
                                    queue_remove_tasks (g_slist_append (NULL, task));
                                    break;

                              default:
                                    g_warning ("unhandled state");
                                    break;
                        }
                  }
            }

            /* do we need to do some actions & rebuild the task list? */
            if (g_queue_is_empty (todo_queue))
                  l = l->next;
            else {
                  queue_do_todo ();
                  l = task_list;
                  n = 0;
            }
      }

      debug_exit ("queue_run_what_we_can done");
}

/**
 * Process the tasks that are waiting in the todo list
 */
static void
queue_do_todo (void)
{
      gboolean do_wakeup = FALSE;
      gboolean do_shutdown = FALSE;
      GQueue * queue;
      debug_enter ("queue_do_todo");

      g_mutex_lock (todo_mutex);
      queue = todo_queue;
      todo_queue = g_queue_new ();
      g_mutex_unlock (todo_mutex);

      while (!g_queue_is_empty(queue))
      {
            QueueTodo* a = (QueueTodo*) g_queue_pop_head (queue);

            switch (a->action)
            {
                  case TODO_INSERT:
                        real_queue_insert_tasks (a->tasks, a->index_1, &do_wakeup);
                        break;
                  case TODO_REMOVE:
                        real_queue_remove_tasks (a->tasks, &do_wakeup);
                        break;
                  case TODO_MOVE:
                        real_queue_move_tasks (a->tasks, a->index_1);
                        break;
                  case TODO_REQUEUE:
                        real_requeue_failed_tasks (a->tasks, &do_wakeup);
                        break;
                  case TODO_STOP:
                        real_queue_stop_tasks (a->tasks);
                        break;
                  case TODO_SHUTDOWN:
                        do_shutdown = TRUE;
                        break;
                  case TODO_ONLINE:
                        real_queue_set_online (a->index_1, &do_wakeup);
                        break;
                  default:
                        pan_warn_if_reached();
                        break;
            }

            g_slist_free (a->tasks);
            g_free (a);
      }

      if (do_wakeup)
            queue_wakeup ();
      if (do_shutdown)
            real_queue_shutdown ();

      /* throw the todo list away */
      g_queue_free (queue);
      debug_exit ("queue_do_todo");
}

static void
sockets_upkeep_ghfunc (gpointer key, gpointer value, gpointer user_data)
{
      socket_pool_refresh ((SocketPool*)value);
}

static void*
queue_mainloop (void* unused)
{
      const int TIMEOUT_SECS = 60;

      for (;;)
      {
            gboolean was_timeout;
            GTimeVal sleep_until;

            g_mutex_lock (cond_mutex);

            /* sleep for TIMEOUT_SECS unless someone wakes us up */
            g_get_current_time (&sleep_until);
            g_time_val_add (&sleep_until, TIMEOUT_SECS*G_USEC_PER_SEC);
            was_timeout = FALSE;
            while (!work_to_do && !was_timeout)
                  was_timeout = !g_cond_timed_wait (qcond, cond_mutex, &sleep_until);
            work_to_do = FALSE;
                g_mutex_unlock (cond_mutex);

            /* do work */
            if (was_timeout)
                  g_hash_table_foreach (server_to_pool, sockets_upkeep_ghfunc, NULL);
            else {
                  queue_do_todo ();
                  queue_run_what_we_can ();
                  if (_dirty) {
                        _dirty = FALSE;
                        queue_save_tasks ();
                  }
            }
      }
}


PanCallback*
queue_get_online_status_changed_callback (void)
{
      static PanCallback * cb = NULL;
      if (cb==NULL) cb = pan_callback_new ();
      return cb;
}

static void
fire_online_status_changed (gboolean online)
{
      pan_callback_call (queue_get_online_status_changed_callback(),
                         GINT_TO_POINTER(online), NULL);
}

PanCallback*
queue_get_message_id_status_changed (void)
{
      static PanCallback * cb = NULL;
      if (cb==NULL) cb = pan_callback_new ();
      return cb;
}

static void
fire_message_id_status_changed (const PString ** message_ids, int message_id_qty)
{
      pan_callback_call (queue_get_message_id_status_changed(),
                         message_ids,
                         GINT_TO_POINTER(message_id_qty));
}

PanCallback*
queue_get_connection_size_changed_callback (void)
{
      static PanCallback * cb = NULL;
      if (cb==NULL) cb = pan_callback_new ();
      return cb;
}

static void
fire_connection_size_changed (int increment)
{
      pan_callback_call (queue_get_connection_size_changed_callback(),
                         GINT_TO_POINTER(increment),
                         NULL);
}

PanCallback*
queue_get_size_changed_callback (void)
{
      static PanCallback * cb = NULL;
      if (cb==NULL) cb = pan_callback_new ();
      return cb;
}

static void
fire_queue_size_changed (int running_qty, int total_qty)
{
      pan_callback_call (queue_get_size_changed_callback(),
                         GINT_TO_POINTER(running_qty),
                         GINT_TO_POINTER(total_qty));
}

Generated by  Doxygen 1.6.0   Back to index