25 #include <sys/types.h>
29 #include <opensync/opensync_support.h>
30 #include "opensync/opensync_message_internals.h"
31 #include "opensync/opensync_queue_internals.h"
32 #include "opensync/opensync_format_internals.h"
34 #include "engine_internals.h"
35 #include <opensync/opensync_user_internals.h>
37 OSyncMappingEntry *osengine_mappingtable_find_entry(OSyncMappingTable *table,
const char *uid,
const char *objtype,
long long int memberid);
70 osync_trace(
TRACE_INTERNAL,
"Handling new change with uid %s, changetype %i, objtype %s and format %s from member %lli", uid, change_type,
83 osync_bool is_file_objformat = FALSE;
86 ((!strcmp(objformat->name,
"file"))?(TRUE):(FALSE));
87 if ( (!objtype) || (!objformat) ||
89 (!strcmp(objformat->name,
"plain"))) {
91 objtype = (objtype_test)?(objtype_test):(objtype);
104 ( (!is_file_objformat) &&
105 (!osengine_mappingtable_find_entry(
106 engine->maptable, uid,
123 ( !osengine_mappingtable_find_entry(
124 engine->maptable, uid,
128 OSyncMappingEntry *entry =
129 osengine_mappingtable_find_entry(
130 engine->maptable, uid, NULL,
140 "Could not find one entry with UID=%s to delete.", uid);
148 osync_trace(
TRACE_INTERNAL,
"Handling new change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", uid, change_type,
osync_change_get_data(change),
osync_change_get_datasize(change), objtype ?
osync_objtype_get_name(objtype) :
"None",
osync_change_get_objformat(change) ?
osync_objformat_get_name(
osync_change_get_objformat(change)) :
"None",
osync_member_get_id(client->member));
152 "ObjType not set for uid %s.", uid);
157 OSyncMappingEntry *entry = osengine_mappingtable_store_change(engine->maptable, change);
158 change = entry->change;
163 osync_flag_unset(entry->fl_has_data);
172 if (!entry->mapping) {
173 osync_flag_attach(entry->fl_mapped, engine->cmb_entries_mapped);
174 osync_flag_unset(entry->fl_mapped);
178 osync_flag_set(entry->fl_mapped);
179 osync_flag_unset(entry->mapping->fl_solved);
180 osync_flag_unset(entry->mapping->fl_chkconflict);
181 osync_flag_unset(entry->mapping->fl_multiplied);
186 osync_flag_set(entry->fl_has_data);
190 osync_flag_unset(entry->fl_has_data);
195 osync_flag_set(entry->fl_deleted);
197 osync_flag_set(entry->fl_has_info);
198 osync_flag_unset(entry->fl_synced);
200 osengine_mappingentry_decider(engine, entry);
211 OSyncClient *osengine_get_client(OSyncEngine *engine,
long long int memberId)
214 for (c = engine->clients; c; c = c->next) {
215 OSyncClient *client = c->data;
223 void send_engine_changed(OSyncEngine *engine)
225 if (!engine->is_initialized)
231 osync_debug(
"ENG", 4,
"Sending message %p:\"ENGINE_CHANGED\"", message);
232 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
235 void send_mapping_changed(OSyncEngine *engine, OSyncMapping *mapping)
238 osync_message_write_long_long_int(message, mapping->id);
241 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
245 void send_mappingentry_changed(OSyncEngine *engine, OSyncMappingEntry *entry)
250 long long ptr = (
long long)(
long)entry;
251 osync_message_write_long_long_int(message, ptr);
254 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
265 static void engine_message_handler(
OSyncMessage *message, OSyncEngine *engine)
274 case OSYNC_MESSAGE_SYNCHRONIZE:
276 osengine_client_all_deciders(engine);
278 case OSYNC_MESSAGE_NEW_CHANGE:
281 long long int member_id = 0;
282 osync_message_read_long_long_int(message, &member_id);
283 OSyncClient *sender = osengine_get_client(engine, member_id);
287 case OSYNC_MESSAGE_ENGINE_CHANGED:
288 osengine_client_all_deciders(engine);
289 osengine_mapping_all_deciders(engine);
291 for (u = engine->maptable->unmapped; u; u = u->next) {
292 OSyncMappingEntry *unmapped = u->data;
293 send_mappingentry_changed(engine, unmapped);
296 case OSYNC_MESSAGE_MAPPING_CHANGED:
299 osync_message_read_long_long_int(message, &
id);
301 OSyncMapping *mapping = osengine_mappingtable_mapping_from_id(engine->maptable,
id);
303 if (!g_list_find(engine->maptable->mappings, mapping)) {
308 osengine_mapping_decider(engine, mapping);
311 case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED:
314 osync_message_read_long_long_int(message, &ptr);
315 OSyncMappingEntry *entry = (OSyncMappingEntry*)(
long)ptr;
317 if (!g_list_find(engine->maptable->entries, entry) && !g_list_find(engine->maptable->unmapped, entry)) {
322 osengine_mappingentry_decider(engine, entry);
325 case OSYNC_MESSAGE_SYNC_ALERT:
326 if (engine->allow_sync_alert)
327 osync_flag_set(engine->fl_running);
366 static void trigger_clients_sent_changes(OSyncEngine *engine)
371 g_mutex_lock(engine->info_received_mutex);
372 g_cond_signal(engine->info_received);
373 g_mutex_unlock(engine->info_received_mutex);
376 osengine_mappingtable_inject_changes(engine->maptable);
378 send_engine_changed(engine);
382 static void trigger_clients_read_all(OSyncEngine *engine)
386 send_engine_changed(engine);
390 static void trigger_status_end_conflicts(OSyncEngine *engine)
398 static void trigger_clients_connected(OSyncEngine *engine)
402 osengine_client_all_deciders(engine);
407 static void trigger_clients_comitted_all(OSyncEngine *engine)
448 static gboolean startupfunc(gpointer data)
450 OSyncEngine *engine = data;
454 if (!osengine_mappingtable_load(engine->maptable, &error)) {
456 osync_status_update_engine(engine,
ENG_ERROR, &error);
458 osync_flag_set(engine->fl_stop);
461 g_mutex_lock(engine->started_mutex);
462 g_cond_signal(engine->started);
463 g_mutex_unlock(engine->started_mutex);
494 for (c = engine->clients; c; c = c->next) {
495 OSyncClient *client = c->data;
496 osync_client_reset(client);
499 osync_flag_set_state(engine->fl_running, FALSE);
500 osync_flag_set_state(engine->fl_stop, FALSE);
501 osync_flag_set_state(engine->cmb_sent_changes, FALSE);
502 osync_flag_set_state(engine->cmb_entries_mapped, TRUE);
503 osync_flag_set_state(engine->cmb_synced, TRUE);
504 osync_flag_set_state(engine->cmb_chkconflict, TRUE);
505 osync_flag_set_state(engine->cmb_finished, FALSE);
506 osync_flag_set_state(engine->cmb_connected, FALSE);
507 osync_flag_set_state(engine->cmb_read_all, TRUE);
508 osync_flag_set_state(engine->cmb_committed_all, TRUE);
509 osync_flag_set_state(engine->cmb_committed_all_sent, FALSE);
513 engine->committed_all_sent = FALSE;
515 osengine_mappingtable_reset(engine->maptable);
521 osync_status_update_engine(engine,
ENG_ERROR, &newerror);
530 g_mutex_lock(engine->syncing_mutex);
531 g_cond_signal(engine->syncing);
532 g_mutex_unlock(engine->syncing_mutex);
542 static int __mkdir_with_parents(
char *dir,
int mode)
544 if (g_file_test(dir, G_FILE_TEST_IS_DIR))
547 char *slash = strrchr(dir,
'/');
548 if (slash && slash != dir) {
557 if (__mkdir_with_parents(dir, mode) < 0)
562 if (mkdir(dir, mode) < 0)
568 static int mkdir_with_parents(
const char *dir,
int mode)
571 char *mydir = strdup(dir);
575 r = __mkdir_with_parents(mydir, mode);
594 OSyncEngine *engine = g_malloc0(
sizeof(OSyncEngine));
597 if (!g_thread_supported ())
598 g_thread_init (NULL);
600 engine->context = g_main_context_new();
601 engine->syncloop = g_main_loop_new(engine->context, FALSE);
602 engine->group = group;
609 char *path = g_strdup_printf(
"%s/enginepipe", enginesdir);
611 if (mkdir_with_parents(enginesdir, 0755) < 0) {
612 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Couldn't create engines directory: %s", strerror(errno));
613 goto error_free_paths;
616 engine->syncing_mutex = g_mutex_new();
617 engine->info_received_mutex = g_mutex_new();
618 engine->syncing = g_cond_new();
619 engine->info_received = g_cond_new();
620 engine->started_mutex = g_mutex_new();
621 engine->started = g_cond_new();
624 engine->fl_running = osync_flag_new(NULL);
625 osync_flag_set_pos_trigger(engine->fl_running, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
627 engine->fl_sync = osync_flag_new(NULL);
628 engine->fl_stop = osync_flag_new(NULL);
629 osync_flag_set_pos_trigger(engine->fl_stop, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
632 engine->cmb_sent_changes = osync_comb_flag_new(FALSE, FALSE);
633 osync_flag_set_pos_trigger(engine->cmb_sent_changes, (OSyncFlagTriggerFunc)trigger_clients_sent_changes, engine, NULL);
635 engine->cmb_read_all = osync_comb_flag_new(FALSE, TRUE);
636 osync_flag_set_pos_trigger(engine->cmb_read_all, (OSyncFlagTriggerFunc)trigger_clients_read_all, engine, NULL);
638 engine->cmb_entries_mapped = osync_comb_flag_new(FALSE, FALSE);
639 osync_flag_set_pos_trigger(engine->cmb_entries_mapped, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
642 engine->cmb_synced = osync_comb_flag_new(FALSE, TRUE);
643 osync_flag_set_pos_trigger(engine->cmb_synced, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
646 engine->cmb_finished = osync_comb_flag_new(FALSE, TRUE);
647 osync_flag_set_pos_trigger(engine->cmb_finished, (OSyncFlagTriggerFunc)
osengine_reset, engine, NULL);
649 engine->cmb_connected = osync_comb_flag_new(FALSE, FALSE);
650 osync_flag_set_pos_trigger(engine->cmb_connected, (OSyncFlagTriggerFunc)trigger_clients_connected, engine, NULL);
652 engine->cmb_chkconflict = osync_comb_flag_new(FALSE, TRUE);
653 osync_flag_set_pos_trigger(engine->cmb_chkconflict, (OSyncFlagTriggerFunc)trigger_status_end_conflicts, engine, NULL);
655 engine->cmb_multiplied = osync_comb_flag_new(FALSE, TRUE);
657 engine->cmb_committed_all = osync_comb_flag_new(FALSE, TRUE);
658 osync_flag_set_pos_trigger(engine->cmb_committed_all, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
661 engine->cmb_committed_all_sent = osync_comb_flag_new(FALSE, TRUE);
662 osync_flag_set_pos_trigger(engine->cmb_committed_all_sent, (OSyncFlagTriggerFunc)trigger_clients_comitted_all, engine, NULL);
664 osync_flag_set(engine->fl_sync);
669 if (!osync_client_new(engine, member, error))
670 goto error_free_paths;
673 engine->maptable = osengine_mappingtable_new(engine);
698 for (c = engine->clients; c; c = c->next) {
699 OSyncClient *client = c->data;
700 osync_client_free(client);
703 osengine_mappingtable_free(engine->maptable);
704 engine->maptable = NULL;
706 osync_flag_free(engine->fl_running);
707 osync_flag_free(engine->fl_sync);
708 osync_flag_free(engine->fl_stop);
709 osync_flag_free(engine->cmb_sent_changes);
710 osync_flag_free(engine->cmb_entries_mapped);
711 osync_flag_free(engine->cmb_synced);
712 osync_flag_free(engine->cmb_chkconflict);
713 osync_flag_free(engine->cmb_finished);
714 osync_flag_free(engine->cmb_connected);
715 osync_flag_free(engine->cmb_read_all);
716 osync_flag_free(engine->cmb_multiplied);
717 osync_flag_free(engine->cmb_committed_all);
718 osync_flag_free(engine->cmb_committed_all_sent);
720 g_list_free(engine->clients);
721 g_main_loop_unref(engine->syncloop);
723 g_main_context_unref(engine->context);
725 g_mutex_free(engine->syncing_mutex);
726 g_mutex_free(engine->info_received_mutex);
727 g_cond_free(engine->syncing);
728 g_cond_free(engine->info_received);
729 g_mutex_free(engine->started_mutex);
730 g_cond_free(engine->started);
747 engine->conflict_callback =
function;
748 engine->conflict_userdata = user_data;
762 engine->changestat_callback =
function;
763 engine->changestat_userdata = user_data;
777 engine->mapstat_callback =
function;
778 engine->mapstat_userdata = user_data;
792 engine->engstat_callback =
function;
793 engine->engstat_userdata = user_data;
807 engine->mebstat_callback =
function;
808 engine->mebstat_userdata = user_data;
822 engine->plgmsg_callback =
function;
823 engine->plgmsg_userdata = user_data;
841 if (engine->is_initialized) {
842 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION,
"This engine was already initialized");
852 case OSYNC_LOCK_STALE:
853 osync_debug(
"ENG", 1,
"Detected stale lock file. Slow-syncing");
861 osync_flag_set(engine->cmb_entries_mapped);
862 osync_flag_set(engine->cmb_synced);
863 engine->allow_sync_alert = TRUE;
876 engine->is_initialized = TRUE;
880 for (c = engine->clients; c; c = c->next) {
881 OSyncClient *client = c->data;
882 osync_queue_create(client->commands_from_osplugin, NULL);
884 if (!osync_client_spawn(client, engine, error)) {
891 if (!(engine->man_dispatch))
894 if (!osync_queue_connect(client->commands_from_osplugin, OSYNC_QUEUE_RECEIVER, 0 )) {
902 if (!osync_queue_new_pipes(&engine->commands_from_self, &engine->commands_to_self, error)) {
908 if (!osync_queue_connect(engine->commands_from_self, OSYNC_QUEUE_RECEIVER, 0 )) {
914 if (!osync_queue_connect(engine->commands_to_self, OSYNC_QUEUE_SENDER, 0 )) {
921 if (!(engine->man_dispatch))
925 for (c = engine->clients; c; c = c->next) {
926 OSyncClient *client = c->data;
927 if (!osync_client_init(client, engine, error)) {
939 g_mutex_lock(engine->started_mutex);
940 GSource *idle = g_idle_source_new();
941 g_source_set_priority(idle, G_PRIORITY_HIGH);
942 g_source_set_callback(idle, startupfunc, engine, NULL);
943 g_source_attach(idle, engine->context);
944 engine->thread = g_thread_create ((GThreadFunc)g_main_loop_run, engine->syncloop, TRUE, NULL);
945 g_cond_wait(engine->started, engine->started_mutex);
946 g_mutex_unlock(engine->started_mutex);
965 if (!engine->is_initialized) {
971 osync_debug(
"ENG", 3,
"finalizing engine %p", engine);
973 if (engine->thread) {
974 g_main_loop_quit(engine->syncloop);
975 g_thread_join(engine->thread);
979 for (c = engine->clients; c; c = c->next) {
980 OSyncClient *client = c->data;
981 osync_queue_disconnect(client->commands_from_osplugin, NULL);
982 osync_client_finalize(client, NULL);
985 osync_queue_disconnect(engine->commands_from_self, NULL);
986 osync_queue_disconnect(engine->commands_to_self, NULL);
988 osync_queue_free(engine->commands_from_self);
989 engine->commands_from_self = NULL;
990 osync_queue_free(engine->commands_to_self);
991 engine->commands_to_self = NULL;
993 osengine_mappingtable_close(engine->maptable);
1002 if (!osync_flag_is_set(engine->cmb_connected) && !engine->slowsync)
1009 engine->is_initialized = FALSE;
1029 if (!engine->is_initialized) {
1030 osync_error_set(error, OSYNC_ERROR_GENERIC,
"osengine_synchronize: Not initialized");
1038 engine->slowsync = TRUE;
1040 engine->slowsync = FALSE;
1044 engine->alldeciders = 0;
1046 osync_flag_set(engine->fl_running);
1052 if (!osync_queue_send_message(engine->commands_to_self, NULL, message, error))
1053 goto error_free_message;
1055 osync_message_unref(message);
1061 osync_message_unref(message);
1076 osync_flag_unset(engine->fl_sync);
1088 if (engine->syncloop) {
1089 g_warning(
"Unable to flag manual since engine is already initialized\n");
1091 engine->man_dispatch = TRUE;
1102 osync_flag_unset(engine->fl_running);
1114 osync_flag_set(engine->fl_stop);
1125 engine->allow_sync_alert = TRUE;
1136 engine->allow_sync_alert = FALSE;
1153 g_mutex_lock(engine->syncing_mutex);
1156 g_mutex_unlock(engine->syncing_mutex);
1160 g_cond_wait(engine->syncing, engine->syncing_mutex);
1161 g_mutex_unlock(engine->syncing_mutex);
1163 if (engine->error) {
1188 g_mutex_lock(engine->syncing_mutex);
1189 g_cond_wait(engine->syncing, engine->syncing_mutex);
1190 g_mutex_unlock(engine->syncing_mutex);
1192 if (engine->error) {
1207 g_mutex_lock(engine->info_received_mutex);
1208 g_cond_wait(engine->info_received, engine->info_received_mutex);
1209 g_mutex_unlock(engine->info_received_mutex);
1230 return osengine_mappingtable_mapping_from_id(engine->maptable,
id);