Thu Apr 28 2011 17:16:22

Asterisk developer's documentation


taskprocessor.c File Reference

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules. More...

#include "asterisk.h"
#include <signal.h>
#include <sys/time.h>
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
Include dependency graph for taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor
 A ast_taskprocessor structure is a singleton by name. More...
struct  ast_taskprocessor::tps_queue
 Taskprocessor queue. More...
struct  tps_task
 tps_task structure is queued to a taskprocessor More...
struct  tps_taskprocessor_stats
 tps_taskprocessor_stats maintain statistics for a taskprocessor. More...

Defines

#define TPS_MAX_BUCKETS   7

Functions

struct ast_taskprocessorast_taskprocessor_get (char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton.
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement.
int ast_tps_init (void)
static char * cli_tps_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_tps_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int tps_cmp_cb (void *obj, void *arg, int flags)
 The astobj2 compare callback for taskprocessors.
static int tps_hash_cb (const void *obj, const int flags)
 The astobj2 hash callback for taskprocessors.
static int tps_ping_handler (void *datap)
 CLI taskprocessor ping <blah>handler function.
static void * tps_processing_function (void *data)
 The task processing function executed by a taskprocessor.
static struct tps_tasktps_task_alloc (int(*task_exe)(void *datap), void *datap)
static void * tps_task_free (struct tps_task *task)
static int tps_taskprocessor_depth (struct ast_taskprocessor *tps)
 Return the size of the taskprocessor queue.
static void tps_taskprocessor_destroy (void *tps)
 Destroy the taskprocessor when its refcount reaches zero.
static struct tps_tasktps_taskprocessor_pop (struct ast_taskprocessor *tps)
 Remove the front task off the taskprocessor queue.
static char * tps_taskprocessor_tab_complete (struct ast_taskprocessor *p, struct ast_cli_args *a)

Variables

static ast_cond_t cli_ping_cond
 CLI taskprocessor ping <blah>operation requires a ping condition.
static ast_mutex_t cli_ping_cond_lock = AST_MUTEX_INIT_VALUE
 CLI taskprocessor ping <blah>operation requires a ping condition lock.
static struct ast_cli_entry taskprocessor_clis []
static struct ao2_containertps_singletons
 tps_singletons is the astobj2 container for taskprocessor singletons

Detailed Description

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.

Author:
Dwayne Hubbard <dhubbard@digium.com>

Definition in file taskprocessor.c.


Define Documentation

#define TPS_MAX_BUCKETS   7

Definition at line 83 of file taskprocessor.c.

Referenced by ast_tps_init().


Function Documentation

struct ast_taskprocessor* ast_taskprocessor_get ( char *  name,
enum ast_tps_options  create 
) [read]

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters:
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since:
1.6.1

Definition at line 407 of file taskprocessor.c.

References ao2_alloc, ao2_find, ao2_link, ao2_lock(), ao2_ref, ao2_unlock(), ast_calloc, ast_cond_init(), ast_log(), ast_mutex_init(), ast_pthread_create, AST_PTHREADT_NULL, ast_strdup, ast_strlen_zero(), LOG_ERROR, LOG_WARNING, name, ast_taskprocessor::name, OBJ_POINTER, ast_taskprocessor::poll_cond, ast_taskprocessor::poll_thread, ast_taskprocessor::poll_thread_run, ast_taskprocessor::stats, ast_taskprocessor::taskprocessor_lock, tps_processing_function(), TPS_REF_IF_EXISTS, tps_singletons, and tps_taskprocessor_destroy().

Referenced by ast_event_init(), cli_tps_ping(), load_module(), and load_pbx().

{
   struct ast_taskprocessor *p, tmp_tps = {
      .name = name,
   };
      
   if (ast_strlen_zero(name)) {
      ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
      return NULL;
   }
   ao2_lock(tps_singletons);
   p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
   if (p) {
      ao2_unlock(tps_singletons);
      return p;
   }
   if (create & TPS_REF_IF_EXISTS) {
      /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
      ao2_unlock(tps_singletons);
      return NULL;
   }
   /* create a new taskprocessor */
   if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
      ao2_unlock(tps_singletons);
      ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
      return NULL;
   }

   ast_cond_init(&p->poll_cond, NULL);
   ast_mutex_init(&p->taskprocessor_lock);

   if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
      ao2_unlock(tps_singletons);
      ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
      ao2_ref(p, -1);
      return NULL;
   }
   if (!(p->name = ast_strdup(name))) {
      ao2_unlock(tps_singletons);
      ao2_ref(p, -1);
      return NULL;
   }
   p->poll_thread_run = 1;
   p->poll_thread = AST_PTHREADT_NULL;
   if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
      ao2_unlock(tps_singletons);
      ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
      ao2_ref(p, -1);
      return NULL;
   }
   if (!(ao2_link(tps_singletons, p))) {
      ao2_unlock(tps_singletons);
      ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
      ao2_ref(p, -1);
      return NULL;
   }
   ao2_unlock(tps_singletons);
   return p;
}
const char* ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since:
1.6.1

