21 #ifndef __TBB_flow_graph_streaming_H 22 #define __TBB_flow_graph_streaming_H 24 #ifndef __TBB_flow_graph_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 28 #if __TBB_PREVIEW_STREAMING_NODE 34 template <
int N1,
int N2>
37 static const int size = N2 - N1 + 1;
44 template <
int N1,
int N2 = N1>
56 template <
int N1,
int N2>
61 template <
int N1,
int N2>
66 template <
typename... Args>
71 template <
typename T,
typename... Rest>
82 template<
typename Key>
88 template<
typename Key>
94 template<
typename Device,
typename Key>
107 template <
typename T>
112 template <
int N1,
int N2>
117 template <
int N1,
int N2>
122 template <
typename T>
127 template <
typename ...Args1>
130 template <
typename A1,
typename ...Args1>
132 static const size_t my_delta = 1;
134 template <
typename F,
typename Tuple,
typename ...Args2>
135 static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
138 template <
typename F,
typename Tuple,
typename ...Args2>
142 template <
typename F,
typename Tuple,
int N1,
int N2,
typename ...Args2>
145 args2..., std::get<N1 + my_delta>(t));
147 template <
typename F,
typename Tuple,
int N,
typename ...Args2>
152 template <
typename F,
typename Tuple,
int N1,
int N2,
typename ...Args2>
154 doit_impl(x, f, t,
fn(), args1..., args2...);
156 template <
typename F,
typename Tuple,
int N,
typename ...Args2>
158 doit_impl(x, f, t,
fn(), args1..., args2...);
164 template <
typename F,
typename Tuple,
typename ...Args2>
165 static void doit(F& f, Tuple&, Args2&... args2) {
171 template<
typename JP,
typename StreamFactory,
typename... Ports>
174 template <
typename T>
189 template<
typename StreamFactory,
typename KernelInputTuple,
typename =
void>
195 template <
typename ...Args>
197 factory.send_kernel( device, kernel, args... );
202 template<
typename StreamFactory,
typename KernelInputTuple>
211 struct range_wrapper {
213 virtual range_wrapper *clone()
const = 0;
217 struct range_value :
public range_wrapper {
227 return new range_value(my_value);
234 struct range_mapper :
public range_wrapper {
239 return get<N + 1>(ip).data(
false);
243 return new range_mapper<N>;
248 template <
typename ...Args>
250 __TBB_ASSERT(my_range_wrapper,
"Range is not set. Call set_range() before running streaming_node.");
251 factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
261 executor.my_range_wrapper = NULL;
265 if (my_range_wrapper)
delete my_range_wrapper;
269 my_range_wrapper =
new range_value(work_size);
273 my_range_wrapper =
new range_value(
std::move(work_size));
278 my_range_wrapper =
new range_mapper<N>;
283 my_range_wrapper =
new range_mapper<N>;
305 template<
typename... Args>
308 template<
typename... Ports,
typename JP,
typename StreamFactory>
310 :
public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
311 typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
322 typedef composite_node<input_tuple, output_tuple>
base_type;
337 return std::tie( internal::input_port<S>( my_indexer_node )... );
342 return std::tie( internal::output_port<S>( my_kernel_node )... );
355 make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
361 make_edge( my_indexer_node, my_device_selector_node );
362 make_edge( my_device_selector_node, my_join_node );
364 make_edge( my_join_node, my_kernel_node );
371 class device_selector_base {
373 virtual void operator()(
const indexer_node_output_type &v,
typename device_selector_node::output_ports_type &op ) = 0;
374 virtual device_selector_base *clone(
streaming_node &n )
const = 0;
378 template <
typename UserFunctor>
383 , my_user_functor( uf ), my_node(n), my_factory( f )
385 my_port_epoches.fill( 0 );
389 (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
391 || my_port_epoches[v.tag()] == 0,
"Epoch is changed when key matching is requested" );
395 return new device_selector( my_user_functor, n, my_factory );
398 typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(
size_t &,
const indexer_node_output_type &,
typename device_selector_node::output_ports_type &);
407 template <
typename T>
413 template <
typename T>
416 return key_from_message<key_type>( t );
421 typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
422 elem_type e = internal::cast_to<elem_type>( v );
424 my_factory.send_data( device, e );
425 get<N + 1>( op ).try_put( e );
428 template<
typename DevicePort >
431 if ( it == my_devices.end() ) {
433 std::tie( it, std::ignore ) = my_devices.insert( std::make_pair(
key,
d ) );
436 my_node.notify_new_device(
d );
438 epoch_desc &e = it->second;
440 if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
458 class device_selector_body {
463 (*my_device_selector)(v, op);
475 virtual args_storage_base *clone()
const = 0;
480 : my_kernel( kernel ), my_factory( f )
484 : my_kernel( k.my_kernel ), my_factory( k.my_factory )
491 template <
typename... Args>
492 class args_storage :
public args_storage_base {
498 const auto& t = get<N + 1>( ip );
499 auto &port = get<N>( op );
500 return port.try_put( t );
512 : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
516 template <
typename... FnArgs>
528 : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
531 template <
typename... FnArgs>
533 my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
538 template<
typename FinalizeFn>
542 : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(),
fn ) {}
546 template <
typename... FnArgs>
559 : my_factory(factory), my_device(device), my_fn(
fn) {}
561 template <
typename... FnArgs>
563 my_factory.finalize( my_device, my_fn, args... );
568 template<
typename FinalizeFn>
570 return run_finalize_func<FinalizeFn>( ip, factory,
fn );
576 : my_factory(factory), my_device(
d ) {}
578 template <
typename... FnArgs>
580 my_factory.send_data( my_device, args... );
589 : args_storage_base( kernel, f )
590 , my_args_pack( std::forward<Args>(args)... )
593 args_storage(
const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
595 args_storage(
const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
606 graph& g = n.my_graph;
608 g.increment_wait_count();
614 g.decrement_wait_count();
615 }), const_args_pack );
627 return new args_storage<Args...>( *this );
641 __TBB_ASSERT( (my_node.my_args_storage != NULL),
"No arguments storage" );
643 my_node.my_args_storage->enqueue( ip, op, my_node );
650 struct wrap_to_async {
654 template <
typename T>
659 template <
typename... Args>
662 return new args_storage<Args...>(storage, std::forward<Args>(args)...);
666 my_args_storage->send(
d );
669 template <
typename ...Args>
671 this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
675 template <
typename DeviceSelector>
678 , my_indexer_node( g )
679 , my_device_selector( new device_selector<DeviceSelector>(
d, *this, f ) )
680 , my_device_selector_node( g,
serial, device_selector_body( my_device_selector ) )
682 , my_kernel_node( g,
serial, kernel_body( *this ) )
684 , my_args_storage( make_args_storage( args_storage<>(kernel, f),
port_ref<0, NUM_INPUTS - 1>() ) )
686 base_type::set_external_ports( get_input_ports(), get_output_ports() );
692 , my_indexer_node( node.my_indexer_node )
693 , my_device_selector( node.my_device_selector->clone( *this ) )
694 , my_device_selector_node( node.my_graph,
serial, device_selector_body( my_device_selector ) )
695 , my_join_node( node.my_join_node )
696 , my_kernel_node( node.my_graph,
serial, kernel_body( *this ) )
697 , my_args_storage( node.my_args_storage->clone() )
699 base_type::set_external_ports( get_input_ports(), get_output_ports() );
705 , my_indexer_node( std::
move( node.my_indexer_node ) )
706 , my_device_selector( node.my_device_selector->clone(*this) )
707 , my_device_selector_node( node.my_graph,
serial, device_selector_body( my_device_selector ) )
708 , my_join_node( std::
move( node.my_join_node ) )
709 , my_kernel_node( node.my_graph,
serial, kernel_body( *this ) )
710 , my_args_storage( node.my_args_storage )
712 base_type::set_external_ports( get_input_ports(), get_output_ports() );
715 node.my_args_storage = NULL;
719 if ( my_args_storage )
delete my_args_storage;
720 if ( my_device_selector )
delete my_device_selector;
723 template <
typename... Args>
726 args_storage_base *
const new_args_storage = make_args_storage( *my_args_storage,
typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
727 delete my_args_storage;
728 my_args_storage = new_args_storage;
744 #endif // __TBB_PREVIEW_STREAMING_NODE 745 #endif // __TBB_flow_graph_streaming_H
finalize_func(StreamFactory &factory, device_type device, FinalizeFn fn)
streaming_node(const streaming_node &node)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
StreamFactory::range_type range_type
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
void set_range(range_type &&work_size)
is_port_ref_impl< typename tbb::internal::strip< T >::type >::type type
range_wrapper * clone() const __TBB_override
void set_range(const range_type &work_size)
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
T or_return_values(T &&t)
void set_args(Args &&... args)
#define __TBB_STATIC_ASSERT(condition, msg)
key_type get_key(std::false_type, const T &, size_t &epoch)
device_selector_node my_device_selector_node
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
void operator()(FnArgs &... args)
streaming_device_with_key(const Device &d, Key k)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s __itt_frame ITT_FORMAT p const char const char ITT_FORMAT s __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
std::decay< Key >::type my_key
kernel_multifunction_node my_kernel_node
kernel_input_tuple & my_ip
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
join_node< kernel_input_tuple, JP > my_join_node
internal::port_ref_impl< N1, N2 > port_ref()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
bool_constant< true > true_type
device_selector_base * clone(streaming_node &n) const __TBB_override
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
StreamFactory::device_type device_type
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
internal::key_from_policy< JP >::type key_type
KernelInputTuple kernel_input_tuple
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
Base class for types that should not be copied or assigned.
Detects whether two given types are the same.
K key_from_message(const T &t)
void make_edges(internal::sequence< S... >)
std::array< size_t, NUM_INPUTS > my_port_epoches
device_selector_base * my_device_selector
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
const Device & device() const
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
streaming_node(streaming_node &&node)
void send(device_type d) __TBB_override
dispatch_funcs_type my_dispatch_funcs
StreamFactory::kernel_type kernel_type
kernel_body(const streaming_node &node)
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
send_func(StreamFactory &factory, device_type d)
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
const kernel_type my_kernel
StreamFactory::kernel_type kernel_type
Base class for types that should not be assigned.
void set_range(port_ref_impl< N, N >(*)())
kernel_multifunction_node::output_ports_type output_ports_type
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
StreamFactory::device_type device_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
epoch_desc(device_type d)
base_type::input_ports_type get_input_ports()
args_storage_base * my_args_storage
StreamFactory & my_factory
task * do_try_put(const T &v, void *p)
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
range_wrapper * clone() const __TBB_override
StreamFactory::template async_msg_type< T > type
kernel_executor_helper(kernel_executor_helper &&executor)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
tuple< typename async_msg_type< Ports >::type... > input_tuple
std::false_type is_key_matching
std::true_type is_key_matching
indexer_node_type my_indexer_node
StreamFactory & my_factory
static void doit(F &f, Tuple &, Args2 &... args2)
StreamFactory::device_type device_type
args_pack_type my_args_pack
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
KernelInputTuple kernel_input_tuple
device_selector_base * my_device_selector
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
UserFunctor my_user_functor
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
virtual ~args_storage_base()
std::array< send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type
indexer_node_type::output_type indexer_node_output_type
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
args_storage(const args_storage &k)
internal::make_sequence< NUM_INPUTS >::type input_sequence
void operator()(FnArgs &... args)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void set_range(port_ref_impl< N, N >)
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
kernel_executor_helper(const kernel_executor_helper &executor)
StreamFactory & my_factory
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
key_type get_key(std::true_type, const T &t, size_t &)
kernel_input_tuple & my_ip
void operator()(FnArgs &... args)
tbb::internal::stored_pack< Args... > args_pack_type
const streaming_node & my_node
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
void move(tbb_thread &t1, tbb_thread &t2)
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
args_storage_base * clone() const __TBB_override
composite_node< input_tuple, output_tuple > base_type
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
base_type::output_ports_type get_output_ports()
device_type get_device(key_type key, DevicePort &dp)
void ignore_return_values(Args &&...)
void operator()(FnArgs &... args)
device_selector_body(device_selector_base *d)
StreamFactory::kernel_type kernel_type
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
streaming_device_with_key()
range_value(const range_type &value)
args_storage_base(const kernel_type &kernel, StreamFactory &f)
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
bool_constant< false > false_type
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
range_wrapper * my_range_wrapper
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
~kernel_executor_helper()
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
virtual ~device_selector_base()
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
args_storage_base::output_ports_type output_ports_type
range_type get_range(const kernel_input_tuple &ip) const __TBB_override
args_storage(const args_storage_base &k, Args &&... args)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
void notify_new_device(device_type d)
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
range_value(range_type &&value)
void operator()(FnArgs &... args)
const args_storage & my_storage
range_type get_range(const kernel_input_tuple &) const __TBB_override
StreamFactory & my_factory
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
args_storage_base(const args_storage_base &k)
std::true_type is_key_matching
const streaming_node & my_node
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple