OpenSync  0.22
osengine_client.c
1 /*
2  * libosengine - A synchronization engine for the opensync framework
3  * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  */
20 
21 #include "config.h"
22 #include "engine.h"
23 #include <glib.h>
24 #include <opensync/opensync_support.h>
25 #include "opensync/opensync_format_internals.h"
26 #include "opensync/opensync_member_internals.h"
27 #include "opensync/opensync_message_internals.h"
28 #include "opensync/opensync_queue_internals.h"
29 
30 #include "engine_internals.h"
31 #include <unistd.h>
32 
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <errno.h>
36 #include <signal.h>
37 
43 void _get_changes_reply_receiver(OSyncMessage *message, OSyncClient *sender)
44 {
45  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender);
46  OSyncEngine *engine = sender->engine;
47 
48  if (osync_message_is_error(message)) {
49  OSyncError *error = NULL;
50  osync_demarshal_error(message, &error);
51  osync_error_duplicate(&engine->error, &error);
52  osync_debug("ENG", 1, "Get changes command reply was a error: %s", osync_error_print(&error));
53  osync_status_update_member(engine, sender, MEMBER_GET_CHANGES_ERROR, &error);
54  osync_error_update(&engine->error, "Unable to read from one of the members");
55  osync_flag_unset(sender->fl_sent_changes);
56  //osync_flag_set(sender->fl_finished);
57  osync_flag_set(sender->fl_done);
58  /*
59  * FIXME: For now we want to stop the engine if
60  * one of the member didnt connect yet. Later it should
61  * be that if >= 2 members connect, the sync should continue
62  */
63  osync_flag_set(engine->fl_stop);
64 
65  } else {
66  osync_status_update_member(engine, sender, MEMBER_SENT_CHANGES, NULL);
67  osync_flag_set(sender->fl_sent_changes);
68  }
69 
70  osengine_client_decider(engine, sender);
71  osync_trace(TRACE_EXIT, "_get_changes_reply_receiver");
72 }
73 
79 void _connect_reply_receiver(OSyncMessage *message, OSyncClient *sender)
80 {
81  osync_trace(TRACE_ENTRY, "_connect_reply_receiver(%p, %p)", message, sender);
82 
83  osync_trace(TRACE_INTERNAL, "connect reply %i", osync_message_is_error(message));
84  OSyncEngine *engine = sender->engine;
85 
86  if (osync_message_is_error(message)) {
87  OSyncError *error = NULL;
88  osync_demarshal_error(message, &error);
89  osync_error_duplicate(&engine->error, &error);
90  osync_debug("ENG", 1, "Connect command reply was a error: %s", osync_error_print(&error));
91  osync_status_update_member(engine, sender, MEMBER_CONNECT_ERROR, &error);
92  osync_error_update(&engine->error, "Unable to connect one of the members");
93  osync_flag_unset(sender->fl_connected);
94  osync_flag_set(sender->fl_finished);
95  osync_flag_set(sender->fl_sent_changes);
96  osync_flag_set(sender->fl_done);
97  /*
98  * FIXME: For now we want to stop the engine if
99  * one of the member didnt connect yet. Later it should
100  * be that if >= 2 members connect, the sync should continue
101  */
102  osync_flag_set(engine->fl_stop);
103 
104  } else {
105  osync_member_read_sink_info(sender->member, message);
106 
107  osync_status_update_member(engine, sender, MEMBER_CONNECTED, NULL);
108  osync_flag_set(sender->fl_connected);
109  }
110 
111  osengine_client_decider(engine, sender);
112  osync_trace(TRACE_EXIT, "_connect_reply_receiver");
113 }
114 
115 void _sync_done_reply_receiver(OSyncMessage *message, OSyncClient *sender)
116 {
117  osync_trace(TRACE_ENTRY, "_sync_done_reply_receiver(%p, %p)", message, sender);
118 
119  OSyncEngine *engine = sender->engine;
120 
121  if (osync_message_is_error(message)) {
122  OSyncError *error = NULL;
123  osync_demarshal_error(message, &error);
124  osync_error_duplicate(&engine->error, &error);
125  osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error));
126  osync_status_update_member(engine, sender, MEMBER_SYNC_DONE_ERROR, &error);
127  osync_error_update(&engine->error, "Unable to finish the sync for one of the members");
128  }
129 
130  osync_flag_set(sender->fl_done);
131  osengine_client_decider(engine, sender);
132  osync_trace(TRACE_EXIT, "_sync_done_reply_receiver");
133 }
134 
135 void _committed_all_reply_receiver(OSyncMessage *message, OSyncClient *sender)
136 {
137  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender);
138 
139  OSyncEngine *engine = sender->engine;
140 
141  if (osync_message_is_error(message)) {
142  OSyncError *error = NULL;
143  osync_demarshal_error(message, &error);
144  osync_error_duplicate(&engine->error, &error);
145  osync_debug("ENG", 1, "Committed all command reply was a error: %s", osync_error_print(&error));
146  osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL_ERROR, &error);
147  osync_error_update(&engine->error, "Unable to write changes to one of the members");
148  } else
149  osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL, NULL);
150 
151  osync_flag_set(sender->fl_committed_all);
152  osengine_client_decider(engine, sender);
153  osync_trace(TRACE_EXIT, "%s", __func__);
154 }
155 
156 void _disconnect_reply_receiver(OSyncMessage *message, OSyncClient *sender)
157 {
158  osync_trace(TRACE_ENTRY, "_disconnect_reply_receiver(%p, %p)", message, sender);
159 
160  OSyncEngine *engine = sender->engine;
161 
162  if (osync_message_is_error(message)) {
163  OSyncError *error = NULL;
164  osync_demarshal_error(message, &error);
165  osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error));
166  osync_status_update_member(engine, sender, MEMBER_DISCONNECT_ERROR, &error);
167  } else
168  osync_status_update_member(engine, sender, MEMBER_DISCONNECTED, NULL);
169 
170  osync_flag_unset(sender->fl_connected);
171  osync_flag_set(sender->fl_finished);
172  osengine_client_decider(engine, sender);
173  osync_trace(TRACE_EXIT, "_disconnect_reply_receiver");
174 }
175 
176 void _get_change_data_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry)
177 {
178  osync_trace(TRACE_ENTRY, "_get_change_data_reply_receiver(%p, %p, %p)", message, entry);
179  OSyncEngine *engine = entry->client->engine;
180 
181  if (osync_message_is_error(message)) {
182  OSyncError *error = NULL;
183  osync_demarshal_error(message, &error);
184  osync_error_duplicate(&engine->error, &error);
185  osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error));
186  osync_status_update_change(engine, entry->change, CHANGE_RECV_ERROR, &error);
187  osync_error_update(&engine->error, "Unable to read one or more objects");
188 
189  //FIXME Do we need to do anything here?
190  //osync_flag_unset(entry->fl_has_data);
191  } else {
192 
193  osync_demarshal_changedata(message, entry->change);
194 
195  osync_flag_set(entry->fl_has_data);
196  osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL);
197  }
198 
199  osync_change_save(entry->change, TRUE, NULL);
200  osengine_mappingentry_decider(engine, entry);
201  osync_trace(TRACE_EXIT, "_get_change_data_reply_receiver");
202 }
203 
204 void _read_change_reply_receiver(OSyncClient *sender, OSyncMessage *message, OSyncEngine *engine)
205 {
206  osync_trace(TRACE_ENTRY, "_read_change_reply_receiver(%p, %p, %p)", sender, message, engine);
207 
208  /*OSyncMappingEntry *entry = osync_message_get_data(message, "entry");
209 
210  osync_flag_detach(entry->fl_read);
211 
212  osync_flag_unset(entry->mapping->fl_solved);
213  osync_flag_unset(entry->mapping->fl_chkconflict);
214  osync_flag_unset(entry->mapping->fl_multiplied);
215 
216  if (osync_change_get_changetype(entry->change) == CHANGE_DELETED)
217  osync_flag_set(entry->fl_deleted);
218 
219  osync_flag_set(entry->fl_has_info);
220  osync_flag_unset(entry->fl_synced);
221 
222  osync_change_save(entry->change, TRUE, NULL);
223 
224  osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL);
225 
226  osengine_mappingentry_decider(engine, entry);*/
227  osync_trace(TRACE_EXIT, "_read_change_reply_receiver");
228 }
229 
230 void _commit_change_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry)
231 {
232  osync_trace(TRACE_ENTRY, "_commit_change_reply_receiver(%p, %p)", message, entry);
233  OSyncEngine *engine = entry->client->engine;
234 
235  if (osync_message_is_error(message)) {
236  OSyncError *error = NULL;
237  osync_demarshal_error(message, &error);
238  osync_error_duplicate(&engine->error, &error);
239  osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error));
240  osync_status_update_change(engine, entry->change, CHANGE_WRITE_ERROR, &error);
241  OSyncError *maperror = NULL;
242  osync_error_duplicate(&maperror, &error);
243  osync_status_update_mapping(engine, entry->mapping, MAPPING_WRITE_ERROR, &maperror);
244  osync_error_update(&engine->error, "Unable to write one or more objects");
245 
246  //FIXME Do we need to do anything here?
247  osync_flag_unset(entry->fl_dirty);
248  osync_flag_set(entry->fl_synced);
249  } else {
250  /* The plugin may have generated a new UID after committing the change. The commit
251  * change reply will return the new UID of the change
252  */
253 
254  char *newuid;
255  osync_message_read_string(message, &newuid);
256  osync_change_set_uid(entry->change, newuid);
257 
258  osync_status_update_change(engine, entry->change, CHANGE_SENT, NULL);
259  osync_flag_unset(entry->fl_dirty);
260  osync_flag_set(entry->fl_synced);
261  }
262 
263  if (osync_change_get_changetype(entry->change) == CHANGE_DELETED)
264  osync_flag_set(entry->fl_deleted);
265 
266  osync_change_reset(entry->change);
267 
268  OSyncError *error = NULL;
269  osync_change_save(entry->change, TRUE, &error);
270 
271  osengine_mappingentry_decider(engine, entry);
272  osync_trace(TRACE_EXIT, "_commit_change_reply_receiver");
273 }
274 
275 OSyncClient *osync_client_new(OSyncEngine *engine, OSyncMember *member, OSyncError **error)
276 {
277  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, member, error);
278  OSyncClient *client = osync_try_malloc0(sizeof(OSyncClient), error);
279  if (!client)
280  goto error;
281 
282  client->member = member;
283  osync_member_set_data(member, client);
284  client->engine = engine;
285  engine->clients = g_list_append(engine->clients, client);
286 
287  char *name = g_strdup_printf("%s/pluginpipe", osync_member_get_configdir(member));
288  client->commands_to_osplugin = osync_queue_new(name, error);
289  g_free(name);
290 
291  name = g_strdup_printf("%s/enginepipe", osync_member_get_configdir(member));
292  client->commands_from_osplugin = osync_queue_new(name, error);
293  g_free(name);
294 
295  if (!client->commands_to_osplugin || !client->commands_from_osplugin)
296  goto error_free_client;
297 
298  client->fl_connected = osync_flag_new(engine->cmb_connected);
299  client->fl_sent_changes = osync_flag_new(engine->cmb_sent_changes);
300  client->fl_done = osync_flag_new(NULL);
301  client->fl_committed_all = osync_flag_new(engine->cmb_committed_all_sent);
302  client->fl_finished = osync_flag_new(engine->cmb_finished);
303 
304  osync_trace(TRACE_EXIT, "%s: %p", __func__, client);
305  return client;
306 
307 error_free_client:
308  g_free(client);
309 error:
310  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
311  return NULL;
312 }
313 
314 void osync_client_reset(OSyncClient *client)
315 {
316  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
317  osync_flag_set_state(client->fl_connected, FALSE);
318  osync_flag_set_state(client->fl_sent_changes, FALSE);
319  osync_flag_set_state(client->fl_done, FALSE);
320  osync_flag_set_state(client->fl_finished, FALSE);
321  osync_flag_set_state(client->fl_committed_all, FALSE);
322  osync_trace(TRACE_EXIT, "%s", __func__);
323 }
324 
325 void osync_client_free(OSyncClient *client)
326 {
327  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
328  osync_queue_free(client->commands_to_osplugin);
329  osync_queue_free(client->commands_from_osplugin);
330 
331  osync_flag_free(client->fl_connected);
332  osync_flag_free(client->fl_sent_changes);
333  osync_flag_free(client->fl_done);
334  osync_flag_free(client->fl_finished);
335  osync_flag_free(client->fl_committed_all);
336 
337  g_free(client);
338  osync_trace(TRACE_EXIT, "%s", __func__);
339 }
340 
341 void *osync_client_message_sink(OSyncMember *member, const char *name, void *data, osync_bool synchronous)
342 {
343  OSyncClient *client = osync_member_get_data(member);
344  OSyncEngine *engine = client->engine;
345  if (!synchronous) {
346  /*OSyncMessage *message = itm_message_new_signal(client, "PLUGIN_MESSAGE");
347  osync_debug("CLI", 3, "Sending message %p PLUGIN_MESSAGE for message %s", message, name);
348  itm_message_set_data(message, "data", data);
349  itm_message_set_data(message, "name", g_strdup(name));
350  itm_queue_send(engine->incoming, message);*/
351  return NULL;
352  } else {
353  return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata);
354  }
355 }
356 
357 OSyncPluginTimeouts osync_client_get_timeouts(OSyncClient *client)
358 {
359  return osync_plugin_get_timeouts(osync_member_get_plugin(client->member));
360 }
361 
362 void osync_client_call_plugin(OSyncClient *client, char *function, void *data, OSyncPluginReplyHandler replyhandler, void *userdata)
363 {
364  osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p, %p)", __func__, client, function, data, replyhandler, userdata);
365 
366  /*OSyncEngine *engine = client->engine;
367  ITMessage *message = itm_message_new_methodcall(engine, "CALL_PLUGIN");
368  itm_message_set_data(message, "data", data);
369  itm_message_set_data(message, "function", g_strdup(function));
370 
371  if (replyhandler) {
372  OSyncPluginCallContext *ctx = g_malloc0(sizeof(OSyncPluginCallContext));
373  ctx->handler = replyhandler;
374  ctx->userdata = userdata;
375  itm_message_set_handler(message, engine->incoming, (ITMessageHandler)_recv_plugin_answer, ctx);
376 
377  itm_message_set_data(message, "want_reply", GINT_TO_POINTER(1));
378  } else
379  itm_message_set_data(message, "want_reply", GINT_TO_POINTER(0));
380 
381  itm_queue_send(client->incoming, message);*/
382 
383  osync_trace(TRACE_EXIT, "%s", __func__);
384 }
385 
386 osync_bool osync_client_get_changes(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
387 {
388  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
389 
390  osync_flag_changing(target->fl_sent_changes);
391 
392  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGES, 0, error);
393  if (!message)
394  goto error;
395 
396  osync_message_set_handler(message, (OSyncMessageHandler)_get_changes_reply_receiver, target);
397 
398  osync_member_write_sink_info(target->member, message);
399 
400  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
401  if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_changeinfo_timeout, error))
402  goto error_free_message;
403 
404  osync_message_unref(message);
405 
406  osync_trace(TRACE_EXIT, "%s", __func__);
407  return TRUE;
408 
409 error_free_message:
410  osync_message_unref(message);
411 error:
412  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
413  return FALSE;
414 }
415 
416 osync_bool osync_client_get_change_data(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error)
417 {
418  osync_flag_changing(entry->fl_has_data);
419 
420  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGEDATA, 0, error);
421  if (!message)
422  goto error;
423 
424  osync_message_set_handler(message, (OSyncMessageHandler)_get_change_data_reply_receiver, entry);
425 
426  osync_marshal_change(message, entry->change);
427 
428  osync_debug("ENG", 3, "Sending get_changedata message %p to client %p", message, entry->client);
429 
430  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
431  if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_data_timeout, error))
432  goto error_free_message;
433 
434  osync_message_unref(message);
435 
436  osync_trace(TRACE_EXIT, "%s", __func__);
437  return TRUE;
438 
439 error_free_message:
440  osync_message_unref(message);
441 error:
442  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
443  return FALSE;
444 }
445 
446 /*void osync_client_read_change(OSyncEngine *sender, OSyncMappingEntry *entry)
447 {
448  //osync_flag_changing(entry->fl_has_data);
449  OSyncMessage *message = osync_message_new_methodcall(sender, "READ_CHANGE");
450  osync_message_set_handler(message, sender->incoming, (OSyncMessageHandler)_read_change_reply_receiver, sender);
451  osync_message_set_data(message, "change", entry->change);
452  osync_message_set_data(message, "entry", entry);
453  osync_debug("ENG", 3, "Sending read_change message %p to client %p", message, entry->client);
454 
455  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client);
456  osync_queue_send_with_timeout(entry->client->incoming, message, timeouts.read_change_timeout, sender);
457 }*/
458 
459 osync_bool osync_client_connect(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
460 {
461  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
462 
463  osync_flag_changing(target->fl_connected);
464 
465  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_CONNECT, 0, error);
466  if (!message)
467  goto error;
468 
469  osync_member_write_sink_info(target->member, message);
470 
471  osync_message_set_handler(message, (OSyncMessageHandler)_connect_reply_receiver, target);
472 
473  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
474  if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.connect_timeout, error))
475  goto error_free_message;
476 
477  osync_message_unref(message);
478 
479  osync_trace(TRACE_EXIT, "%s", __func__);
480  return TRUE;
481 
482 error_free_message:
483  osync_message_unref(message);
484 error:
485  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
486  return FALSE;
487 }
488 
489 osync_bool osync_client_commit_change(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error)
490 {
491  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, entry);
492  osync_trace(TRACE_INTERNAL, "Committing change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", osync_change_get_uid(entry->change), osync_change_get_changetype(entry->change), osync_change_get_data(entry->change), osync_change_get_datasize(entry->change), osync_change_get_objtype(entry->change) ? osync_objtype_get_name(osync_change_get_objtype(entry->change)) : "None", osync_change_get_objformat(entry->change) ? osync_objformat_get_name(osync_change_get_objformat(entry->change)) : "None", osync_member_get_id(entry->client->member));
493 
494  osync_flag_changing(entry->fl_dirty);
495 
496  // convert the data to the format accepted by the member
497  if (!osync_change_convert_member_sink(osync_group_get_format_env(sender->group), entry->change, target->member, error))
498  goto error;
499 
500  if (osync_change_get_changetype(entry->change) == CHANGE_ADDED) {
501  int elevated = 0;
502  // Generate a new UID, if necessary
503  OSyncMappingView *view = osengine_mappingtable_find_view(sender->maptable, target->member);
504  while (!osengine_mappingview_uid_is_unique(view, entry, TRUE)) {
505  if (!osync_change_elevate(sender, entry->change, 1))
506  break;
507  elevated++;
508  }
509 
510  if (elevated) {
511  // Save the newly generated UID
512  if (!osync_change_save(entry->change, TRUE, error))
513  goto error;
514  }
515  }
516 
517  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMIT_CHANGE, 0, error);
518  if (!message)
519  goto error;
520 
521  osync_marshal_change(message, entry->change);
522 
523  osync_message_set_handler(message, (OSyncMessageHandler)_commit_change_reply_receiver, entry);
524  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client);
525 
526  if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.commit_timeout, error))
527  goto error_free_message;
528 
529  osync_message_unref(message);
530 
531  g_assert(osync_flag_is_attached(entry->fl_committed) == TRUE);
532  osync_flag_detach(entry->fl_committed);
533 
534  osync_trace(TRACE_EXIT, "%s", __func__);
535  return TRUE;
536 
537 error_free_message:
538  osync_message_unref(message);
539 error:
540  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
541  return FALSE;
542 }
543 
544 osync_bool osync_client_sync_done(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
545 {
546  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
547 
548  osync_flag_changing(target->fl_done);
549  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNC_DONE, 0, error);
550  if (!message)
551  goto error;
552 
553  osync_message_set_handler(message, (OSyncMessageHandler)_sync_done_reply_receiver, target);
554 
555  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
556  if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.sync_done_timeout, error))
557  goto error_free_message;
558 
559  osync_message_unref(message);
560 
561  osync_trace(TRACE_EXIT, "%s", __func__);
562  return TRUE;
563 
564 error_free_message:
565  osync_message_unref(message);
566 error:
567  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
568  return FALSE;
569 }
570 
571 osync_bool osync_client_committed_all(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
572 {
573  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender);
574 
575  osync_flag_changing(target->fl_committed_all);
576 
577  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMITTED_ALL, 0, error);
578  if (!message)
579  goto error;
580 
581  osync_message_set_handler(message, (OSyncMessageHandler)_committed_all_reply_receiver, target);
582 
583  //OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
584  /*FIXME: Add timeout to committed_all message */
585  if (!osync_queue_send_message(target->commands_to_osplugin, target->commands_from_osplugin, message, error))
586  goto error_free_message;
587 
588  osync_message_unref(message);
589 
590  osync_trace(TRACE_EXIT, "%s", __func__);
591  return TRUE;
592 
593 error_free_message:
594  osync_message_unref(message);
595 error:
596  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
597  return FALSE;
598 }
599 
600 osync_bool osync_client_disconnect(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
601 {
602  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender);
603 
604  osync_flag_changing(target->fl_connected);
605 
606  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_DISCONNECT, 0, error);
607  if (!message)
608  goto error;
609 
610  osync_message_set_handler(message, (OSyncMessageHandler)_disconnect_reply_receiver, target);
611 
612  OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
613  if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.disconnect_timeout, error))
614  goto error_free_message;
615 
616  osync_message_unref(message);
617 
618  osync_trace(TRACE_EXIT, "%s", __func__);
619  return TRUE;
620 
621 error_free_message:
622  osync_message_unref(message);
623 error:
624  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
625  return FALSE;
626 }
627 
628 
629 /*
630 void osync_client_call_plugin_with_reply(OSyncClient *client, char *function, void *data, void ( *replyhandler)(OSyncEngine *, OSyncClient *, void *, OSyncError *), int timeout)
631 {
632  OSyncEngine *engine = client->engine;
633  ITMessage *message = itm_message_new_signal(engine, "CALL_PLUGIN");
634  osync_debug("CLI", 3, "Sending message %p CALL_PLUGIN for function %s", message, function);
635  itm_message_set_data(message, "data", data);
636  itm_message_set_data(message, "function", g_strdup(function));
637  itm_queue_send_with_reply(client->incoming, message);
638 }*/
639 
640 char *osync_client_pid_filename(OSyncClient *client)
641 {
642  return g_strdup_printf("%s/osplugin.pid", client->member->configdir);
643 }
644 
645 osync_bool osync_client_remove_pidfile(OSyncClient *client, OSyncError **error)
646 {
647  osync_bool ret = FALSE;
648  char *pidpath = osync_client_pid_filename(client);
649 
650  if (unlink(pidpath) < 0) {
651  osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't remove pid file: %s", strerror(errno));
652  goto out_free_path;
653  }
654 
655  /* Success */
656  ret = TRUE;
657 
658 out_free_path:
659  g_free(pidpath);
660 //out:
661  return ret;
662 }
663 
664 osync_bool osync_client_create_pidfile(OSyncClient *client, OSyncError **error)
665 {
666  osync_bool ret = FALSE;
667  char *pidpath = osync_client_pid_filename(client);
668  char *pidstr = g_strdup_printf("%ld", (long)client->child_pid);
669 
670  if (!osync_file_write(pidpath, pidstr, strlen(pidstr), 0644, error))
671  goto out_free_pidstr;
672 
673  /* Success */
674  ret = TRUE;
675 
676 out_free_pidstr:
677  g_free(pidstr);
678 //out_free_path:
679  g_free(pidpath);
680 //out:
681  return ret;
682 }
683 
684 osync_bool osync_client_kill_old_osplugin(OSyncClient *client, OSyncError **error)
685 {
686  osync_bool ret = FALSE;
687 
688  char *pidstr;
689  int pidlen;
690  pid_t pid;
691 
692  char *pidpath = osync_client_pid_filename(client);
693 
694  /* Simply returns if there is no PID file */
695  if (!g_file_test(pidpath, G_FILE_TEST_EXISTS)) {
696  ret = TRUE;
697  goto out_free_path;
698  }
699 
700  if (!osync_file_read(pidpath, &pidstr, &pidlen, error))
701  goto out_free_path;
702 
703  pid = atol(pidstr);
704  if (!pid)
705  goto out_free_str;
706 
707  osync_trace(TRACE_INTERNAL, "Killing old osplugin process. PID: %ld", (long)pid);
708 
709  if (kill(pid, SIGTERM) < 0) {
710  osync_trace(TRACE_INTERNAL, "Error killing old osplugin: %s. Stale pid file?", strerror(errno));
711  /* Don't return failure if kill() failed, because it may be a stale pid file */
712  }
713 
714  int count = 0;
715  while (osync_queue_is_alive(client->commands_to_osplugin)) {
716  if (count++ > 10) {
717  osync_trace(TRACE_INTERNAL, "Killing old osplugin process with SIGKILL");
718  kill(pid, SIGKILL);
719  break;
720  }
721  osync_trace(TRACE_INTERNAL, "Waiting for other side to terminate");
722  /*FIXME: Magic numbers are evil */
723  usleep(500000);
724  }
725 
726  if (unlink(pidpath) < 0) {
727  osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't erase PID file: %s", strerror(errno));
728  goto out_free_str;
729  }
730 
731  /* Success */
732  ret = TRUE;
733 
734 out_free_str:
735  g_free(pidstr);
736 out_free_path:
737  g_free(pidpath);
738 //out:
739  return ret;
740 }
741 
742 
743 osync_bool osync_client_spawn(OSyncClient *client, OSyncEngine *engine, OSyncError **error)
744 {
745  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error);
746 
747  int waiting = 0;
748 
749  if (!osync_client_kill_old_osplugin(client, error))
750  goto error;
751 
752  if (!osync_queue_exists(client->commands_to_osplugin) || !osync_queue_is_alive(client->commands_to_osplugin)) {
753  pid_t cpid = fork();
754  if (cpid == 0) {
756 
757  /* Export all options to osplugin through environment variables */
759 
760  OSyncMember *member = client->member;
761  OSyncPlugin *plugin = osync_member_get_plugin(member);
762  const char *path = osync_plugin_get_path(plugin);
763  setenv("OSYNC_MODULE_LIST", path, 1);
764 
766 
767  char *memberstring = g_strdup_printf("%lli", osync_member_get_id(client->member));
768  execlp(OSPLUGIN, OSPLUGIN, osync_group_get_configdir(engine->group), memberstring, NULL);
769 
770  if (errno == ENOENT) {
771  execlp("./osplugin", "osplugin", osync_group_get_configdir(engine->group), memberstring, NULL);
772  }
773 
774  osync_trace(TRACE_INTERNAL, "unable to exec");
775  exit(1);
776  }
777 
778  client->child_pid = cpid;
779 
780  /* We are going to wait 5 seconds for plugin */
781  while (!osync_queue_exists(client->commands_to_osplugin) && waiting <= 5) {
782  osync_trace(TRACE_INTERNAL, "Waiting for other side to create fifo");
783 
784  sleep(1);
785  waiting++;
786  }
787 
788  osync_trace(TRACE_INTERNAL, "Queue was created");
789  }
790 
791  if (client->child_pid) {
792  if (!osync_client_create_pidfile(client, error))
793  goto error;
794  }
795 
796  if (!osync_queue_connect(client->commands_to_osplugin, OSYNC_QUEUE_SENDER, error))
797  goto error;
798 
799  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, error);
800  if (!message)
801  goto error_disconnect;
802 
803  osync_message_write_string(message, client->commands_from_osplugin->name);
804 
805  if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
806  goto error_free_message;
807 
808  osync_message_unref(message);
809 
810  osync_trace(TRACE_EXIT, "%s", __func__);
811  return TRUE;
812 
813 error_free_message:
814  osync_message_unref(message);
815 error_disconnect:
816  osync_queue_disconnect(client->commands_to_osplugin, NULL);
817 error:
818  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
819  return FALSE;
820 }
821 
822 osync_bool osync_client_init(OSyncClient *client, OSyncEngine *engine, OSyncError **error)
823 {
824  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error);
825 
826  OSyncMessage *reply = osync_queue_get_message(client->commands_from_osplugin);
827 
828  osync_trace(TRACE_INTERNAL, "reply received %i", reply->cmd);
829  if (reply->cmd == OSYNC_MESSAGE_ERRORREPLY) {
830  if (error)
831  osync_demarshal_error(reply, error);
832  goto error_free_reply;
833  }
834 
835  if (reply->cmd != OSYNC_MESSAGE_REPLY) {
836  osync_error_set(error, OSYNC_ERROR_GENERIC, "Invalid answer from plugin process");
837  goto error_free_reply;
838  }
839 
840  osync_message_unref(reply);
841 
842  osync_trace(TRACE_EXIT, "%s", __func__);
843  return TRUE;
844 
845 error_free_reply:
846  osync_message_unref(reply);
847  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
848  return FALSE;
849 }
850 
851 osync_bool osync_client_finalize(OSyncClient *client, OSyncError **error)
852 {
853  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, client, error);
854 
855  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_FINALIZE, 0, error);
856  if (!message)
857  goto error;
858 
859  if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
860  goto error_free_message;
861 
862  osync_message_unref(message);
863 
864  if (client->child_pid) {
865  int status;
866  if (waitpid(client->child_pid, &status, 0) == -1) {
867  osync_error_set(error, OSYNC_ERROR_GENERIC, "Error waiting for osplugin process: %s", strerror(errno));
868  goto error;
869  }
870 
871  if (!WIFEXITED(status))
872  osync_trace(TRACE_INTERNAL, "Child has exited abnormally");
873  else if (WEXITSTATUS(status) != 0)
874  osync_trace(TRACE_INTERNAL, "Child has returned non-zero exit status (%d)", WEXITSTATUS(status));
875 
876  if (!osync_client_remove_pidfile(client, error))
877  goto error;
878  }
879 
880  osync_queue_disconnect(client->commands_to_osplugin, NULL);
881 
882 
883  osync_trace(TRACE_EXIT, "%s", __func__);
884  return TRUE;
885 
886 error_free_message:
887  osync_message_unref(message);
888 error:
889  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
890  return FALSE;
891 }