24 #include <netcomm/fawkes/client.h>
25 #include <netcomm/fawkes/client_handler.h>
26 #include <netcomm/fawkes/message_queue.h>
27 #include <netcomm/fawkes/transceiver.h>
28 #include <netcomm/socket/stream.h>
29 #include <netcomm/utils/exceptions.h>
31 #include <core/threading/thread.h>
32 #include <core/threading/mutex.h>
33 #include <core/threading/mutex_locker.h>
34 #include <core/threading/wait_condition.h>
35 #include <core/exceptions/system.h>
54 :
Exception(
"A handler for this component has already been registered")
78 __outbound_mutex =
new Mutex();
81 __outbound_active = 0;
82 __outbound_msgq = __outbound_msgqs[0];
88 for (
unsigned int i = 0; i < 2; ++i) {
89 while ( ! __outbound_msgqs[i]->empty() ) {
92 __outbound_msgqs[i]->pop();
95 delete __outbound_msgqs[0];
96 delete __outbound_msgqs[1];
97 delete __outbound_mutex;
102 __parent->set_send_slave_alive();
109 while ( __outbound_havemore ) {
110 __outbound_mutex->
lock();
111 __outbound_havemore =
false;
113 __outbound_active = 1 - __outbound_active;
114 __outbound_msgq = __outbound_msgqs[__outbound_active];
115 __outbound_mutex->
unlock();
117 if ( ! q->empty() ) {
121 __parent->connection_died();
152 __outbound_mutex->
lock();
153 __outbound_msgq->push(message);
154 __outbound_havemore =
true;
155 __outbound_mutex->
unlock();
165 Mutex *__outbound_mutex;
166 unsigned int __outbound_active;
167 bool __outbound_havemore;
190 :
Thread(
"FawkesNetworkClientRecvThread")
195 __recv_mutex = recv_mutex;
201 while ( ! __inbound_msgq->empty() ) {
204 __inbound_msgq->pop();
206 delete __inbound_msgq;
212 std::list<unsigned int> wakeup_list;
219 __inbound_msgq->
lock();
220 while ( ! __inbound_msgq->empty() ) {
222 wakeup_list.push_back(m->
cid());
223 __parent->dispatch_message(m);
225 __inbound_msgq->pop();
232 wakeup_list.unique();
233 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
234 __parent->wake_handlers(*i);
243 __parent->set_recv_slave_alive();
261 __parent->connection_died();
268 __parent->connection_died();
300 __hostname = strdup(hostname);
301 __ip = ip ? strdup(ip) : NULL;
308 connection_died_recently =
false;
309 __send_slave_alive =
false;
310 __recv_slave_alive =
false;
312 slave_status_mutex =
new Mutex();
317 __recv_mutex =
new Mutex();
319 __connest_mutex =
new Mutex();
322 __connest_interrupted =
false;
340 connection_died_recently =
false;
341 __send_slave_alive =
false;
342 __recv_slave_alive =
false;
344 slave_status_mutex =
new Mutex();
349 __recv_mutex =
new Mutex();
351 __connest_mutex =
new Mutex();
354 __connest_interrupted =
false;
365 unsigned short int port,
const char *ip)
367 __hostname = strdup(hostname);
368 __ip = ip ? strdup(ip) : NULL;
375 connection_died_recently =
false;
376 __send_slave_alive =
false;
377 __recv_slave_alive =
false;
379 slave_status_mutex =
new Mutex();
384 __recv_mutex =
new Mutex();
386 __connest_mutex =
new Mutex();
389 __connest_interrupted =
false;
399 if (__hostname) free(__hostname);
400 if (__ip) free(__ip);
401 delete slave_status_mutex;
403 delete __connest_waitcond;
404 delete __connest_mutex;
405 delete __recv_waitcond;
417 if ( __hostname == NULL && __ip == NULL) {
426 connection_died_recently =
false;
430 s->
connect(__ip ? __ip : __hostname, __port);
432 __send_slave->
start();
434 __recv_slave->
start();
436 connection_died_recently =
true;
437 if ( __send_slave ) {
439 __send_slave->
join();
443 if ( __recv_slave ) {
445 __recv_slave->
join();
449 __send_slave_alive =
false;
450 __recv_slave_alive =
false;
456 __connest_mutex->
lock();
457 while ( ! __connest && ! __connest_interrupted ) {
458 __connest_waitcond->
wait();
460 bool interrupted = __connest_interrupted;
461 __connest_interrupted =
false;
462 __connest_mutex->
unlock();
467 notify_of_connection_established();
493 if (__hostname) free(__hostname);
494 if (__ip) free(__ip);
495 __hostname = strdup(hostname);
496 __ip = ip ? strdup(ip) : NULL;
505 if ( s == NULL )
return;
507 if ( __send_slave_alive ) {
508 if ( ! connection_died_recently ) {
514 __send_slave->
join();
518 if ( __recv_slave_alive ) {
520 __recv_slave->
join();
524 __send_slave_alive =
false;
525 __recv_slave_alive =
false;
529 if (! connection_died_recently) {
542 __connest_mutex->
lock();
543 __connest_interrupted =
true;
545 __connest_mutex->
unlock();
563 if (__send_slave) __send_slave->
enqueue(message);
580 unsigned int timeout_sec)
582 if (__send_slave && __recv_slave) {
583 __recv_mutex->
lock();
584 if ( __recv_received.find(message->
cid()) != __recv_received.end()) {
586 unsigned int cid = message->
cid();
587 throw Exception(
"There is already a thread waiting for messages of "
588 "component id %u", cid);
590 __send_slave->
enqueue(message);
591 unsigned int cid = message->
cid();
592 __recv_received[cid] =
false;
593 while (!__recv_received[cid] && ! connection_died_recently) {
595 __recv_received.erase(cid);
597 throw TimeoutException(
"Timeout reached while waiting for incoming message "
598 "(outgoing was %u:%u)", message->
cid(), message->
msgid());
601 __recv_received.erase(cid);
604 unsigned int cid = message->
cid();
605 unsigned int msgid = message->
msgid();
606 throw Exception(
"Cannot enqueue given message %u:%u, sender or "
607 "receiver missing", cid, msgid);
621 unsigned int component_id)
624 if ( handlers.find(component_id) != handlers.end() ) {
628 handlers[component_id] = handler;
642 if ( handlers.find(component_id) != handlers.end() ) {
643 handlers[component_id]->deregistered(_id);
644 handlers.erase(component_id);
647 __recv_mutex->
lock();
648 if (__recv_received.find(component_id) != __recv_received.end()) {
649 __recv_received[component_id] =
true;
659 unsigned int cid = m->
cid();
661 if (handlers.find(cid) != handlers.end()) {
662 handlers[cid]->inbound_received(m, _id);
669 FawkesNetworkClient::wake_handlers(
unsigned int cid)
671 __recv_mutex->
lock();
672 if (__recv_received.find(cid) != __recv_received.end()) {
673 __recv_received[cid] =
true;
680 FawkesNetworkClient::notify_of_connection_dead()
682 __connest_mutex->
lock();
684 __connest_mutex->
unlock();
687 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
688 i->second->connection_died(_id);
692 __recv_mutex->
lock();
698 FawkesNetworkClient::notify_of_connection_established()
701 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
702 i->second->connection_established(_id);
709 FawkesNetworkClient::connection_died()
711 connection_died_recently =
true;
712 notify_of_connection_dead();
717 FawkesNetworkClient::set_send_slave_alive()
719 slave_status_mutex->
lock();
720 __send_slave_alive =
true;
721 if ( __send_slave_alive && __recv_slave_alive ) {
722 __connest_mutex->
lock();
725 __connest_mutex->
unlock();
727 slave_status_mutex->
unlock();
732 FawkesNetworkClient::set_recv_slave_alive()
734 slave_status_mutex->
lock();
735 __recv_slave_alive =
true;
736 if ( __send_slave_alive && __recv_slave_alive ) {
737 __connest_mutex->
lock();
740 __connest_mutex->
unlock();
742 slave_status_mutex->
unlock();
756 __recv_mutex->
lock();
757 if ( __recv_received.find(component_id) != __recv_received.end()) {
759 throw Exception(
"There is already a thread waiting for messages of "
760 "component id %u", component_id);
762 __recv_received[component_id] =
false;
763 while (! __recv_received[component_id] && ! connection_died_recently) {
765 __recv_received.erase(component_id);
767 throw TimeoutException(
"Timeout reached while waiting for incoming message "
768 "(component %u)", component_id);
771 __recv_received.erase(component_id);
784 __recv_mutex->
lock();
785 if ( __recv_received.find(component_id) != __recv_received.end()) {
786 __recv_received[component_id] =
true;
799 return (! connection_died_recently && (s != NULL));
820 throw Exception(
"Trying to get the ID of a client that has no ID");