24 #include <blackboard/internal/notifier.h>
25 #include <blackboard/blackboard.h>
26 #include <blackboard/interface_listener.h>
27 #include <blackboard/interface_observer.h>
29 #include <core/threading/mutex.h>
30 #include <core/threading/mutex_locker.h>
31 #include <core/utils/lock_hashset.h>
32 #include <core/utils/lock_hashmap.h>
33 #include <logging/liblogger.h>
34 #include <interface/interface.h>
56 __bbil_writer_events = 0;
57 __bbil_writer_mutex =
new Mutex();
59 __bbil_reader_events = 0;
60 __bbil_reader_mutex =
new Mutex();
62 __bbil_data_events = 0;
63 __bbil_data_mutex =
new Mutex();
65 __bbil_messages_events = 0;
66 __bbil_messages_mutex =
new Mutex();
69 __bbio_mutex =
new Mutex();
76 delete __bbil_writer_mutex;
77 delete __bbil_reader_mutex;
78 delete __bbil_data_mutex;
79 delete __bbil_messages_mutex;
107 listener->bbil_acquire_queue();
109 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
111 for (i = queue.begin(); i != queue.end(); ++i) {
115 proc_listener_maybe_queue(i->op, i->interface, listener,
116 __bbil_data_mutex, __bbil_data_events,
117 __bbil_data, __bbil_data_queue,
"data");
122 proc_listener_maybe_queue(i->op, i->interface, listener,
123 __bbil_messages_mutex, __bbil_messages_events,
124 __bbil_messages, __bbil_messages_queue,
130 proc_listener_maybe_queue(i->op, i->interface, listener,
131 __bbil_reader_mutex, __bbil_reader_events,
132 __bbil_reader, __bbil_reader_queue,
"reader");
137 proc_listener_maybe_queue(i->op, i->interface, listener,
138 __bbil_writer_mutex, __bbil_writer_events,
139 __bbil_writer, __bbil_writer_queue,
"writer");
146 listener->bbil_release_queue(flag);
150 BlackBoardNotifier::proc_listener_maybe_queue(
bool op,
153 Mutex *mutex,
unsigned int &events,
154 BBilMap &map, BBilQueue &queue,
160 "listener %s for %s events (queued)",
163 queue_listener(op, interface, listener, queue);
166 add_listener(interface, listener, map);
168 remove_listener(interface, listener, map);
183 listener->bbil_acquire_maps();
185 BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
186 for (i = maps.
data.begin(); i != maps.
data.end(); ++i) {
187 proc_listener_maybe_queue(
false, i->second, listener,
188 __bbil_data_mutex, __bbil_data_events,
189 __bbil_data, __bbil_data_queue,
"data");
193 proc_listener_maybe_queue(
false, i->second, listener,
194 __bbil_messages_mutex, __bbil_messages_events,
195 __bbil_messages, __bbil_messages_queue,
199 for (i = maps.
reader.begin(); i != maps.
reader.end(); ++i) {
200 proc_listener_maybe_queue(
false, i->second, listener,
201 __bbil_reader_mutex, __bbil_reader_events,
202 __bbil_reader, __bbil_reader_queue,
"reader");
205 for (i = maps.
writer.begin(); i != maps.
writer.end(); ++i) {
206 proc_listener_maybe_queue(
false, i->second, listener,
207 __bbil_writer_mutex, __bbil_writer_events,
208 __bbil_writer, __bbil_writer_queue,
"writer");
211 listener->bbil_release_maps();
220 BlackBoardNotifier::add_listener(
Interface *interface,
224 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
225 ilmap.equal_range(interface->
uid());
227 BBilMap::value_type v = std::make_pair(interface->
uid(), listener);
228 BBilMap::iterator f = std::find(ret.first, ret.second, v);
230 if (f == ret.second) {
231 ilmap.insert(std::make_pair(interface->
uid(), listener));
236 BlackBoardNotifier::remove_listener(Interface *interface,
237 BlackBoardInterfaceListener *listener,
240 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
241 ilmap.equal_range(interface->uid());
242 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
243 if (j->second == listener) {
252 BlackBoardNotifier::is_in_queue(
bool op, BBilQueue &queue,
const char *uid,
253 BlackBoardInterfaceListener *bbil)
255 BBilQueue::iterator q;
256 for (q = queue.begin(); q != queue.end(); ++q) {
257 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
265 BlackBoardNotifier::queue_listener(
bool op, Interface *interface,
266 BlackBoardInterfaceListener *listener,
269 BBilQueueEntry qe = { op, interface->uid(), interface, listener };
281 __bbio_mutex->
lock();
282 if (__bbio_events > 0) {
283 __bbio_queue.push_back(std::make_pair(1, observer));
299 for (i = its->begin(); i != its->end(); ++i) {
300 bbiomap[i->first].push_back(make_pair(observer, i->second));
311 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
313 BBioMapIterator i, tmp;
316 while (i != iomap.end()) {
317 BBioListIterator j = i->second.begin();
318 while (j != i->second.end()) {
319 if ( j->first == observer ) {
320 j = i->second.erase(j);
325 if ( i->second.empty() ) {
344 if ( __bbio_events > 0) {
345 BBioQueueEntry e = std::make_pair((
unsigned int)0, observer);
346 BBioQueue::iterator re;
347 while ( (re = find_if(__bbio_queue.begin(), __bbio_queue.end(),
348 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
349 != __bbio_queue.end()) {
351 if (re->second == observer) {
352 __bbio_queue.erase(re);
355 __bbio_queue.push_back(std::make_pair(0, observer));
358 remove_observer(__bbio_created, observer);
359 remove_observer(__bbio_destroyed, observer);
370 __bbio_mutex->lock();
372 __bbio_mutex->unlock();
374 BBioMapIterator lhmi;
375 BBioListIterator i, l;
376 for (lhmi = __bbio_created.begin(); lhmi != __bbio_created.end(); ++lhmi) {
377 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
continue;
379 BBioList &list = lhmi->second;
380 for (i = list.begin(); i != list.end(); ++i) {
382 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
383 if (fnmatch(pi->c_str(), id, 0) == 0) {
391 __bbio_mutex->lock();
393 process_bbio_queue();
394 __bbio_mutex->unlock();
405 __bbio_mutex->lock();
407 __bbio_mutex->unlock();
409 BBioMapIterator lhmi;
410 BBioListIterator i, l;
411 for (lhmi = __bbio_destroyed.begin(); lhmi != __bbio_destroyed.end(); ++lhmi) {
412 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
continue;
414 BBioList &list = (*lhmi).second;
415 for (i = list.begin(); i != list.end(); ++i) {
417 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
418 if (fnmatch(pi->c_str(), id, 0) == 0) {
426 __bbio_mutex->lock();
428 process_bbio_queue();
429 __bbio_mutex->unlock();
434 BlackBoardNotifier::process_bbio_queue()
436 if ( ! __bbio_queue.empty() ) {
437 if (__bbio_events > 0 ) {
440 while (! __bbio_queue.empty()) {
441 BBioQueueEntry &e = __bbio_queue.front();
443 add_observer(e.second, e.second->bbio_get_observed_create(), __bbio_created);
444 add_observer(e.second, e.second->bbio_get_observed_destroy(), __bbio_destroyed);
446 remove_observer(__bbio_created, e.second);
447 remove_observer(__bbio_destroyed, e.second);
449 __bbio_queue.pop_front();
464 unsigned int event_instance_serial)
throw()
466 __bbil_writer_mutex->lock();
467 __bbil_writer_events += 1;
468 __bbil_writer_mutex->unlock();
470 const char *uid = interface->uid();
471 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
472 __bbil_writer.equal_range(uid);
473 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
475 if (! is_in_queue(
false, __bbil_writer_queue, uid, bbil)) {
477 if (bbil_iface != NULL ) {
481 "BBIL[%s] registered for writer events "
482 "(open) for '%s' but has no such interface",
488 __bbil_writer_mutex->lock();
489 __bbil_writer_events -= 1;
490 process_writer_queue();
491 __bbil_writer_mutex->unlock();
502 unsigned int event_instance_serial)
throw()
504 __bbil_writer_mutex->lock();
505 __bbil_writer_events += 1;
506 __bbil_writer_mutex->unlock();
508 const char *uid = interface->uid();
509 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
510 __bbil_writer.equal_range(uid);
511 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
513 if (! is_in_queue(
false, __bbil_data_queue, uid, bbil)) {
515 if (bbil_iface != NULL ) {
519 "BBIL[%s] registered for writer events "
520 "(close) for '%s' but has no such interface",
526 __bbil_writer_mutex->lock();
527 __bbil_writer_events -= 1;
528 process_writer_queue();
529 __bbil_writer_mutex->unlock();
533 BlackBoardNotifier::process_writer_queue()
535 if ( ! __bbil_writer_queue.empty() ) {
536 if (__bbil_writer_events > 0 ) {
539 while (! __bbil_writer_queue.empty()) {
540 BBilQueueEntry &e = __bbil_writer_queue.front();
542 add_listener(e.interface, e.listener, __bbil_writer);
544 remove_listener(e.interface, e.listener, __bbil_writer);
546 __bbil_writer_queue.pop_front();
560 unsigned int event_instance_serial)
throw()
562 __bbil_reader_mutex->lock();
563 __bbil_reader_events += 1;
564 __bbil_reader_mutex->unlock();
566 const char *uid = interface->uid();
567 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
568 __bbil_reader.equal_range(uid);
569 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
571 if (! is_in_queue(
false, __bbil_reader_queue, uid, bbil)) {
573 if (bbil_iface != NULL ) {
577 "BBIL[%s] registered for reader events "
578 "(open) for '%s' but has no such interface",
584 __bbil_reader_mutex->lock();
585 __bbil_reader_events -= 1;
586 process_reader_queue();
587 __bbil_reader_mutex->unlock();
598 unsigned int event_instance_serial)
throw()
600 __bbil_reader_mutex->lock();
601 __bbil_reader_events += 1;
602 __bbil_reader_mutex->unlock();
604 const char *uid = interface->uid();
605 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
606 __bbil_reader.equal_range(uid);
607 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
609 if (! is_in_queue(
false, __bbil_data_queue, uid, bbil)) {
611 if (bbil_iface != NULL ) {
615 "BBIL[%s] registered for reader events "
616 "(close) for '%s' but has no such interface",
622 __bbil_reader_mutex->lock();
623 __bbil_reader_events -= 1;
624 process_reader_queue();
625 __bbil_reader_mutex->unlock();
630 BlackBoardNotifier::process_reader_queue()
632 if ( ! __bbil_reader_queue.empty() ) {
633 if (__bbil_reader_events > 0 ) {
636 while (! __bbil_reader_queue.empty()) {
637 BBilQueueEntry &e = __bbil_reader_queue.front();
639 add_listener(e.interface, e.listener, __bbil_reader);
641 remove_listener(e.interface, e.listener, __bbil_reader);
643 __bbil_reader_queue.pop_front();
662 __bbil_data_mutex->
lock();
663 __bbil_data_events += 1;
664 __bbil_data_mutex->
unlock();
666 const char *uid = interface->
uid();
667 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
668 __bbil_data.equal_range(uid);
669 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
671 if (! is_in_queue(
false, __bbil_data_queue, uid, bbil)) {
673 if (bbil_iface != NULL ) {
677 "BBIL[%s] registered for data change events "
678 "for '%s' but has no such interface",
684 __bbil_data_mutex->
lock();
685 __bbil_data_events -= 1;
686 if ( ! __bbil_data_queue.empty() ) {
687 if (__bbil_data_events == 0 ) {
688 while (! __bbil_data_queue.empty()) {
689 BBilQueueEntry &e = __bbil_data_queue.front();
691 add_listener(e.interface, e.listener, __bbil_data);
693 remove_listener(e.interface, e.listener, __bbil_data);
695 __bbil_data_queue.pop_front();
699 __bbil_data_mutex->
unlock();
716 __bbil_messages_mutex->
lock();
717 __bbil_messages_events += 1;
718 __bbil_messages_mutex->
unlock();
722 const char *uid = interface->
uid();
723 std::pair<BBilMap::iterator, BBilMap::iterator> ret =
724 __bbil_messages.equal_range(uid);
725 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
727 if (! is_in_queue(
false, __bbil_messages_queue, uid, bbil)) {
729 if (bbil_iface != NULL ) {
737 "BBIL[%s] registered for message events "
738 "for '%s' but has no such interface",
744 __bbil_messages_mutex->
lock();
745 __bbil_messages_events -= 1;
746 if ( ! __bbil_messages_queue.empty() ) {
747 if (__bbil_messages_events == 0 ) {
748 while (! __bbil_messages_queue.empty()) {
749 BBilQueueEntry &e = __bbil_messages_queue.front();
751 add_listener(e.interface, e.listener, __bbil_messages);
753 remove_listener(e.interface, e.listener, __bbil_messages);
755 __bbil_messages_queue.pop_front();
759 __bbil_messages_mutex->
unlock();