Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_node_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB__flow_graph_node_impl_H
22 #define __TBB__flow_graph_node_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
29 
31 namespace internal {
32 
36 
37  template< typename T, typename A >
38  class function_input_queue : public item_buffer<T,A> {
39  public:
40  bool empty() const {
41  return this->buffer_empty();
42  }
43 
44  const T& front() const {
45  return this->item_buffer<T, A>::front();
46  }
47 
48  bool pop( T& t ) {
49  return this->pop_front( t );
50  }
51 
52  void pop() {
53  this->destroy_front();
54  }
55 
56  bool push( T& t ) {
57  return this->push_back( t );
58  }
59  };
60 
62  // The only up-ref is apply_body_impl, which should implement the function
63  // call and any handling of the result.
64  template< typename Input, typename Policy, typename A, typename ImplType >
65  class function_input_base : public receiver<Input>, tbb::internal::no_assign {
67 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
68  , add_blt_pred, del_blt_pred,
69  blt_pred_cnt, blt_pred_cpy // create vector copies of preds and succs
70 #endif
71  };
73 
74  public:
75 
77  typedef Input input_type;
78  typedef typename receiver<input_type>::predecessor_type predecessor_type;
81  typedef typename A::template rebind< input_queue_type >::other queue_allocator_type;
83  "queueing and rejecting policies can't be specified simultaneously");
84 
85 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
86  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
87  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
88 #endif
89 
94  , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(priority))
95  , my_queue(!internal::has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
96  , forwarder_busy(false)
97  {
99  my_aggregator.initialize_handler(handler_type(this));
100  }
101 
104  : receiver<Input>(), tbb::internal::no_assign()
106  , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(src.my_priority))
107  , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
108  {
110  my_aggregator.initialize_handler(handler_type(this));
111  }
112 
114  // The queue is allocated by the constructor for {multi}function_node.
115  // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
116  // This would be an interface-breaking change.
118  if ( my_queue ) delete my_queue;
119  }
120 
123  }
124 
127  operation_type op_data(reg_pred);
128  op_data.r = &src;
129  my_aggregator.execute(&op_data);
130  return true;
131  }
132 
135  operation_type op_data(rem_pred);
136  op_data.r = &src;
137  my_aggregator.execute(&op_data);
138  return true;
139  }
140 
141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
142  void internal_add_built_predecessor( predecessor_type &src) __TBB_override {
144  operation_type op_data(add_blt_pred);
145  op_data.r = &src;
146  my_aggregator.execute(&op_data);
147  }
148 
150  void internal_delete_built_predecessor( predecessor_type &src) __TBB_override {
151  operation_type op_data(del_blt_pred);
152  op_data.r = &src;
153  my_aggregator.execute(&op_data);
154  }
155 
156  size_t predecessor_count() __TBB_override {
157  operation_type op_data(blt_pred_cnt);
158  my_aggregator.execute(&op_data);
159  return op_data.cnt_val;
160  }
161 
162  void copy_predecessors(predecessor_list_type &v) __TBB_override {
163  operation_type op_data(blt_pred_cpy);
164  op_data.predv = &v;
165  my_aggregator.execute(&op_data);
166  }
167 
168  built_predecessors_type &built_predecessors() __TBB_override {
169  return my_predecessors.built_predecessors();
170  }
171 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
172 
173  protected:
174 
176  my_concurrency = 0;
177  if(my_queue) {
178  my_queue->reset();
179  }
180  reset_receiver(f);
181  forwarder_busy = false;
182  }
183 
184  graph& my_graph_ref;
185  const size_t my_max_concurrency;
190 
193  else
195  __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
196  }
197 
199  return my_graph_ref;
200  }
201 
203  operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item
204  my_aggregator.execute(&op_data);
205  return op_data.bypass_t;
206  }
207 
208  private:
209 
212 
213  class operation_type : public aggregated_operation< operation_type > {
214  public:
215  char type;
216  union {
219 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
220  size_t cnt_val;
221  predecessor_list_type *predv;
222 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
223  };
226  type(char(t)), elem(const_cast<input_type*>(&e)) {}
227  operation_type(op_type t) : type(char(t)), r(NULL) {}
228  };
229 
231  typedef internal::aggregating_functor<class_type, operation_type> handler_type;
232  friend class internal::aggregating_functor<class_type, operation_type>;
234 
236  task* new_task = NULL;
237  if(my_queue) {
238  if(!my_queue->empty()) {
239  ++my_concurrency;
240  new_task = create_body_task(my_queue->front());
241 
242  my_queue->pop();
243  }
244  }
245  else {
246  input_type i;
247  if(my_predecessors.get_item(i)) {
248  ++my_concurrency;
249  new_task = create_body_task(i);
250  }
251  }
252  return new_task;
253  }
254  void handle_operations(operation_type *op_list) {
255  operation_type *tmp;
256  while (op_list) {
257  tmp = op_list;
258  op_list = op_list->next;
259  switch (tmp->type) {
260  case reg_pred:
261  my_predecessors.add(*(tmp->r));
262  __TBB_store_with_release(tmp->status, SUCCEEDED);
263  if (!forwarder_busy) {
264  forwarder_busy = true;
266  }
267  break;
268  case rem_pred:
269  my_predecessors.remove(*(tmp->r));
270  __TBB_store_with_release(tmp->status, SUCCEEDED);
271  break;
272  case app_body_bypass: {
273  tmp->bypass_t = NULL;
274  __TBB_ASSERT(my_max_concurrency != 0, NULL);
275  --my_concurrency;
277  tmp->bypass_t = perform_queued_requests();
278 
279  __TBB_store_with_release(tmp->status, SUCCEEDED);
280  }
281  break;
282  case tryput_bypass: internal_try_put_task(tmp); break;
283  case try_fwd: internal_forward(tmp); break;
284  case occupy_concurrency:
286  ++my_concurrency;
287  __TBB_store_with_release(tmp->status, SUCCEEDED);
288  } else {
289  __TBB_store_with_release(tmp->status, FAILED);
290  }
291  break;
292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
293  case add_blt_pred: {
294  my_predecessors.internal_add_built_predecessor(*(tmp->r));
295  __TBB_store_with_release(tmp->status, SUCCEEDED);
296  }
297  break;
298  case del_blt_pred:
299  my_predecessors.internal_delete_built_predecessor(*(tmp->r));
300  __TBB_store_with_release(tmp->status, SUCCEEDED);
301  break;
302  case blt_pred_cnt:
303  tmp->cnt_val = my_predecessors.predecessor_count();
304  __TBB_store_with_release(tmp->status, SUCCEEDED);
305  break;
306  case blt_pred_cpy:
307  my_predecessors.copy_predecessors( *(tmp->predv) );
308  __TBB_store_with_release(tmp->status, SUCCEEDED);
309  break;
310 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
311  }
312  }
313  }
314 
316  void internal_try_put_task(operation_type *op) {
317  __TBB_ASSERT(my_max_concurrency != 0, NULL);
319  ++my_concurrency;
320  task * new_task = create_body_task(*(op->elem));
321  op->bypass_t = new_task;
322  __TBB_store_with_release(op->status, SUCCEEDED);
323  } else if ( my_queue && my_queue->push(*(op->elem)) ) {
324  op->bypass_t = SUCCESSFULLY_ENQUEUED;
325  __TBB_store_with_release(op->status, SUCCEEDED);
326  } else {
327  op->bypass_t = NULL;
328  __TBB_store_with_release(op->status, FAILED);
329  }
330  }
331 
333  void internal_forward(operation_type *op) {
334  op->bypass_t = NULL;
336  op->bypass_t = perform_queued_requests();
337  if(op->bypass_t)
338  __TBB_store_with_release(op->status, SUCCEEDED);
339  else {
340  forwarder_busy = false;
341  __TBB_store_with_release(op->status, FAILED);
342  }
343  }
344 
346  operation_type op_data(t, tryput_bypass);
347  my_aggregator.execute(&op_data);
348  if( op_data.status == internal::SUCCEEDED ) {
349  return op_data.bypass_t;
350  }
351  return NULL;
352  }
353 
355  if( my_max_concurrency == 0 ) {
356  return apply_body_bypass(t);
357  } else {
358  operation_type check_op(t, occupy_concurrency);
359  my_aggregator.execute(&check_op);
360  if( check_op.status == internal::SUCCEEDED ) {
361  return apply_body_bypass(t);
362  }
363  return internal_try_put_bypass(t);
364  }
365  }
366 
368  if( my_max_concurrency == 0 ) {
369  return create_body_task(t);
370  } else {
371  return internal_try_put_bypass(t);
372  }
373  }
374 
376  // then decides if more work is available
378  return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
379  }
380 
382  inline task * create_body_task( const input_type &input ) {
384  new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
386  *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(input, my_priority))
387  : NULL;
388  }
389 
392  operation_type op_data(try_fwd);
393  task* rval = NULL;
394  do {
395  op_data.status = WAIT;
396  my_aggregator.execute(&op_data);
397  if(op_data.status == SUCCEEDED) {
398  task* ttask = op_data.bypass_t;
399  __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
400  rval = combine_tasks(my_graph_ref, rval, ttask);
401  }
402  } while (op_data.status == SUCCEEDED);
403  return rval;
404  }
405 
408  new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
410  : NULL;
411  }
412 
414  inline void spawn_forward_task() {
415  task* tp = create_forward_task();
416  if(tp) {
418  }
419  }
420  }; // function_input_base
421 
423  // a type Output to its successors.
424  template< typename Input, typename Output, typename Policy, typename A>
425  class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
426  public:
427  typedef Input input_type;
428  typedef Output output_type;
433 
434  // constructor
435  template<typename Body>
437  graph &g, size_t max_concurrency,
438  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
440  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
441  , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) {
442  }
443 
446  base_type(src),
447  my_body( src.my_init_body->clone() ),
448  my_init_body(src.my_init_body->clone() ) {
449  }
450 
452  delete my_body;
453  delete my_init_body;
454  }
455 
456  template< typename Body >
458  function_body_type &body_ref = *this->my_body;
459  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
460  }
461 
463  // There is an extra copied needed to capture the
464  // body execution without the try_put
466  output_type v = (*my_body)(i);
468  return v;
469  }
470 
471  //TODO: consider moving into the base class
474 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
475  task* successor_task = successors().try_put_task(v);
476 #endif
477  task* postponed_task = NULL;
478  if( base_type::my_max_concurrency != 0 ) {
479  postponed_task = base_type::try_get_postponed_task(i);
480  __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
481  }
482 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
483  graph& g = base_type::my_graph_ref;
484  return combine_tasks(g, successor_task, postponed_task);
485 #else
486  if( postponed_task ) {
487  // make the task available for other workers since we do not know successors'
488  // execution policy
490  }
491  task* successor_task = successors().try_put_task(v);
492 #if _MSC_VER && !__INTEL_COMPILER
493 #pragma warning (push)
494 #pragma warning (disable: 4127) /* suppress conditional expression is constant */
495 #endif
497 #if _MSC_VER && !__INTEL_COMPILER
498 #pragma warning (pop)
499 #endif
500  if(!successor_task) {
501  // Return confirmative status since current
502  // node's body has been executed anyway
503  successor_task = SUCCESSFULLY_ENQUEUED;
504  }
505  }
506  return successor_task;
507 #endif /* TBB_DEPRECATED_MESSAGE_FLOW_ORDER */
508  }
509 
510  protected:
511 
514  if(f & rf_reset_bodies) {
516  delete my_body;
517  my_body = tmp;
518  }
519  }
520 
524 
525  }; // function_input
526 
527 
528  // helper templates to clear the successor edges of the output ports of an multifunction_node
529  template<int N> struct clear_element {
530  template<typename P> static void clear_this(P &p) {
531  (void)tbb::flow::get<N-1>(p).successors().clear();
533  }
534  template<typename P> static bool this_empty(P &p) {
535  if(tbb::flow::get<N-1>(p).successors().empty())
537  return false;
538  }
539  };
540 
541  template<> struct clear_element<1> {
542  template<typename P> static void clear_this(P &p) {
543  (void)tbb::flow::get<0>(p).successors().clear();
544  }
545  template<typename P> static bool this_empty(P &p) {
546  return tbb::flow::get<0>(p).successors().empty();
547  }
548  };
549 
550 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
551  // helper templates to extract the output ports of an multifunction_node from graph
552  template<int N> struct extract_element {
553  template<typename P> static void extract_this(P &p) {
554  (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
555  extract_element<N-1>::extract_this(p);
556  }
557  };
558 
559  template<> struct extract_element<1> {
560  template<typename P> static void extract_this(P &p) {
561  (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
562  }
563  };
564 #endif
565 
567  // and has a tuple of output ports specified.
568  template< typename Input, typename OutputPortSet, typename Policy, typename A>
569  class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
570  public:
572  typedef Input input_type;
573  typedef OutputPortSet output_ports_type;
578 
579  // constructor
580  template<typename Body>
582  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
584  , my_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
585  , my_init_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) {
586  }
587 
590  base_type(src),
591  my_body( src.my_init_body->clone() ),
592  my_init_body(src.my_init_body->clone() ) {
593  }
594 
596  delete my_body;
597  delete my_init_body;
598  }
599 
600  template< typename Body >
602  multifunction_body_type &body_ref = *this->my_body;
603  return *static_cast<Body*>(dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
604  }
605 
606  // for multifunction nodes we do not have a single successor as such. So we just tell
607  // the task we were successful.
608  //TODO: consider moving common parts with implementation in function_input into separate function
611  (*my_body)(i, my_output_ports);
613  task* ttask = NULL;
616  }
617  return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
618  }
619 
621 
622  protected:
623 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
624  void extract() {
625  extract_element<N>::extract_this(my_output_ports);
626  }
627 #endif
628 
629  void reset(reset_flags f) {
632  if(f & rf_reset_bodies) {
634  delete my_body;
635  my_body = tmp;
636  }
637  __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
638  }
639 
643 
644  }; // multifunction_input
645 
646  // template to refer to an output port of a multifunction_node
647  template<size_t N, typename MOP>
649  return tbb::flow::get<N>(op.output_ports());
650  }
651 
652  inline void check_task_and_spawn(graph& g, task* t) {
653  if (t && t != SUCCESSFULLY_ENQUEUED) {
655  }
656  }
657 
658  // helper structs for split_node
659  template<int N>
660  struct emit_element {
661  template<typename T, typename P>
662  static task* emit_this(graph& g, const T &t, P &p) {
663  // TODO: consider to collect all the tasks in task_list and spawn them all at once
664  task* last_task = tbb::flow::get<N-1>(p).try_put_task(tbb::flow::get<N-1>(t));
665  check_task_and_spawn(g, last_task);
666  return emit_element<N-1>::emit_this(g,t,p);
667  }
668  };
669 
670  template<>
671  struct emit_element<1> {
672  template<typename T, typename P>
673  static task* emit_this(graph& g, const T &t, P &p) {
674  task* last_task = tbb::flow::get<0>(p).try_put_task(tbb::flow::get<0>(t));
675  check_task_and_spawn(g, last_task);
676  return SUCCESSFULLY_ENQUEUED;
677  }
678  };
679 
681  template< typename Output, typename Policy>
682  class continue_input : public continue_receiver {
683  public:
684 
686  typedef continue_msg input_type;
687 
689  typedef Output output_type;
692 
693  template< typename Body >
695  : continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(/*number_of_predecessors=*/0, priority))
696  , my_graph_ref(g)
697  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
698  , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
699  { }
700 
701  template< typename Body >
702  continue_input( graph &g, int number_of_predecessors,
703  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
704  ) : continue_receiver( __TBB_FLOW_GRAPH_PRIORITY_ARG1(number_of_predecessors, priority) )
705  , my_graph_ref(g)
706  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
707  , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
708  { }
709 
710  continue_input( const continue_input& src ) : continue_receiver(src),
712  my_body( src.my_init_body->clone() ),
713  my_init_body( src.my_init_body->clone() ) {}
714 
716  delete my_body;
717  delete my_init_body;
718  }
719 
720  template< typename Body >
722  function_body_type &body_ref = *my_body;
723  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
724  }
725 
727  continue_receiver::reset_receiver(f);
728  if(f & rf_reset_bodies) {
730  delete my_body;
731  my_body = tmp;
732  }
733  }
734 
735  protected:
736 
737  graph& my_graph_ref;
740 
742 
743  friend class apply_body_task_bypass< class_type, continue_msg >;
744 
747  // There is an extra copied needed to capture the
748  // body execution without the try_put
750  output_type v = (*my_body)( continue_msg() );
752  return successors().try_put_task( v );
753  }
754 
757  return NULL;
758  }
759 #if _MSC_VER && !__INTEL_COMPILER
760 #pragma warning (push)
761 #pragma warning (disable: 4127) /* suppress conditional expression is constant */
762 #endif
764 #if _MSC_VER && !__INTEL_COMPILER
765 #pragma warning (pop)
766 #endif
767  return apply_body_bypass( continue_msg() );
768  }
769  else {
770  return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
772  *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(continue_msg(), my_priority) );
773  }
774  }
775 
777  return my_graph_ref;
778  }
779  }; // continue_input
780 
782  template< typename Output >
783  class function_output : public sender<Output> {
784  public:
785 
786  template<int N> friend struct clear_element;
787  typedef Output output_type;
788  typedef typename sender<output_type>::successor_type successor_type;
790 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
791  typedef typename sender<output_type>::built_successors_type built_successors_type;
792  typedef typename sender<output_type>::successor_list_type successor_list_type;
793 #endif
794 
796  function_output(const function_output & /*other*/) : sender<output_type>() {
797  my_successors.set_owner(this);
798  }
799 
803  return true;
804  }
805 
809  return true;
810  }
811 
812 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
813  built_successors_type &built_successors() __TBB_override { return successors().built_successors(); }
814 
815 
816  void internal_add_built_successor( successor_type &r) __TBB_override {
817  successors().internal_add_built_successor( r );
818  }
819 
820  void internal_delete_built_successor( successor_type &r) __TBB_override {
821  successors().internal_delete_built_successor( r );
822  }
823 
824  size_t successor_count() __TBB_override {
825  return successors().successor_count();
826  }
827 
828  void copy_successors( successor_list_type &v) __TBB_override {
829  successors().copy_successors(v);
830  }
831 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
832 
833  // for multifunction_node. The function_body that implements
834  // the node will have an input and an output tuple of ports. To put
835  // an item to a successor, the body should
836  //
837  // get<I>(output_ports).try_put(output_value);
838  //
839  // if task pointer is returned will always spawn and return true, else
840  // return value will be bool returned from successors.try_put.
841  task *try_put_task(const output_type &i) { // not a virtual method in this class
842  return my_successors.try_put_task(i);
843  }
844 
846  protected:
848 
849  }; // function_output
850 
851  template< typename Output >
852  class multifunction_output : public function_output<Output> {
853  public:
854  typedef Output output_type;
857 
860 
861  bool try_put(const output_type &i) {
862  task *res = try_put_task(i);
863  if(!res) return false;
864  if(res != SUCCESSFULLY_ENQUEUED) {
865  FLOW_SPAWN(*res); // TODO: Spawn task inside arena
866  }
867  return true;
868  }
869 
870  protected:
871 
873  return my_successors.try_put_task(i);
874  }
875 
876  template <int N> friend struct emit_element;
877 
878  }; // multifunction_output
879 
880 //composite_node
881 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
882  template<typename CompositeType>
883  void add_nodes_impl(CompositeType*, bool) {}
884 
885  template< typename CompositeType, typename NodeType1, typename... NodeTypes >
886  void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
887  void *addr = const_cast<NodeType1 *>(&n1);
888 
889  fgt_alias_port(c_node, addr, visible);
890  add_nodes_impl(c_node, visible, n...);
891  }
892 #endif
893 
894 } // internal
895 
896 #endif // __TBB__flow_graph_node_impl_H
virtual broadcast_cache< output_type > & successors()=0
multifunction_body_type * my_init_body
__TBB_STATIC_ASSERT(!((internal::has_policy< queueing, Policy >::value) &&(internal::has_policy< rejecting, Policy >::value)), "queueing and rejecting policies can't be specified simultaneously")
broadcast_cache< output_type > broadcast_cache_type
void reset_receiver(reset_flags f) __TBB_override
function_input_queue< input_type, A > input_queue_type
static tbb::task *const SUCCESSFULLY_ENQUEUED
#define __TBB_override
Definition: tbb_stddef.h:244
A task that calls a node's forward_task function.
internal::aggregating_functor< class_type, operation_type > handler_type
function_input_queue< input_type, A > input_queue_type
Input input_type
The input type of this receiver.
Definition: flow_graph.h:440
output_type apply_body_impl(const input_type &i)
graph & graph_reference() __TBB_override
task * try_put_task(const output_type &i)
continue_input< output_type, Policy > class_type
void set_owner(successor_type *owner)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
A functor that takes an Input and generates an Output.
static task * emit_this(graph &g, const T &t, P &p)
void reset_function_input_base(reset_flags f)
A::template rebind< input_queue_type >::other queue_allocator_type
void spawn_forward_task()
Spawns a task that calls forward()
function_input< Input, Output, Policy, A > my_class
static void fgt_begin_body(void *)
function_body that takes an Input and a set of output ports
broadcast_cache_type my_successors
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
task * apply_body_impl_bypass(const input_type &i)
void reset_function_input(reset_flags f)
void * addr
Base class for user-defined tasks.
Definition: task.h:592
void handle_operations(operation_type *op_list)
function_body_type * my_body
void register_successor(successor_type &r)
virtual multifunction_body * clone()=0
task * execute() __TBB_override
bool try_put(const output_type &i)
Implements methods for a function node that takes a type Input as input.
task * try_get_postponed_task(const input_type &i)
unsigned int node_priority_t
void const char const char int ITT_FORMAT __itt_group_sync p
function_input_base(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority))
Constructor for function_input_base.
Implements methods for a function node that takes a type Input as input and sends.
function_output< output_type > base_type
receiver< input_type >::predecessor_type predecessor_type
#define FLOW_SPAWN(a)
Definition: flow_graph.h:53
static void fgt_alias_port(void *, void *, bool)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:171
predecessor_cache< input_type, null_mutex > predecessor_cache_type
multifunction_input< Input, OutputPortSet, Policy, A > my_class
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
continue_input(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body &body, node_priority_t priority))
void reset_receiver(reset_flags f) __TBB_override
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
sender< output_type >::successor_type successor_type
A task that calls a node's apply_body_bypass function, passing in an input of type Input.
void check_task_and_spawn(graph &g, task *t)
bool register_successor(successor_type &r) __TBB_override
Adds a new successor to this node.
task * internal_try_put_bypass(const input_type &t)
int max_concurrency()
Returns the maximal number of threads that can work inside the arena.
Definition: task_arena.h:412
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
continue_msg input_type
The input type of this receiver.
static task * emit_this(graph &g, const T &t, P &p)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
continue_input(const continue_input &src)
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
function_input_queue< input_type, A > input_queue_type
The graph class.
void set_owner(owner_type *owner)
Implements methods for both executable and function nodes that puts Output to its successors.
function_input(const function_input &src)
Copy constructor.
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
virtual function_body * clone()=0
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
function_body_type * my_body
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
function_output(const function_output &)
multifunction_output(const multifunction_output &)
task * apply_body_bypass(const input_type &i)
Applies the body to the provided input.
const item_type & front() const
function_input_base< Input, Policy, A, my_class > base_type
function_input_base< Input, Policy, A, my_class > base_type
task * try_put_task_impl(const input_type &t, tbb::internal::true_type)
task * create_body_task(const input_type &input)
allocates a task to apply a body
graph & graph_reference() __TBB_override
function_body_type * my_init_body
broadcast_cache_type & successors()
void add_nodes_impl(CompositeType *, bool)
task * apply_body_impl_bypass(const input_type &i)
function_body_type * my_init_body
static void fgt_end_body(void *)
void remove_successor(successor_type &r)
multifunction_body_type * my_body
function_body< input_type, output_type > function_body_type
virtual broadcast_cache< output_type > & successors()=0
continue_input(graph &g, int number_of_predecessors,)
function_input_base< Input, Policy, A, ImplType > class_type
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
function_input_base(const function_input_base &src)
Copy constructor.
multifunction_body< input_type, output_ports_type > multifunction_body_type
function_input(graph &g, size_t max_concurrency,)
task * try_put_task(const output_type &i)
void internal_try_put_task(operation_type *op)
Put to the node, but return the task instead of enqueueing it.
Input and scheduling for a function node that takes a type Input as input.
the leaf for function_body
Implements methods for an executable node that takes continue_msg as input.
task * try_put_task_impl(const input_type &t, tbb::internal::false_type)
multifunction_input(graph &g, size_t max_concurrency,)
aggregator< handler_type, operation_type > my_aggregator
task * apply_body_bypass(input_type)
Applies the body to the provided input.
task * forward_task()
This is executed by an enqueued task, the "forwarder".
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:717
task * try_put_task(const input_type &t) __TBB_override
Put item to successor; return task to run the successor if possible.
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Input input_type
The input type of this receiver.
virtual ~function_input_base()
Destructor.
multifunction_input(const multifunction_input &src)
Copy constructor.
predecessor_cache< input_type, null_mutex > my_predecessors
Output output_type
The output type of this receiver.
void internal_forward(operation_type *op)
Creates tasks for postponed messages if available and if concurrency allows.
task * try_put_task(const T &t) __TBB_override
function_body< input_type, output_type > function_body_type

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.