Thu Apr 28 2011 17:15:25

Asterisk developer's documentation


res_timing_pthread.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief pthread timing interface
00024  */
00025 
00026 #include "asterisk.h"
00027 
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 278479 $");
00029 
00030 #include <math.h>
00031 #include <sys/select.h>
00032 
00033 #include "asterisk/module.h"
00034 #include "asterisk/timing.h"
00035 #include "asterisk/utils.h"
00036 #include "asterisk/astobj2.h"
00037 #include "asterisk/time.h"
00038 #include "asterisk/lock.h"
00039 #include "asterisk/poll-compat.h"
00040 
00041 static void *timing_funcs_handle;
00042 
00043 static int pthread_timer_open(void);
00044 static void pthread_timer_close(int handle);
00045 static int pthread_timer_set_rate(int handle, unsigned int rate);
00046 static void pthread_timer_ack(int handle, unsigned int quantity);
00047 static int pthread_timer_enable_continuous(int handle);
00048 static int pthread_timer_disable_continuous(int handle);
00049 static enum ast_timer_event pthread_timer_get_event(int handle);
00050 static unsigned int pthread_timer_get_max_rate(int handle);
00051 
00052 static struct ast_timing_interface pthread_timing = {
00053    .name = "pthread",
00054    .priority = 0, /* use this as a last resort */
00055    .timer_open = pthread_timer_open,
00056    .timer_close = pthread_timer_close,
00057    .timer_set_rate = pthread_timer_set_rate,
00058    .timer_ack = pthread_timer_ack,
00059    .timer_enable_continuous = pthread_timer_enable_continuous,
00060    .timer_disable_continuous = pthread_timer_disable_continuous,
00061    .timer_get_event = pthread_timer_get_event,
00062    .timer_get_max_rate = pthread_timer_get_max_rate,
00063 };
00064 
00065 /* 1 tick / 10 ms */
00066 #define MAX_RATE 100
00067 
00068 static struct ao2_container *pthread_timers;
00069 #define PTHREAD_TIMER_BUCKETS 563
00070 
00071 enum {
00072    PIPE_READ =  0,
00073    PIPE_WRITE = 1
00074 };
00075 
00076 enum pthread_timer_state {
00077    TIMER_STATE_IDLE,
00078    TIMER_STATE_TICKING,
00079 };
00080 
00081 struct pthread_timer {
00082    int pipe[2];
00083    enum pthread_timer_state state;
00084    unsigned int rate;
00085    /*! Interval in ms for current rate */
00086    unsigned int interval;
00087    unsigned int tick_count;
00088    unsigned int pending_ticks;
00089    struct timeval start;
00090    unsigned int continuous:1;
00091 };
00092 
00093 static void pthread_timer_destructor(void *obj);
00094 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00095 static void write_byte(struct pthread_timer *timer);
00096 static void read_pipe(struct pthread_timer *timer, unsigned int num);
00097 
00098 /*!
00099  * \brief Data for the timing thread
00100  */
00101 static struct {
00102    pthread_t thread;
00103    ast_mutex_t lock;
00104    ast_cond_t cond;
00105    unsigned int stop:1;
00106 } timing_thread;
00107 
00108 static int pthread_timer_open(void)
00109 {
00110    struct pthread_timer *timer;
00111    int fd;
00112 
00113    if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00114       errno = ENOMEM;
00115       return -1;
00116    }
00117 
00118    timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00119    timer->state = TIMER_STATE_IDLE;
00120 
00121    if (pipe(timer->pipe)) {
00122       ao2_ref(timer, -1);
00123       return -1;
00124    }
00125 
00126    ao2_lock(pthread_timers);
00127    if (!ao2_container_count(pthread_timers)) {
00128       ast_mutex_lock(&timing_thread.lock);
00129       ast_cond_signal(&timing_thread.cond);
00130       ast_mutex_unlock(&timing_thread.lock);
00131    }
00132    ao2_link(pthread_timers, timer);
00133    ao2_unlock(pthread_timers);
00134 
00135    fd = timer->pipe[PIPE_READ];
00136 
00137    ao2_ref(timer, -1);
00138 
00139    return fd;
00140 }
00141 
00142 static void pthread_timer_close(int handle)
00143 {
00144    struct pthread_timer *timer;
00145 
00146    if (!(timer = find_timer(handle, 1))) {
00147       return;
00148    }
00149 
00150    ao2_ref(timer, -1);
00151 }
00152 
00153 static int pthread_timer_set_rate(int handle, unsigned int rate)
00154 {
00155    struct pthread_timer *timer;
00156 
00157    if (!(timer = find_timer(handle, 0))) {
00158       errno = EINVAL;
00159       return -1;
00160    }
00161 
00162    if (rate > MAX_RATE) {
00163       ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00164             "max rate of %d / sec\n", MAX_RATE);
00165       errno = EINVAL;
00166       return -1;
00167    }
00168 
00169    ao2_lock(timer);
00170 
00171    if ((timer->rate = rate)) {
00172       timer->interval = roundf(1000.0 / ((float) rate));
00173       timer->start = ast_tvnow();
00174       timer->state = TIMER_STATE_TICKING;
00175    } else {
00176       timer->interval = 0;
00177       timer->start = ast_tv(0, 0);
00178       timer->state = TIMER_STATE_IDLE;
00179    }
00180    timer->tick_count = 0;
00181 
00182    ao2_unlock(timer);
00183 
00184    ao2_ref(timer, -1);
00185 
00186    return 0;
00187 }
00188 
00189 static void pthread_timer_ack(int handle, unsigned int quantity)
00190 {
00191    struct pthread_timer *timer;
00192 
00193    ast_assert(quantity > 0);
00194 
00195    if (!(timer = find_timer(handle, 0))) {
00196       return;
00197    }
00198 
00199    ao2_lock(timer);
00200    read_pipe(timer, quantity);
00201    ao2_unlock(timer);
00202 
00203    ao2_ref(timer, -1);
00204 }
00205 
00206 static int pthread_timer_enable_continuous(int handle)
00207 {
00208    struct pthread_timer *timer;
00209 
00210    if (!(timer = find_timer(handle, 0))) {
00211       errno = EINVAL;
00212       return -1;
00213    }
00214 
00215    ao2_lock(timer);
00216    if (!timer->continuous) {
00217       timer->continuous = 1;
00218       write_byte(timer);
00219    }
00220    ao2_unlock(timer);
00221 
00222    ao2_ref(timer, -1);
00223 
00224    return 0;
00225 }
00226 
00227 static int pthread_timer_disable_continuous(int handle)
00228 {
00229    struct pthread_timer *timer;
00230 
00231    if (!(timer = find_timer(handle, 0))) {
00232       errno = EINVAL;
00233       return -1;
00234    }
00235 
00236    ao2_lock(timer);
00237    if (timer->continuous) {
00238       timer->continuous = 0;
00239       read_pipe(timer, 1);
00240    }
00241    ao2_unlock(timer);
00242 
00243    ao2_ref(timer, -1);
00244 
00245    return 0;
00246 }
00247 
00248 static enum ast_timer_event pthread_timer_get_event(int handle)
00249 {
00250    struct pthread_timer *timer;
00251    enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00252 
00253    if (!(timer = find_timer(handle, 0))) {
00254       return res;
00255    }
00256 
00257    ao2_lock(timer);
00258    if (timer->continuous && timer->pending_ticks == 1) {
00259       res = AST_TIMING_EVENT_CONTINUOUS;
00260    }
00261    ao2_unlock(timer);
00262 
00263    ao2_ref(timer, -1);
00264 
00265    return res;
00266 }
00267 
00268 static unsigned int pthread_timer_get_max_rate(int handle)
00269 {
00270    return MAX_RATE;
00271 }
00272 
00273 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00274 {
00275    struct pthread_timer *timer;
00276    struct pthread_timer tmp_timer;
00277    int flags = OBJ_POINTER;
00278 
00279    tmp_timer.pipe[PIPE_READ] = handle;
00280 
00281    if (unlinkobj) {
00282       flags |= OBJ_UNLINK;
00283    }
00284 
00285    if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00286       ast_assert(timer != NULL);
00287       return NULL;
00288    }
00289 
00290    return timer;
00291 }
00292 
00293 static void pthread_timer_destructor(void *obj)
00294 {
00295    struct pthread_timer *timer = obj;
00296 
00297    if (timer->pipe[PIPE_READ] > -1) {
00298       close(timer->pipe[PIPE_READ]);
00299       timer->pipe[PIPE_READ] = -1;
00300    }
00301 
00302    if (timer->pipe[PIPE_WRITE] > -1) {
00303       close(timer->pipe[PIPE_WRITE]);
00304       timer->pipe[PIPE_WRITE] = -1;
00305    }
00306 }
00307 
00308 /*!
00309  * \note only PIPE_READ is guaranteed valid
00310  */
00311 static int pthread_timer_hash(const void *obj, const int flags)
00312 {
00313    const struct pthread_timer *timer = obj;
00314 
00315    return timer->pipe[PIPE_READ];
00316 }
00317 
00318 /*!
00319  * \note only PIPE_READ is guaranteed valid
00320  */
00321 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00322 {
00323    struct pthread_timer *timer1 = obj, *timer2 = arg;
00324 
00325    return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00326 }
00327 
00328 /*!
00329  * \retval 0 no timer tick needed
00330  * \retval non-zero write to the timing pipe needed
00331  */
00332 static int check_timer(struct pthread_timer *timer)
00333 {
00334    struct timeval now;
00335 
00336    if (timer->state == TIMER_STATE_IDLE) {
00337       return 0;
00338    }
00339 
00340    now = ast_tvnow();
00341 
00342    if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00343       timer->tick_count++;
00344       if (!timer->tick_count) {
00345          /* Handle overflow. */
00346          timer->start = now;
00347       }
00348       return 1;
00349    }
00350 
00351    return 0;
00352 }
00353 
00354 /*!
00355  * \internal
00356  * \pre timer is locked
00357  */
00358 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
00359 {
00360    int rd_fd = timer->pipe[PIPE_READ];
00361    int pending_ticks = timer->pending_ticks;
00362 
00363    ast_assert(quantity);
00364 
00365    if (timer->continuous && pending_ticks) {
00366       pending_ticks--;
00367    }
00368 
00369    if (quantity > pending_ticks) {
00370       quantity = pending_ticks;
00371    }
00372 
00373    if (!quantity) {
00374       return;
00375    }
00376 
00377    do {
00378       unsigned char buf[1024];
00379       ssize_t res;
00380       struct pollfd pfd = {
00381          .fd = rd_fd,
00382          .events = POLLIN,
00383       };
00384 
00385       if (ast_poll(&pfd, 1, 0) != 1) {
00386          ast_debug(1, "Reading not available on timing pipe, "
00387                "quantity: %u\n", quantity);
00388          break;
00389       }
00390 
00391       res = read(rd_fd, buf,
00392          (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00393 
00394       if (res == -1) {
00395          if (errno == EAGAIN) {
00396             continue;
00397          }
00398          ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
00399                strerror(errno));
00400          break;
00401       }
00402 
00403       quantity -= res;
00404       timer->pending_ticks -= res;
00405    } while (quantity);
00406 }
00407 
00408 /*!
00409  * \internal
00410  * \pre timer is locked
00411  */
00412 static void write_byte(struct pthread_timer *timer)
00413 {
00414    ssize_t res;
00415    unsigned char x = 42;
00416 
00417    do {
00418       res = write(timer->pipe[PIPE_WRITE], &x, 1);
00419    } while (res == -1 && errno == EAGAIN);
00420 
00421    if (res == -1) {
00422       ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00423             strerror(errno));
00424    } else {
00425       timer->pending_ticks++;
00426    }
00427 }
00428 
00429 static int run_timer(void *obj, void *arg, int flags)
00430 {
00431    struct pthread_timer *timer = obj;
00432 
00433    if (timer->state == TIMER_STATE_IDLE) {
00434       return 0;
00435    }
00436 
00437    ao2_lock(timer);
00438    if (check_timer(timer)) {
00439       write_byte(timer);
00440    }
00441    ao2_unlock(timer);
00442 
00443    return 0;
00444 }
00445 
00446 static void *do_timing(void *arg)
00447 {
00448    struct timeval next_wakeup = ast_tvnow();
00449 
00450    while (!timing_thread.stop) {
00451       struct timespec ts = { 0, };
00452 
00453       ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00454 
00455       next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00456 
00457       ts.tv_sec = next_wakeup.tv_sec;
00458       ts.tv_nsec = next_wakeup.tv_usec * 1000;
00459 
00460       ast_mutex_lock(&timing_thread.lock);
00461       if (!timing_thread.stop) {
00462          if (ao2_container_count(pthread_timers)) {
00463             ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00464          } else {
00465             ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00466          }
00467       }
00468       ast_mutex_unlock(&timing_thread.lock);
00469    }
00470 
00471    return NULL;
00472 }
00473 
00474 static int init_timing_thread(void)
00475 {
00476    ast_mutex_init(&timing_thread.lock);
00477    ast_cond_init(&timing_thread.cond, NULL);
00478 
00479    if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00480       ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00481       return -1;
00482    }
00483 
00484    return 0;
00485 }
00486 
00487 static int load_module(void)
00488 {
00489    if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00490       pthread_timer_hash, pthread_timer_cmp))) {
00491       return AST_MODULE_LOAD_DECLINE;
00492    }
00493 
00494    if (init_timing_thread()) {
00495       ao2_ref(pthread_timers, -1);
00496       pthread_timers = NULL;
00497       return AST_MODULE_LOAD_DECLINE;
00498    }
00499 
00500    return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00501       AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00502 }
00503 
00504 static int unload_module(void)
00505 {
00506    int res;
00507 
00508    ast_mutex_lock(&timing_thread.lock);
00509    timing_thread.stop = 1;
00510    ast_cond_signal(&timing_thread.cond);
00511    ast_mutex_unlock(&timing_thread.lock);
00512    pthread_join(timing_thread.thread, NULL);
00513 
00514    if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00515       ao2_ref(pthread_timers, -1);
00516       pthread_timers = NULL;
00517    }
00518 
00519    return res;
00520 }
00521 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00522       .load = load_module,
00523       .unload = unload_module,
00524       .load_pri = 10,
00525       );