RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23  struct not_void {};
24  template<class CS, class CT>
25  static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26  template<class CS, class CT>
27  static not_void check(...);
28 
29  typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30  static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
37  : public rxs::source_base<T>
38 {
39  struct state_type
40  : public std::enable_shared_from_this<state_type>
41  {
42  typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44  onsubscribe_type on_subscribe;
45  };
46  std::shared_ptr<state_type> state;
47 
48  template<class U>
49  friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51  template<class SO>
52  void construct(SO&& source, rxs::tag_source&&) {
53  rxu::decay_t<SO> so = std::forward<SO>(source);
54  state->on_subscribe = [so](subscriber<T> o) mutable {
55  so.on_subscribe(std::move(o));
56  };
57  }
58 
59  struct tag_function {};
60  template<class F>
61  void construct(F&& f, tag_function&&) {
62  state->on_subscribe = std::forward<F>(f);
63  }
64 
65 public:
66 
68 
70  {
71  }
72 
73  template<class SOF>
74  explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75  : state(std::make_shared<state_type>())
76  {
77  construct(std::forward<SOF>(sof),
78  typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79  }
80 
81  void on_subscribe(subscriber<T> o) const {
82  state->on_subscribe(std::move(o));
83  }
84 
85  template<class Subscriber>
86  typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
87  on_subscribe(Subscriber o) const {
88  state->on_subscribe(o.as_dynamic());
89  }
90 };
91 
92 template<class T>
93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94  return lhs.state == rhs.state;
95 }
96 template<class T>
97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98  return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
103  return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113  typedef typename SO::type type;
114  typedef typename type::value_type value_type;
115  static const bool value = true;
116  typedef observable<value_type, type> observable_type;
117  template<class... AN>
118  static observable_type make(const Default&, AN&&... an) {
119  return observable_type(type(std::forward<AN>(an)...));
120  }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125  static const bool value = false;
126  typedef Default observable_type;
127  template<class... AN>
128  static observable_type make(const observable_type& that, const AN&...) {
129  return that;
130  }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135  typedef typename SO::type type;
136  typedef typename type::value_type value_type;
137  static const bool value = true;
138  typedef observable<value_type, type> observable_type;
139  template<class... AN>
140  static observable_type make(AN&&... an) {
141  return observable_type(type(std::forward<AN>(an)...));
142  }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147  static const bool value = false;
148  typedef void observable_type;
149  template<class... AN>
150  static observable_type make(const AN&...) {
151  }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
158  : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
168 template<class T, class Observable>
170 {
171  template<class Obsvbl, class... ArgN>
172  static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173  -> void {
174  std::mutex lock;
175  std::condition_variable wake;
176  bool disposed = false;
178 
179  auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
180 
181  // keep any error to rethrow at the end.
182  auto scbr = make_subscriber<T>(
183  dest,
184  [&](T t){dest.on_next(t);},
185  [&](rxu::error_ptr e){
186  if (do_rethrow) {
187  error = e;
188  } else {
189  dest.on_error(e);
190  }
191  },
192  [&](){dest.on_completed();}
193  );
194 
195  auto cs = scbr.get_subscription();
196  cs.add(
197  [&](){
198  std::unique_lock<std::mutex> guard(lock);
199  wake.notify_one();
200  disposed = true;
201  });
202 
203  source.subscribe(std::move(scbr));
204 
205  std::unique_lock<std::mutex> guard(lock);
206  wake.wait(guard,
207  [&](){
208  return disposed;
209  });
210 
212  }
213 
214 public:
218  {
219  }
221 
237  template<class... ArgN>
238  auto subscribe(ArgN&&... an) const
239  -> void {
240  return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
241  }
242 
262  template<class... ArgN>
263  auto subscribe_with_rethrow(ArgN&&... an) const
264  -> void {
265  return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
266  }
267 
283  template<class... AN>
284  auto first(AN**...) -> delayed_type_t<T, AN...> const {
285  rxu::maybe<T> result;
288  cs,
289  [&](T v){result.reset(v); cs.unsubscribe();});
290  if (result.empty())
291  rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
292  return result.get();
293  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
294  }
295 
311  template<class... AN>
312  auto last(AN**...) -> delayed_type_t<T, AN...> const {
313  rxu::maybe<T> result;
315  [&](T v){result.reset(v);});
316  if (result.empty())
317  rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
318  return result.get();
319  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
320  }
321 
334  int count() const {
335  int result = 0;
336  source.count().as_blocking().subscribe_with_rethrow(
337  [&](int v){result = v;});
338  return result;
339  }
340 
358  T sum() const {
359  return source.sum().as_blocking().last();
360  }
361 
379  double average() const {
380  return source.average().as_blocking().last();
381  }
382 
400  T max() const {
401  return source.max().as_blocking().last();
402  }
403 
421  T min() const {
422  return source.min().as_blocking().last();
423  }
424 };
425 
426 namespace detail {
427 
428 template<class SourceOperator, class Subscriber>
429 struct safe_subscriber
430 {
431  safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
432 
433  void subscribe() {
434  RXCPP_TRY {
435  so->on_subscribe(*o);
436  } RXCPP_CATCH(...) {
437  if (!o->is_subscribed()) {
439  }
441  o->unsubscribe();
442  }
443  }
444 
445  void operator()(const rxsc::schedulable&) {
446  subscribe();
447  }
448 
449  SourceOperator* so;
450  Subscriber* o;
451 };
452 
453 }
454 
455 template<>
456 class observable<void, void>;
457 
477 template<class T, class SourceOperator>
479  : public observable_base<T>
480 {
481  static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
482 
484 
485 public:
488 
489 private:
490 
491  template<class U, class SO>
492  friend class observable;
493 
494  template<class U, class SO>
495  friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
496 
497  template<class Subscriber>
498  auto detail_subscribe(Subscriber o) const
500 
501  typedef rxu::decay_t<Subscriber> subscriber_type;
502 
503  static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
504  static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
505  static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
506 
507  trace_activity().subscribe_enter(*this, o);
508 
509  if (!o.is_subscribed()) {
510  trace_activity().subscribe_return(*this);
511  return o.get_subscription();
512  }
513 
514  detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
515 
516  // make sure to let current_thread take ownership of the thread as early as possible.
517  if (rxsc::current_thread::is_schedule_required()) {
518  const auto& sc = rxsc::make_current_thread();
519  sc.create_worker(o.get_subscription()).schedule(subscriber);
520  } else {
521  // current_thread already owns this thread.
522  subscriber.subscribe();
523  }
524 
525  trace_activity().subscribe_return(*this);
526  return o.get_subscription();
527  }
528 
529 public:
530  typedef T value_type;
531 
532  static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
533 
535  {
536  }
537 
539  {
540  }
541 
542  explicit observable(const source_operator_type& o)
543  : source_operator(o)
544  {
545  }
547  : source_operator(std::move(o))
548  {
549  }
550 
552  template<class SO>
555  {}
557  template<class SO>
559  : source_operator(std::move(o.source_operator))
560  {}
561 
562 #if 0
563  template<class I>
564  void on_subscribe(observer<T, I> o) const {
565  source_operator.on_subscribe(o);
566  }
567 #endif
568 
571  template<class... AN>
573  return *this;
574  static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
575  }
576 
579  template<class... AN>
581  return blocking_observable<T, this_type>(*this);
582  static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
583  }
584 
586 
592  template<class OperatorFactory>
593  auto op(OperatorFactory&& of) const
594  -> decltype(of(*(const this_type*)nullptr)) {
595  return of(*this);
596  static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
597  }
598 
601  template<class ResultType, class Operator>
602  auto lift(Operator&& op) const
603  -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
604  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
605  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
606  static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
607  }
608 
614  template<class ResultType, class Operator>
615  auto lift_if(Operator&& op) const
616  -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
617  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
618  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
619  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
620  }
626  template<class ResultType, class Operator>
627  auto lift_if(Operator&&) const
628  -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
629  decltype(rxs::from<ResultType>())>::type {
630  return rxs::from<ResultType>();
631  }
633 
636  template<class... ArgN>
637  auto subscribe(ArgN&&... an) const
639  return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
640  }
641 
644  template<class... AN>
645  auto all(AN&&... an) const
647  -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
649  {
650  return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
651  }
652 
655  template<class... AN>
656  auto is_empty(AN&&... an) const
658  -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
660  {
661  return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
662  }
663 
666  template<class... AN>
667  auto any(AN&&... an) const
669  -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
671  {
672  return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
673  }
674 
677  template<class... AN>
678  auto exists(AN&&... an) const
680  -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
682  {
683  return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
684  }
685 
688  template<class... AN>
689  auto contains(AN&&... an) const
691  -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
693  {
694  return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
695  }
696 
699  template<class... AN>
700  auto filter(AN&&... an) const
702  -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
704  {
705  return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
706  }
707 
710  template<class... AN>
711  auto switch_if_empty(AN&&... an) const
713  -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
715  {
716  return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
717  }
718 
721  template<class... AN>
722  auto default_if_empty(AN&&... an) const
724  -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
726  {
727  return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
728  }
729 
732  template<class... AN>
733  auto sequence_equal(AN... an) const
735  -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
737  {
738  return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
739  }
740 
743  template<class... AN>
744  auto tap(AN&&... an) const
746  -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
748  {
749  return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
750  }
751 
754  template<class... AN>
755  auto time_interval(AN&&... an) const
757  -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
759  {
760  return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
761  }
762 
765  template<class... AN>
766  auto timeout(AN&&... an) const
768  -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
770  {
771  return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
772  }
773 
776  template<class... AN>
777  auto timestamp(AN&&... an) const
779  -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
781  {
782  return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
783  }
784 
787  template<class... AN>
788  auto finally(AN&&... an) const
790  -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
792  {
793  return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
794  }
795 
798  template<class... AN>
799  auto on_error_resume_next(AN&&... an) const
801  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
803  {
804  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
805  }
806 
809  template<class... AN>
810  auto switch_on_error(AN&&... an) const
812  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
814  {
815  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
816  }
817 
820  template<class... AN>
821  auto map(AN&&... an) const
823  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
825  {
826  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
827  }
828 
831  template<class... AN>
832  auto transform(AN&&... an) const
834  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
836  {
837  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
838  }
839 
842  template<class... AN>
843  auto debounce(AN&&... an) const
845  -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
847  {
848  return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
849  }
850 
853  template<class... AN>
854  auto delay(AN&&... an) const
856  -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
858  {
859  return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
860  }
861 
864  template<class... AN>
865  auto distinct(AN&&... an) const
867  -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
869  {
870  return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
871  }
872 
875  template<class... AN>
876  auto distinct_until_changed(AN&&... an) const
878  -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
880  {
881  return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
882  }
883 
886  template<class... AN>
887  auto element_at(AN&&... an) const
889  -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
891  {
892  return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
893  }
894 
897  template<class... AN>
898  auto window(AN&&... an) const
900  -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
902  {
903  return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
904  }
905 
908  template<class... AN>
909  auto window_with_time(AN&&... an) const
911  -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
913  {
914  return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
915  }
916 
919  template<class... AN>
920  auto window_with_time_or_count(AN&&... an) const
922  -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
924  {
925  return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
926  }
927 
930  template<class... AN>
931  auto window_toggle(AN&&... an) const
933  -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
935  {
936  return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
937  }
938 
941  template<class... AN>
942  auto buffer(AN&&... an) const
944  -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
946  {
947  return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
948  }
949 
952  template<class... AN>
953  auto buffer_with_time(AN&&... an) const
955  -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
957  {
958  return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
959  }
960 
963  template<class... AN>
964  auto buffer_with_time_or_count(AN&&... an) const
966  -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
968  {
969  return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
970  }
971 
974  template<class... AN>
975  auto switch_on_next(AN&&... an) const
977  -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
979  {
980  return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
981  }
982 
985  template<class... AN>
986  auto merge(AN... an) const
988  -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
990  {
991  return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
992  }
993 
996  template<class... AN>
997  auto merge_delay_error(AN... an) const
999  -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1001  {
1002  return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
1003  }
1004 
1007  template<class... AN>
1008  auto amb(AN... an) const
1010  -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1012  {
1013  return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1014  }
1015 
1018  template<class... AN>
1019  auto flat_map(AN&&... an) const
1021  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1023  {
1024  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1025  }
1026 
1029  template<class... AN>
1030  auto merge_transform(AN&&... an) const
1032  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1034  {
1035  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1036  }
1037 
1040  template<class... AN>
1041  auto concat(AN... an) const
1043  -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1045  {
1046  return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1047  }
1048 
1051  template<class... AN>
1052  auto concat_map(AN&&... an) const
1054  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1056  {
1057  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1058  }
1059 
1062  template<class... AN>
1063  auto concat_transform(AN&&... an) const
1065  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1067  {
1068  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1069  }
1070 
1073  template<class... AN>
1074  auto with_latest_from(AN... an) const
1076  -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1078  {
1079  return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1080  }
1081 
1082 
1085  template<class... AN>
1086  auto combine_latest(AN... an) const
1088  -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1090  {
1091  return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1092  }
1093 
1096  template<class... AN>
1097  auto zip(AN&&... an) const
1099  -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1101  {
1102  return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1103  }
1104 
1107  template<class... AN>
1108  inline auto group_by(AN&&... an) const
1110  -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1112  {
1113  return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1114  }
1115 
1118  template<class... AN>
1119  auto ignore_elements(AN&&... an) const
1121  -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1123  {
1124  return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1125  }
1126 
1129  template<class... AN>
1130  auto multicast(AN&&... an) const
1132  -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1134  {
1135  return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1136  }
1137 
1140  template<class... AN>
1141  auto publish(AN&&... an) const
1143  -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1145  {
1146  return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1147  }
1148 
1151  template<class... AN>
1152  auto publish_synchronized(AN&&... an) const
1154  -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1156  {
1157  return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1158  }
1159 
1162  template<class... AN>
1163  auto replay(AN&&... an) const
1165  -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1167  {
1168  return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1169  }
1170 
1173  template<class... AN>
1174  auto subscribe_on(AN&&... an) const
1176  -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1178  {
1179  return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1180  }
1181 
1184  template<class... AN>
1185  auto observe_on(AN&&... an) const
1187  -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1189  {
1190  return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1191  }
1192 
1195  template<class... AN>
1196  auto reduce(AN&&... an) const
1198  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1200  {
1201  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1202  }
1203 
1206  template<class... AN>
1207  auto accumulate(AN&&... an) const
1209  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1211  {
1212  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1213  }
1214 
1217  template<class... AN>
1218  auto first(AN**...) const
1222  {
1224  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1225  }
1226 
1229  template<class... AN>
1230  auto last(AN**...) const
1234  {
1236  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1237  }
1238 
1241  template<class... AN>
1242  auto count(AN**...) const
1246  {
1248  static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1249  }
1250 
1253  template<class... AN>
1254  auto sum(AN**...) const
1258  {
1260  static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1261  }
1262 
1265  template<class... AN>
1266  auto average(AN**...) const
1270  {
1272  static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1273  }
1274 
1277  template<class... AN>
1278  auto max(AN**...) const
1282  {
1284  static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1285  }
1286 
1289  template<class... AN>
1290  auto min(AN**...) const
1294  {
1296  static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1297  }
1298 
1301  template<class... AN>
1302  auto scan(AN... an) const
1304  -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1306  {
1307  return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1308  }
1309 
1312  template<class... AN>
1313  auto sample_with_time(AN&&... an) const
1315  -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1317  {
1318  return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1319  }
1320 
1323  template<class... AN>
1324  auto skip(AN... an) const
1326  -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1328  {
1329  return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1330  }
1331 
1334  template<class... AN>
1335  auto skip_while(AN... an) const
1337  -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1339  {
1340  return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
1341  }
1342 
1345  template<class... AN>
1346  auto skip_last(AN... an) const
1348  -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1350  {
1351  return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1352  }
1353 
1356  template<class... AN>
1357  auto skip_until(AN... an) const
1359  -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1361  {
1362  return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1363  }
1364 
1367  template<class... AN>
1368  auto take(AN... an) const
1370  -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1372  {
1373  return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1374  }
1375 
1378  template<class... AN>
1379  auto take_last(AN&&... an) const
1381  -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1383  {
1384  return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1385  }
1386 
1389  template<class... AN>
1390  auto take_until(AN&&... an) const
1392  -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1394  {
1395  return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1396  }
1397 
1400  template<class... AN>
1401  auto take_while(AN&&... an) const
1403  -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1405  {
1406  return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1407  }
1408 
1411  template<class... AN>
1412  auto repeat(AN... an) const
1414  -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1416  {
1417  return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1418  }
1419 
1422  template<class... AN>
1423  auto retry(AN... an) const
1425  -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1427  {
1428  return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1429  }
1430 
1433  template<class... AN>
1434  auto start_with(AN... an) const
1436  -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1438  {
1439  return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1440  }
1441 
1444  template<class... AN>
1445  auto pairwise(AN... an) const
1447  -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1449  {
1450  return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1451  }
1452 };
1453 
1454 template<class T, class SourceOperator>
1456  return lhs.source_operator == rhs.source_operator;
1457 }
1458 template<class T, class SourceOperator>
1460  return !(lhs == rhs);
1461 }
1462 
1545 template<>
1546 class observable<void, void>
1547 {
1548  ~observable();
1549 public:
1552  template<class T, class OnSubscribe>
1553  static auto create(OnSubscribe os)
1554  -> decltype(rxs::create<T>(std::move(os))) {
1555  return rxs::create<T>(std::move(os));
1556  }
1557 
1560  template<class T>
1561  static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1562  -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1563  return rxs::range<T>(first, last, step, identity_current_thread());
1564  }
1567  template<class T, class Coordination>
1568  static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1569  -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1570  return rxs::range<T>(first, last, step, std::move(cn));
1571  }
1574  template<class T, class Coordination>
1575  static auto range(T first, T last, Coordination cn)
1576  -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1577  return rxs::range<T>(first, last, std::move(cn));
1578  }
1581  template<class T, class Coordination>
1582  static auto range(T first, Coordination cn)
1583  -> decltype(rxs::range<T>(first, std::move(cn))) {
1584  return rxs::range<T>(first, std::move(cn));
1585  }
1586 
1589  template<class T>
1590  static auto never()
1591  -> decltype(rxs::never<T>()) {
1592  return rxs::never<T>();
1593  }
1594 
1597  template<class ObservableFactory>
1598  static auto defer(ObservableFactory of)
1599  -> decltype(rxs::defer(std::move(of))) {
1600  return rxs::defer(std::move(of));
1601  }
1602 
1605  template<class... AN>
1606  static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1607  -> decltype(rxs::interval(period)) {
1608  return rxs::interval(period);
1609  static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1610  }
1613  template<class Coordination>
1614  static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1615  -> decltype(rxs::interval(period, std::move(cn))) {
1616  return rxs::interval(period, std::move(cn));
1617  }
1620  template<class... AN>
1621  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1622  -> decltype(rxs::interval(initial, period)) {
1623  return rxs::interval(initial, period);
1624  static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1625  }
1628  template<class Coordination>
1629  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1630  -> decltype(rxs::interval(initial, period, std::move(cn))) {
1631  return rxs::interval(initial, period, std::move(cn));
1632  }
1633 
1636  template<class... AN>
1637  static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1638  -> decltype(rxs::timer(at)) {
1639  return rxs::timer(at);
1640  static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1641  }
1644  template<class... AN>
1645  static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1646  -> decltype(rxs::timer(after)) {
1647  return rxs::timer(after);
1648  static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1649  }
1652  template<class Coordination>
1653  static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1654  -> decltype(rxs::timer(when, std::move(cn))) {
1655  return rxs::timer(when, std::move(cn));
1656  }
1659  template<class Coordination>
1660  static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1661  -> decltype(rxs::timer(when, std::move(cn))) {
1662  return rxs::timer(when, std::move(cn));
1663  }
1664 
1667  template<class Collection>
1668  static auto iterate(Collection c)
1669  -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1670  return rxs::iterate(std::move(c), identity_current_thread());
1671  }
1674  template<class Collection, class Coordination>
1675  static auto iterate(Collection c, Coordination cn)
1676  -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1677  return rxs::iterate(std::move(c), std::move(cn));
1678  }
1679 
1682  template<class T>
1683  static auto from()
1684  -> decltype( rxs::from<T>()) {
1685  return rxs::from<T>();
1686  }
1689  template<class T, class Coordination>
1690  static auto from(Coordination cn)
1691  -> typename std::enable_if<is_coordination<Coordination>::value,
1692  decltype( rxs::from<T>(std::move(cn)))>::type {
1693  return rxs::from<T>(std::move(cn));
1694  }
1697  template<class Value0, class... ValueN>
1698  static auto from(Value0 v0, ValueN... vn)
1699  -> typename std::enable_if<!is_coordination<Value0>::value,
1700  decltype( rxs::from(v0, vn...))>::type {
1701  return rxs::from(v0, vn...);
1702  }
1705  template<class Coordination, class Value0, class... ValueN>
1706  static auto from(Coordination cn, Value0 v0, ValueN... vn)
1707  -> typename std::enable_if<is_coordination<Coordination>::value,
1708  decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1709  return rxs::from(std::move(cn), v0, vn...);
1710  }
1711 
1714  template<class T>
1715  static auto just(T v)
1716  -> decltype(rxs::just(std::move(v))) {
1717  return rxs::just(std::move(v));
1718  }
1721  template<class T, class Coordination>
1722  static auto just(T v, Coordination cn)
1723  -> decltype(rxs::just(std::move(v), std::move(cn))) {
1724  return rxs::just(std::move(v), std::move(cn));
1725  }
1726 
1729  template<class Observable, class Value0, class... ValueN>
1730  static auto start_with(Observable o, Value0 v0, ValueN... vn)
1731  -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1732  return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1733  }
1734 
1737  template<class T>
1738  static auto empty()
1739  -> decltype(from<T>()) {
1740  return from<T>();
1741  }
1744  template<class T, class Coordination>
1745  static auto empty(Coordination cn)
1746  -> decltype(from<T>(std::move(cn))) {
1747  return from<T>(std::move(cn));
1748  }
1749 
1752  template<class T, class Exception>
1753  static auto error(Exception&& e)
1754  -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1755  return rxs::error<T>(std::forward<Exception>(e));
1756  }
1759  template<class T, class Exception, class Coordination>
1760  static auto error(Exception&& e, Coordination cn)
1761  -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1762  return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1763  }
1764 
1767  template<class ResourceFactory, class ObservableFactory>
1768  static auto scope(ResourceFactory rf, ObservableFactory of)
1769  -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1770  return rxs::scope(std::move(rf), std::move(of));
1771  }
1772 };
1773 
1774 }
1775 
1776 //
1777 // support range() >> filter() >> subscribe() syntax
1778 // '>>' is spelled 'stream'
1779 //
1780 template<class T, class SourceOperator, class OperatorFactory>
1781 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1782  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1783  return source.op(std::forward<OperatorFactory>(of));
1784 }
1785 
1786 //
1787 // support range() | filter() | subscribe() syntax
1788 // '|' is spelled 'pipe'
1789 //
1790 template<class T, class SourceOperator, class OperatorFactory>
1791 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1792  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793  return source.op(std::forward<OperatorFactory>(of));
1794 }
1795 
1796 #endif
auto distinct_until_changed(AN &&... an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:876
Definition: rx-operators.hpp:126
#define RXCPP_TRY
Definition: rx-util.hpp:38
auto delay(AN &&... an) const
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-observable.hpp:854
auto skip_last(AN... an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1346
Definition: rx-operators.hpp:366
auto with_latest_from(AN... an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1074
auto time_interval(AN &&... an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:755
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< rxu::error_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:117
auto concat_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1063
Definition: rx-operators.hpp:248
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1242
Definition: rx-operators.hpp:380
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-observable.hpp:1290
Definition: rx-operators.hpp:143
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto merge_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1030
auto retry(AN... an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1423
Definition: rx-observable.hpp:157
auto skip_until(AN... an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1357
Definition: rx-operators.hpp:387
Definition: rx-operators.hpp:458
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-observable.hpp:1553
Definition: rx-all.hpp:26
auto skip(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1324
Definition: rx-predef.hpp:302
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1575
auto element_at(AN &&... an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:887
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:553
Definition: rx-operators.hpp:329
auto any(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:667
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
auto reduce(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1196
source_operator_type source_operator
Definition: rx-observable.hpp:487
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1690
RXCPP_NORETURN void rethrow_current_exception()
Definition: rx-util.hpp:933
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void **>::type=0)
Definition: rx-observable.hpp:74
auto timeout(AN &&... an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:766
T max() const
Definition: rx-observable.hpp:400
auto take_while(AN &&... an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1401
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:444
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
Definition: rx-operators.hpp:157
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:580
Definition: rx-operators.hpp:282
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1781
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:215
auto window(AN &&... an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:898
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1152
auto subscribe(ArgN &&... an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:637
Definition: rx-operators.hpp:289
Definition: rx-operators.hpp:296
Definition: rx-operators.hpp:352
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto combine_latest(AN... an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1086
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1675
Definition: rx-operators.hpp:421
observable(const source_operator_type &o)
Definition: rx-observable.hpp:542
Definition: rx-operators.hpp:500
Definition: rx-operators.hpp:394
Definition: rx-operators.hpp:472
Definition: rx-operators.hpp:331
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1668
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:530
auto tap(AN &&... an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:744
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:238
auto zip(AN &&... an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1097
Definition: rx-operators.hpp:521
auto scan(AN... an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1302
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:263
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
Definition: rx-operators.hpp:345
auto start_with(AN... an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1434
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:486
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1606
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:678
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
RXCPP_NORETURN void throw_exception(E &&e)
Definition: rx-util.hpp:920
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-observable.hpp:1278
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
auto switch_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable...
Definition: rx-observable.hpp:711
auto filter(AN &&... an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:700
~blocking_observable()
Definition: rx-observable.hpp:217
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1715
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1698
Definition: rx-operators.hpp:373
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1598
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item...
Definition: rx-observable.hpp:689
Definition: rx-operators.hpp:316
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1730
Definition: rx-sources.hpp:15
Definition: rx-operators.hpp:451
Definition: rx-operators.hpp:227
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:416
auto debounce(AN &&... an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:843
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1130
Definition: rx-observable.hpp:36
auto group_by(AN &&... an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1108
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1637
auto take_last(AN &&... an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1379
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1653
~observable()
Definition: rx-observable.hpp:534
auto map(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:821
auto sample_with_time(AN &&... an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1313
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:478
Definition: rx-operators.hpp:430
auto pairwise(AN... an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1445
auto replay(AN &&... an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-observable.hpp:1163
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
Definition: rx-operators.hpp:268
auto sequence_equal(AN... an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:733
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:437
auto merge_delay_error(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
Definition: rx-observable.hpp:997
Definition: rx-operators.hpp:359
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1683
Definition: rx-operators.hpp:479
Definition: rx-operators.hpp:136
auto repeat(AN... an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1412
auto take(AN... an) const
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-observable.hpp:1368
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:192
auto subscribe_on(AN &&... an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1174
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
auto distinct(AN &&... an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:865
auto skip_while(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1335
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1760
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1561
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1738
Definition: rx-operators.hpp:486
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto merge(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-observable.hpp:986
error_ptr current_exception()
Definition: rx-util.hpp:943
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:220
Definition: rx-predef.hpp:270
auto buffer(AN &&... an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:942
Definition: rx-operators.hpp:185
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1582
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1621
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1753
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1629
observable_type source
Definition: rx-observable.hpp:216
auto buffer_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:964
Definition: rx-operators.hpp:465
Definition: rx-operators.hpp:261
Definition: rx-operators.hpp:275
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto switch_on_error(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:810
Definition: rx-operators.hpp:493
auto switch_on_next(AN &&... an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:975
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
auto flat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1019
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:572
auto window_with_time(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:909
auto all(AN &&... an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:645
auto amb(AN... an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1008
Definition: rx-operators.hpp:150
auto window_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:920
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1660
auto take_until(AN &&... an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1390
auto transform(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:832
Definition: rx-operators.hpp:338
int count() const
Definition: rx-observable.hpp:334
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
void unsubscribe() const
Definition: rx-subscription.hpp:178
auto ignore_elements(AN &&... an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1119
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1706
blocking_observable(observable_type s)
Definition: rx-observable.hpp:220
T sum() const
Definition: rx-observable.hpp:358
Definition: rx-operators.hpp:164
auto timestamp(AN &&... an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:777
Definition: rx-operators.hpp:323
Definition: rx-operators.hpp:507
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-observable.hpp:656
T min() const
Definition: rx-observable.hpp:421
auto buffer_with_time(AN &&... an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:953
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
Definition: rx-predef.hpp:128
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1768
Definition: rx-operators.hpp:423
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes...
Definition: rx-observable.hpp:722
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1614
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1568
double average() const
Definition: rx-observable.hpp:379
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:284
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1722
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
auto observe_on(AN &&... an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1185
auto accumulate(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1207
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:558
Definition: rx-predef.hpp:115
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
Definition: rx-sources.hpp:17
auto window_toggle(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:931
auto on_error_resume_next(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:799
observable()
Definition: rx-observable.hpp:538
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:312
static auto never() -> decltype(rxs::never< T >())
Returns an observable that never sends any items or notifications to observer.
Definition: rx-observable.hpp:1590
auto concat(AN... an) const
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-observable.hpp:1041
Definition: rx-predef.hpp:126
auto concat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1052
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
auto publish(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1141
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1745
Definition: rx-operators.hpp:415
Definition: rx-operators.hpp:408
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
observable(source_operator_type &&o)
Definition: rx-observable.hpp:546
Definition: rx-operators.hpp:401
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1645
Definition: rx-operators.hpp:514