OpenSync  0.22
opensync/opensync_queue.c
00001 /*
00002  * libosengine - A synchronization engine for the opensync framework
00003  * Copyright (C) 2004-2005  Armin Bauer <armin.bauer@opensync.org>
00004  * 
00005  * This library is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU Lesser General Public
00007  * License as published by the Free Software Foundation; either
00008  * version 2.1 of the License, or (at your option) any later version.
00009  * 
00010  * This library is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013  * Lesser General Public License for more details.
00014  * 
00015  * You should have received a copy of the GNU Lesser General Public
00016  * License along with this library; if not, write to the Free Software
00017  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00018  * 
00019  */
00020 
00021 #include <fcntl.h>
00022 #include <sys/poll.h>
00023 
00024 #include "opensync.h"
00025 #include "opensync_internals.h"
00026 
00027 #include <sys/time.h>
00028 #include <signal.h>
00029 
00030 typedef struct OSyncPendingMessage {
00031         long long int id1;
00032         int id2;
00034         OSyncMessageHandler callback;
00036         gpointer user_data;
00037 } OSyncPendingMessage;
00038 
00046 
00047 static
00048 gboolean _incoming_prepare(GSource *source, gint *timeout_)
00049 {
00050         *timeout_ = 1;
00051         return FALSE;
00052 }
00053 
00054 static
00055 gboolean _incoming_check(GSource *source)
00056 {
00057         OSyncQueue *queue = *((OSyncQueue **)(source + 1));
00058         if (g_async_queue_length(queue->incoming) > 0)
00059                 return TRUE;
00060         
00061         return FALSE;
00062 }
00063 
00064 /* This function is called from the master thread. The function dispatched incoming data from
00065  * the remote end */
00066 static
00067 gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00068 {
00069         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data);
00070         OSyncQueue *queue = user_data;
00071 
00072         OSyncMessage *message = NULL;
00073         while ((message = g_async_queue_try_pop(queue->incoming))) {
00074                 /* We check of the message is a reply to something */
00075                 if (message->cmd == OSYNC_MESSAGE_REPLY || message->cmd == OSYNC_MESSAGE_ERRORREPLY) {
00076                         
00077                         /* Search for the pending reply. We have to lock the
00078                          * list since another thread might be duing the updates */
00079                         g_mutex_lock(queue->pendingLock);
00080 
00081                         OSyncPendingMessage *found = NULL;
00082                         
00083                         GList *p = NULL;
00084                         for (p = queue->pendingReplies; p; p = p->next) {
00085                                 OSyncPendingMessage *pending = p->data;
00086                                 
00087                                 if (pending->id1 == message->id1 && pending->id2 == message->id2) {
00088 
00089                                         /* Get the pending message from the queue */
00090                                         queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
00091                                         found = pending;
00092                                         break;
00093                                 }
00094                         }
00095                         g_mutex_unlock(queue->pendingLock);
00096 
00097                         if (found) {
00098                                 /* Call the callback of the pending message and free the message */
00099                                 osync_assert(found->callback);
00100                                 found->callback(message, found->user_data);
00101                                 
00102                                 g_free(found);
00103                         } else
00104                                 osync_trace(TRACE_INTERNAL, "%s: No pending message for %lld:%d", __func__, message->id1, message->id2);
00105 
00106                 } else 
00107                         queue->message_handler(message, queue->user_data);
00108                 
00109                 osync_message_unref(message);
00110         }
00111         
00112         osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__);
00113         return TRUE;
00114 }
00115 
00116 static void _osync_queue_stop_incoming(OSyncQueue *queue)
00117 {
00118         if (queue->incoming_source) {
00119                 g_source_destroy(queue->incoming_source);
00120                 queue->incoming_source = NULL;
00121         }
00122         
00123         if (queue->incomingContext) {
00124                 g_main_context_unref(queue->incomingContext);
00125                 queue->incomingContext = NULL;
00126         }
00127         
00128         if (queue->incoming_functions) {
00129                 g_free(queue->incoming_functions);
00130                 queue->incoming_functions = NULL;
00131         }
00132 }
00133 
00134 static
00135 gboolean _queue_prepare(GSource *source, gint *timeout_)
00136 {
00137         *timeout_ = 1;
00138         return FALSE;
00139 }
00140 
00141 static
00142 gboolean _queue_check(GSource *source)
00143 {
00144         OSyncQueue *queue = *((OSyncQueue **)(source + 1));
00145         if (g_async_queue_length(queue->outgoing) > 0)
00146                 return TRUE;
00147         return FALSE;
00148 }
00149 
00150 int _osync_queue_write_data(OSyncQueue *queue, const void *vptr, size_t n, OSyncError **error)
00151 {
00152         ssize_t nwritten = 0;
00153 
00154         while (n > 0) {
00155                 if ((nwritten = write(queue->fd, vptr, n)) <= 0) {
00156                         if (errno == EINTR)
00157                                 nwritten = 0;  /* and call write() again */
00158                         else {
00159                                 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to write IPC data: %i: %s", errno, strerror(errno));
00160                                 return (-1);  /* error */
00161                         }
00162                 }
00163                 
00164                 n -= nwritten;
00165                 vptr += nwritten;
00166         }
00167         return (nwritten);
00168 }
00169 
00170 osync_bool _osync_queue_write_long_long_int(OSyncQueue *queue, const long long int message, OSyncError **error)
00171 {
00172         if (_osync_queue_write_data(queue, &message, sizeof(long long int), error) < 0)
00173                 return FALSE;
00174 
00175         return TRUE;
00176 }
00177 
00178 osync_bool _osync_queue_write_int(OSyncQueue *queue, const int message, OSyncError **error)
00179 {
00180         if (_osync_queue_write_data(queue, &message, sizeof(int), error) < 0)
00181                 return FALSE;
00182 
00183         return TRUE;
00184 }
00185 
00186 /* This function sends the data to the remote side. If there is an error, it sends an error
00187  * message to the incoming queue */
00188 static
00189 gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00190 {
00191         OSyncQueue *queue = user_data;
00192         OSyncError *error = NULL;
00193         
00194         OSyncMessage *message = NULL;
00195         
00196         while ((message = g_async_queue_try_pop(queue->outgoing))) {
00197                 /* Check if the queue is connected */
00198                 if (!queue->connected) {
00199                         osync_error_set(&error, OSYNC_ERROR_GENERIC, "Trying to send to a queue thats not connected");
00200                         goto error;
00201                 }
00202                 
00203                 /*FIXME: review usage of osync_marshal_get_size_message() */
00204                 if (!_osync_queue_write_int(queue, message->buffer->len + osync_marshal_get_size_message(message), &error))
00205                         goto error;
00206                 
00207                 if (!_osync_queue_write_int(queue, message->cmd, &error))
00208                         goto error;
00209 
00210                 if (!_osync_queue_write_long_long_int(queue, message->id1, &error))
00211                         goto error;
00212                         
00213                 if (!_osync_queue_write_int(queue, message->id2, &error))
00214                         goto error;
00215                 
00216                 if (message->buffer->len) {
00217                         int sent = 0;
00218                         do {
00219                                 int written = _osync_queue_write_data(queue, message->buffer->data + sent, message->buffer->len - sent, &error);
00220                                 if (written < 0)
00221                                         goto error;
00222                                 
00223                                 sent += written;
00224                         } while (sent < message->buffer->len);
00225                 }
00226                 
00227                 osync_message_unref(message);
00228         }
00229         
00230         return TRUE;
00231         
00232 error:
00233         if (message)
00234                 osync_message_unref(message);
00235         
00236         if (error) {
00237                 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
00238                 if (message) {
00239                         osync_marshal_error(message, error);
00240                         g_async_queue_push(queue->incoming, message);
00241                 }
00242                 
00243                 osync_error_free(&error);
00244         }
00245         return FALSE;
00246 }
00247 
00248 static
00249 gboolean _source_prepare(GSource *source, gint *timeout_)
00250 {
00251         *timeout_ = 1;
00252         return FALSE;
00253 }
00254 
00255 static
00256 int _osync_queue_read_data(OSyncQueue *queue, void *vptr, size_t n, OSyncError **error)
00257 {
00258         size_t nleft;
00259         ssize_t nread = 0;
00260 
00261         nleft = n;
00262         while (n > 0) {
00263                 if ((nread = read(queue->fd, vptr, nleft)) < 0) {
00264                         if (errno == EINTR)
00265                                 nread = 0;  /* and call read() again */
00266                         else {
00267                                 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read IPC data: %i: %s", errno, strerror(errno));
00268                                 return (-1);
00269                         }
00270                 } else if (nread == 0)
00271                         break;  /* EOF */
00272                 
00273                 nleft -= nread;
00274                 vptr += nread;
00275         }
00276         return (n - nleft);  /* return >= 0 */
00277 }
00278 
00279 static
00280 osync_bool _osync_queue_read_int(OSyncQueue *queue, int *message, OSyncError **error)
00281 {
00282         int read = _osync_queue_read_data(queue, message, sizeof(int), error);
00283         
00284         if (read < 0)
00285                 return FALSE;
00286 
00287         if (read != sizeof(int)) {
00288                 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF");
00289                 return FALSE;
00290         }
00291         
00292         return TRUE;
00293 }
00294 
00295 static
00296 osync_bool _osync_queue_read_long_long_int(OSyncQueue *queue, long long int *message, OSyncError **error)
00297 {
00298         int read = _osync_queue_read_data(queue, message, sizeof(long long int), error);
00299 
00300         if (read < 0)
00301                 return FALSE;
00302         
00303         if (read != sizeof(long long int)) {
00304                 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF");
00305                 return FALSE;
00306         }
00307 
00308         return TRUE;
00309 }
00310 
00311 static
00312 gboolean _source_check(GSource *source)
00313 {
00314         OSyncQueue *queue = *((OSyncQueue **)(source + 1));
00315         OSyncMessage *message = NULL;
00316         OSyncError *error = NULL;
00317         
00318         if (queue->connected == FALSE) {
00319                 /* Ok. so we arent connected. lets check if there are pending replies. We cannot
00320                  * receive any data on the pipe, therefore, any pending replies will never
00321                  * be answered. So we return error messages for all of them. */
00322                 if (queue->pendingReplies) {
00323                         g_mutex_lock(queue->pendingLock);
00324                         osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Broken Pipe");
00325                         GList *p = NULL;
00326                         for (p = queue->pendingReplies; p; p = p->next) {
00327                                 OSyncPendingMessage *pending = p->data;
00328                                 
00329                                 message = osync_message_new(OSYNC_MESSAGE_ERRORREPLY, 0, NULL);
00330                                 if (message) {
00331                                         osync_marshal_error(message, error);
00332         
00333                                         message->id1 = pending->id1;
00334                                         message->id2 = pending->id2;
00335                                         
00336                                         g_async_queue_push(queue->incoming, message);
00337                                 }
00338                         }
00339                         
00340                         osync_error_free(&error);
00341                         g_mutex_unlock(queue->pendingLock);
00342                 }
00343                 
00344                 return FALSE;
00345         }
00346         
00347         switch (osync_queue_poll(queue)) {
00348                 case OSYNC_QUEUE_EVENT_NONE:
00349                         return FALSE;
00350                 case OSYNC_QUEUE_EVENT_READ:
00351                         return TRUE;
00352                 case OSYNC_QUEUE_EVENT_HUP:
00353                 case OSYNC_QUEUE_EVENT_ERROR:
00354                         queue->connected = FALSE;
00355                         
00356                         /* Now we can send the hup message, and wake up the consumer thread so
00357                          * it can pickup the messages in the incoming queue */
00358                         message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error);
00359                         if (!message)
00360                                 goto error;
00361                         
00362                         g_async_queue_push(queue->incoming, message);
00363                         
00364                         if (queue->incomingContext)
00365                                 g_main_context_wakeup(queue->incomingContext);
00366                         return FALSE;
00367         }
00368         
00369         return FALSE;
00370 
00371 error:
00372         message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
00373         if (message) {
00374                 osync_marshal_error(message, error);
00375                 g_async_queue_push(queue->incoming, message);
00376         }
00377         osync_error_free(&error);
00378         return FALSE;
00379 }
00380 
00381 /* This function reads from the file descriptor and inserts incoming data into the
00382  * incoming queue */
00383 static
00384 gboolean _source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00385 {
00386         OSyncQueue *queue = user_data;
00387         OSyncMessage *message = NULL;
00388         OSyncError *error = NULL;
00389         
00390         do {
00391                 int size = 0;
00392                 int cmd = 0;
00393                 long long int id1 = 0;
00394                 int id2 = 0;
00395                 
00396                 if (!_osync_queue_read_int(queue, &size, &error))
00397                         goto error;
00398                 
00399                 if (!_osync_queue_read_int(queue, &cmd, &error))
00400                         goto error;
00401                 
00402                 if (!_osync_queue_read_long_long_int(queue, &id1, &error))
00403                         goto error;
00404                         
00405                 if (!_osync_queue_read_int(queue, &id2, &error))
00406                         goto error;
00407                 
00408                 message = osync_message_new(cmd, size, &error);
00409                 if (!message)
00410                         goto error;
00411         
00412                 message->id1 = id1;
00413                 message->id2 = id2;
00414                 
00415                 if (size) {
00416                         int read = 0;
00417                         do {
00418                                 int inc = _osync_queue_read_data(queue, message->buffer->data + read, size - read, &error);
00419                                 
00420                                 if (inc < 0)
00421                                         goto error_free_message;
00422                                 
00423                                 if (inc == 0) {
00424                                         osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Encountered EOF while data was missing");
00425                                         goto error_free_message;
00426                                 }
00427                                 
00428                                 read += inc;
00429                         } while (read < size);
00430                 }
00431                 
00432                 g_async_queue_push(queue->incoming, message);
00433                 
00434                 if (queue->incomingContext)
00435                         g_main_context_wakeup(queue->incomingContext);
00436         } while (_source_check(queue->read_source));
00437         
00438         return TRUE;
00439 
00440 error_free_message:
00441         osync_message_unref(message);
00442 error:
00443         if (error) {
00444                 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
00445                 if (message) {
00446                         osync_marshal_error(message, error);
00447                         g_async_queue_push(queue->incoming, message);
00448                 }
00449                 
00450                 osync_error_free(&error);
00451         }
00452         
00453         return FALSE;
00454 }
00455 
00461 OSyncQueue *osync_queue_new(const char *name, OSyncError **error)
00462 {
00463         osync_trace(TRACE_ENTRY, "%s(%s, %p)", __func__, name, error);
00464         
00465         OSyncQueue *queue = osync_try_malloc0(sizeof(OSyncQueue), error);
00466         if (!queue)
00467                 goto error;
00468         
00469         if (name)
00470                 queue->name = g_strdup(name);
00471         queue->fd = -1;
00472         
00473         if (!g_thread_supported ())
00474                 g_thread_init (NULL);
00475         
00476         queue->pendingLock = g_mutex_new();
00477         
00478         queue->context = g_main_context_new();
00479         
00480         queue->outgoing = g_async_queue_new();
00481         queue->incoming = g_async_queue_new();
00482 
00483         osync_trace(TRACE_EXIT, "%s: %p", __func__, queue);
00484         return queue;
00485 
00486 error:
00487         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00488         return NULL;
00489 }
00490 
00491 /* Creates anonymous pipes which dont have to be created and are automatically connected.
00492  * 
00493  * Lets assume parent wants to send, child wants to receive
00494  * 
00495  * osync_queue_new_pipes()
00496  * fork()
00497  * 
00498  * Parent:
00499  * connect(write_queue)
00500  * disconnect(read_queue)
00501  * 
00502  * Child:
00503  * connect(read_queue)
00504  * close(write_queue)
00505  * 
00506  * 
00507  *  */
00508 osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error)
00509 {
00510         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, read_queue, write_queue, error);
00511         
00512         *read_queue = osync_queue_new(NULL, error);
00513         if (!*read_queue)
00514                 goto error;
00515         
00516         *write_queue = osync_queue_new(NULL, error);
00517         if (!*write_queue)
00518                 goto error_free_read_queue;
00519         
00520         int filedes[2];
00521         
00522         if (pipe(filedes) < 0) {
00523                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create pipes");
00524                 goto error_free_write_queue;
00525         }
00526         
00527         (*read_queue)->fd = filedes[0];
00528         (*write_queue)->fd = filedes[1];
00529         
00530         osync_trace(TRACE_EXIT, "%s", __func__);
00531         return TRUE;
00532 
00533 error_free_write_queue:
00534         osync_queue_free(*write_queue);
00535 error_free_read_queue:
00536         osync_queue_free(*read_queue);
00537 error:
00538         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00539         return FALSE;
00540 }
00541 
00542 void osync_queue_free(OSyncQueue *queue)
00543 {
00544         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, queue);
00545         OSyncMessage *message = NULL;
00546         OSyncPendingMessage *pending = NULL;
00547         
00548         g_mutex_free(queue->pendingLock);
00549         
00550         g_main_context_unref(queue->context);
00551 
00552         _osync_queue_stop_incoming(queue);
00553 
00554         while ((message = g_async_queue_try_pop(queue->incoming))) {
00555                 osync_message_unref(message);
00556         }
00557         g_async_queue_unref(queue->incoming);
00558         
00559         while ((message = g_async_queue_try_pop(queue->outgoing))) {
00560                 osync_message_unref(message);
00561         }
00562         g_async_queue_unref(queue->outgoing);
00563 
00564         while (queue->pendingReplies) {
00565                 pending = queue->pendingReplies->data;
00566                 g_free(pending);
00567                 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
00568         }
00569 
00570         if (queue->name)
00571                 g_free(queue->name);
00572                 
00573         g_free(queue);
00574         
00575         osync_trace(TRACE_EXIT, "%s", __func__);
00576 }
00577 
00578 osync_bool osync_queue_exists(OSyncQueue *queue)
00579 {
00580         return g_file_test(queue->name, G_FILE_TEST_EXISTS) ? TRUE : FALSE;
00581 }
00582 
00583 osync_bool osync_queue_create(OSyncQueue *queue, OSyncError **error)
00584 {       
00585         if (mkfifo(queue->name, 0600) != 0) {
00586                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create fifo");
00587                 return FALSE;
00588         }
00589         
00590         return TRUE;
00591 }
00592 
00593 osync_bool osync_queue_remove(OSyncQueue *queue, OSyncError **error)
00594 {
00595         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error);
00596         
00597         if (unlink(queue->name) != 0) {
00598                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to remove queue");
00599                 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00600                 return FALSE;
00601         }
00602         
00603         osync_trace(TRACE_EXIT, "%s", __func__);
00604         return TRUE;
00605 }
00606 
00607 static osync_bool __osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, osync_bool nonblocking, OSyncError **error)
00608 {
00609         osync_assert(queue);
00610         osync_assert(queue->connected == FALSE);
00611         OSyncQueue **queueptr = NULL;
00612         
00613         queue->type = type;
00614         
00615         if (queue->fd == -1) {
00616                 /* First, open the queue with the flags provided by the user */
00617                 int fd = open(queue->name, (type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY) | (nonblocking ? O_NONBLOCK : 0));
00618                 if (fd == -1) {
00619                         osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo");
00620                         goto error;
00621                 }
00622                 queue->fd = fd;
00623         
00624                 int oldflags = fcntl(queue->fd, F_GETFD);
00625                 if (oldflags == -1) {
00626                         osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags");
00627                         goto error_close;
00628                 }
00629                 if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) {
00630                         osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags");
00631                         goto error_close;
00632                 }
00633         }
00634 
00635         queue->connected = TRUE;
00636         signal(SIGPIPE, SIG_IGN);
00637         
00638         /* now we start a thread which handles reading/writing of the queue */
00639         queue->thread = osync_thread_new(queue->context, error);
00640 
00641         if (!queue->thread)
00642                 goto error;
00643         
00644         queue->write_functions = g_malloc0(sizeof(GSourceFuncs));
00645         queue->write_functions->prepare = _queue_prepare;
00646         queue->write_functions->check = _queue_check;
00647         queue->write_functions->dispatch = _queue_dispatch;
00648         queue->write_functions->finalize = NULL;
00649 
00650         queue->write_source = g_source_new(queue->write_functions, sizeof(GSource) + sizeof(OSyncQueue *));
00651         queueptr = (OSyncQueue **)(queue->write_source + 1);
00652         *queueptr = queue;
00653         g_source_set_callback(queue->write_source, NULL, queue, NULL);
00654         g_source_attach(queue->write_source, queue->context);
00655         g_main_context_ref(queue->context);
00656 
00657         queue->read_functions = g_malloc0(sizeof(GSourceFuncs));
00658         queue->read_functions->prepare = _source_prepare;
00659         queue->read_functions->check = _source_check;
00660         queue->read_functions->dispatch = _source_dispatch;
00661         queue->read_functions->finalize = NULL;
00662 
00663         queue->read_source = g_source_new(queue->read_functions, sizeof(GSource) + sizeof(OSyncQueue *));
00664         queueptr = (OSyncQueue **)(queue->read_source + 1);
00665         *queueptr = queue;
00666         g_source_set_callback(queue->read_source, NULL, queue, NULL);
00667         g_source_attach(queue->read_source, queue->context);
00668         g_main_context_ref(queue->context);
00669         
00670         osync_thread_start(queue->thread);
00671         
00672         return TRUE;
00673 
00674 error_close:
00675         close(queue->fd);
00676 error:
00677         return FALSE;
00678 }
00679 
00680 
00681 osync_bool osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error)
00682 {
00683     return __osync_queue_connect(queue, type, FALSE, error);
00684 }
00685 
00686 osync_bool osync_queue_try_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error)
00687 {
00688     return __osync_queue_connect(queue, type, TRUE, error);
00689 }
00690 
00691 osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error)
00692 {
00693         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error);
00694         osync_assert(queue);
00695         
00696         if (queue->thread) {
00697                 osync_thread_stop(queue->thread);
00698                 osync_thread_free(queue->thread);
00699                 queue->thread = NULL;
00700         }
00701         
00702         //g_source_unref(queue->write_source);
00703         
00704         if (queue->write_functions)
00705                 g_free(queue->write_functions);
00706                 
00707         //g_source_unref(queue->read_source);
00708         
00709         _osync_queue_stop_incoming(queue);
00710         
00711         /* We have to empty the incoming queue if we disconnect the queue. Otherwise, the
00712          * consumer threads might try to pick up messages even after we are done. */
00713         OSyncMessage *message = NULL;
00714         while ((message = g_async_queue_try_pop(queue->incoming))) {
00715                 osync_message_unref(message);
00716         }
00717         
00718         if (close(queue->fd) != 0) {
00719                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue");
00720                 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00721                 return FALSE;
00722         }
00723         
00724         queue->fd = -1;
00725         queue->connected = FALSE;
00726         
00727         osync_trace(TRACE_EXIT, "%s", __func__);
00728         return TRUE;
00729 }
00730 
00731 osync_bool osync_queue_is_connected(OSyncQueue *queue)
00732 {
00733         osync_assert(queue);
00734         return queue->connected;
00735 }
00736 
00746 void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data)
00747 {
00748         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data);
00749         
00750         queue->message_handler = handler;
00751         queue->user_data = user_data;
00752         
00753         osync_trace(TRACE_EXIT, "%s", __func__);
00754 }
00755 
00766 void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context)
00767 {
00768         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context);
00769         
00770         queue->incoming_functions = g_malloc0(sizeof(GSourceFuncs));
00771         queue->incoming_functions->prepare = _incoming_prepare;
00772         queue->incoming_functions->check = _incoming_check;
00773         queue->incoming_functions->dispatch = _incoming_dispatch;
00774         queue->incoming_functions->finalize = NULL;
00775 
00776         queue->incoming_source = g_source_new(queue->incoming_functions, sizeof(GSource) + sizeof(OSyncQueue *));
00777         OSyncQueue **queueptr = (OSyncQueue **)(queue->incoming_source + 1);
00778         *queueptr = queue;
00779         g_source_set_callback(queue->incoming_source, NULL, queue, NULL);
00780         g_source_attach(queue->incoming_source, context);
00781         queue->incomingContext = context;
00782         // For the source
00783         g_main_context_ref(context);
00784         
00785         //To unref it later
00786         g_main_context_ref(context);
00787         
00788         osync_trace(TRACE_EXIT, "%s", __func__);
00789 }
00790 
00791 osync_bool osync_queue_dispatch(OSyncQueue *queue, OSyncError **error)
00792 {
00793         _incoming_dispatch(NULL, NULL, queue);
00794         return TRUE;
00795 }
00796 
00797 OSyncQueueEvent osync_queue_poll(OSyncQueue *queue)
00798 {
00799         struct pollfd pfd;
00800         pfd.fd = queue->fd;
00801         pfd.events = POLLIN;
00802         
00803         /* Here we poll on the queue. If we read on the queue, we either receive a 
00804          * POLLIN or POLLHUP. Since we cannot write to the queue, we can block pretty long here.
00805          * 
00806          * If we are sending, we can only receive a POLLERR which means that the remote side has
00807          * disconnected. Since we mainly dispatch the write IO, we dont want to block here. */
00808         int ret = poll(&pfd, 1, queue->type == OSYNC_QUEUE_SENDER ? 0 : 100);
00809         
00810         if (ret < 0 && errno == EINTR)
00811                 return OSYNC_QUEUE_EVENT_NONE;
00812 
00813         if (ret == 0)
00814                 return OSYNC_QUEUE_EVENT_NONE;  
00815         
00816         if (pfd.revents & POLLERR)
00817                 return OSYNC_QUEUE_EVENT_ERROR;
00818         else if (pfd.revents & POLLHUP)
00819                 return OSYNC_QUEUE_EVENT_HUP;
00820         else if (pfd.revents & POLLIN)
00821                 return OSYNC_QUEUE_EVENT_READ;
00822                 
00823         return OSYNC_QUEUE_EVENT_ERROR;
00824 }
00825 
00827 OSyncMessage *osync_queue_get_message(OSyncQueue *queue)
00828 {
00829         return g_async_queue_pop(queue->incoming);
00830 }
00831 
00832 void gen_id(long long int *part1, int *part2)
00833 {
00834         struct timeval tv;
00835     struct timezone tz;
00836 
00837     gettimeofday(&tv, &tz);
00838 
00839     long long int now = tv.tv_sec * 1000000 + tv.tv_usec;
00840     
00841     int rnd = (int)random();
00842     rnd = rnd << 16 | getpid();
00843     
00844     *part1 = now;
00845     *part2 = rnd;
00846 }
00847 
00848 osync_bool osync_queue_send_message(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, OSyncError **error)
00849 {
00850         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, replyqueue, message, error);
00851         
00852         if (message->callback) {
00853                 osync_assert(replyqueue);
00854                 OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error);
00855                 if (!pending)
00856                         goto error;
00857                 
00858                 gen_id(&(message->id1), &(message->id2));
00859                 pending->id1 = message->id1;
00860                 pending->id2 = message->id2;
00861                 
00862                 pending->callback = message->callback;
00863                 pending->user_data = message->user_data;
00864                 
00865                 g_mutex_lock(replyqueue->pendingLock);
00866                 replyqueue->pendingReplies = g_list_append(replyqueue->pendingReplies, pending);
00867                 g_mutex_unlock(replyqueue->pendingLock);
00868         }
00869         
00870         osync_message_ref(message);
00871         g_async_queue_push(queue->outgoing, message);
00872 
00873         g_main_context_wakeup(queue->context);
00874 
00875         osync_trace(TRACE_EXIT, "%s", __func__);
00876         return TRUE;
00877 
00878 error:
00879         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00880         return FALSE;
00881 }
00882 
00883 osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, int timeout, OSyncError **error)
00884 {
00885         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, message, error);
00886         
00887         /*TODO: add timeout handling */
00888         
00889         osync_bool ret = osync_queue_send_message(queue, replyqueue, message, error);
00890         
00891         osync_trace(ret ? TRACE_EXIT : TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00892         return ret;
00893 }
00894 
00895 osync_bool osync_queue_is_alive(OSyncQueue *queue)
00896 {
00897         
00898         if (!osync_queue_try_connect(queue, OSYNC_QUEUE_SENDER, NULL)) {
00899                 return FALSE;
00900         }
00901         
00902         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, NULL);
00903         if (!message) {
00904                 return FALSE;
00905         }
00906         
00907         if (!osync_queue_send_message(queue, NULL, message, NULL)) {
00908                 return FALSE;
00909         }
00910         
00911         osync_queue_disconnect(queue, NULL);
00912         
00913         return TRUE;
00914 }