Fawkes API  Fawkes Development Version
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
base_thread.cpp
1 
2 /***************************************************************************
3  * base_thread.cpp - FireVision Base Thread
4  *
5  * Created: Tue May 29 16:41:50 2007
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "base_thread.h"
24 #include "acquisition_thread.h"
25 #include "aqt_vision_threads.h"
26 
27 #include <core/threading/thread.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/threading/barrier.h>
31 #include <logging/logger.h>
32 
33 #include <fvutils/system/camargp.h>
34 #include <fvutils/ipc/shm_image.h>
35 #include <fvutils/ipc/shm_lut.h>
36 #include <fvcams/factory.h>
37 #include <fvcams/cam_exceptions.h>
38 #include <fvcams/control/factory.h>
39 #include <core/exceptions/software.h>
40 
41 #include <aspect/vision.h>
42 
43 #include <algorithm>
44 #include <unistd.h>
45 
46 using namespace fawkes;
47 using namespace firevision;
48 
49 /** @class FvBaseThread "base_thread.h"
50  * FireVision base thread.
51  * This implements the functionality of the FvBasePlugin.
52  * @author Tim Niemueller
53  */
54 
55 /** Constructor. */
57  : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
58  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
59  VisionMasterAspect(this)
60 {
61  // default to 30 seconds
62  __aqt_timeout = 30;
63  __aqt_barrier = new Barrier(1);
64 }
65 
66 
67 /** Destructor. */
69 {
70  delete __aqt_barrier;
71 }
72 
73 
74 void
76 {
77  // wipe all previously existing FireVision shared memory segments
78  // that are orphaned
79  SharedMemoryImageBuffer::cleanup(/* use lister */ false);
80  SharedMemoryLookupTable::cleanup(/* use lister */ false);
81 }
82 
83 
84 void
86 {
87  __aqts.lock();
88  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
89  thread_collector->remove(__ait->second);
90  delete __ait->second;
91  }
92  __aqts.clear();
93  __aqts.unlock();
94  __owned_controls.lock();
96  for (i = __owned_controls.begin(); i != __owned_controls.end(); ++i) {
97  delete *i;
98  }
99  __owned_controls.clear();
100  __owned_controls.unlock();
101 }
102 
103 
104 /** Thread loop. */
105 void
107 {
108  __aqts.lock();
109 
110  try {
111  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
112  __ait->second->set_vt_prepfin_hold(true);
113  }
114  } catch (Exception &e) {
115  logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
116  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
117  __ait->second->set_vt_prepfin_hold(false);
118  }
119  __aqts.unlock();
120  return;
121  }
122 
123  // Wakeup all cyclic acquisition threads and wait for them
124  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
125  if ( __ait->second->aqtmode() == FvAcquisitionThread::AqtCyclic ) {
126  //logger->log_debug(name(), "Waking Thread %s", __ait->second->name());
127  __ait->second->wakeup(__aqt_barrier);
128  }
129  }
130 
131  __aqt_barrier->wait();
132 
133  // Check for aqt timeouts
134  for (__ait = __aqts.begin(); __ait != __aqts.end();) {
135  if ( __ait->second->vision_threads->empty() &&
136  (__ait->second->vision_threads->empty_time() > __aqt_timeout) ) {
137 
138  logger->log_info(name(), "Acquisition thread %s timed out, destroying",
139  __ait->second->name());
140 
141 
142  thread_collector->remove(__ait->second);
143  delete __ait->second;
144  __aqts.erase(__ait++);
145  } else {
146  ++__ait;
147  }
148  }
149 
150  __started_threads.lock();
151  fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = __started_threads.begin();
152  while (stit != __started_threads.end()) {
153 
154  logger->log_info(name(), "Thread %s has been started, %zu",
155  stit->second->name(), __started_threads.size());
156 
157  // if the thread is registered in that aqt mark it running
158  stit->second->vision_threads->set_thread_running(stit->first);
159 
160  if ( stit->second->vision_threads->has_cyclic_thread() ) {
161  if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic ) {
162  logger->log_info(name(), "Switching acquisition thread %s to cyclic mode",
163  stit->second->name());
164 
165  stit->second->prepare_finalize();
166  stit->second->cancel();
167  stit->second->join();
168  stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
169  stit->second->start();
170  stit->second->cancel_finalize();
171  }
172  } else if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
173  logger->log_info(name(), "Switching acquisition thread %s to continuous mode",
174  stit->second->name());
175  stit->second->prepare_finalize();
176  stit->second->cancel();
177  stit->second->join();
178  stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
179  stit->second->start();
180  stit->second->cancel_finalize();
181  }
182 
183  // Make thread actually capture data
184  stit->second->set_enabled(true);
185 
187  ++stit;
188  __started_threads.erase( stittmp );
189  }
190  __started_threads.unlock();
191 
192  // Re-create barrier as necessary after _adding_ threads
193  unsigned int num_cyclic_threads = 0;
194  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
195  if ( __ait->second->vision_threads->has_cyclic_thread() ) {
196  ++num_cyclic_threads;
197  }
198  }
199  cond_recreate_barrier(num_cyclic_threads);
200 
201  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
202  __ait->second->set_vt_prepfin_hold(false);
203  }
204 
205  __aqts.unlock();
206 }
207 
208 
209 /** Get vision master.
210  * @return vision master
211  */
212 VisionMaster *
214 {
215  return this;
216 }
217 
218 
219 Camera *
220 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread,
221  colorspace_t cspace)
222 {
223  Camera *c = NULL;
224  __aqts.lock();
225 
226  logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
227 
228  VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
229  if ( vision_thread == NULL ) {
230  throw TypeMismatchException("Thread is not a vision thread");
231  }
232 
233  CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
234  try {
235  std::string id = cap->cam_type() + "." + cap->cam_id();
236  if ( __aqts.find(id) != __aqts.end() ) {
237  // this camera has already been loaded
238  c = __aqts[id]->camera_instance(cspace,
239  (vision_thread->vision_thread_mode() ==
240  VisionAspect::CONTINUOUS));
241 
242  __aqts[id]->vision_threads->add_waiting_thread(thread);
243 
244  } else {
245  Camera *cam = NULL;
246  try {
247  cam = CameraFactory::instance(cap);
248  cam->open();
249  cam->start();
250  } catch (Exception &e) {
251  delete cam;
252  e.append("Could not open or start camera");
253  throw;
254  }
255 
256  FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
257 
258  c = aqt->camera_instance(cspace, (vision_thread->vision_thread_mode() ==
259  VisionAspect::CONTINUOUS));
260 
261  aqt->vision_threads->add_waiting_thread(thread);
262 
263  __aqts[id] = aqt;
264  thread_collector->add(aqt);
265 
266  // no need to recreate barrier, by default aqts operate in continuous mode
267 
268  logger->log_info(name(), "Acquisition thread '%s' started for thread '%s' and camera '%s'",
269  aqt->name(), thread->name(), id.c_str());
270 
271  }
272 
273  thread->add_notification_listener(this);
274 
275  } catch (UnknownCameraTypeException &e) {
276  delete cap;
277  e.append("FvBaseVisionMaster: could not instantiate camera");
278  __aqts.unlock();
279  throw;
280  } catch (Exception &e) {
281  delete cap;
282  e.append("FvBaseVisionMaster: could not open or start camera");
283  __aqts.unlock();
284  throw;
285  }
286 
287  delete cap;
288 
289  __aqts.unlock();
290  return c;
291 }
292 
293 
294 Camera *
295 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
296 {
297  Camera *camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
298  CameraArgumentParser cap(camera_string);
299  try {
300  std::string id = cap.cam_type() + "." + cap.cam_id();
301  __aqts.lock();
302  if ( __aqts.find(id) != __aqts.end() ) {
303  __aqts[id]->raw_subscriber_thread = thread;
304  }
305  __aqts.unlock();
306  } catch (Exception &e) {
307  __aqts.unlock();
308  throw;
309  }
310  return camera;
311 }
312 
314 FvBaseThread::create_camctrl(const char *camera_string)
315 {
316  CameraControl *cc = CameraControlFactory::instance(camera_string);
317  if (cc) {
318  __owned_controls.lock();
319  __owned_controls.push_back(cc);
320  __owned_controls.sort();
321  __owned_controls.unique();
322  __owned_controls.unlock();
323  return cc;
324  } else {
325  throw Exception("Cannot create camera control of desired type");
326  }
327 }
328 
330 FvBaseThread::acquire_camctrl(const char *cam_string)
331 {
332  CameraArgumentParser cap(cam_string);
333  std::string id = cap.cam_type() + "." + cap.cam_id();
334 
335  // Has this camera been loaded?
336  MutexLocker lock(__aqts.mutex());
337  if (__aqts.find(id) != __aqts.end()) {
338  return CameraControlFactory::instance(__aqts[id]->get_camera());
339  } else {
340  return create_camctrl(cam_string);
341  }
342 }
343 
344 
346 FvBaseThread::acquire_camctrl(const char *cam_string,
347  const std::type_info &typeinf)
348 {
349  CameraArgumentParser cap(cam_string);
350  std::string id = cap.cam_type() + "." + cap.cam_id();
351 
352  // Has this camera been loaded?
353  MutexLocker lock(__aqts.mutex());
354  if (__aqts.find(id) != __aqts.end()) {
355  return CameraControlFactory::instance(typeinf, __aqts[id]->get_camera());
356  } else {
357  return create_camctrl(cam_string);
358  }
359 }
360 
361 
362 void
364 {
365  __owned_controls.lock();
367  if ((f = std::find(__owned_controls.begin(), __owned_controls.end(), cc)) != __owned_controls.end()) {
368  delete *f;
369  __owned_controls.erase(f);
370  }
371  __owned_controls.unlock();
372 }
373 
374 
375 /** Conditionally re-create barriers.
376  * Re-create barriers if the number of cyclic threads has changed.
377  * @param num_cyclic_threads new number of cyclic threads
378  */
379 void
380 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
381 {
382  if ( (num_cyclic_threads + 1) != __aqt_barrier->count() ) {
383  delete __aqt_barrier;
384  __aqt_barrier = new Barrier( num_cyclic_threads + 1 ); // +1 for base thread
385  }
386 }
387 
388 
389 void
391 {
392  __aqts.lock();
393  unsigned int num_cyclic_threads = 0;
394 
395  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
396 
397  // Remove thread from all aqts
398  __ait->second->vision_threads->remove_thread(thread);
399 
400  if (__ait->second->raw_subscriber_thread == thread) {
401  __ait->second->raw_subscriber_thread = NULL;
402  }
403 
404  if ( __ait->second->vision_threads->has_cyclic_thread() ) {
405  ++num_cyclic_threads;
406 
407  } else if (__ait->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
408  logger->log_info(name(), "Switching acquisition thread %s to continuous mode "
409  "on unregister", __ait->second->name());
410 
411  __ait->second->prepare_finalize();
412  __ait->second->cancel();
413  __ait->second->join();
414  __ait->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
415  __ait->second->start();
416  __ait->second->cancel_finalize();
417  }
418  }
419  // Recreate as necessary after _removing_ threads
420  cond_recreate_barrier(num_cyclic_threads);
421 
422  __aqts.unlock();
423 }
424 
425 
426 bool
428 {
429  __aqts.lock();
430  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
431  if (__ait->second->vision_threads->has_waiting_thread(thread)) {
432  __started_threads.lock();
433  __started_threads[thread] = __ait->second;
434  __started_threads.unlock();
435  }
436  }
437  __aqts.unlock();
438 
439  return false;
440 }
441 
442 
443 bool
445 {
446  __aqts.lock();
447  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
448  __ait->second->vision_threads->remove_waiting_thread(thread);
449  }
450  __aqts.unlock();
451 
452  return false;
453 }