RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observe_on.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
25 
26 #include "../rx-includes.hpp"
27 
28 namespace rxcpp {
29 
30 namespace operators {
31 
32 namespace detail {
33 
34 template<class... AN>
35 struct observe_on_invalid_arguments {};
36 
37 template<class... AN>
38 struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
39  using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
43 
44 template<class T, class Coordination>
45 struct observe_on
46 {
47  typedef rxu::decay_t<T> source_value_type;
48 
49  typedef rxu::decay_t<Coordination> coordination_type;
50  typedef typename coordination_type::coordinator_type coordinator_type;
51 
52  coordination_type coordination;
53 
54  observe_on(coordination_type cn)
55  : coordination(std::move(cn))
56  {
57  }
58 
59  template<class Subscriber>
60  struct observe_on_observer
61  {
62  typedef observe_on_observer<Subscriber> this_type;
63  typedef source_value_type value_type;
64  typedef rxu::decay_t<Subscriber> dest_type;
65  typedef observer<value_type, this_type> observer_type;
66 
67  typedef rxn::notification<T> notification_type;
68  typedef typename notification_type::type base_notification_type;
69  typedef std::deque<base_notification_type> queue_type;
70 
71  struct mode
72  {
73  enum type {
74  Invalid = 0,
75  Processing,
76  Empty,
77  Disposed,
78  Errored
79  };
80  };
81  struct observe_on_state : std::enable_shared_from_this<observe_on_state>
82  {
83  mutable std::mutex lock;
84  mutable queue_type fill_queue;
85  mutable queue_type drain_queue;
86  composite_subscription lifetime;
87  mutable typename mode::type current;
88  coordinator_type coordinator;
89  dest_type destination;
90 
91  observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
92  : lifetime(std::move(cs))
93  , current(mode::Empty)
94  , coordinator(std::move(coor))
95  , destination(std::move(d))
96  {
97  }
98 
99  void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
100  if (!guard.owns_lock()) {
101  std::terminate();
102  }
103  if (current == mode::Errored || current == mode::Disposed) {return;}
104  current = end;
105  queue_type fill_expired;
106  swap(fill_expired, fill_queue);
107  queue_type drain_expired;
108  swap(drain_expired, drain_queue);
109  RXCPP_UNWIND_AUTO([&](){guard.lock();});
110  guard.unlock();
111  lifetime.unsubscribe();
112  destination.unsubscribe();
113  }
114 
115  void ensure_processing(std::unique_lock<std::mutex>& guard) const {
116  if (!guard.owns_lock()) {
117  std::terminate();
118  }
119  if (current == mode::Empty) {
120  current = mode::Processing;
121 
122  if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
123  finish(guard, mode::Disposed);
124  }
125 
126  auto keepAlive = this->shared_from_this();
127 
128  auto drain = [keepAlive, this](const rxsc::schedulable& self){
129  using std::swap;
130  RXCPP_TRY {
131  for (;;) {
132  if (drain_queue.empty() || !destination.is_subscribed()) {
133  std::unique_lock<std::mutex> guard(lock);
134  if (!destination.is_subscribed() ||
135  (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
136  finish(guard, mode::Disposed);
137  return;
138  }
139  if (drain_queue.empty()) {
140  if (fill_queue.empty()) {
141  current = mode::Empty;
142  return;
143  }
144  swap(fill_queue, drain_queue);
145  }
146  }
147  auto notification = std::move(drain_queue.front());
148  drain_queue.pop_front();
149  notification->accept(destination);
150  std::unique_lock<std::mutex> guard(lock);
151  self();
152  if (lifetime.is_subscribed()) break;
153  }
154  }
155  RXCPP_CATCH(...) {
156  destination.on_error(rxu::current_exception());
157  std::unique_lock<std::mutex> guard(lock);
158  finish(guard, mode::Errored);
159  }
160  };
161 
162  auto selectedDrain = on_exception(
163  [&](){return coordinator.act(drain);},
164  destination);
165  if (selectedDrain.empty()) {
166  finish(guard, mode::Errored);
167  return;
168  }
169 
170  auto processor = coordinator.get_worker();
171 
172  RXCPP_UNWIND_AUTO([&](){guard.lock();});
173  guard.unlock();
174 
175  processor.schedule(selectedDrain.get());
176  }
177  }
178  };
179  std::shared_ptr<observe_on_state> state;
180 
181  observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
182  : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
183  {
184  }
185 
186  void on_next(source_value_type v) const {
187  std::unique_lock<std::mutex> guard(state->lock);
188  if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
189  state->fill_queue.push_back(notification_type::on_next(std::move(v)));
190  state->ensure_processing(guard);
191  }
192  void on_error(rxu::error_ptr e) const {
193  std::unique_lock<std::mutex> guard(state->lock);
194  if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
195  state->fill_queue.push_back(notification_type::on_error(e));
196  state->ensure_processing(guard);
197  }
198  void on_completed() const {
199  std::unique_lock<std::mutex> guard(state->lock);
200  if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
201  state->fill_queue.push_back(notification_type::on_completed());
202  state->ensure_processing(guard);
203  }
204 
205  static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
206  auto coor = cn.create_coordinator(d.get_subscription());
207  d.add(cs);
208 
209  this_type o(d, std::move(coor), cs);
210  auto keepAlive = o.state;
211  cs.add([=](){
212  std::unique_lock<std::mutex> guard(keepAlive->lock);
213  keepAlive->ensure_processing(guard);
214  });
215 
216  return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
217  }
218  };
219 
220  template<class Subscriber>
221  auto operator()(Subscriber dest) const
222  -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
223  return observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
224  }
225 };
226 
227 }
228 
231 template<class... AN>
232 auto observe_on(AN&&... an)
234  return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
235 }
236 
237 }
238 
239 template<>
241 {
242  template<class Observable, class Coordination,
243  class Enabled = rxu::enable_if_all_true_type_t<
246  class SourceValue = rxu::value_type_t<Observable>,
247  class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
248  static auto member(Observable&& o, Coordination&& cn)
249  -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
250  return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
251  }
252 
253  template<class... AN>
254  static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
255  std::terminate();
256  return {};
257  static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
258  }
259 };
260 
262 {
263  rxsc::scheduler factory;
264 
265  class input_type
266  {
267  rxsc::worker controller;
268  rxsc::scheduler factory;
269  identity_one_worker coordination;
270  public:
271  explicit input_type(rxsc::worker w)
272  : controller(w)
273  , factory(rxsc::make_same_worker(w))
274  , coordination(factory)
275  {
276  }
277  inline rxsc::worker get_worker() const {
278  return controller;
279  }
280  inline rxsc::scheduler get_scheduler() const {
281  return factory;
282  }
283  inline rxsc::scheduler::clock_type::time_point now() const {
284  return factory.now();
285  }
286  template<class Observable>
287  auto in(Observable o) const
288  -> decltype(o.observe_on(coordination)) {
289  return o.observe_on(coordination);
290  }
291  template<class Subscriber>
292  auto out(Subscriber s) const
293  -> Subscriber {
294  return s;
295  }
296  template<class F>
297  auto act(F f) const
298  -> F {
299  return f;
300  }
301  };
302 
303 public:
304 
305  explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {}
306 
308 
309  inline rxsc::scheduler::clock_type::time_point now() const {
310  return factory.now();
311  }
312 
314  auto w = factory.create_worker(std::move(cs));
315  return coordinator_type(input_type(std::move(w)));
316  }
317 };
318 
321 }
322 
325  return r;
326 }
327 
330  return r;
331 }
332 
333 }
334 
335 #endif
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-observe_on.hpp:313
#define RXCPP_TRY
Definition: rx-util.hpp:38
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
Definition: rx-observe_on.hpp:261
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:47
observe_on_one_worker(rxsc::scheduler sc)
Definition: rx-observe_on.hpp:305
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
static type on_completed()
Definition: rx-notification.hpp:213
Definition: rx-operators.hpp:47
observe_on_one_worker observe_on_event_loop()
Definition: rx-observe_on.hpp:323
observe_on_one_worker observe_on_new_thread()
Definition: rx-observe_on.hpp:328
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
static type on_error(Exception &&e)
Definition: rx-notification.hpp:218
scheduler make_run_loop(const run_loop &r)
Definition: rx-runloop.hpp:204
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:126
scheduler make_event_loop()
Definition: rx-eventloop.hpp:106
static type on_next(U value)
Definition: rx-notification.hpp:209
auto observe_on(AN &&... an) -> operator_factory< observe_on_tag, AN... >
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observe_on.hpp:232
scheduler make_new_thread()
Definition: rx-newthread.hpp:169
static auto member(Observable &&o, Coordination &&cn) -> decltype(o.template lift< SourceValue >(ObserveOn(std::forward< Coordination >(cn))))
Definition: rx-observe_on.hpp:248
detail::notification_base< T >::type type
Definition: rx-notification.hpp:118
Definition: rx-coordination.hpp:23
Definition: rx-runloop.hpp:118
error_ptr current_exception()
Definition: rx-util.hpp:943
Definition: rx-operators.hpp:275
observe_on_one_worker observe_on_run_loop(const rxsc::run_loop &rl)
Definition: rx-observe_on.hpp:319
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:1024
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-observe_on.hpp:309
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
coordinator< input_type > coordinator_type
Definition: rx-observe_on.hpp:307
Definition: rx-coordination.hpp:114
static operators::detail::observe_on_invalid_t< AN... > member(AN...)
Definition: rx-observe_on.hpp:254
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
Definition: rx-coordination.hpp:45