5 #if !defined(RXCPP_RX_SUBSCRIBER_HPP) 6 #define RXCPP_RX_SUBSCRIBER_HPP 24 template<
class T,
class Observer = observer<T>>
33 observer_type destination;
47 , do_unsubscribe(
true)
51 void operator()(U u) {
54 that->destination.on_next(std::move(u));
55 do_unsubscribe =
false;
59 that->destination.on_error(std::move(ex));
64 volatile bool do_unsubscribe;
80 that->destination.on_error(std::move(ex));
85 struct completeddetacher
96 inline void operator()() {
98 that->destination.on_completed();
108 : lifetime(o.lifetime)
109 , destination(o.destination)
114 : lifetime(std::move(o.lifetime))
115 , destination(std::move(o.destination))
116 , id(std::move(o.id))
120 template<
class U,
class O>
126 typename std::enable_if<
128 std::is_same<Observer,
observer<T>>::value,
void**>::type =
nullptr)
129 : lifetime(o.lifetime)
137 : lifetime(std::move(cs))
138 , destination(std::forward<U>(o))
147 lifetime = std::move(o.lifetime);
148 destination = std::move(o.destination);
149 id = std::move(o.id);
170 return subscriber<T>(id, lifetime, destination.as_dynamic());
180 nextdetacher protect(
this);
181 protect(std::forward<V>(v));
187 errordetacher protect(
this);
188 protect(std::move(e));
194 completeddetacher protect(
this);
204 return lifetime.
add(std::move(s));
208 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value,
weak_subscription>::type {
212 return lifetime.
remove(std::move(w));
215 return lifetime.clear();
223 template<
class T,
class Observer>
235 ->
typename std::enable_if<
236 detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
242 template<
class T,
class I>
248 template<
class T,
class Observer>
250 ->
typename std::enable_if<
256 template<
class T,
class Observer>
258 ->
typename std::enable_if<
259 !detail::is_on_next_of<T, Observer>::value &&
263 subscriber<T, observer<T, Observer>>>::type {
266 template<
class T,
class OnNext>
268 ->
typename std::enable_if<
269 detail::is_on_next_of<T, OnNext>::value,
274 template<
class T,
class OnNext,
class OnError>
276 ->
typename std::enable_if<
277 detail::is_on_next_of<T, OnNext>::value &&
278 detail::is_on_error<OnError>::value,
283 template<
class T,
class OnNext,
class OnCompleted>
285 ->
typename std::enable_if<
286 detail::is_on_next_of<T, OnNext>::value &&
287 detail::is_on_completed<OnCompleted>::value,
292 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
294 ->
typename std::enable_if<
295 detail::is_on_next_of<T, OnNext>::value &&
296 detail::is_on_error<OnError>::value &&
297 detail::is_on_completed<OnCompleted>::value,
313 template<
class T,
class I>
319 template<
class T,
class I>
325 template<
class T,
class Observer>
327 ->
typename std::enable_if<
333 template<
class T,
class Observer>
334 auto make_subscriber(
const composite_subscription& cs,
const Observer& o)
335 ->
typename std::enable_if<
336 !detail::is_on_next_of<T, Observer>::value &&
340 subscriber<T, observer<T, Observer>>>::type {
343 template<
class T,
class OnNext>
345 ->
typename std::enable_if<
346 detail::is_on_next_of<T, OnNext>::value,
351 template<
class T,
class OnNext,
class OnError>
353 ->
typename std::enable_if<
354 detail::is_on_next_of<T, OnNext>::value &&
355 detail::is_on_error<OnError>::value,
360 template<
class T,
class OnNext,
class OnCompleted>
362 ->
typename std::enable_if<
363 detail::is_on_next_of<T, OnNext>::value &&
364 detail::is_on_completed<OnCompleted>::value,
369 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
371 ->
typename std::enable_if<
372 detail::is_on_next_of<T, OnNext>::value &&
373 detail::is_on_error<OnError>::value &&
374 detail::is_on_completed<OnCompleted>::value,
397 template<
class T,
class I>
403 template<
class T,
class I>
409 template<
class T,
class Observer>
411 ->
typename std::enable_if<
416 template<
class T,
class Observer>
418 ->
typename std::enable_if<
423 template<
class T,
class Observer>
425 ->
typename std::enable_if<
426 !detail::is_on_next_of<T, Observer>::value &&
430 subscriber<T, observer<T, Observer>>>::type {
431 return subscriber<T, observer<T, Observer>>(std::move(
id), composite_subscription(), o);
433 template<
class T,
class Observer>
434 auto make_subscriber(trace_id
id,
const composite_subscription& cs,
const Observer& o)
435 ->
typename std::enable_if<
436 !detail::is_on_next_of<T, Observer>::value &&
440 subscriber<T, observer<T, Observer>>>::type {
441 return subscriber<T, observer<T, Observer>>(std::move(
id), cs, o);
443 template<
class T,
class OnNext>
445 ->
typename std::enable_if<
446 detail::is_on_next_of<T, OnNext>::value,
451 template<
class T,
class OnNext>
453 ->
typename std::enable_if<
454 detail::is_on_next_of<T, OnNext>::value,
459 template<
class T,
class OnNext,
class OnError>
461 ->
typename std::enable_if<
462 detail::is_on_next_of<T, OnNext>::value &&
463 detail::is_on_error<OnError>::value,
468 template<
class T,
class OnNext,
class OnError>
470 ->
typename std::enable_if<
471 detail::is_on_next_of<T, OnNext>::value &&
472 detail::is_on_error<OnError>::value,
477 template<
class T,
class OnNext,
class OnCompleted>
479 ->
typename std::enable_if<
480 detail::is_on_next_of<T, OnNext>::value &&
481 detail::is_on_completed<OnCompleted>::value,
486 template<
class T,
class OnNext,
class OnCompleted>
488 ->
typename std::enable_if<
489 detail::is_on_next_of<T, OnNext>::value &&
490 detail::is_on_completed<OnCompleted>::value,
495 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
497 ->
typename std::enable_if<
498 detail::is_on_next_of<T, OnNext>::value &&
499 detail::is_on_error<OnError>::value &&
500 detail::is_on_completed<OnCompleted>::value,
505 template<
class T,
class OnNext,
class OnError,
class OnCompleted>
507 ->
typename std::enable_if<
508 detail::is_on_next_of<T, OnNext>::value &&
509 detail::is_on_error<OnError>::value &&
510 detail::is_on_completed<OnCompleted>::value,
519 template<
class T,
class OtherT,
class OtherObserver,
class I>
527 template<
class T,
class OtherT,
class OtherObserver,
class I>
535 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
537 ->
typename std::enable_if<
544 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
546 ->
typename std::enable_if<
554 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
555 auto make_subscriber(
const subscriber<OtherT, OtherObserver>& scbr,
const Observer& o)
556 ->
typename std::enable_if<
557 !detail::is_on_next_of<T, Observer>::value &&
561 subscriber<T, observer<T, Observer>>>::type {
566 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
567 auto make_subscriber(
const subscriber<OtherT, OtherObserver>& scbr, trace_id
id,
const Observer& o)
568 ->
typename std::enable_if<
569 !detail::is_on_next_of<T, Observer>::value &&
573 subscriber<T, observer<T, Observer>>>::type {
574 auto r = subscriber<T, observer<T, Observer>>(std::move(
id), scbr.get_subscription(), o);
578 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
580 ->
typename std::enable_if<
581 detail::is_on_next_of<T, OnNext>::value,
588 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
590 ->
typename std::enable_if<
591 detail::is_on_next_of<T, OnNext>::value,
598 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
600 ->
typename std::enable_if<
601 detail::is_on_next_of<T, OnNext>::value &&
602 detail::is_on_error<OnError>::value,
609 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
611 ->
typename std::enable_if<
612 detail::is_on_next_of<T, OnNext>::value &&
613 detail::is_on_error<OnError>::value,
620 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
622 ->
typename std::enable_if<
623 detail::is_on_next_of<T, OnNext>::value &&
624 detail::is_on_completed<OnCompleted>::value,
631 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
633 ->
typename std::enable_if<
634 detail::is_on_next_of<T, OnNext>::value &&
635 detail::is_on_completed<OnCompleted>::value,
642 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
644 ->
typename std::enable_if<
645 detail::is_on_next_of<T, OnNext>::value &&
646 detail::is_on_error<OnError>::value &&
647 detail::is_on_completed<OnCompleted>::value,
654 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
656 ->
typename std::enable_if<
657 detail::is_on_next_of<T, OnNext>::value &&
658 detail::is_on_error<OnError>::value &&
659 detail::is_on_completed<OnCompleted>::value,
667 template<
class T,
class OtherT,
class OtherObserver,
class I>
673 template<
class T,
class OtherT,
class OtherObserver,
class I>
679 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
681 ->
typename std::enable_if<
688 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
690 ->
typename std::enable_if<
697 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
698 auto make_subscriber(
const subscriber<OtherT, OtherObserver>& scbr,
const composite_subscription& cs,
const Observer& o)
699 ->
typename std::enable_if<
700 !detail::is_on_next_of<T, Observer>::value &&
704 subscriber<T, observer<T, Observer>>>::type {
709 template<
class T,
class OtherT,
class OtherObserver,
class Observer>
710 auto make_subscriber(
const subscriber<OtherT, OtherObserver>& scbr, trace_id
id,
const composite_subscription& cs,
const Observer& o)
711 ->
typename std::enable_if<
712 !detail::is_on_next_of<T, Observer>::value &&
716 subscriber<T, observer<T, Observer>>>::type {
717 auto r = subscriber<T, observer<T, Observer>>(std::move(
id), cs, o);
721 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
723 ->
typename std::enable_if<
724 detail::is_on_next_of<T, OnNext>::value,
731 template<
class T,
class OtherT,
class OtherObserver,
class OnNext>
733 ->
typename std::enable_if<
734 detail::is_on_next_of<T, OnNext>::value,
741 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
743 ->
typename std::enable_if<
744 detail::is_on_next_of<T, OnNext>::value &&
745 detail::is_on_error<OnError>::value,
752 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError>
754 ->
typename std::enable_if<
755 detail::is_on_next_of<T, OnNext>::value &&
756 detail::is_on_error<OnError>::value,
763 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
765 ->
typename std::enable_if<
766 detail::is_on_next_of<T, OnNext>::value &&
767 detail::is_on_completed<OnCompleted>::value,
774 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnCompleted>
776 ->
typename std::enable_if<
777 detail::is_on_next_of<T, OnNext>::value &&
778 detail::is_on_completed<OnCompleted>::value,
785 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
787 ->
typename std::enable_if<
788 detail::is_on_next_of<T, OnNext>::value &&
789 detail::is_on_error<OnError>::value &&
790 detail::is_on_completed<OnCompleted>::value,
797 template<
class T,
class OtherT,
class OtherObserver,
class OnNext,
class OnError,
class OnCompleted>
799 ->
typename std::enable_if<
800 detail::is_on_next_of<T, OnNext>::value &&
801 detail::is_on_error<OnError>::value &&
802 detail::is_on_completed<OnCompleted>::value,
810 template<
class T,
class Observer>
817 template<
class T,
class Observer>
825 template<
class T,
class Observer>
#define RXCPP_TRY
Definition: rx-util.hpp:38
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscriber.hpp:207
const observer_type & get_observer() const
Definition: rx-subscriber.hpp:153
subscriber(trace_id id, composite_subscription cs, U &&o)
Definition: rx-subscriber.hpp:136
void on_completed() const
Definition: rx-subscriber.hpp:190
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
weak_subscription add(subscription s) const
Definition: rx-subscriber.hpp:203
void clear() const
Definition: rx-subscriber.hpp:214
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:219
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:465
Definition: rx-predef.hpp:113
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
static const bool value
Definition: rx-predef.hpp:97
subscriber(const subscriber< T, O > &o, typename std::enable_if< !std::is_same< O, observer< T >>::value &&std::is_same< Observer, observer< T >>::value, void **>::type=nullptr)
Definition: rx-subscriber.hpp:124
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
const composite_subscription & get_subscription() const
Definition: rx-subscriber.hpp:159
void unsubscribe() const
Definition: rx-subscriber.hpp:217
Definition: rx-subscription.hpp:31
Definition: rx-trace.hpp:14
static const bool value
Definition: rx-predef.hpp:123
observer_type & get_observer()
Definition: rx-subscriber.hpp:156
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
subscriber< T > as_dynamic() const
Definition: rx-subscriber.hpp:169
Definition: rx-subscriber.hpp:13
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
tag_subscriber subscriber_tag
Definition: rx-subscriber.hpp:15
void on_error(rxu::error_ptr e) const
Definition: rx-subscriber.hpp:183
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscriber.hpp:105
this_type & operator=(this_type o)
Definition: rx-subscriber.hpp:146
error_ptr current_exception()
Definition: rx-util.hpp:943
bool is_subscribed() const
Definition: rx-subscription.hpp:172
Definition: rx-observer.hpp:14
auto make_subscriber(subscriber< T, Observer > o) -> subscriber< T, Observer >
Definition: rx-subscriber.hpp:224
void unsubscribe() const
Definition: rx-subscription.hpp:178
Definition: rx-subscription.hpp:29
subscriber(this_type &&o)
Definition: rx-subscriber.hpp:113
static trace_id make_next_id_subscriber()
Definition: rx-trace.hpp:16
bool is_subscribed() const
Definition: rx-subscriber.hpp:200
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-predef.hpp:90
Definition: rx-subscription.hpp:67
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:527
trace_id get_id() const
Definition: rx-subscriber.hpp:165
Definition: rx-predef.hpp:115
static const bool value
Definition: rx-subscription.hpp:38
subscriber(const this_type &o)
Definition: rx-subscriber.hpp:107
composite_subscription & get_subscription()
Definition: rx-subscriber.hpp:162