25 #include "opensync_internals.h"
48 gboolean _incoming_prepare(GSource *source, gint *timeout_)
55 gboolean _incoming_check(GSource *source)
58 if (g_async_queue_length(queue->incoming) > 0)
67 gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
73 while ((message = g_async_queue_try_pop(queue->incoming))) {
75 if (message->
cmd == OSYNC_MESSAGE_REPLY || message->
cmd == OSYNC_MESSAGE_ERRORREPLY) {
79 g_mutex_lock(queue->pendingLock);
84 for (p = queue->pendingReplies; p; p = p->next) {
87 if (pending->id1 == message->
id1 && pending->id2 == message->id2) {
90 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
95 g_mutex_unlock(queue->pendingLock);
109 osync_message_unref(message);
116 static void _osync_queue_stop_incoming(
OSyncQueue *queue)
118 if (queue->incoming_source) {
119 g_source_destroy(queue->incoming_source);
120 queue->incoming_source = NULL;
123 if (queue->incomingContext) {
124 g_main_context_unref(queue->incomingContext);
125 queue->incomingContext = NULL;
135 gboolean _queue_prepare(GSource *source, gint *timeout_)
142 gboolean _queue_check(GSource *source)
145 if (g_async_queue_length(queue->outgoing) > 0)
152 ssize_t nwritten = 0;
155 if ((nwritten = write(queue->
fd, vptr, n)) <= 0) {
159 osync_error_set(error, OSYNC_ERROR_IO_ERROR,
"Unable to write IPC data: %i: %s", errno, strerror(errno));
170 osync_bool _osync_queue_write_long_long_int(
OSyncQueue *queue,
const long long int message,
OSyncError **error)
172 if (_osync_queue_write_data(queue, &message,
sizeof(
long long int), error) < 0)
180 if (_osync_queue_write_data(queue, &message,
sizeof(
int), error) < 0)
189 gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
196 while ((message = g_async_queue_try_pop(queue->outgoing))) {
198 if (!queue->connected) {
199 osync_error_set(&error, OSYNC_ERROR_GENERIC,
"Trying to send to a queue thats not connected");
204 if (!_osync_queue_write_int(queue, message->
buffer->len + osync_marshal_get_size_message(message), &error))
207 if (!_osync_queue_write_int(queue, message->
cmd, &error))
210 if (!_osync_queue_write_long_long_int(queue, message->
id1, &error))
213 if (!_osync_queue_write_int(queue, message->id2, &error))
216 if (message->
buffer->len) {
219 int written = _osync_queue_write_data(queue, message->
buffer->data + sent, message->
buffer->len - sent, &error);
224 }
while (sent < message->buffer->len);
227 osync_message_unref(message);
234 osync_message_unref(message);
239 osync_marshal_error(message, error);
240 g_async_queue_push(queue->incoming, message);
249 gboolean _source_prepare(GSource *source, gint *timeout_)
263 if ((nread = read(queue->
fd, vptr, nleft)) < 0) {
267 osync_error_set(error, OSYNC_ERROR_IO_ERROR,
"Unable to read IPC data: %i: %s", errno, strerror(errno));
270 }
else if (nread == 0)
282 int read = _osync_queue_read_data(queue, message,
sizeof(
int), error);
287 if (read !=
sizeof(
int)) {
288 osync_error_set(error, OSYNC_ERROR_IO_ERROR,
"Unable to read int. EOF");
296 osync_bool _osync_queue_read_long_long_int(
OSyncQueue *queue,
long long int *message,
OSyncError **error)
298 int read = _osync_queue_read_data(queue, message,
sizeof(
long long int), error);
303 if (read !=
sizeof(
long long int)) {
304 osync_error_set(error, OSYNC_ERROR_IO_ERROR,
"Unable to read int. EOF");
312 gboolean _source_check(GSource *source)
318 if (queue->connected == FALSE) {
322 if (queue->pendingReplies) {
323 g_mutex_lock(queue->pendingLock);
326 for (p = queue->pendingReplies; p; p = p->next) {
331 osync_marshal_error(message, error);
333 message->
id1 = pending->id1;
334 message->id2 = pending->id2;
336 g_async_queue_push(queue->incoming, message);
341 g_mutex_unlock(queue->pendingLock);
347 switch (osync_queue_poll(queue)) {
348 case OSYNC_QUEUE_EVENT_NONE:
350 case OSYNC_QUEUE_EVENT_READ:
352 case OSYNC_QUEUE_EVENT_HUP:
353 case OSYNC_QUEUE_EVENT_ERROR:
354 queue->connected = FALSE;
362 g_async_queue_push(queue->incoming, message);
364 if (queue->incomingContext)
365 g_main_context_wakeup(queue->incomingContext);
374 osync_marshal_error(message, error);
375 g_async_queue_push(queue->incoming, message);
384 gboolean _source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
393 long long int id1 = 0;
396 if (!_osync_queue_read_int(queue, &size, &error))
399 if (!_osync_queue_read_int(queue, &cmd, &error))
402 if (!_osync_queue_read_long_long_int(queue, &id1, &error))
405 if (!_osync_queue_read_int(queue, &id2, &error))
418 int inc = _osync_queue_read_data(queue, message->
buffer->data + read, size - read, &error);
421 goto error_free_message;
424 osync_error_set(&error, OSYNC_ERROR_IO_ERROR,
"Encountered EOF while data was missing");
425 goto error_free_message;
429 }
while (read < size);
432 g_async_queue_push(queue->incoming, message);
434 if (queue->incomingContext)
435 g_main_context_wakeup(queue->incomingContext);
436 }
while (_source_check(queue->read_source));
441 osync_message_unref(message);
446 osync_marshal_error(message, error);
447 g_async_queue_push(queue->incoming, message);
470 queue->
name = g_strdup(name);
473 if (!g_thread_supported ())
474 g_thread_init (NULL);
476 queue->pendingLock = g_mutex_new();
478 queue->
context = g_main_context_new();
480 queue->outgoing = g_async_queue_new();
481 queue->incoming = g_async_queue_new();
518 goto error_free_read_queue;
522 if (pipe(filedes) < 0) {
524 goto error_free_write_queue;
527 (*read_queue)->fd = filedes[0];
528 (*write_queue)->fd = filedes[1];
533 error_free_write_queue:
534 osync_queue_free(*write_queue);
535 error_free_read_queue:
536 osync_queue_free(*read_queue);
548 g_mutex_free(queue->pendingLock);
550 g_main_context_unref(queue->
context);
552 _osync_queue_stop_incoming(queue);
554 while ((message = g_async_queue_try_pop(queue->incoming))) {
555 osync_message_unref(message);
557 g_async_queue_unref(queue->incoming);
559 while ((message = g_async_queue_try_pop(queue->outgoing))) {
560 osync_message_unref(message);
562 g_async_queue_unref(queue->outgoing);
564 while (queue->pendingReplies) {
565 pending = queue->pendingReplies->data;
567 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
578 osync_bool osync_queue_exists(
OSyncQueue *queue)
580 return g_file_test(queue->
name, G_FILE_TEST_EXISTS) ? TRUE : FALSE;
585 if (mkfifo(queue->
name, 0600) != 0) {
597 if (unlink(queue->
name) != 0) {
607 static osync_bool __osync_queue_connect(
OSyncQueue *queue, OSyncQueueType type, osync_bool nonblocking,
OSyncError **error)
610 osync_assert(queue->connected == FALSE);
615 if (queue->
fd == -1) {
617 int fd = open(queue->
name, (type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY) | (nonblocking ? O_NONBLOCK : 0));
624 int oldflags = fcntl(queue->
fd, F_GETFD);
625 if (oldflags == -1) {
626 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Unable to get fifo flags");
629 if (fcntl(queue->
fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) {
630 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Unable to set fifo flags");
635 queue->connected = TRUE;
636 signal(SIGPIPE, SIG_IGN);
639 queue->thread = osync_thread_new(queue->
context, error);
644 queue->write_functions = g_malloc0(
sizeof(GSourceFuncs));
645 queue->write_functions->prepare = _queue_prepare;
646 queue->write_functions->check = _queue_check;
647 queue->write_functions->dispatch = _queue_dispatch;
648 queue->write_functions->finalize = NULL;
650 queue->write_source = g_source_new(queue->write_functions,
sizeof(GSource) +
sizeof(
OSyncQueue *));
651 queueptr = (
OSyncQueue **)(queue->write_source + 1);
653 g_source_set_callback(queue->write_source, NULL, queue, NULL);
654 g_source_attach(queue->write_source, queue->
context);
655 g_main_context_ref(queue->
context);
657 queue->read_functions = g_malloc0(
sizeof(GSourceFuncs));
658 queue->read_functions->prepare = _source_prepare;
659 queue->read_functions->check = _source_check;
660 queue->read_functions->dispatch = _source_dispatch;
661 queue->read_functions->finalize = NULL;
663 queue->read_source = g_source_new(queue->read_functions,
sizeof(GSource) +
sizeof(
OSyncQueue *));
664 queueptr = (
OSyncQueue **)(queue->read_source + 1);
666 g_source_set_callback(queue->read_source, NULL, queue, NULL);
667 g_source_attach(queue->read_source, queue->
context);
668 g_main_context_ref(queue->
context);
670 osync_thread_start(queue->thread);
683 return __osync_queue_connect(queue, type, FALSE, error);
688 return __osync_queue_connect(queue, type, TRUE, error);
697 osync_thread_stop(queue->thread);
698 osync_thread_free(queue->thread);
699 queue->thread = NULL;
704 if (queue->write_functions)
705 g_free(queue->write_functions);
709 _osync_queue_stop_incoming(queue);
714 while ((message = g_async_queue_try_pop(queue->incoming))) {
715 osync_message_unref(message);
718 if (close(queue->
fd) != 0) {
725 queue->connected = FALSE;
731 osync_bool osync_queue_is_connected(
OSyncQueue *queue)
734 return queue->connected;
779 g_source_set_callback(queue->incoming_source, NULL, queue, NULL);
780 g_source_attach(queue->incoming_source, context);
781 queue->incomingContext = context;
783 g_main_context_ref(context);
786 g_main_context_ref(context);
793 _incoming_dispatch(NULL, NULL, queue);
797 OSyncQueueEvent osync_queue_poll(
OSyncQueue *queue)
808 int ret = poll(&pfd, 1, queue->type == OSYNC_QUEUE_SENDER ? 0 : 100);
810 if (ret < 0 && errno == EINTR)
811 return OSYNC_QUEUE_EVENT_NONE;
814 return OSYNC_QUEUE_EVENT_NONE;
816 if (pfd.revents & POLLERR)
817 return OSYNC_QUEUE_EVENT_ERROR;
818 else if (pfd.revents & POLLHUP)
819 return OSYNC_QUEUE_EVENT_HUP;
820 else if (pfd.revents & POLLIN)
821 return OSYNC_QUEUE_EVENT_READ;
823 return OSYNC_QUEUE_EVENT_ERROR;
829 return g_async_queue_pop(queue->incoming);
832 void gen_id(
long long int *part1,
int *part2)
837 gettimeofday(&tv, &tz);
839 long long int now = tv.tv_sec * 1000000 + tv.tv_usec;
841 int rnd = (int)random();
842 rnd = rnd << 16 | getpid();
853 osync_assert(replyqueue);
858 gen_id(&(message->
id1), &(message->id2));
859 pending->id1 = message->
id1;
860 pending->id2 = message->id2;
865 g_mutex_lock(replyqueue->pendingLock);
866 replyqueue->pendingReplies = g_list_append(replyqueue->pendingReplies, pending);
867 g_mutex_unlock(replyqueue->pendingLock);
870 osync_message_ref(message);
871 g_async_queue_push(queue->outgoing, message);
873 g_main_context_wakeup(queue->
context);
889 osync_bool ret = osync_queue_send_message(queue, replyqueue, message, error);
895 osync_bool osync_queue_is_alive(
OSyncQueue *queue)
898 if (!osync_queue_try_connect(queue, OSYNC_QUEUE_SENDER, NULL)) {
907 if (!osync_queue_send_message(queue, NULL, message, NULL)) {
911 osync_queue_disconnect(queue, NULL);