Definition at line 395 of file taskprocessor.c.

References ast_log(), LOG_ERROR, and ast_taskprocessor::name.

{
   if (!tps) {
      ast_log(LOG_ERROR, "no taskprocessor specified!\n");
      return NULL;
   }
   return tps->name;
}
int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters:
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Returns:
zero on success, -1 on failure
Since:
1.6.1

Definition at line 482 of file taskprocessor.c.

References ast_cond_signal(), AST_LIST_INSERT_TAIL, ast_log(), ast_mutex_lock(), ast_mutex_unlock(), LOG_ERROR, ast_taskprocessor::name, ast_taskprocessor::poll_cond, ast_taskprocessor::taskprocessor_lock, ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_size, and tps_task_alloc().

Referenced by ast_event_queue(), cli_tps_ping(), device_state_cb(), mwi_sub_event_cb(), and mwi_unsub_event_cb().

{
   struct tps_task *t;

   if (!tps || !task_exe) {
      ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
      return -1;
   }
   if (!(t = tps_task_alloc(task_exe, datap))) {
      ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
      return -1;
   }
   ast_mutex_lock(&tps->taskprocessor_lock);
   AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
   tps->tps_queue_size++;
   ast_cond_signal(&tps->poll_cond);
   ast_mutex_unlock(&tps->taskprocessor_lock);
   return 0;
}
void* ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters:
tpstaskprocessor to unreference
Returns:
NULL
Since:
1.6.1

Definition at line 468 of file taskprocessor.c.

References ao2_link, ao2_lock(), ao2_ref, ao2_unlink, ao2_unlock(), and tps_singletons.

Referenced by unload_module().

{
   if (tps) {
      ao2_lock(tps_singletons);
      ao2_unlink(tps_singletons, tps);
      if (ao2_ref(tps, -1) > 1) {
         ao2_link(tps_singletons, tps);
      }
      ao2_unlock(tps_singletons);
   }
   return NULL;
}
static char * cli_tps_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
) [static]

Definition at line 189 of file taskprocessor.c.

References ao2_ref, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), ast_cond_timedwait(), ast_mutex_lock(), ast_mutex_unlock(), ast_samp2tv(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_tvadd(), ast_tvnow(), ast_tvsub(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, cli_ping_cond, cli_ping_cond_lock, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, name, tps_ping_handler(), TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

{
   struct timeval begin, end, delta;
   char *name;
   struct timeval when;
   struct timespec ts;
   struct ast_taskprocessor *tps = NULL;

   switch (cmd) {
   case CLI_INIT:
      e->command = "core ping taskprocessor";
      e->usage = 
         "Usage: core ping taskprocessor <taskprocessor>\n"
         "  Displays the time required for a task to be processed\n";
      return NULL;
   case CLI_GENERATE:
      return tps_taskprocessor_tab_complete(tps, a);
   }

   if (a->argc != 4)
      return CLI_SHOWUSAGE;

   name = a->argv[3];
   if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
      ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
      return CLI_SUCCESS;
   }
   ast_cli(a->fd, "\npinging %s ...", name);
   when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
   ts.tv_sec = when.tv_sec;
   ts.tv_nsec = when.tv_usec * 1000;
   ast_mutex_lock(&cli_ping_cond_lock);
   if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
      ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
      ao2_ref(tps, -1);
      return CLI_FAILURE;
   }
   ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
   ast_mutex_unlock(&cli_ping_cond_lock);
   end = ast_tvnow();
   delta = ast_tvsub(end, begin);
   ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
   ao2_ref(tps, -1);
   return CLI_SUCCESS;  
}
static char * cli_tps_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
) [static]

