5 #if !defined(RXCPP_RX_OBSERVABLE_HPP) 6 #define RXCPP_RX_OBSERVABLE_HPP 11 #define EXPLICIT_THIS this-> 20 template<
class Subscriber,
class T>
21 struct has_on_subscribe_for
24 template<
class CS,
class CT>
25 static auto check(
int) -> decltype((*(CT*)
nullptr).on_subscribe(*(CS*)
nullptr));
26 template<
class CS,
class CT>
27 static not_void check(...);
29 typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30 static const bool value = std::is_same<detail_result, void>::value;
40 :
public std::enable_shared_from_this<state_type>
42 typedef std::function<void(subscriber<T>)> onsubscribe_type;
46 std::shared_ptr<state_type> state;
55 so.on_subscribe(std::move(o));
59 struct tag_function {};
61 void construct(F&& f, tag_function&&) {
62 state->on_subscribe = std::forward<F>(f);
75 : state(std::make_shared<state_type>())
77 construct(std::forward<SOF>(sof),
82 state->on_subscribe(std::move(o));
85 template<
class Subscriber>
86 typename std::enable_if<is_subscriber<Subscriber>::value,
void>::type
88 state->on_subscribe(o.as_dynamic());
94 return lhs.state == rhs.state;
101 template<
class T,
class Source>
107 template<
bool Selector,
class Default,
class SO>
108 struct resolve_observable;
110 template<
class Default,
class SO>
111 struct resolve_observable<true, Default, SO>
113 typedef typename SO::type type;
114 typedef typename type::value_type value_type;
115 static const bool value =
true;
116 typedef observable<value_type, type> observable_type;
117 template<
class...
AN>
118 static observable_type make(
const Default&,
AN&&... an) {
119 return observable_type(type(std::forward<AN>(an)...));
122 template<
class Default,
class SO>
123 struct resolve_observable<false, Default, SO>
125 static const bool value =
false;
126 typedef Default observable_type;
127 template<
class...
AN>
128 static observable_type make(
const observable_type& that,
const AN&...) {
133 struct resolve_observable<true, void, SO>
135 typedef typename SO::type type;
136 typedef typename type::value_type value_type;
137 static const bool value =
true;
138 typedef observable<value_type, type> observable_type;
139 template<
class...
AN>
140 static observable_type make(
AN&&... an) {
141 return observable_type(type(std::forward<AN>(an)...));
145 struct resolve_observable<false, void, SO>
147 static const bool value =
false;
148 typedef void observable_type;
149 template<
class...
AN>
150 static observable_type make(
const AN&...) {
156 template<
class Selector,
class Default,
template<
class... TN>
class SO, class...
AN>
158 :
public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
168 template<
class T,
class Observable>
171 template<
class Obsvbl,
class... ArgN>
172 static auto blocking_subscribe(
const Obsvbl&
source,
bool do_rethrow, ArgN&&... an)
175 std::condition_variable wake;
176 bool disposed =
false;
179 auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
182 auto scbr = make_subscriber<T>(
184 [&](T t){dest.on_next(t);},
192 [&](){dest.on_completed();}
195 auto cs = scbr.get_subscription();
198 std::unique_lock<std::mutex> guard(lock);
203 source.subscribe(std::move(scbr));
205 std::unique_lock<std::mutex> guard(lock);
237 template<
class... ArgN>
240 return blocking_subscribe(
source,
false, std::forward<ArgN>(an)...);
262 template<
class... ArgN>
265 return blocking_subscribe(
source,
true, std::forward<ArgN>(an)...);
283 template<
class...
AN>
285 rxu::maybe<T> result;
293 static_assert(
sizeof...(
AN) == 0,
"first() was passed too many arguments.");
311 template<
class...
AN>
313 rxu::maybe<T> result;
315 [&](T v){result.reset(v);});
319 static_assert(
sizeof...(
AN) == 0,
"last() was passed too many arguments.");
336 source.count().as_blocking().subscribe_with_rethrow(
337 [&](
int v){result = v;});
359 return source.sum().as_blocking().last();
380 return source.average().as_blocking().last();
401 return source.max().as_blocking().last();
422 return source.min().as_blocking().last();
428 template<
class SourceOperator,
class Subscriber>
429 struct safe_subscriber
431 safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
435 so->on_subscribe(*o);
437 if (!o->is_subscribed()) {
445 void operator()(
const rxsc::schedulable&) {
456 class observable<void, void>;
477 template<
class T,
class SourceOperator>
481 static_assert(std::is_same<T, typename SourceOperator::value_type>::value,
"SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
491 template<
class U,
class SO>
494 template<
class U,
class SO>
497 template<
class Subscriber>
498 auto detail_subscribe(Subscriber o)
const 504 static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value,
"the value types in the sequence must match or be convertible");
505 static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value,
"inner must have on_subscribe method that accepts this subscriber ");
509 if (!o.is_subscribed()) {
511 return o.get_subscription();
517 if (rxsc::current_thread::is_schedule_required()) {
519 sc.create_worker(o.get_subscription()).schedule(
subscriber);
522 subscriber.subscribe();
526 return o.get_subscription();
571 template<
class...
AN>
574 static_assert(
sizeof...(
AN) == 0,
"as_dynamic() was passed too many arguments.");
579 template<
class...
AN>
582 static_assert(
sizeof...(
AN) == 0,
"as_blocking() was passed too many arguments.");
592 template<
class OperatorFactory>
593 auto op(OperatorFactory&& of)
const 594 -> decltype(of(*(
const this_type*)
nullptr)) {
601 template<
class ResultType,
class Operator>
602 auto lift(Operator&& op)
const 603 -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
604 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
605 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(
source_operator, std::forward<Operator>(op)));
606 static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
"Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
614 template<
class ResultType,
class Operator>
615 auto lift_if(Operator&& op)
const 616 ->
typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
617 observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
618 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
619 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(
source_operator, std::forward<Operator>(op)));
626 template<
class ResultType,
class Operator>
627 auto lift_if(Operator&&) const
628 -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
629 decltype(rxs::
from<ResultType>())>::type {
630 return rxs::from<ResultType>();
636 template<
class... ArgN>
639 return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
644 template<
class...
AN>
655 template<
class...
AN>
666 template<
class...
AN>
677 template<
class...
AN>
688 template<
class...
AN>
699 template<
class...
AN>
710 template<
class...
AN>
721 template<
class...
AN>
732 template<
class...
AN>
743 template<
class...
AN>
754 template<
class...
AN>
765 template<
class...
AN>
776 template<
class...
AN>
787 template<
class...
AN>
788 auto finally(
AN&&... an)
const 798 template<
class...
AN>
809 template<
class...
AN>
820 template<
class...
AN>
831 template<
class...
AN>
842 template<
class...
AN>
853 template<
class...
AN>
864 template<
class...
AN>
875 template<
class...
AN>
886 template<
class...
AN>
897 template<
class...
AN>
908 template<
class...
AN>
919 template<
class...
AN>
930 template<
class...
AN>
941 template<
class...
AN>
952 template<
class...
AN>
963 template<
class...
AN>
974 template<
class...
AN>
985 template<
class...
AN>
996 template<
class...
AN>
1007 template<
class...
AN>
1018 template<
class...
AN>
1029 template<
class...
AN>
1040 template<
class...
AN>
1051 template<
class...
AN>
1062 template<
class...
AN>
1073 template<
class...
AN>
1085 template<
class...
AN>
1096 template<
class...
AN>
1107 template<
class...
AN>
1118 template<
class...
AN>
1129 template<
class...
AN>
1140 template<
class...
AN>
1151 template<
class...
AN>
1162 template<
class...
AN>
1173 template<
class...
AN>
1184 template<
class...
AN>
1195 template<
class...
AN>
1206 template<
class...
AN>
1217 template<
class...
AN>
1224 static_assert(
sizeof...(
AN) == 0,
"first() was passed too many arguments.");
1229 template<
class...
AN>
1236 static_assert(
sizeof...(
AN) == 0,
"last() was passed too many arguments.");
1241 template<
class...
AN>
1248 static_assert(
sizeof...(
AN) == 0,
"count() was passed too many arguments.");
1253 template<
class...
AN>
1260 static_assert(
sizeof...(
AN) == 0,
"sum() was passed too many arguments.");
1265 template<
class...
AN>
1272 static_assert(
sizeof...(
AN) == 0,
"average() was passed too many arguments.");
1277 template<
class...
AN>
1284 static_assert(
sizeof...(
AN) == 0,
"max() was passed too many arguments.");
1289 template<
class...
AN>
1296 static_assert(
sizeof...(
AN) == 0,
"min() was passed too many arguments.");
1301 template<
class...
AN>
1312 template<
class...
AN>
1323 template<
class...
AN>
1334 template<
class...
AN>
1345 template<
class...
AN>
1356 template<
class...
AN>
1367 template<
class...
AN>
1378 template<
class...
AN>
1389 template<
class...
AN>
1400 template<
class...
AN>
1411 template<
class...
AN>
1422 template<
class...
AN>
1433 template<
class...
AN>
1444 template<
class...
AN>
1454 template<
class T,
class SourceOperator>
1458 template<
class T,
class SourceOperator>
1460 return !(lhs == rhs);
1552 template<
class T,
class OnSubscribe>
1554 -> decltype(rxs::create<T>(std::move(os))) {
1555 return rxs::create<T>(std::move(os));
1567 template<
class T,
class Coordination>
1569 -> decltype(rxs::range<T>(
first,
last, step, std::move(cn))) {
1570 return rxs::range<T>(
first,
last, step, std::move(cn));
1574 template<
class T,
class Coordination>
1576 -> decltype(rxs::range<T>(
first,
last, std::move(cn))) {
1577 return rxs::range<T>(
first,
last, std::move(cn));
1581 template<
class T,
class Coordination>
1583 -> decltype(rxs::range<T>(
first, std::move(cn))) {
1584 return rxs::range<T>(
first, std::move(cn));
1591 -> decltype(rxs::never<T>()) {
1592 return rxs::never<T>();
1597 template<
class ObservableFactory>
1605 template<
class...
AN>
1606 static auto interval(rxsc::scheduler::clock_type::duration period,
AN**...)
1609 static_assert(
sizeof...(
AN) == 0,
"interval(period) was passed too many arguments.");
1613 template<
class Coordination>
1614 static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1620 template<
class...
AN>
1621 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period,
AN**...)
1624 static_assert(
sizeof...(
AN) == 0,
"interval(initial, period) was passed too many arguments.");
1628 template<
class Coordination>
1629 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1630 -> decltype(
rxs::interval(initial, period, std::move(cn))) {
1636 template<
class...
AN>
1637 static auto timer(rxsc::scheduler::clock_type::time_point at,
AN**...)
1640 static_assert(
sizeof...(
AN) == 0,
"timer(at) was passed too many arguments.");
1644 template<
class...
AN>
1645 static auto timer(rxsc::scheduler::clock_type::duration after,
AN**...)
1648 static_assert(
sizeof...(
AN) == 0,
"timer(after) was passed too many arguments.");
1652 template<
class Coordination>
1653 static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1654 -> decltype(
rxs::timer(when, std::move(cn))) {
1659 template<
class Coordination>
1660 static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1661 -> decltype(
rxs::timer(when, std::move(cn))) {
1667 template<
class Collection>
1674 template<
class Collection,
class Coordination>
1675 static auto iterate(Collection c, Coordination cn)
1676 -> decltype(
rxs::iterate(std::move(c), std::move(cn))) {
1684 -> decltype( rxs::from<T>()) {
1685 return rxs::from<T>();
1689 template<
class T,
class Coordination>
1691 ->
typename std::enable_if<is_coordination<Coordination>::value,
1692 decltype( rxs::from<T>(std::move(cn)))>::type {
1693 return rxs::from<T>(std::move(cn));
1697 template<
class Value0,
class... ValueN>
1698 static auto from(Value0 v0, ValueN... vn)
1699 ->
typename std::enable_if<!is_coordination<Value0>::value,
1700 decltype(
rxs::from(v0, vn...))>::type {
1705 template<
class Coordination,
class Value0,
class... ValueN>
1706 static auto from(Coordination cn, Value0 v0, ValueN... vn)
1707 ->
typename std::enable_if<is_coordination<Coordination>::value,
1708 decltype(
rxs::from(std::move(cn), v0, vn...))>::type {
1709 return rxs::from(std::move(cn), v0, vn...);
1721 template<
class T,
class Coordination>
1722 static auto just(T v, Coordination cn)
1723 -> decltype(
rxs::just(std::move(v), std::move(cn))) {
1724 return rxs::just(std::move(v), std::move(cn));
1729 template<
class Observable,
class Value0,
class... ValueN>
1731 -> decltype(
rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1732 return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1739 -> decltype(from<T>()) {
1744 template<
class T,
class Coordination>
1746 -> decltype(from<T>(std::move(cn))) {
1747 return from<T>(std::move(cn));
1752 template<
class T,
class Exception>
1754 -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1755 return rxs::error<T>(std::forward<Exception>(e));
1759 template<
class T,
class Exception,
class Coordination>
1760 static auto error(Exception&& e, Coordination cn)
1761 -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1762 return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1767 template<
class ResourceFactory,
class ObservableFactory>
1768 static auto scope(ResourceFactory rf, ObservableFactory of)
1769 -> decltype(
rxs::scope(std::move(rf), std::move(of))) {
1770 return rxs::scope(std::move(rf), std::move(of));
1780 template<
class T,
class SourceOperator,
class OperatorFactory>
1782 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1783 return source.op(std::forward<OperatorFactory>(of));
1790 template<
class T,
class SourceOperator,
class OperatorFactory>
1792 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793 return source.op(std::forward<OperatorFactory>(of));
auto distinct_until_changed(AN &&... an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:876
Definition: rx-operators.hpp:126
#define RXCPP_TRY
Definition: rx-util.hpp:38
auto delay(AN &&... an) const
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-observable.hpp:854
auto skip_last(AN... an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1346
Definition: rx-operators.hpp:366
auto with_latest_from(AN... an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1074
auto time_interval(AN &&... an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:755
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< rxu::error_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:117
auto concat_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1063
Definition: rx-operators.hpp:248
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1242
Definition: rx-operators.hpp:380
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-observable.hpp:1290
Definition: rx-operators.hpp:143
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto merge_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1030
auto retry(AN... an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1423
Definition: rx-observable.hpp:157
auto skip_until(AN... an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1357
Definition: rx-operators.hpp:387
Definition: rx-operators.hpp:458
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-observable.hpp:1553
Definition: rx-all.hpp:26
auto skip(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1324
Definition: rx-predef.hpp:302
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1575
auto element_at(AN &&... an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:887
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:553
Definition: rx-operators.hpp:329
auto any(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:667
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
auto reduce(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1196
source_operator_type source_operator
Definition: rx-observable.hpp:487
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1690
RXCPP_NORETURN void rethrow_current_exception()
Definition: rx-util.hpp:933
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void **>::type=0)
Definition: rx-observable.hpp:74
auto timeout(AN &&... an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:766
T max() const
Definition: rx-observable.hpp:400
auto take_while(AN &&... an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1401
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:444
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
Definition: rx-operators.hpp:157
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:580
Definition: rx-operators.hpp:282
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1781
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:215
auto window(AN &&... an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:898
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1152
auto subscribe(ArgN &&... an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:637
Definition: rx-operators.hpp:289
Definition: rx-operators.hpp:296
Definition: rx-operators.hpp:352
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto combine_latest(AN... an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1086
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1675
Definition: rx-operators.hpp:421
observable(const source_operator_type &o)
Definition: rx-observable.hpp:542
Definition: rx-operators.hpp:500
Definition: rx-operators.hpp:394
Definition: rx-operators.hpp:472
Definition: rx-operators.hpp:331
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1668
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:530
auto tap(AN &&... an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:744
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:238
auto zip(AN &&... an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1097
Definition: rx-operators.hpp:521
auto scan(AN... an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1302
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:263
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
Definition: rx-operators.hpp:345
auto start_with(AN... an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1434
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:486
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1606
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:678
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
RXCPP_NORETURN void throw_exception(E &&e)
Definition: rx-util.hpp:920
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-observable.hpp:1278
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
auto switch_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable...
Definition: rx-observable.hpp:711
auto filter(AN &&... an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:700
~blocking_observable()
Definition: rx-observable.hpp:217
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1715
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1698
Definition: rx-operators.hpp:373
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1598
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item...
Definition: rx-observable.hpp:689
Definition: rx-operators.hpp:316
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1730
Definition: rx-sources.hpp:15
Definition: rx-operators.hpp:451
Definition: rx-operators.hpp:227
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:416
auto debounce(AN &&... an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:843
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1130
Definition: rx-observable.hpp:36
auto group_by(AN &&... an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1108
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1637
auto take_last(AN &&... an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1379
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1653
~observable()
Definition: rx-observable.hpp:534
auto map(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:821
auto sample_with_time(AN &&... an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1313
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:478
Definition: rx-operators.hpp:430
auto pairwise(AN... an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1445
auto replay(AN &&... an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-observable.hpp:1163
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
Definition: rx-operators.hpp:268
auto sequence_equal(AN... an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:733
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:437
auto merge_delay_error(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
Definition: rx-observable.hpp:997
Definition: rx-operators.hpp:359
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1683
Definition: rx-operators.hpp:479
Definition: rx-operators.hpp:136
auto repeat(AN... an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1412
auto take(AN... an) const
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-observable.hpp:1368
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:192
auto subscribe_on(AN &&... an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1174
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
auto distinct(AN &&... an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:865
auto skip_while(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1335
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1760
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1561
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1738
Definition: rx-operators.hpp:486
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto merge(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-observable.hpp:986
error_ptr current_exception()
Definition: rx-util.hpp:943
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:220
Definition: rx-predef.hpp:270
auto buffer(AN &&... an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:942
Definition: rx-operators.hpp:185
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1582
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1621
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1753
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1629
observable_type source
Definition: rx-observable.hpp:216
auto buffer_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:964
Definition: rx-operators.hpp:465
Definition: rx-operators.hpp:261
Definition: rx-operators.hpp:275
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto switch_on_error(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:810
Definition: rx-operators.hpp:493
auto switch_on_next(AN &&... an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:975
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
auto flat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1019
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:572
auto window_with_time(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:909
auto all(AN &&... an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:645
auto amb(AN... an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1008
Definition: rx-operators.hpp:150
auto window_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:920
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1660
auto take_until(AN &&... an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1390
auto transform(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:832
Definition: rx-operators.hpp:338
int count() const
Definition: rx-observable.hpp:334
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
void unsubscribe() const
Definition: rx-subscription.hpp:178
auto ignore_elements(AN &&... an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1119
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1706
blocking_observable(observable_type s)
Definition: rx-observable.hpp:220
T sum() const
Definition: rx-observable.hpp:358
Definition: rx-operators.hpp:164
auto timestamp(AN &&... an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:777
Definition: rx-operators.hpp:323
Definition: rx-operators.hpp:507
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-observable.hpp:656
T min() const
Definition: rx-observable.hpp:421
auto buffer_with_time(AN &&... an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:953
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
Definition: rx-predef.hpp:128
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1768
Definition: rx-operators.hpp:423
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes...
Definition: rx-observable.hpp:722
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1614
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1568
double average() const
Definition: rx-observable.hpp:379
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:284
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1722
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
auto observe_on(AN &&... an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1185
auto accumulate(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1207
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:558
Definition: rx-predef.hpp:115
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
Definition: rx-sources.hpp:17
auto window_toggle(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:931
auto on_error_resume_next(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:799
observable()
Definition: rx-observable.hpp:538
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:312
static auto never() -> decltype(rxs::never< T >())
Returns an observable that never sends any items or notifications to observer.
Definition: rx-observable.hpp:1590
auto concat(AN... an) const
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-observable.hpp:1041
Definition: rx-predef.hpp:126
auto concat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1052
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
auto publish(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1141
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1745
Definition: rx-operators.hpp:415
Definition: rx-operators.hpp:408
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
observable(source_operator_type &&o)
Definition: rx-observable.hpp:546
Definition: rx-operators.hpp:401
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1645
Definition: rx-operators.hpp:514