22 #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP) 23 #define RXCPP_OPERATORS_RX_TIMEOUT_HPP 25 #include "../rx-includes.hpp" 33 std::runtime_error(msg)
42 struct timeout_invalid_arguments {};
45 struct timeout_invalid :
public rxo::operator_base<timeout_invalid_arguments<AN...>> {
46 using type = observable<timeout_invalid_arguments<
AN...>, timeout_invalid<
AN...>>;
49 using timeout_invalid_t =
typename timeout_invalid<
AN...>::type;
51 template<
class T,
class Duration,
class Coordination>
54 typedef rxu::decay_t<T> source_value_type;
55 typedef rxu::decay_t<Coordination> coordination_type;
56 typedef typename coordination_type::coordinator_type coordinator_type;
57 typedef rxu::decay_t<Duration> duration_type;
61 timeout_values(duration_type p, coordination_type c)
68 coordination_type coordination;
70 timeout_values initial;
72 timeout(duration_type period, coordination_type coordination)
73 : initial(period, coordination)
77 template<
class Subscriber>
78 struct timeout_observer
80 typedef timeout_observer<Subscriber> this_type;
81 typedef rxu::decay_t<T> value_type;
82 typedef rxu::decay_t<Subscriber> dest_type;
83 typedef observer<T, this_type> observer_type;
85 struct timeout_subscriber_values :
public timeout_values
87 timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
91 , coordinator(std::move(c))
92 , worker(coordinator.get_worker())
97 composite_subscription cs;
99 coordinator_type coordinator;
101 mutable std::size_t index;
103 typedef std::shared_ptr<timeout_subscriber_values> state_type;
106 timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
107 : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
109 auto localState = state;
111 auto disposer = [=](
const rxsc::schedulable&){
112 localState->cs.unsubscribe();
113 localState->dest.unsubscribe();
114 localState->worker.unsubscribe();
117 [&](){
return localState->coordinator.act(disposer); },
119 if (selectedDisposer.empty()) {
123 localState->dest.add([=](){
124 localState->worker.schedule(selectedDisposer.get());
126 localState->cs.add([=](){
127 localState->worker.schedule(selectedDisposer.get());
130 auto work = [v, localState](
const rxsc::schedulable&) {
131 auto new_id = ++localState->index;
132 auto produce_time = localState->worker.now() + localState->period;
134 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
137 [&](){
return localState->coordinator.act(work);},
139 if (selectedWork.empty()) {
142 localState->worker.schedule(selectedWork.get());
145 static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t
id, state_type state) {
146 auto produce = [id, state](
const rxsc::schedulable&) {
147 if(
id != state->index)
154 [&](){
return state->coordinator.act(produce); },
156 if (selectedProduce.empty()) {
157 return std::function<void(const rxsc::schedulable&)>();
160 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
163 void on_next(T v)
const {
164 auto localState = state;
165 auto work = [v, localState](
const rxsc::schedulable&) {
166 auto new_id = ++localState->index;
167 auto produce_time = localState->worker.now() + localState->period;
169 localState->dest.on_next(v);
170 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
173 [&](){
return localState->coordinator.act(work);},
175 if (selectedWork.empty()) {
178 localState->worker.schedule(selectedWork.get());
182 auto localState = state;
183 auto work = [e, localState](
const rxsc::schedulable&) {
184 localState->dest.on_error(e);
187 [&](){
return localState->coordinator.act(work); },
189 if (selectedWork.empty()) {
192 localState->worker.schedule(selectedWork.get());
195 void on_completed()
const {
196 auto localState = state;
197 auto work = [localState](
const rxsc::schedulable&) {
198 localState->dest.on_completed();
201 [&](){
return localState->coordinator.act(work); },
203 if (selectedWork.empty()) {
206 localState->worker.schedule(selectedWork.get());
209 static subscriber<T, observer_type> make(dest_type d, timeout_values v) {
210 auto cs = composite_subscription();
211 auto coordinator = v.coordination.create_coordinator();
213 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
217 template<
class Subscriber>
218 auto operator()(Subscriber dest)
const 219 -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
220 return timeout_observer<Subscriber>::make(std::move(dest), initial);
228 template<
class...
AN>
239 template<
class Observable,
class Duration,
244 class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>>
245 static auto member(Observable&& o, Duration&& d)
250 template<
class Observable,
class Coordination,
class Duration,
257 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
258 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
259 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
262 template<
class Observable,
class Coordination,
class Duration,
269 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
270 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
271 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
274 template<
class...
AN>
275 static operators::detail::timeout_invalid_t<
AN...>
member(
const AN&...) {
278 static_assert(
sizeof...(
AN) == 10000,
"timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
Definition: rx-util.hpp:817
timeout_error(const std::string &msg)
Definition: rx-timeout.hpp:32
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:47
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:257
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:126
Definition: rx-timeout.hpp:29
auto timeout(AN &&... an) -> operator_factory< timeout_tag, AN... >
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-timeout.hpp:229
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:269
Definition: rx-operators.hpp:465
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
static operators::detail::timeout_invalid_t< AN... > member(const AN &...)
Definition: rx-timeout.hpp:275
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-timeout.hpp:245