Definition at line 235 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_container_count(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), ast_copy_string(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::name, name, ast_taskprocessor::stats, ast_taskprocessor::tps_queue_size, tps_singletons, and ast_cli_entry::usage.

{
   char name[256];
   int tcount;
   unsigned long qsize;
   unsigned long maxqsize;
   unsigned long processed;
   struct ast_taskprocessor *p;
   struct ao2_iterator i;

   switch (cmd) {
   case CLI_INIT:
      e->command = "core show taskprocessors";
      e->usage = 
         "Usage: core show taskprocessors\n"
         "  Shows a list of instantiated task processors and their statistics\n";
      return NULL;
   case CLI_GENERATE:
      return NULL;   
   }

   if (a->argc != e->args)
      return CLI_SHOWUSAGE;

   ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
   i = ao2_iterator_init(tps_singletons, 0);
   while ((p = ao2_iterator_next(&i))) {
      ast_copy_string(name, p->name, sizeof(name));
      qsize = p->tps_queue_size;
      maxqsize = p->stats->max_qsize;
      processed = p->stats->_tasks_processed_count;
      ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
      ao2_ref(p, -1);
   }
   tcount = ao2_container_count(tps_singletons); 
   ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
   return CLI_SUCCESS;  
}
static int tps_cmp_cb ( void *  obj,
void *  arg,
int  flags 
) [static]

The astobj2 compare callback for taskprocessors.

Definition at line 338 of file taskprocessor.c.

References CMP_MATCH, CMP_STOP, and ast_taskprocessor::name.

Referenced by ast_tps_init().

{
   struct ast_taskprocessor *lhs = obj, *rhs = arg;

   return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
}
static int tps_hash_cb ( const void *  obj,
const int  flags 
) [static]

The astobj2 hash callback for taskprocessors.

Definition at line 330 of file taskprocessor.c.

References ast_str_case_hash(), and ast_taskprocessor::name.

Referenced by ast_tps_init().

{
   const struct ast_taskprocessor *tps = obj;

   return ast_str_case_hash(tps->name);
}
static int tps_ping_handler ( void *  datap) [static]

CLI taskprocessor ping <blah>handler function.

Definition at line 180 of file taskprocessor.c.

References ast_cond_signal(), ast_mutex_lock(), ast_mutex_unlock(), cli_ping_cond, and cli_ping_cond_lock.

Referenced by cli_tps_ping().

static void * tps_processing_function ( void *  data) [static]

The task processing function executed by a taskprocessor.

Definition at line 275 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ast_cond_wait(), ast_log(), ast_mutex_lock(), ast_mutex_unlock(), tps_task::datap, tps_task::execute, LOG_ERROR, LOG_WARNING, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::poll_cond, ast_taskprocessor::poll_thread_run, ast_taskprocessor::stats, ast_taskprocessor::taskprocessor_lock, tps_task_free(), tps_taskprocessor_depth(), and tps_taskprocessor_pop().

Referenced by ast_taskprocessor_get().

{
   struct ast_taskprocessor *i = data;
   struct tps_task *t;
   int size;

   if (!i) {
      ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
      return NULL;
   }

   while (i->poll_thread_run) {
      ast_mutex_lock(&i->taskprocessor_lock);
      if (!i->poll_thread_run) {
         ast_mutex_unlock(&i->taskprocessor_lock);
         break;
      }
      if (!(size = tps_taskprocessor_depth(i))) {
         ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
         if (!i->poll_thread_run) {
            ast_mutex_unlock(&i->taskprocessor_lock);
            break;
         }
      }
      ast_mutex_unlock(&i->taskprocessor_lock);
      /* stuff is in the queue */
      if (!(t = tps_taskprocessor_pop(i))) {
         ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
         continue;
      }
      if (!t->execute) {
         ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
         tps_task_free(t);
         continue;
      }
      t->execute(t->datap);
 
      ast_mutex_lock(&i->taskprocessor_lock);
      if (i->stats) {
         i->stats->_tasks_processed_count++;
         if (size > i->stats->max_qsize) {
            i->stats->max_qsize = size;
         }
      }
      ast_mutex_unlock(&i->taskprocessor_lock);
 
      tps_task_free(t);
   }
   while ((t = tps_taskprocessor_pop(i))) {
      tps_task_free(t);
   }
   return NULL;
}
static struct tps_task* tps_task_alloc ( int(*)(void *datap)  task_exe,
void *  datap 
) [static, read]

Definition at line 136 of file taskprocessor.c.

References ast_calloc, tps_task::datap, and tps_task::execute.

Referenced by ast_taskprocessor_push().

{
   struct tps_task *t;
   if ((t = ast_calloc(1, sizeof(*t)))) {
      t->execute = task_exe;
      t->datap = datap;
   }
   return t;
}
static void* tps_task_free ( struct tps_task task) [static]

Definition at line 147 of file taskprocessor.c.

References ast_free.

Referenced by tps_processing_function().

{
   if (task) {
      ast_free(task);
   }
   return NULL;
}
static int tps_taskprocessor_depth ( struct ast_taskprocessor tps) [static]

Return the size of the taskprocessor queue.

Definition at line 389 of file taskprocessor.c.

References ast_taskprocessor::tps_queue_size.

Referenced by tps_processing_function().

{
   return (tps) ? tps->tps_queue_size : -1;
}
static void tps_taskprocessor_destroy ( void *  tps) [static]
static struct tps_task * tps_taskprocessor_pop ( struct ast_taskprocessor tps) [static, read]

Remove the front task off the taskprocessor queue.

Definition at line 373 of file taskprocessor.c.

References AST_LIST_REMOVE_HEAD, ast_log(), ast_mutex_lock(), ast_mutex_unlock(), LOG_ERROR, ast_taskprocessor::taskprocessor_lock, ast_taskprocessor::tps_queue, and ast_taskprocessor::tps_queue_size.

Referenced by tps_processing_function().

{
   struct tps_task *task;

   if (!tps) {
      ast_log(LOG_ERROR, "missing taskprocessor\n");
      return NULL;
   }
   ast_mutex_lock(&tps->taskprocessor_lock);
   if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
      tps->tps_queue_size--;
   }
   ast_mutex_unlock(&tps->taskprocessor_lock);
   return task;
}
static char* tps_taskprocessor_tab_complete ( struct ast_taskprocessor p,
struct ast_cli_args a 
) [static]

