00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "asterisk.h"
00027
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 281574 $")
00029
00030 #ifdef DEBUG_SCHEDULER
00031 #define DEBUG(a) do { \
00032 if (option_debug) \
00033 DEBUG_M(a) \
00034 } while (0)
00035 #else
00036 #define DEBUG(a)
00037 #endif
00038
00039 #include <sys/time.h>
00040
00041 #include "asterisk/sched.h"
00042 #include "asterisk/channel.h"
00043 #include "asterisk/lock.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/linkedlists.h"
00046 #include "asterisk/dlinkedlists.h"
00047 #include "asterisk/hashtab.h"
00048 #include "asterisk/heap.h"
00049
00050 struct sched {
00051 AST_LIST_ENTRY(sched) list;
00052 int id;
00053 struct timeval when;
00054 int resched;
00055 int variable;
00056 const void *data;
00057 ast_sched_cb callback;
00058 ssize_t __heap_index;
00059 };
00060
00061 struct sched_context {
00062 ast_mutex_t lock;
00063 unsigned int eventcnt;
00064 unsigned int schedcnt;
00065 unsigned int highwater;
00066 struct ast_hashtab *schedq_ht;
00067 struct ast_heap *sched_heap;
00068
00069 #ifdef SCHED_MAX_CACHE
00070 AST_LIST_HEAD_NOLOCK(, sched) schedc;
00071 unsigned int schedccnt;
00072 #endif
00073 };
00074
00075 struct ast_sched_thread {
00076 pthread_t thread;
00077 ast_mutex_t lock;
00078 ast_cond_t cond;
00079 struct sched_context *context;
00080 unsigned int stop:1;
00081 };
00082
00083 static void *sched_run(void *data)
00084 {
00085 struct ast_sched_thread *st = data;
00086
00087 while (!st->stop) {
00088 int ms;
00089 struct timespec ts = {
00090 .tv_sec = 0,
00091 };
00092
00093 ast_mutex_lock(&st->lock);
00094
00095 if (st->stop) {
00096 ast_mutex_unlock(&st->lock);
00097 return NULL;
00098 }
00099
00100 ms = ast_sched_wait(st->context);
00101
00102 if (ms == -1) {
00103 ast_cond_wait(&st->cond, &st->lock);
00104 } else {
00105 struct timeval tv;
00106 tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00107 ts.tv_sec = tv.tv_sec;
00108 ts.tv_nsec = tv.tv_usec * 1000;
00109 ast_cond_timedwait(&st->cond, &st->lock, &ts);
00110 }
00111
00112 ast_mutex_unlock(&st->lock);
00113
00114 if (st->stop) {
00115 return NULL;
00116 }
00117
00118 ast_sched_runq(st->context);
00119 }
00120
00121 return NULL;
00122 }
00123
00124 void ast_sched_thread_poke(struct ast_sched_thread *st)
00125 {
00126 ast_mutex_lock(&st->lock);
00127 ast_cond_signal(&st->cond);
00128 ast_mutex_unlock(&st->lock);
00129 }
00130
00131 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
00132 {
00133 return st->context;
00134 }
00135
00136 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
00137 {
00138 if (st->thread != AST_PTHREADT_NULL) {
00139 ast_mutex_lock(&st->lock);
00140 st->stop = 1;
00141 ast_cond_signal(&st->cond);
00142 ast_mutex_unlock(&st->lock);
00143 pthread_join(st->thread, NULL);
00144 st->thread = AST_PTHREADT_NULL;
00145 }
00146
00147 ast_mutex_destroy(&st->lock);
00148 ast_cond_destroy(&st->cond);
00149
00150 if (st->context) {
00151 sched_context_destroy(st->context);
00152 st->context = NULL;
00153 }
00154
00155 ast_free(st);
00156
00157 return NULL;
00158 }
00159
00160 struct ast_sched_thread *ast_sched_thread_create(void)
00161 {
00162 struct ast_sched_thread *st;
00163
00164 if (!(st = ast_calloc(1, sizeof(*st)))) {
00165 return NULL;
00166 }
00167
00168 ast_mutex_init(&st->lock);
00169 ast_cond_init(&st->cond, NULL);
00170
00171 st->thread = AST_PTHREADT_NULL;
00172
00173 if (!(st->context = sched_context_create())) {
00174 ast_log(LOG_ERROR, "Failed to create scheduler\n");
00175 ast_sched_thread_destroy(st);
00176 return NULL;
00177 }
00178
00179 if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
00180 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00181 ast_sched_thread_destroy(st);
00182 return NULL;
00183 }
00184
00185 return st;
00186 }
00187
00188 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00189 const void *data, int variable)
00190 {
00191 int res;
00192
00193 ast_mutex_lock(&st->lock);
00194 res = ast_sched_add_variable(st->context, when, cb, data, variable);
00195 if (res != -1) {
00196 ast_cond_signal(&st->cond);
00197 }
00198 ast_mutex_unlock(&st->lock);
00199
00200 return res;
00201 }
00202
00203 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00204 const void *data)
00205 {
00206 int res;
00207
00208 ast_mutex_lock(&st->lock);
00209 res = ast_sched_add(st->context, when, cb, data);
00210 if (res != -1) {
00211 ast_cond_signal(&st->cond);
00212 }
00213 ast_mutex_unlock(&st->lock);
00214
00215 return res;
00216 }
00217
00218
00219
00220 static int sched_cmp(const void *a, const void *b)
00221 {
00222 const struct sched *as = a;
00223 const struct sched *bs = b;
00224 return as->id != bs->id;
00225 }
00226
00227 static unsigned int sched_hash(const void *obj)
00228 {
00229 const struct sched *s = obj;
00230 unsigned int h = s->id;
00231 return h;
00232 }
00233
00234 static int sched_time_cmp(void *a, void *b)
00235 {
00236 return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00237 }
00238
00239 struct sched_context *sched_context_create(void)
00240 {
00241 struct sched_context *tmp;
00242
00243 if (!(tmp = ast_calloc(1, sizeof(*tmp))))
00244 return NULL;
00245
00246 ast_mutex_init(&tmp->lock);
00247 tmp->eventcnt = 1;
00248
00249 tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
00250
00251 if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00252 offsetof(struct sched, __heap_index)))) {
00253 sched_context_destroy(tmp);
00254 return NULL;
00255 }
00256
00257 return tmp;
00258 }
00259
00260 void sched_context_destroy(struct sched_context *con)
00261 {
00262 struct sched *s;
00263
00264 ast_mutex_lock(&con->lock);
00265
00266 #ifdef SCHED_MAX_CACHE
00267
00268 while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00269 ast_free(s);
00270 #endif
00271
00272 if (con->sched_heap) {
00273 while ((s = ast_heap_pop(con->sched_heap))) {
00274 ast_free(s);
00275 }
00276 ast_heap_destroy(con->sched_heap);
00277 con->sched_heap = NULL;
00278 }
00279
00280 ast_hashtab_destroy(con->schedq_ht, NULL);
00281 con->schedq_ht = NULL;
00282
00283
00284 ast_mutex_unlock(&con->lock);
00285 ast_mutex_destroy(&con->lock);
00286 ast_free(con);
00287 }
00288
00289 static struct sched *sched_alloc(struct sched_context *con)
00290 {
00291 struct sched *tmp;
00292
00293
00294
00295
00296
00297 #ifdef SCHED_MAX_CACHE
00298 if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00299 con->schedccnt--;
00300 else
00301 #endif
00302 tmp = ast_calloc(1, sizeof(*tmp));
00303
00304 return tmp;
00305 }
00306
00307 static void sched_release(struct sched_context *con, struct sched *tmp)
00308 {
00309
00310
00311
00312
00313
00314 #ifdef SCHED_MAX_CACHE
00315 if (con->schedccnt < SCHED_MAX_CACHE) {
00316 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00317 con->schedccnt++;
00318 } else
00319 #endif
00320 ast_free(tmp);
00321 }
00322
00323
00324
00325
00326
00327 int ast_sched_wait(struct sched_context *con)
00328 {
00329 int ms;
00330 struct sched *s;
00331
00332 DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00333
00334 ast_mutex_lock(&con->lock);
00335 if ((s = ast_heap_peek(con->sched_heap, 1))) {
00336 ms = ast_tvdiff_ms(s->when, ast_tvnow());
00337 if (ms < 0) {
00338 ms = 0;
00339 }
00340 } else {
00341 ms = -1;
00342 }
00343 ast_mutex_unlock(&con->lock);
00344
00345 return ms;
00346 }
00347
00348
00349
00350
00351
00352
00353
00354 static void schedule(struct sched_context *con, struct sched *s)
00355 {
00356 ast_heap_push(con->sched_heap, s);
00357
00358 if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
00359 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
00360 }
00361
00362 con->schedcnt++;
00363
00364 if (con->schedcnt > con->highwater) {
00365 con->highwater = con->schedcnt;
00366 }
00367 }
00368
00369
00370
00371
00372
00373 static int sched_settime(struct timeval *t, int when)
00374 {
00375 struct timeval now = ast_tvnow();
00376
00377
00378 if (ast_tvzero(*t))
00379 *t = now;
00380 *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00381 if (ast_tvcmp(*t, now) < 0) {
00382 *t = now;
00383 }
00384 return 0;
00385 }
00386
00387 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00388 {
00389
00390 if (old_id > 0) {
00391 AST_SCHED_DEL(con, old_id);
00392 }
00393 return ast_sched_add_variable(con, when, callback, data, variable);
00394 }
00395
00396
00397
00398
00399 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00400 {
00401 struct sched *tmp;
00402 int res = -1;
00403
00404 DEBUG(ast_debug(1, "ast_sched_add()\n"));
00405
00406 ast_mutex_lock(&con->lock);
00407 if ((tmp = sched_alloc(con))) {
00408 tmp->id = con->eventcnt++;
00409 tmp->callback = callback;
00410 tmp->data = data;
00411 tmp->resched = when;
00412 tmp->variable = variable;
00413 tmp->when = ast_tv(0, 0);
00414 if (sched_settime(&tmp->when, when)) {
00415 sched_release(con, tmp);
00416 } else {
00417 schedule(con, tmp);
00418 res = tmp->id;
00419 }
00420 }
00421 #ifdef DUMP_SCHEDULER
00422
00423 if (option_debug)
00424 ast_sched_dump(con);
00425 #endif
00426 ast_mutex_unlock(&con->lock);
00427
00428 return res;
00429 }
00430
00431 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00432 {
00433 if (old_id > -1) {
00434 AST_SCHED_DEL(con, old_id);
00435 }
00436 return ast_sched_add(con, when, callback, data);
00437 }
00438
00439 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00440 {
00441 return ast_sched_add_variable(con, when, callback, data, 0);
00442 }
00443
00444 const void *ast_sched_find_data(struct sched_context *con, int id)
00445 {
00446 struct sched tmp,*res;
00447 tmp.id = id;
00448 res = ast_hashtab_lookup(con->schedq_ht, &tmp);
00449 if (res)
00450 return res->data;
00451 return NULL;
00452 }
00453
00454
00455
00456
00457
00458
00459
00460 #ifndef AST_DEVMODE
00461 int ast_sched_del(struct sched_context *con, int id)
00462 #else
00463 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
00464 #endif
00465 {
00466 struct sched *s, tmp = {
00467 .id = id,
00468 };
00469
00470 DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00471
00472 ast_mutex_lock(&con->lock);
00473 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00474 if (s) {
00475 if (!ast_heap_remove(con->sched_heap, s)) {
00476 ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00477 }
00478
00479 if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
00480 ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
00481 }
00482
00483 con->schedcnt--;
00484
00485 sched_release(con, s);
00486 }
00487
00488 #ifdef DUMP_SCHEDULER
00489
00490 if (option_debug)
00491 ast_sched_dump(con);
00492 #endif
00493 ast_mutex_unlock(&con->lock);
00494
00495 if (!s) {
00496 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00497 #ifndef AST_DEVMODE
00498 ast_assert(s != NULL);
00499 #else
00500 _ast_assert(0, "s != NULL", file, line, function);
00501 #endif
00502 return -1;
00503 }
00504
00505 return 0;
00506 }
00507
00508 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00509 {
00510 int i, x;
00511 struct sched *cur;
00512 int countlist[cbnames->numassocs + 1];
00513 size_t heap_size;
00514
00515 memset(countlist, 0, sizeof(countlist));
00516 ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
00517
00518 ast_mutex_lock(&con->lock);
00519
00520 heap_size = ast_heap_size(con->sched_heap);
00521 for (x = 1; x <= heap_size; x++) {
00522 cur = ast_heap_peek(con->sched_heap, x);
00523
00524 for (i = 0; i < cbnames->numassocs; i++) {
00525 if (cur->callback == cbnames->cblist[i]) {
00526 break;
00527 }
00528 }
00529 if (i < cbnames->numassocs) {
00530 countlist[i]++;
00531 } else {
00532 countlist[cbnames->numassocs]++;
00533 }
00534 }
00535
00536 ast_mutex_unlock(&con->lock);
00537
00538 for (i = 0; i < cbnames->numassocs; i++) {
00539 ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
00540 }
00541
00542 ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
00543 }
00544
00545
00546 void ast_sched_dump(struct sched_context *con)
00547 {
00548 struct sched *q;
00549 struct timeval when = ast_tvnow();
00550 int x;
00551 size_t heap_size;
00552 #ifdef SCHED_MAX_CACHE
00553 ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
00554 #else
00555 ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
00556 #endif
00557
00558 ast_debug(1, "=============================================================\n");
00559 ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
00560 ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00561 ast_mutex_lock(&con->lock);
00562 heap_size = ast_heap_size(con->sched_heap);
00563 for (x = 1; x <= heap_size; x++) {
00564 struct timeval delta;
00565 q = ast_heap_peek(con->sched_heap, x);
00566 delta = ast_tvsub(q->when, when);
00567 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
00568 q->id,
00569 q->callback,
00570 q->data,
00571 (long)delta.tv_sec,
00572 (long int)delta.tv_usec);
00573 }
00574 ast_mutex_unlock(&con->lock);
00575 ast_debug(1, "=============================================================\n");
00576 }
00577
00578
00579
00580
00581 int ast_sched_runq(struct sched_context *con)
00582 {
00583 struct sched *current;
00584 struct timeval when;
00585 int numevents;
00586 int res;
00587
00588 DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00589
00590 ast_mutex_lock(&con->lock);
00591
00592 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00593 for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00594
00595
00596
00597
00598
00599 if (ast_tvcmp(current->when, when) != -1) {
00600 break;
00601 }
00602
00603 current = ast_heap_pop(con->sched_heap);
00604
00605 if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
00606 ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
00607 }
00608
00609 con->schedcnt--;
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620 ast_mutex_unlock(&con->lock);
00621 res = current->callback(current->data);
00622 ast_mutex_lock(&con->lock);
00623
00624 if (res) {
00625
00626
00627
00628
00629 if (sched_settime(¤t->when, current->variable? res : current->resched)) {
00630 sched_release(con, current);
00631 } else {
00632 schedule(con, current);
00633 }
00634 } else {
00635
00636 sched_release(con, current);
00637 }
00638 }
00639
00640 ast_mutex_unlock(&con->lock);
00641
00642 return numevents;
00643 }
00644
00645 long ast_sched_when(struct sched_context *con,int id)
00646 {
00647 struct sched *s, tmp;
00648 long secs = -1;
00649 DEBUG(ast_debug(1, "ast_sched_when()\n"));
00650
00651 ast_mutex_lock(&con->lock);
00652
00653
00654 tmp.id = id;
00655 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00656
00657 if (s) {
00658 struct timeval now = ast_tvnow();
00659 secs = s->when.tv_sec - now.tv_sec;
00660 }
00661 ast_mutex_unlock(&con->lock);
00662
00663 return secs;
00664 }