00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00044 #include <string.h>
00045
00046 #include "sbthread.h"
00047 #include "ckd_alloc.h"
00048 #include "err.h"
00049
00050
00051
00052
00053 #ifdef _WIN32
00054 #define _WIN32_WINNT 0x0400
00055 #include <windows.h>
00056
00057 struct sbthread_s {
00058 cmd_ln_t *config;
00059 sbmsgq_t *msgq;
00060 sbthread_main func;
00061 void *arg;
00062 HANDLE th;
00063 DWORD tid;
00064 };
00065
00066 struct sbmsgq_s {
00067
00068 char *data;
00069 size_t depth;
00070 size_t out;
00071 size_t nbytes;
00072
00073
00074 char *msg;
00075 size_t msglen;
00076 CRITICAL_SECTION mtx;
00077 HANDLE evt;
00078 };
00079
00080 struct sbevent_s {
00081 HANDLE evt;
00082 };
00083
00084 struct sbmtx_s {
00085 CRITICAL_SECTION mtx;
00086 };
00087
00088 DWORD WINAPI
00089 sbthread_internal_main(LPVOID arg)
00090 {
00091 sbthread_t *th = (sbthread_t *)arg;
00092 int rv;
00093
00094 rv = (*th->func)(th);
00095 return (DWORD)rv;
00096 }
00097
00098 sbthread_t *
00099 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
00100 {
00101 sbthread_t *th;
00102
00103 th = ckd_calloc(1, sizeof(*th));
00104 th->config = config;
00105 th->func = func;
00106 th->arg = arg;
00107 th->msgq = sbmsgq_init(256);
00108 th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
00109 if (th->th == NULL) {
00110 sbthread_free(th);
00111 return NULL;
00112 }
00113 return th;
00114 }
00115
00116 int
00117 sbthread_wait(sbthread_t *th)
00118 {
00119 DWORD rv, exit;
00120
00121
00122 if (th->th == NULL)
00123 return -1;
00124
00125 rv = WaitForSingleObject(th->th, INFINITE);
00126 if (rv == WAIT_FAILED) {
00127 E_ERROR("Failed to join thread: WAIT_FAILED\n");
00128 return -1;
00129 }
00130 GetExitCodeThread(th->th, &exit);
00131 CloseHandle(th->th);
00132 th->th = NULL;
00133 return (int)exit;
00134 }
00135
00136 static DWORD
00137 cond_timed_wait(HANDLE cond, int sec, int nsec)
00138 {
00139 DWORD rv;
00140 if (sec == -1) {
00141 rv = WaitForSingleObject(cond, INFINITE);
00142 }
00143 else {
00144 DWORD ms;
00145
00146 ms = sec * 1000 + nsec / (1000*1000);
00147 rv = WaitForSingleObject(cond, ms);
00148 }
00149 return rv;
00150 }
00151
00152 sbevent_t *
00153 sbevent_init(void)
00154 {
00155 sbevent_t *evt;
00156
00157 evt = ckd_calloc(1, sizeof(*evt));
00158 evt->evt = CreateEvent(NULL, FALSE, FALSE, NULL);
00159 if (evt->evt == NULL) {
00160 ckd_free(evt);
00161 return NULL;
00162 }
00163 return evt;
00164 }
00165
00166 void
00167 sbevent_free(sbevent_t *evt)
00168 {
00169 CloseHandle(evt->evt);
00170 ckd_free(evt);
00171 }
00172
00173 int
00174 sbevent_signal(sbevent_t *evt)
00175 {
00176 return SetEvent(evt->evt) ? 0 : -1;
00177 }
00178
00179 int
00180 sbevent_wait(sbevent_t *evt, int sec, int nsec)
00181 {
00182 DWORD rv;
00183
00184 rv = cond_timed_wait(evt->evt, sec, nsec);
00185 return rv;
00186 }
00187
00188 sbmtx_t *
00189 sbmtx_init(void)
00190 {
00191 sbmtx_t *mtx;
00192
00193 mtx = ckd_calloc(1, sizeof(*mtx));
00194 InitializeCriticalSection(&mtx->mtx);
00195 return mtx;
00196 }
00197
00198 int
00199 sbmtx_trylock(sbmtx_t *mtx)
00200 {
00201 return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
00202 }
00203
00204 int
00205 sbmtx_lock(sbmtx_t *mtx)
00206 {
00207 EnterCriticalSection(&mtx->mtx);
00208 return 0;
00209 }
00210
00211 int
00212 sbmtx_unlock(sbmtx_t *mtx)
00213 {
00214 LeaveCriticalSection(&mtx->mtx);
00215 return 0;
00216 }
00217
00218 void
00219 sbmtx_free(sbmtx_t *mtx)
00220 {
00221 DeleteCriticalSection(&mtx->mtx);
00222 ckd_free(mtx);
00223 }
00224
00225 sbmsgq_t *
00226 sbmsgq_init(size_t depth)
00227 {
00228 sbmsgq_t *msgq;
00229
00230 msgq = ckd_calloc(1, sizeof(*msgq));
00231 msgq->depth = depth;
00232 msgq->evt = CreateEvent(NULL, FALSE, FALSE, NULL);
00233 if (msgq->evt == NULL) {
00234 ckd_free(msgq);
00235 return NULL;
00236 }
00237 InitializeCriticalSection(&msgq->mtx);
00238 msgq->data = ckd_calloc(depth, 1);
00239 msgq->msg = ckd_calloc(depth, 1);
00240 return msgq;
00241 }
00242
00243 void
00244 sbmsgq_free(sbmsgq_t *msgq)
00245 {
00246 CloseHandle(msgq->evt);
00247 ckd_free(msgq->data);
00248 ckd_free(msgq->msg);
00249 ckd_free(msgq);
00250 }
00251
00252 int
00253 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
00254 {
00255 char const *cdata = (char const *)data;
00256 size_t in;
00257
00258
00259 if (len + sizeof(len) > q->depth)
00260 return -1;
00261
00262 if (q->nbytes + len + sizeof(len) > q->depth)
00263 WaitForSingleObject(q->evt, INFINITE);
00264
00265
00266
00267 EnterCriticalSection(&q->mtx);
00268 in = (q->out + q->nbytes) % q->depth;
00269
00270 if (in + sizeof(len) > q->depth) {
00271
00272 size_t len1 = q->depth - in;
00273 memcpy(q->data + in, &len, len1);
00274 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
00275 q->nbytes += sizeof(len);
00276 in = sizeof(len) - len1;
00277 }
00278 else {
00279 memcpy(q->data + in, &len, sizeof(len));
00280 q->nbytes += sizeof(len);
00281 in += sizeof(len);
00282 }
00283
00284
00285 if (in + len > q->depth) {
00286
00287 size_t len1 = q->depth - in;
00288 memcpy(q->data + in, cdata, len1);
00289 q->nbytes += len1;
00290 cdata += len1;
00291 len -= len1;
00292 in = 0;
00293 }
00294 memcpy(q->data + in, cdata, len);
00295 q->nbytes += len;
00296
00297
00298 SetEvent(q->evt);
00299
00300 LeaveCriticalSection(&q->mtx);
00301
00302 return 0;
00303 }
00304
00305 void *
00306 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
00307 {
00308 char *outptr;
00309 size_t len;
00310
00311
00312 if (q->nbytes == 0) {
00313 if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
00314
00315 return NULL;
00316 }
00317
00318 EnterCriticalSection(&q->mtx);
00319
00320 if (q->out + sizeof(q->msglen) > q->depth) {
00321
00322 size_t len1 = q->depth - q->out;
00323 memcpy(&q->msglen, q->data + q->out, len1);
00324 memcpy(((char *)&q->msglen) + len1, q->data,
00325 sizeof(q->msglen) - len1);
00326 q->out = sizeof(q->msglen) - len1;
00327 }
00328 else {
00329 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
00330 q->out += sizeof(q->msglen);
00331 }
00332 q->nbytes -= sizeof(q->msglen);
00333
00334 outptr = q->msg;
00335 len = q->msglen;
00336 if (q->out + q->msglen > q->depth) {
00337
00338 size_t len1 = q->depth - q->out;
00339 memcpy(outptr, q->data + q->out, len1);
00340 outptr += len1;
00341 len -= len1;
00342 q->nbytes -= len1;
00343 q->out = 0;
00344 }
00345 memcpy(outptr, q->data + q->out, len);
00346 q->nbytes -= len;
00347 q->out += len;
00348
00349
00350 SetEvent(q->evt);
00351
00352 LeaveCriticalSection(&q->mtx);
00353 if (out_len)
00354 *out_len = q->msglen;
00355 return q->msg;
00356 }
00357
00358 #else
00359 #include <pthread.h>
00360 #include <sys/time.h>
00361
00362 struct sbthread_s {
00363 cmd_ln_t *config;
00364 sbmsgq_t *msgq;
00365 sbthread_main func;
00366 void *arg;
00367 pthread_t th;
00368 };
00369
00370 struct sbmsgq_s {
00371
00372 char *data;
00373 size_t depth;
00374 size_t out;
00375 size_t nbytes;
00376
00377
00378 char *msg;
00379 size_t msglen;
00380 pthread_mutex_t mtx;
00381 pthread_cond_t cond;
00382 };
00383
00384 struct sbevent_s {
00385 pthread_mutex_t mtx;
00386 pthread_cond_t cond;
00387 int signalled;
00388 };
00389
00390 struct sbmtx_s {
00391 pthread_mutex_t mtx;
00392 };
00393
00394 static void *
00395 sbthread_internal_main(void *arg)
00396 {
00397 sbthread_t *th = (sbthread_t *)arg;
00398 int rv;
00399
00400 rv = (*th->func)(th);
00401 return (void *)(long)rv;
00402 }
00403
00404 sbthread_t *
00405 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
00406 {
00407 sbthread_t *th;
00408 int rv;
00409
00410 th = ckd_calloc(1, sizeof(*th));
00411 th->config = config;
00412 th->func = func;
00413 th->arg = arg;
00414 th->msgq = sbmsgq_init(1024);
00415 if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
00416 E_ERROR("Failed to create thread: %d\n", rv);
00417 sbthread_free(th);
00418 return NULL;
00419 }
00420 return th;
00421 }
00422
00423 int
00424 sbthread_wait(sbthread_t *th)
00425 {
00426 void *exit;
00427 int rv;
00428
00429
00430 if (th->th == (pthread_t)-1)
00431 return -1;
00432
00433 rv = pthread_join(th->th, &exit);
00434 if (rv != 0) {
00435 E_ERROR("Failed to join thread: %d\n", rv);
00436 return -1;
00437 }
00438 th->th = (pthread_t)-1;
00439 return (int)(long)exit;
00440 }
00441
00442 sbmsgq_t *
00443 sbmsgq_init(size_t depth)
00444 {
00445 sbmsgq_t *msgq;
00446
00447 msgq = ckd_calloc(1, sizeof(*msgq));
00448 msgq->depth = depth;
00449 if (pthread_cond_init(&msgq->cond, NULL) != 0) {
00450 ckd_free(msgq);
00451 return NULL;
00452 }
00453 if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
00454 pthread_cond_destroy(&msgq->cond);
00455 ckd_free(msgq);
00456 return NULL;
00457 }
00458 msgq->data = ckd_calloc(depth, 1);
00459 msgq->msg = ckd_calloc(depth, 1);
00460 return msgq;
00461 }
00462
00463 void
00464 sbmsgq_free(sbmsgq_t *msgq)
00465 {
00466 pthread_mutex_destroy(&msgq->mtx);
00467 pthread_cond_destroy(&msgq->cond);
00468 ckd_free(msgq->data);
00469 ckd_free(msgq->msg);
00470 ckd_free(msgq);
00471 }
00472
00473 int
00474 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
00475 {
00476 size_t in;
00477
00478
00479 if (len + sizeof(len) > q->depth)
00480 return -1;
00481
00482
00483 pthread_mutex_lock(&q->mtx);
00484 if (q->nbytes + len + sizeof(len) > q->depth) {
00485
00486 if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
00487
00488 pthread_mutex_unlock(&q->mtx);
00489 return -1;
00490 }
00491
00492 }
00493 in = (q->out + q->nbytes) % q->depth;
00494
00495
00496 if (in + sizeof(len) > q->depth) {
00497
00498 size_t len1 = q->depth - in;
00499 memcpy(q->data + in, &len, len1);
00500 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
00501 q->nbytes += sizeof(len);
00502 in = sizeof(len) - len1;
00503 }
00504 else {
00505 memcpy(q->data + in, &len, sizeof(len));
00506 q->nbytes += sizeof(len);
00507 in += sizeof(len);
00508 }
00509
00510
00511 if (in + len > q->depth) {
00512
00513 size_t len1 = q->depth - in;
00514 memcpy(q->data + in, data, len1);
00515 q->nbytes += len1;
00516 data += len1;
00517 len -= len1;
00518 in = 0;
00519 }
00520 memcpy(q->data + in, data, len);
00521 q->nbytes += len;
00522
00523
00524 pthread_cond_signal(&q->cond);
00525
00526 pthread_mutex_unlock(&q->mtx);
00527 return 0;
00528 }
00529
00530 static int
00531 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
00532 {
00533 int rv;
00534 if (sec == -1) {
00535 rv = pthread_cond_wait(cond, mtx);
00536 }
00537 else {
00538 struct timeval now;
00539 struct timespec end;
00540
00541 gettimeofday(&now, NULL);
00542 end.tv_sec = now.tv_sec + sec;
00543 end.tv_nsec = now.tv_usec * 1000 + nsec;
00544 if (end.tv_nsec > (1000*1000*1000)) {
00545 sec += end.tv_nsec / (1000*1000*1000);
00546 end.tv_nsec = end.tv_nsec % (1000*1000*1000);
00547 }
00548 rv = pthread_cond_timedwait(cond, mtx, &end);
00549 }
00550 return rv;
00551 }
00552
00553 void *
00554 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
00555 {
00556 char *outptr;
00557 size_t len;
00558
00559
00560 pthread_mutex_lock(&q->mtx);
00561 if (q->nbytes == 0) {
00562
00563 if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
00564
00565 pthread_mutex_unlock(&q->mtx);
00566 return NULL;
00567 }
00568
00569 }
00570
00571 if (q->out + sizeof(q->msglen) > q->depth) {
00572
00573 size_t len1 = q->depth - q->out;
00574 memcpy(&q->msglen, q->data + q->out, len1);
00575 memcpy(((char *)&q->msglen) + len1, q->data,
00576 sizeof(q->msglen) - len1);
00577 q->out = sizeof(q->msglen) - len1;
00578 }
00579 else {
00580 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
00581 q->out += sizeof(q->msglen);
00582 }
00583 q->nbytes -= sizeof(q->msglen);
00584
00585 outptr = q->msg;
00586 len = q->msglen;
00587 if (q->out + q->msglen > q->depth) {
00588
00589 size_t len1 = q->depth - q->out;
00590 memcpy(outptr, q->data + q->out, len1);
00591 outptr += len1;
00592 len -= len1;
00593 q->nbytes -= len1;
00594 q->out = 0;
00595 }
00596 memcpy(outptr, q->data + q->out, len);
00597 q->nbytes -= len;
00598 q->out += len;
00599
00600
00601 pthread_cond_signal(&q->cond);
00602
00603 pthread_mutex_unlock(&q->mtx);
00604 if (out_len)
00605 *out_len = q->msglen;
00606 return q->msg;
00607 }
00608
00609 sbevent_t *
00610 sbevent_init(void)
00611 {
00612 sbevent_t *evt;
00613 int rv;
00614
00615 evt = ckd_calloc(1, sizeof(*evt));
00616 if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
00617 E_ERROR("Failed to initialize mutex: %d\n", rv);
00618 ckd_free(evt);
00619 return NULL;
00620 }
00621 if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
00622 E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
00623 pthread_mutex_destroy(&evt->mtx);
00624 ckd_free(evt);
00625 return NULL;
00626 }
00627 return evt;
00628 }
00629
00630 void
00631 sbevent_free(sbevent_t *evt)
00632 {
00633 pthread_mutex_destroy(&evt->mtx);
00634 pthread_cond_destroy(&evt->cond);
00635 ckd_free(evt);
00636 }
00637
00638 int
00639 sbevent_signal(sbevent_t *evt)
00640 {
00641 int rv;
00642
00643 pthread_mutex_lock(&evt->mtx);
00644 evt->signalled = TRUE;
00645 rv = pthread_cond_signal(&evt->cond);
00646 pthread_mutex_unlock(&evt->mtx);
00647 return rv;
00648 }
00649
00650 int
00651 sbevent_wait(sbevent_t *evt, int sec, int nsec)
00652 {
00653 int rv = 0;
00654
00655
00656 pthread_mutex_lock(&evt->mtx);
00657
00658 if (!evt->signalled)
00659 rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
00660
00661 if (rv == 0)
00662 evt->signalled = FALSE;
00663
00664 pthread_mutex_unlock(&evt->mtx);
00665
00666 return rv;
00667 }
00668
00669 sbmtx_t *
00670 sbmtx_init(void)
00671 {
00672 sbmtx_t *mtx;
00673
00674 mtx = ckd_calloc(1, sizeof(*mtx));
00675 if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
00676 ckd_free(mtx);
00677 return NULL;
00678 }
00679 return mtx;
00680 }
00681
00682 int
00683 sbmtx_trylock(sbmtx_t *mtx)
00684 {
00685 return pthread_mutex_trylock(&mtx->mtx);
00686 }
00687
00688 int
00689 sbmtx_lock(sbmtx_t *mtx)
00690 {
00691 return pthread_mutex_lock(&mtx->mtx);
00692 }
00693
00694 int
00695 sbmtx_unlock(sbmtx_t *mtx)
00696 {
00697 return pthread_mutex_unlock(&mtx->mtx);
00698 }
00699
00700 void
00701 sbmtx_free(sbmtx_t *mtx)
00702 {
00703 pthread_mutex_destroy(&mtx->mtx);
00704 ckd_free(mtx);
00705 }
00706 #endif
00707
00708 cmd_ln_t *
00709 sbthread_config(sbthread_t *th)
00710 {
00711 return th->config;
00712 }
00713
00714 void *
00715 sbthread_arg(sbthread_t *th)
00716 {
00717 return th->arg;
00718 }
00719
00720 sbmsgq_t *
00721 sbthread_msgq(sbthread_t *th)
00722 {
00723 return th->msgq;
00724 }
00725
00726 int
00727 sbthread_send(sbthread_t *th, size_t len, void const *data)
00728 {
00729 return sbmsgq_send(th->msgq, len, data);
00730 }
00731
00732 void
00733 sbthread_free(sbthread_t *th)
00734 {
00735 sbthread_wait(th);
00736 sbmsgq_free(th->msgq);
00737 ckd_free(th);
00738 }