Fawkes API  Fawkes Development Version
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
remote.cpp
1 
2 /***************************************************************************
3  * remote.h - Remote BlackBoard access via Fawkes network protocol
4  *
5  * Created: Mon Mar 03 10:53:00 2008
6  * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <blackboard/remote.h>
25 #include <blackboard/exceptions.h>
26 #include <blackboard/net/messages.h>
27 #include <blackboard/net/ilist_content.h>
28 #include <blackboard/net/interface_proxy.h>
29 #include <blackboard/internal/notifier.h>
30 #include <blackboard/internal/instance_factory.h>
31 
32 #include <interface/interface_info.h>
33 
34 #include <core/threading/thread.h>
35 #include <core/threading/mutex.h>
36 #include <core/threading/mutex_locker.h>
37 #include <core/threading/wait_condition.h>
38 #include <netcomm/fawkes/client.h>
39 
40 #include <string>
41 #include <cstring>
42 #include <fnmatch.h>
43 #include <arpa/inet.h>
44 
45 namespace fawkes {
46 
47 /** @class RemoteBlackBoard <blackboard/remote.h>
48  * Remote BlackBoard.
49  * This class implements the access to a remote BlackBoard using the Fawkes
50  * network protocol.
51  *
52  * @author Tim Niemueller
53  */
54 
55 /** Constructor.
56  * @param client Fawkes network client to use.
57  */
59 {
60  __fnc = client;
61  __fnc_owner = false;
62 
63  if ( ! __fnc->connected() ) {
64  throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
65  }
66 
67  __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
68 
69  __mutex = new Mutex();
70  __instance_factory = new BlackBoardInstanceFactory();
71 
72  __wait_mutex = new Mutex();
73  __wait_cond = new WaitCondition(__wait_mutex);
74 
75  __inbound_thread = NULL;
76  __m = NULL;
77 }
78 
79 
80 /** Constructor.
81  * This will internall create a fawkes network client that is used to communicate
82  * with the remote BlackBoard.
83  * @param hostname hostname to connect to
84  * @param port port to connect to
85  */
86 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
87 {
88  __fnc = new FawkesNetworkClient(hostname, port);
89  try {
90  __fnc->connect();
91  } catch (Exception &e) {
92  delete __fnc;
93  throw;
94  }
95 
96  __fnc_owner = true;
97 
98  if ( ! __fnc->connected() ) {
99  throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
100  }
101 
102  __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
103 
104  __mutex = new Mutex();
105  __instance_factory = new BlackBoardInstanceFactory();
106 
107  __wait_mutex = new Mutex();
108  __wait_cond = new WaitCondition(__wait_mutex);
109 
110  __inbound_thread = NULL;
111  __m = NULL;
112 }
113 
114 
115 /** Destructor. */
117 {
118  __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
119  delete __mutex;
120  delete __instance_factory;
121 
122  for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
123  delete __pit->second;
124  }
125 
126  if (__fnc_owner) {
127  __fnc->disconnect();
128  delete __fnc;
129  }
130 
131  delete __wait_cond;
132  delete __wait_mutex;
133 }
134 
135 
136 bool
138 {
139  return __fnc->connected();
140 }
141 
142 
143 void
144 RemoteBlackBoard::reopen_interfaces()
145 {
146  __proxies.lock();
147  __ipit = __invalid_proxies.begin();
148  while ( __ipit != __invalid_proxies.end() ) {
149  try {
150  Interface *iface = (*__ipit)->interface();
151  open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
152  iface->set_validity(true);
153  __ipit = __invalid_proxies.erase(__ipit);
154  } catch (Exception &e) {
155  // we failed to re-establish validity for the given interface, bad luck
156  ++__ipit;
157  }
158  }
159  __proxies.unlock();
160 }
161 
162 bool
164 {
165  bool rv = true;
166  try {
167  if ( ! __fnc->connected() ) {
168  __fnc->connect();
169 
170  reopen_interfaces();
171  }
172  } catch (...) {
173  rv = false;
174  }
175  return rv;
176 }
177 
178 
179 void
180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
181  bool writer, Interface *iface)
182 {
183  if ( ! __fnc->connected() ) {
184  throw Exception("Cannot instantiate remote interface, connection is dead");
185  }
186 
187  __mutex->lock();
188  if (__inbound_thread != NULL &&
190  strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
191  {
192  throw Exception("Cannot call open_interface() from inbound handler");
193  }
194  __mutex->unlock();
195 
196  bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
197  strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
198  strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
199  memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
200 
201  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
202  writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
203  om, sizeof(bb_iopen_msg_t));
204 
205  __wait_mutex->lock();
206  __fnc->enqueue(omsg);
207  while (is_alive() &&
208  (! __m ||
209  ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
210  (__m->msgid() != MSG_BB_OPEN_FAILURE))))
211  {
212  if ( __m ) {
213  __m->unref();
214  __m = NULL;
215  }
216  __wait_cond->wait();
217  }
218  __wait_mutex->unlock();
219 
220  if (!is_alive()) {
221  throw Exception("Connection died while trying to open %s::%s",
222  type, identifier);
223  }
224 
225  if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
226  // We got the interface, create internal storage and prepare instance for return
227  BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
228  iface, writer);
229  __proxies[proxy->serial()] = proxy;
230  } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
231  bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
232  unsigned int error = ntohl(fm->errno);
233  __m->unref();
234  __m = NULL;
235  if ( error == BB_ERR_WRITER_EXISTS ) {
236  throw BlackBoardWriterActiveException(identifier, type);
237  } else if ( error == BB_ERR_HASH_MISMATCH ) {
238  throw Exception("Hash mismatch for interface %s:%s", type, identifier);
239  } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
240  throw Exception("Type %s unknown (%s::%s)", type, type, identifier);
241  } else if ( error == BB_ERR_WRITER_EXISTS ) {
242  throw BlackBoardWriterActiveException(identifier, type);
243  } else {
244  throw Exception("Could not open interface");
245  }
246  }
247 
248  __m->unref();
249  __m = NULL;
250 }
251 
252 Interface *
253 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
254 {
255  if ( ! __fnc->connected() ) {
256  throw Exception("Cannot instantiate remote interface, connection is dead");
257  }
258 
259  Interface *iface = __instance_factory->new_interface_instance(type, identifier);
260  try {
261  open_interface(type, identifier, writer, iface);
262  } catch (Exception &e) {
263  __instance_factory->delete_interface_instance(iface);
264  throw;
265  }
266 
267  return iface;
268 }
269 
270 
271 Interface *
272 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
273 {
274  return open_interface(type, identifier, /* writer? */ false);
275 }
276 
277 
278 Interface *
279 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
280 {
281  return open_interface(type, identifier, /* writer? */ true);
282 }
283 
284 
285 std::list<Interface *>
287  const char *id_pattern)
288 {
289  std::list<Interface *> rv;
290 
291  InterfaceInfoList *infl = list_all();
292  for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
293  // ensure 0-termination
294  char type[__INTERFACE_TYPE_SIZE + 1];
295  char id[__INTERFACE_ID_SIZE + 1];
296  type[__INTERFACE_TYPE_SIZE] = 0;
297  id[__INTERFACE_TYPE_SIZE] = 0;
298  strncpy(type, i->type(), __INTERFACE_TYPE_SIZE);
299  strncpy(id, i->id(), __INTERFACE_ID_SIZE);
300 
301  if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH) ||
302  (fnmatch(id_pattern, id, 0) == FNM_NOMATCH) ) {
303  // type or ID prefix does not match, go on
304  continue;
305  }
306 
307  try {
308  Interface *iface = open_for_reading((*i).type(), (*i).id());
309  rv.push_back(iface);
310  } catch (Exception &e) {
311  for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
312  close(*j);
313  }
314  throw;
315  }
316  }
317 
318  return rv;
319 }
320 
321 
322 /** Close interface.
323  * @param interface interface to close
324  */
325 void
327 {
328  if ( interface == NULL ) return;
329 
330  unsigned int serial = interface->serial();
331 
332  if ( __proxies.find(serial) != __proxies.end() ) {
333  delete __proxies[serial];
334  __proxies.erase(serial);
335  }
336 
337  if ( __fnc->connected() ) {
338  // We cannot "officially" close it, if we are disconnected it cannot be used anyway
339  bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
340  sm->serial = htonl(interface->serial());
341 
342  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
343  MSG_BB_CLOSE,
344  sm, sizeof(bb_iserial_msg_t));
345  __fnc->enqueue(omsg);
346  }
347 
348  __instance_factory->delete_interface_instance(interface);
349 }
350 
351 
354 {
355  __mutex->lock();
356  if (__inbound_thread != NULL &&
357  strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
358  {
359  throw Exception("Cannot call list_all() from inbound handler");
360  }
361  __mutex->unlock();
362 
363  InterfaceInfoList *infl = new InterfaceInfoList();
364 
365  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
366  MSG_BB_LIST_ALL);
367  __wait_mutex->lock();
368  __fnc->enqueue(omsg);
369  while (! __m ||
370  (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
371  if ( __m ) {
372  __m->unref();
373  __m = NULL;
374  }
375  __wait_cond->wait();
376  }
377  __wait_mutex->unlock();
378 
380  while ( bbilc->has_next() ) {
381  size_t iisize;
382  bb_iinfo_msg_t *ii = bbilc->next(&iisize);
383  infl->append(ii->type, ii->id, ii->hash, ii->serial,
384  ii->has_writer, ii->num_readers);
385  }
386 
387  __m->unref();
388  __m = NULL;
389 
390  return infl;
391 }
392 
393 
395 RemoteBlackBoard::list(const char *type_pattern, const char *id_pattern)
396 {
397  __mutex->lock();
398  if (__inbound_thread != NULL &&
399  strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
400  {
401  throw Exception("Cannot call list() from inbound handler");
402  }
403  __mutex->unlock();
404 
405  InterfaceInfoList *infl = new InterfaceInfoList();
406 
407  bb_ilistreq_msg_t *om =
408  (bb_ilistreq_msg_t *)calloc(1, sizeof(bb_ilistreq_msg_t));
409  strncpy(om->type_pattern, type_pattern, __INTERFACE_TYPE_SIZE);
410  strncpy(om->id_pattern, id_pattern, __INTERFACE_ID_SIZE);
411 
412  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
413  MSG_BB_LIST,
414  om,
415  sizeof(bb_ilistreq_msg_t));
416 
417  __wait_mutex->lock();
418  __fnc->enqueue(omsg);
419  while (! __m ||
420  (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
421  if ( __m ) {
422  __m->unref();
423  __m = NULL;
424  }
425  __wait_cond->wait();
426  }
427  __wait_mutex->unlock();
428 
431  while ( bbilc->has_next() ) {
432  size_t iisize;
433  bb_iinfo_msg_t *ii = bbilc->next(&iisize);
434  infl->append(ii->type, ii->id, ii->hash, ii->serial,
435  ii->has_writer, ii->num_readers);
436  }
437 
438  __m->unref();
439  __m = NULL;
440 
441  return infl;
442 }
443 
444 
445 /** We are no longer registered in Fawkes network client.
446  * Ignored.
447  * @param id the id of the calling client
448  */
449 void
450 RemoteBlackBoard::deregistered(unsigned int id) throw()
451 {
452 }
453 
454 
455 void
457  unsigned int id) throw()
458 {
459  __mutex->lock();
460  __inbound_thread = Thread::current_thread()->name();
461  __mutex->unlock();
462 
463  if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
464  unsigned int msgid = m->msgid();
465  try {
466  if ( msgid == MSG_BB_DATA_CHANGED ) {
467  unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
468  if ( __proxies.find(serial) != __proxies.end() ) {
469  __proxies[serial]->process_data_changed(m);
470  }
471  } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
472  unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
473  if ( __proxies.find(serial) != __proxies.end() ) {
474  __proxies[serial]->process_interface_message(m);
475  }
476  } else if (msgid == MSG_BB_READER_ADDED) {
478  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
479  __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
480  }
481  } else if (msgid == MSG_BB_READER_REMOVED) {
483  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
484  __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
485  }
486  } else if (msgid == MSG_BB_WRITER_ADDED) {
488  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
489  __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
490  }
491  } else if (msgid == MSG_BB_WRITER_REMOVED) {
493  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
494  __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
495  }
496  } else if (msgid == MSG_BB_INTERFACE_CREATED) {
497  bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
498  __notifier->notify_of_interface_created(em->type, em->id);
499  } else if (msgid == MSG_BB_INTERFACE_DESTROYED) {
500  bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
501  __notifier->notify_of_interface_destroyed(em->type, em->id);
502  } else {
503  __wait_mutex->lock();
504  __m = m;
505  __m->ref();
506  __wait_cond->wake_all();
507  __wait_mutex->unlock();
508  }
509  } catch (Exception &e) {
510  // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
511  }
512  }
513 
514  __mutex->lock();
515  __inbound_thread = NULL;
516  __mutex->unlock();
517 }
518 
519 
520 void
521 RemoteBlackBoard::connection_died(unsigned int id) throw()
522 {
523  // mark all assigned interfaces as invalid
524  __proxies.lock();
525  for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
526  __pit->second->interface()->set_validity(false);
527  __invalid_proxies.push_back(__pit->second);
528  }
529  __proxies.clear();
530  __proxies.unlock();
531  __wait_cond->wake_all();
532 }
533 
534 
535 void
537 {
538 }
539 
540 } // end namespace fawkes