Definition at line 156 of file taskprocessor.c.

References ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_strdup, ast_cli_args::n, ast_taskprocessor::name, name, ast_cli_args::pos, tps_singletons, and ast_cli_args::word.

Referenced by cli_tps_ping().

{
   int tklen;
   int wordnum = 0;
   char *name = NULL;
   struct ao2_iterator i;

   if (a->pos != 3)
      return NULL;

   tklen = strlen(a->word);
   i = ao2_iterator_init(tps_singletons, 0);
   while ((p = ao2_iterator_next(&i))) {
      if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
         name = ast_strdup(p->name);
         ao2_ref(p, -1);
         break;
      }
      ao2_ref(p, -1);
   }
   return name;
}

Variable Documentation

CLI taskprocessor ping <blah>operation requires a ping condition.

Definition at line 88 of file taskprocessor.c.

Referenced by ast_tps_init(), cli_tps_ping(), and tps_ping_handler().

ast_mutex_t cli_ping_cond_lock = AST_MUTEX_INIT_VALUE [static]

CLI taskprocessor ping <blah>operation requires a ping condition lock.

Definition at line 91 of file taskprocessor.c.

Referenced by cli_tps_ping(), and tps_ping_handler().

struct ast_cli_entry taskprocessor_clis[] [static]
Initial value:
 {
   AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
   AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
}

Definition at line 116 of file taskprocessor.c.

Referenced by ast_tps_init().

struct ao2_container* tps_singletons [static]

tps_singletons is the astobj2 container for taskprocessor singletons

Definition at line 85 of file taskprocessor.c.

Referenced by ast_taskprocessor_get(), ast_taskprocessor_unreference(), ast_tps_init(), cli_tps_report(), and tps_taskprocessor_tab_complete().