scheduler.hpp
Go to the documentation of this file.
1 
5 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan SarĂ©n. All rights reserved.
6  *
7  * You can redistribute this software and/or modify it under the terms
8  * of the GNU General Public License as published by the Free Software
9  * Foundation; either version 2 of the License, or (at your option)
10  * any later version.
11  *
12  * This library is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this library (file "COPYING" included in the package);
19  * if not, write to the Free Software Foundation, Inc., 51 Franklin
20  * Street, Fifth Floor, Boston, MA 02110-1301 USA
21  *
22  * If you have questions about your rights to use or distribute this
23  * software, please contact Berkeley Lab's Technology Transfer
24  * Department at TTD@lbl.gov. Other questions, comments and bug
25  * reports should be sent directly to the author via email at
26  * taneli.kalvas@jyu.fi.
27  *
28  * NOTICE. This software was developed under partial funding from the
29  * U.S. Department of Energy. As such, the U.S. Government has been
30  * granted for itself and others acting on its behalf a paid-up,
31  * nonexclusive, irrevocable, worldwide license in the Software to
32  * reproduce, prepare derivative works, and perform publicly and
33  * display publicly. Beginning five (5) years after the date
34  * permission to assert copyright is obtained from the U.S. Department
35  * of Energy, and subject to any subsequent five (5) year renewals,
36  * the U.S. Government is granted for itself and others acting on its
37  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
38  * the Software to reproduce, prepare derivative works, distribute
39  * copies to the public, perform publicly and display publicly, and to
40  * permit others to do so.
41  */
42 
43 #ifndef SCHEDULER_HPP
44 #define SCHEDULER_HPP 1
45 
46 
47 #include <pthread.h>
48 #include <stdint.h>
49 #include <iostream>
50 #include <vector>
51 #include <deque>
52 #include <time.h>
53 #include "comptime.hpp"
54 
55 
56 //#define SCHEDULER_DEBUG 1
57 
58 
59 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
60 
61 
86 template <class Solv, class Prob, class Err>
87 class Scheduler {
88 
89  class Consumer {
90 
91  /*
92  enum consumer_status_e {
93  CONSUMER_CREATED = 0,
94  CONSUMER_RUNNING,
95  CONSUMER_FINISHED
96  };
97  */
98 
99  //pthread_mutex_t _mutex; //!< \brief Mutex for active check
100  pthread_t _thread;
101  Solv *_solver;
102  Scheduler *_scheduler;
103  //struct timeval _t0;
104  //std::vector<struct timeval> _t;
105 
106  void *consumer_main( void ) {
107  //struct timeval t;
108 
109 #ifdef SCHEDULER_DEBUG
110  std::cout << "Consumer main entrance\n";
111 #endif
112  //pthread_mutex_lock( &_mutex );
113  //_status = CONSUMER_RUNNING;
114  //pthread_mutex_unlock( &_mutex );
115 
116  Prob *p;
117  uint32_t pi;
118  while( (p = _scheduler->get_next_problem( pi )) ) {
119  try {
120  //gettimeofday( &t, NULL );
121  //_t.push_back( t );
122  (*_solver)( p, pi );
123  //gettimeofday( &t, NULL );
124  //_t.push_back( t );
125  } catch( Err e ) {
126  //std::cout << "on_error\n";
127  // Handle error and stop solving
128  _scheduler->on_error( e, pi );
129  break;
130  };
131  _scheduler->inc_solved_problem();
132  }
133 
134 #ifdef SCHEDULER_DEBUG
135  std::cout << "Exiting consumer\n";
136 #endif
137  //pthread_mutex_lock( &_mutex );
138  //_status = CONSUMER_FINISHED;
139  //pthread_mutex_unlock( &_mutex );
140  return( NULL );
141  }
142 
143  public:
144 
145  static void *consumer_entry( void *data ) {
146  Consumer *consumer = (Consumer *)data;
147  return( consumer->consumer_main() );
148  }
149 
150  Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) {
151 
152  //pthread_mutex_init( &_mutex, NULL );
153 #ifdef SCHEDULER_DEBUG
154  std::cout << "Consumer constructor\n";
155 #endif
156  //gettimeofday( &_t0, NULL );
157  }
158 
159  ~Consumer() {
160  //pthread_mutex_lock( &cout_mutex );
161 #ifdef SCHEDULER_DEBUG
162  std::cout << "Consumer destructor\n";
163 #endif
164  //for( size_t a = 0; a < _t.size(); a++ ) {
165  //std::cout << (_t[a].tv_sec-_t0.tv_sec) +
166  //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
167  //a++;
168  //std::cout << (_t[a].tv_sec-_t0.tv_sec) +
169  //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
170  //}
171  //pthread_mutex_unlock( &cout_mutex );
172  }
173 
174  void run( void ) {
175  pthread_create( &_thread, NULL, consumer_entry, (void *)this );
176  }
177 
178  void join( void ) {
179 
180 #ifdef SCHEDULER_DEBUG
181  std::cout << "Consumer join\n";
182 #endif
183  //pthread_mutex_lock( &_mutex );
184  //if( _status == CONSUMER_FINISHED ) {
185  //pthread_mutex_unlock( &_mutex );
186  //return;
187  //} else if( _status == CONSUMER_CREATED ) {
188  //
189  //}
190  //pthread_mutex_unlock( &_mutex );
191  pthread_join( _thread, NULL );
192  }
193 
194  };
195 
196 
197  pthread_mutex_t _mutex;
198  pthread_cond_t _scheduler_cond;
199  pthread_cond_t _producer_cond;
200  pthread_cond_t _consumer_cond;
201 
202  //size_t _problems_in_c; //!< \brief Total problems in count
203  //size_t _problems_err_c; //!< \brief Total error problems out count
204  //std::deque<Prob*> _problems_out; //!< \brief Problems already solved
205 
206  uint32_t _read_c;
207  uint32_t _solved_c;
208  std::vector<Prob *> &_problems;
209 
210  pthread_t _scheduler_thread;
211  std::vector<Consumer *> _consumers;
212 
213  bool _join;
214  bool _running;
215  bool _error;
216  bool _done;
217  bool _finish;
218  std::vector<Err> _err;
219  std::vector<int32_t> _eprob;
220 
221 
228  void on_error( Err &e, uint32_t pi ) {
229  pthread_mutex_lock( &_mutex );
230  _err.push_back( e );
231  _eprob.push_back( pi );
232  _error = true;
233  pthread_cond_broadcast( &_scheduler_cond );
234  pthread_mutex_unlock( &_mutex );
235  }
236 
237 
240  void inc_solved_problem( void ) {
241  pthread_mutex_lock( &_mutex );
242  _solved_c++;
243  pthread_mutex_unlock( &_mutex );
244  }
245 
252  Prob *get_next_problem( uint32_t &pi ) {
253 #ifdef SCHEDULER_DEBUG
254  std::cout << "get_next_problem()\n";
255 #endif
256  pthread_mutex_lock( &_mutex );
257 
258  if( _done || _error ) {
259  pthread_mutex_unlock( &_mutex );
260 #ifdef SCHEDULER_DEBUG
261  std::cout << "get_next_problem(): Returning NULL\n";
262 #endif
263  pi = -1;
264  return( NULL );
265  }
266 
267  if( _problems.size() == _read_c ) {
268 #ifdef SCHEDULER_DEBUG
269  std::cout << "get_next_problem(): No problem to return... waiting\n";
270 #endif
271  // Signal producer that problems are spent
272  pthread_cond_signal( &_scheduler_cond );
273  while( _problems.size() == _read_c ) {
274  // Wait for new problems
275  pthread_cond_wait( &_consumer_cond, &_mutex );
276  if( _done || _error ) {
277  pthread_mutex_unlock( &_mutex );
278 #ifdef SCHEDULER_DEBUG
279  std::cout << "get_next_problem(): Returning NULL\n";
280 #endif
281  pi = -1;
282  return( NULL );
283  }
284  }
285  }
286 
287  // Return next problem
288  pi = _read_c++;
289  Prob *ret = _problems[pi];
290 
291 #ifdef SCHEDULER_DEBUG
292  std::cout << "get_next_problem(): Returning problem " << pi << "\n";
293 #endif
294 
295  pthread_mutex_unlock( &_mutex );
296  return( ret );
297  }
298 
299 
302  void *scheduler_main( void ) {
303 
304 #ifdef SCHEDULER_DEBUG
305  std::cout << "Running scheduler_main()\n";
306 #endif
307 
308  // Start consumer threads
309  for( size_t a = 0; a < _consumers.size(); a++ )
310  _consumers[a]->run();
311 
312  pthread_mutex_lock( &_mutex );
313 
314  while( 1 ) {
315  // Wait until all consumers are done with all problems or error occurs
316  while( !(_problems.size() == _solved_c || _done || _error) ) {
317  //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
318  pthread_cond_wait( &_scheduler_cond, &_mutex );
319  }
320 
321  if( (_finish && _problems.size() == _solved_c) || _done || _error )
322  break;
323 
324  // Problems temporarily done
325  pthread_cond_wait( &_scheduler_cond, &_mutex );
326  //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
327  //<< " prob_out = " << _problems_out_c << "\n";
328  //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
329 
330  // Signal consumers to wake up
331  pthread_cond_broadcast( &_consumer_cond );
332  }
333 
334  // Broadcast: done
335  _done = true;
336  _running = false;
337  pthread_cond_broadcast( &_consumer_cond );
338  pthread_mutex_unlock( &_mutex );
339 
340  // Join all consumers
341  //std::cout << "scheduler_main(): Scheduler waiting in join\n";
342  for( size_t a = 0; a < _consumers.size(); a++ )
343  _consumers[a]->join();
344 
345  pthread_cond_broadcast( &_producer_cond );
346  //std::cout << "scheduler_main(): Exiting scheduler\n";
347  return( NULL );
348  }
349 
350 
353  static void *scheduler_entry( void *data ) {
354  Scheduler *scheduler = (Scheduler *)data;
355  return( scheduler->scheduler_main() );
356  }
357 
358 
359 public:
360 
361 
366  Scheduler( std::vector<Prob *> &prob )
367  : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false),
368  _error(false), _done(false), _finish(false) {
369 
370  // Initialize pthread objects
371  pthread_mutex_init( &_mutex, NULL );
372  pthread_cond_init( &_scheduler_cond, NULL );
373  pthread_cond_init( &_consumer_cond, NULL );
374  pthread_cond_init( &_producer_cond, NULL );
375  }
376 
377 
381 
382  // Force exit
383  _done = true;
384  finish();
385 
386  pthread_mutex_destroy( &_mutex );
387  pthread_cond_destroy( &_scheduler_cond );
388  pthread_cond_destroy( &_consumer_cond );
389  pthread_cond_destroy( &_producer_cond );
390  }
391 
392 
395  bool is_error( void ) {
396  // No mutex needed for one bit read
397  return( _error );
398  }
399 
400 
403  bool is_running( void ) {
404  // No mutex needed for one bit read
405  return( _running );
406  }
407 
408 
411  uint32_t get_solved_count( void ) {
412  pthread_mutex_lock( &_mutex );
413  uint32_t ret = _solved_c;
414  pthread_mutex_unlock( &_mutex );
415  return( ret );
416  }
417 
418 
421  uint32_t get_problem_count( void ) {
422  pthread_mutex_lock( &_mutex );
423  uint32_t ret = _problems.size();
424  pthread_mutex_unlock( &_mutex );
425  return( ret );
426  }
427 
428 
437  template <class Cont1, class Cont2>
438  size_t get_errors( Cont1 &e, Cont2 &pi ) {
439  pthread_mutex_lock( &_mutex );
440  size_t r = _err.size();
441  for( size_t a = 0; a < _err.size(); a++ ) {
442  e.push_back( _err[a] );
443  pi.push_back( _eprob[a] );
444  }
445  _err.clear();
446  _eprob.clear();
447  pthread_mutex_unlock( &_mutex );
448  return( r );
449  }
450 
451 
458  void run( std::vector<Solv *> solv ) {
459 
460  // Do nothing if already running
461  if( _running )
462  return;
463 
464  // Create consumer threads
465  for( size_t a = 0; a < solv.size(); a++ )
466  _consumers.push_back( new Consumer( solv[a], this ) );
467 
468  _read_c = 0;
469  _solved_c = 0;
470  _join = true;
471  _running = true;
472  _error = false;
473  _done = false;
474  _finish = false;
475  _err.clear();
476  _eprob.clear();
477 
478  // Create scheduler thread
479  pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
480  }
481 
482 
485  void lock_mutex( void ) {
486 
487  pthread_mutex_lock( &_mutex );
488  }
489 
490 
493  void unlock_mutex( void ) {
494 
495  pthread_cond_broadcast( &_scheduler_cond );
496  pthread_mutex_unlock( &_mutex );
497  }
498 
499 
507  bool force_exit( void ) {
508 
509  _done = true;
510  return( finish() );
511  }
512 
519  bool wait_finish( void ) {
520 
521  pthread_mutex_lock( &_mutex );
522  if( _running ) {
523  _finish = true;
524  pthread_cond_broadcast( &_scheduler_cond );
525 
526  struct timespec ts;
527  ibs_clock_gettime( CLOCK_REALTIME, &ts );
528  ts.tv_sec += 1;
529  int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts );
530  if( rc == ETIMEDOUT ) {
531  pthread_mutex_unlock( &_mutex );
532  return( false );
533  }
534  }
535  pthread_mutex_unlock( &_mutex );
536  return( true );
537  }
538 
547  bool finish( void ) {
548 
549  pthread_mutex_lock( &_mutex );
550  if( _running ) {
551  _finish = true;
552  //std::cout << "finish(): scheduler_cond broadcast\n";
553  pthread_cond_broadcast( &_scheduler_cond );
554 
555  //std::cout << "finish(): producer_cond wait\n";
556  pthread_cond_wait( &_producer_cond, &_mutex );
557  }
558  pthread_mutex_unlock( &_mutex );
559 
560  if( _join ) {
561  // Delete consumer threads
562  for( size_t a = 0; a < _consumers.size(); a++ )
563  delete _consumers[a];
564  _consumers.clear();
565 
566  pthread_join( _scheduler_thread, NULL );
567  _join = false;
568  }
569 
570  if( _error )
571  return( false );
572 
573  return( true );
574  }
575 
576  friend class Consumer;
577 };
578 
579 
580 
581 #endif
582 
Scheduler class for implementing consumer-producer threading.
Definition: scheduler.hpp:87
void run(std::vector< Solv * > solv)
Run threads with N solvers.
Definition: scheduler.hpp:458
uint32_t get_solved_count(void)
Return number of solved problems.
Definition: scheduler.hpp:411
friend class Consumer
Definition: scheduler.hpp:576
uint32_t get_problem_count(void)
Return number of problems.
Definition: scheduler.hpp:421
~Scheduler()
Destructor.
Definition: scheduler.hpp:380
void lock_mutex(void)
Lock mutex for adding problems.
Definition: scheduler.hpp:485
bool is_error(void)
Return true on errors.
Definition: scheduler.hpp:395
bool wait_finish(void)
Call for solvers to finish when problems are solved.
Definition: scheduler.hpp:519
bool is_running(void)
Return true if scheduler is running.
Definition: scheduler.hpp:403
bool force_exit(void)
Force exit from scheduler.
Definition: scheduler.hpp:507
bool finish(void)
Wait for all problems to be solved.
Definition: scheduler.hpp:547
Scheduler(std::vector< Prob * > &prob)
Constructor.
Definition: scheduler.hpp:366
size_t get_errors(Cont1 &e, Cont2 &pi)
Fetch errors and indices of corresponding problems.
Definition: scheduler.hpp:438
void unlock_mutex(void)
Unlock mutex.
Definition: scheduler.hpp:493