RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observer.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVER_HPP)
6 #define RXCPP_RX_OBSERVER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 
13 template<class T>
15 {
16  typedef T value_type;
18 };
19 
20 namespace detail {
21 template<class T>
22 struct OnNextEmpty
23 {
24  void operator()(const T&) const {}
25 };
26 struct OnErrorEmpty
27 {
28  void operator()(rxu::error_ptr) const {
29  // error implicitly ignored, abort
30  std::terminate();
31  }
32 };
33 struct OnErrorIgnore
34 {
35  void operator()(rxu::error_ptr) const {
36  }
37 };
38 struct OnCompletedEmpty
39 {
40  void operator()() const {}
41 };
42 
43 template<class T, class State, class OnNext>
44 struct OnNextForward
45 {
46  using state_t = rxu::decay_t<State>;
47  using onnext_t = rxu::decay_t<OnNext>;
48  OnNextForward() : onnext() {}
49  explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {}
50  onnext_t onnext;
51  void operator()(state_t& s, T& t) const {
52  onnext(s, t);
53  }
54  void operator()(state_t& s, T&& t) const {
55  onnext(s, t);
56  }
57 };
58 template<class T, class State>
59 struct OnNextForward<T, State, void>
60 {
61  using state_t = rxu::decay_t<State>;
62  OnNextForward() {}
63  void operator()(state_t& s, T& t) const {
64  s.on_next(t);
65  }
66  void operator()(state_t& s, T&& t) const {
67  s.on_next(t);
68  }
69 };
70 
71 template<class State, class OnError>
72 struct OnErrorForward
73 {
74  using state_t = rxu::decay_t<State>;
75  using onerror_t = rxu::decay_t<OnError>;
76  OnErrorForward() : onerror() {}
77  explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
78  onerror_t onerror;
79  void operator()(state_t& s, rxu::error_ptr ep) const {
80  onerror(s, ep);
81  }
82 };
83 template<class State>
84 struct OnErrorForward<State, void>
85 {
86  using state_t = rxu::decay_t<State>;
87  OnErrorForward() {}
88  void operator()(state_t& s, rxu::error_ptr ep) const {
89  s.on_error(ep);
90  }
91 };
92 
93 template<class State, class OnCompleted>
94 struct OnCompletedForward
95 {
96  using state_t = rxu::decay_t<State>;
97  using oncompleted_t = rxu::decay_t<OnCompleted>;
98  OnCompletedForward() : oncompleted() {}
99  explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {}
100  oncompleted_t oncompleted;
101  void operator()(state_t& s) const {
102  oncompleted(s);
103  }
104 };
105 template<class State>
106 struct OnCompletedForward<State, void>
107 {
108  OnCompletedForward() {}
109  void operator()(State& s) const {
110  s.on_completed();
111  }
112 };
113 
114 template<class T, class F>
115 struct is_on_next_of
116 {
117  struct not_void {};
118  template<class CT, class CF>
119  static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
120  template<class CT, class CF>
121  static not_void check(...);
122 
123  typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result;
124  static const bool value = std::is_same<detail_result, void>::value;
125 };
126 
127 template<class F>
128 struct is_on_error
129 {
130  struct not_void {};
131  template<class CF>
132  static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr));
133  template<class CF>
134  static not_void check(...);
135 
136  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
137 };
138 
139 template<class State, class F>
140 struct is_on_error_for
141 {
142  struct not_void {};
143  template<class CF>
144  static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr));
145  template<class CF>
146  static not_void check(...);
147 
148  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
149 };
150 
151 template<class F>
152 struct is_on_completed
153 {
154  struct not_void {};
155  template<class CF>
156  static auto check(int) -> decltype((*(CF*)nullptr)());
157  template<class CF>
158  static not_void check(...);
159 
160  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
161 };
162 
163 }
164 
165 
178 template<class T, class State, class OnNext, class OnError, class OnCompleted>
179 class observer : public observer_base<T>
180 {
181 public:
184  using on_next_t = typename std::conditional<
185  !std::is_same<void, OnNext>::value,
187  detail::OnNextForward<T, State, OnNext>>::type;
188  using on_error_t = typename std::conditional<
189  !std::is_same<void, OnError>::value,
191  detail::OnErrorForward<State, OnError>>::type;
192  using on_completed_t = typename std::conditional<
193  !std::is_same<void, OnCompleted>::value,
195  detail::OnCompletedForward<State, OnCompleted>>::type;
196 
197 private:
198  mutable state_t state;
199  on_next_t onnext;
200  on_error_t onerror;
201  on_completed_t oncompleted;
202 
203 public:
204 
206  : state(std::move(s))
207  , onnext(std::move(n))
208  , onerror(std::move(e))
209  , oncompleted(std::move(c))
210  {
211  }
213  : state(std::move(s))
214  , onnext(std::move(n))
215  , onerror(on_error_t())
216  , oncompleted(std::move(c))
217  {
218  }
219  observer(const this_type& o)
220  : state(o.state)
221  , onnext(o.onnext)
222  , onerror(o.onerror)
223  , oncompleted(o.oncompleted)
224  {
225  }
227  : state(std::move(o.state))
228  , onnext(std::move(o.onnext))
229  , onerror(std::move(o.onerror))
230  , oncompleted(std::move(o.oncompleted))
231  {
232  }
234  state = std::move(o.state);
235  onnext = std::move(o.onnext);
236  onerror = std::move(o.onerror);
237  oncompleted = std::move(o.oncompleted);
238  return *this;
239  }
240 
241  void on_next(T& t) const {
242  onnext(state, t);
243  }
244  void on_next(T&& t) const {
245  onnext(state, std::move(t));
246  }
247  void on_error(rxu::error_ptr e) const {
248  onerror(state, e);
249  }
250  void on_completed() const {
251  oncompleted(state);
252  }
254  return observer<T>(*this);
255  }
256 };
257 
269 template<class T, class OnNext, class OnError, class OnCompleted>
270 class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T>
271 {
272 public:
273  using this_type = observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>;
274  using on_next_t = typename std::conditional<
275  !std::is_same<void, OnNext>::value,
276  rxu::decay_t<OnNext>,
277  detail::OnNextEmpty<T>>::type;
278  using on_error_t = typename std::conditional<
279  !std::is_same<void, OnError>::value,
280  rxu::decay_t<OnError>,
281  detail::OnErrorEmpty>::type;
282  using on_completed_t = typename std::conditional<
283  !std::is_same<void, OnCompleted>::value,
284  rxu::decay_t<OnCompleted>,
285  detail::OnCompletedEmpty>::type;
286 
287 private:
288  on_next_t onnext;
289  on_error_t onerror;
290  on_completed_t oncompleted;
291 
292 public:
293  static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);");
294  static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);");
295  static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
296 
297  observer()
298  : onnext(on_next_t())
299  , onerror(on_error_t())
300  , oncompleted(on_completed_t())
301  {
302  }
303 
304  explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
305  : onnext(std::move(n))
306  , onerror(std::move(e))
307  , oncompleted(std::move(c))
308  {
309  }
310  observer(const this_type& o)
311  : onnext(o.onnext)
312  , onerror(o.onerror)
313  , oncompleted(o.oncompleted)
314  {
315  }
316  observer(this_type&& o)
317  : onnext(std::move(o.onnext))
318  , onerror(std::move(o.onerror))
319  , oncompleted(std::move(o.oncompleted))
320  {
321  }
322  this_type& operator=(this_type o) {
323  onnext = std::move(o.onnext);
324  onerror = std::move(o.onerror);
325  oncompleted = std::move(o.oncompleted);
326  return *this;
327  }
328 
329  void on_next(T& t) const {
330  onnext(t);
331  }
332  void on_next(T&& t) const {
333  onnext(std::move(t));
334  }
335  void on_error(rxu::error_ptr e) const {
336  onerror(e);
337  }
338  void on_completed() const {
339  oncompleted();
340  }
341  observer<T> as_dynamic() const {
342  return observer<T>(*this);
343  }
344 };
345 
346 namespace detail
347 {
348 
349 template<class T>
350 struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>>
351 {
352  virtual ~virtual_observer() {}
353  virtual void on_next(T&) const {};
354  virtual void on_next(T&&) const {};
355  virtual void on_error(rxu::error_ptr) const {};
356  virtual void on_completed() const {};
357 };
358 
359 template<class T, class Observer>
360 struct specific_observer : public virtual_observer<T>
361 {
362  explicit specific_observer(Observer o)
363  : destination(std::move(o))
364  {
365  }
366 
367  Observer destination;
368  virtual void on_next(T& t) const {
369  destination.on_next(t);
370  }
371  virtual void on_next(T&& t) const {
372  destination.on_next(std::move(t));
373  }
374  virtual void on_error(rxu::error_ptr e) const {
375  destination.on_error(e);
376  }
377  virtual void on_completed() const {
378  destination.on_completed();
379  }
380 };
381 
382 }
383 
392 template<class T>
393 class observer<T, void, void, void, void> : public observer_base<T>
394 {
395 public:
397 
398 private:
400  using base_type = observer_base<T>;
401  using virtual_observer = detail::virtual_observer<T>;
402 
403  std::shared_ptr<virtual_observer> destination;
404 
405  template<class Observer>
406  static auto make_destination(Observer o)
407  -> std::shared_ptr<virtual_observer> {
408  return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o));
409  }
410 
411 public:
413  {
414  }
415  observer(const this_type& o)
416  : destination(o.destination)
417  {
418  }
420  : destination(std::move(o.destination))
421  {
422  }
423 
424  template<class Observer>
425  explicit observer(Observer o)
426  : destination(make_destination(std::move(o)))
427  {
428  }
429 
431  destination = std::move(o.destination);
432  return *this;
433  }
434 
435  // perfect forwarding delays the copy of the value.
436  template<class V>
437  void on_next(V&& v) const {
438  if (destination) {
439  destination->on_next(std::forward<V>(v));
440  }
441  }
442  void on_error(rxu::error_ptr e) const {
443  if (destination) {
444  destination->on_error(e);
445  }
446  }
447  void on_completed() const {
448  if (destination) {
449  destination->on_completed();
450  }
451  }
452 
454  return *this;
455  }
456 };
457 
458 template<class T, class DefaultOnError = detail::OnErrorEmpty>
462 }
463 
464 template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted>
468 }
469 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
470 auto make_observer(Observer ob)
471  -> typename std::enable_if<
472  !detail::is_on_next_of<T, Observer>::value &&
473  !detail::is_on_error<Observer>::value &&
475  Observer>::type {
476  return std::move(ob);
477 }
478 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
479 auto make_observer(Observer ob)
480  -> typename std::enable_if<
481  !detail::is_on_next_of<T, Observer>::value &&
482  !detail::is_on_error<Observer>::value &&
484  observer<T, Observer>>::type {
485  return observer<T, Observer>(std::move(ob));
486 }
487 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext>
488 auto make_observer(OnNext on)
489  -> typename std::enable_if<
490  detail::is_on_next_of<T, OnNext>::value,
493  std::move(on));
494 }
495 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError>
496 auto make_observer(OnError oe)
497  -> typename std::enable_if<
498  !detail::is_on_next_of<T, OnError>::value &&
499  detail::is_on_error<OnError>::value,
502  detail::OnNextEmpty<T>(), std::move(oe));
503 }
504 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError>
505 auto make_observer(OnNext on, OnError oe)
506  -> typename std::enable_if<
507  detail::is_on_next_of<T, OnNext>::value &&
508  detail::is_on_error<OnError>::value,
511  std::move(on), std::move(oe));
512 }
513 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted>
514 auto make_observer(OnNext on, OnCompleted oc)
515  -> typename std::enable_if<
516  detail::is_on_next_of<T, OnNext>::value &&
517  detail::is_on_completed<OnCompleted>::value,
520  std::move(on), DefaultOnError(), std::move(oc));
521 }
522 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted>
523 auto make_observer(OnNext on, OnError oe, OnCompleted oc)
524  -> typename std::enable_if<
525  detail::is_on_next_of<T, OnNext>::value &&
526  detail::is_on_error<OnError>::value &&
527  detail::is_on_completed<OnCompleted>::value,
530  std::move(on), std::move(oe), std::move(oc));
531 }
532 
533 
534 template<class T, class State, class OnNext>
535 auto make_observer(State os, OnNext on)
536  -> typename std::enable_if<
537  !detail::is_on_next_of<T, State>::value &&
538  !detail::is_on_error<State>::value,
541  std::move(os), std::move(on));
542 }
543 template<class T, class State, class OnError>
544 auto make_observer(State os, OnError oe)
545  -> typename std::enable_if<
546  !detail::is_on_next_of<T, State>::value &&
547  !detail::is_on_error<State>::value &&
548  detail::is_on_error_for<State, OnError>::value,
551  std::move(os), detail::OnNextEmpty<T>(), std::move(oe));
552 }
553 template<class T, class State, class OnNext, class OnError>
554 auto make_observer(State os, OnNext on, OnError oe)
555  -> typename std::enable_if<
556  !detail::is_on_next_of<T, State>::value &&
557  !detail::is_on_error<State>::value &&
558  detail::is_on_error_for<State, OnError>::value,
561  std::move(os), std::move(on), std::move(oe));
562 }
563 template<class T, class State, class OnNext, class OnCompleted>
564 auto make_observer(State os, OnNext on, OnCompleted oc)
565  -> typename std::enable_if<
566  !detail::is_on_next_of<T, State>::value &&
567  !detail::is_on_error<State>::value,
570  std::move(os), std::move(on), std::move(oc));
571 }
572 template<class T, class State, class OnNext, class OnError, class OnCompleted>
573 auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc)
574  -> typename std::enable_if<
575  !detail::is_on_next_of<T, State>::value &&
576  !detail::is_on_error<State>::value &&
577  detail::is_on_error_for<State, OnError>::value,
580  std::move(os), std::move(on), std::move(oe), std::move(oc));
581 }
582 
583 template<class T, class Observer>
584 auto make_observer_dynamic(Observer o)
585  -> typename std::enable_if<
586  !detail::is_on_next_of<T, Observer>::value,
587  observer<T>>::type {
588  return observer<T>(std::move(o));
589 }
590 template<class T, class OnNext>
591 auto make_observer_dynamic(OnNext&& on)
592  -> typename std::enable_if<
593  detail::is_on_next_of<T, OnNext>::value,
594  observer<T>>::type {
595  return observer<T>(
596  make_observer<T>(std::forward<OnNext>(on)));
597 }
598 template<class T, class OnNext, class OnError>
599 auto make_observer_dynamic(OnNext&& on, OnError&& oe)
600  -> typename std::enable_if<
601  detail::is_on_next_of<T, OnNext>::value &&
602  detail::is_on_error<OnError>::value,
603  observer<T>>::type {
604  return observer<T>(
605  make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe)));
606 }
607 template<class T, class OnNext, class OnCompleted>
608 auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
609  -> typename std::enable_if<
610  detail::is_on_next_of<T, OnNext>::value &&
611  detail::is_on_completed<OnCompleted>::value,
612  observer<T>>::type {
613  return observer<T>(
614  make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc)));
615 }
616 template<class T, class OnNext, class OnError, class OnCompleted>
617 auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
618  -> typename std::enable_if<
619  detail::is_on_next_of<T, OnNext>::value &&
620  detail::is_on_error<OnError>::value &&
621  detail::is_on_completed<OnCompleted>::value,
622  observer<T>>::type {
623  return observer<T>(
624  make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)));
625 }
626 
627 namespace detail {
628 
629 template<class F>
630 struct maybe_from_result
631 {
632  typedef decltype((*(F*)nullptr)()) decl_result_type;
633  typedef rxu::decay_t<decl_result_type> result_type;
634  typedef rxu::maybe<result_type> type;
635 };
636 
637 }
638 
639 template<class F, class OnError>
640 auto on_exception(const F& f, const OnError& c)
641  -> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
642  typename detail::maybe_from_result<F>::type r;
643  RXCPP_TRY {
644  r.reset(f());
645  } RXCPP_CATCH(...) {
647  }
648  return r;
649 }
650 
651 template<class F, class Subscriber>
652 auto on_exception(const F& f, const Subscriber& s)
653  -> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
654  typename detail::maybe_from_result<F>::type r;
655  RXCPP_TRY {
656  r.reset(f());
657  } RXCPP_CATCH(...) {
658  s.on_error(rxu::current_exception());
659  }
660  return r;
661 }
662 
663 }
664 
665 #endif
typename std::conditional< !std::is_same< void, OnCompleted >::value, rxu::decay_t< OnCompleted >, detail::OnCompletedForward< State, OnCompleted > >::type on_completed_t
Definition: rx-observer.hpp:195
#define RXCPP_TRY
Definition: rx-util.hpp:38
observer()
Definition: rx-observer.hpp:412
Definition: rx-predef.hpp:88
typename std::conditional< !std::is_same< void, OnError >::value, rxu::decay_t< OnError >, detail::OnErrorForward< State, OnError > >::type on_error_t
Definition: rx-observer.hpp:191
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
void on_error(rxu::error_ptr e) const
Definition: rx-observer.hpp:247
void on_error(rxu::error_ptr e) const
Definition: rx-observer.hpp:442
observer(state_t s, on_next_t n, on_completed_t c)
Definition: rx-observer.hpp:212
observer(Observer o)
Definition: rx-observer.hpp:425
observer(this_type &&o)
Definition: rx-observer.hpp:419
static const bool value
Definition: rx-predef.hpp:97
void on_next(T &&t) const
Definition: rx-observer.hpp:244
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
typename std::conditional< !std::is_same< void, OnNext >::value, rxu::decay_t< OnNext >, detail::OnNextForward< T, State, OnNext > >::type on_next_t
Definition: rx-observer.hpp:187
Definition: rx-predef.hpp:100
this_type & operator=(this_type o)
Definition: rx-observer.hpp:430
observer< T > as_dynamic() const
Definition: rx-observer.hpp:253
void on_next(T &t) const
Definition: rx-observer.hpp:241
void on_completed() const
Definition: rx-observer.hpp:447
observer(state_t s, on_next_t n=on_next_t(), on_error_t e=on_error_t(), on_completed_t c=on_completed_t())
Definition: rx-observer.hpp:205
this_type & operator=(this_type o)
Definition: rx-observer.hpp:233
observer(const this_type &o)
Definition: rx-observer.hpp:219
tag_dynamic_observer dynamic_observer_tag
Definition: rx-observer.hpp:396
tag_observer observer_tag
Definition: rx-observer.hpp:17
auto make_observer() -> observer< T, detail::stateless_observer_tag, detail::OnNextEmpty< T >, DefaultOnError >
Definition: rx-observer.hpp:459
void on_next(V &&v) const
Definition: rx-observer.hpp:437
error_ptr current_exception()
Definition: rx-util.hpp:943
Definition: rx-observer.hpp:14
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
observer(this_type &&o)
Definition: rx-observer.hpp:226
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
T value_type
Definition: rx-observer.hpp:16
consumes values from an observable using type-forgetting (shared allocated state with virtual methods...
Definition: rx-observer.hpp:393
auto make_observer_dynamic(Observer o) -> typename std::enable_if< !detail::is_on_next_of< T, Observer >::value, observer< T >>::type
Definition: rx-observer.hpp:584
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
Definition: rx-predef.hpp:90
void on_completed() const
Definition: rx-observer.hpp:250
observer(const this_type &o)
Definition: rx-observer.hpp:415
observer< T > as_dynamic() const
Definition: rx-observer.hpp:453
rxu::decay_t< State > state_t
Definition: rx-observer.hpp:183