5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP) 6 #define RXCPP_RX_SUBSCRIPTION_HPP 15 struct is_unsubscribe_function
19 static auto check(
int) -> decltype((*(CF*)
nullptr)());
21 static not_void check(...);
23 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)),
void>::value;
34 static typename C::subscription_tag* check(
int);
36 static void check(...);
41 template<
class Unsubscribe>
45 unsubscribe_call_type unsubscribe_call;
51 : unsubscribe_call(o.unsubscribe_call)
55 : unsubscribe_call(std::move(o.unsubscribe_call))
59 : unsubscribe_call(std::move(s))
69 class base_subscription_state :
public std::enable_shared_from_this<base_subscription_state>
71 base_subscription_state();
74 explicit base_subscription_state(
bool initial)
75 : issubscribed(initial)
78 virtual ~base_subscription_state() {}
81 std::atomic<bool> issubscribed;
88 struct subscription_state :
public base_subscription_state
91 subscription_state(inner_t i)
92 : base_subscription_state(
true)
97 if (issubscribed.exchange(
false)) {
107 std::shared_ptr<base_subscription_state>
state;
121 explicit subscription(std::shared_ptr<base_subscription_state> s)
122 :
state(std::move(s))
131 :
state(std::make_shared<base_subscription_state>(false))
139 :
state(std::make_shared<subscription_state<U>>(std::move(u)))
176 return state->issubscribed;
182 auto keepAlive =
state;
183 state->unsubscribe();
199 auto strong_subscription = w.lock();
200 if (!strong_subscription) {
201 return rxu::detail::maybe<subscription>{};
203 return rxu::detail::maybe<subscription>{
subscription{std::move(strong_subscription)}};
215 return !(lhs == rhs);
225 ->
typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
229 template<
class Unsubscribe>
231 ->
typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
236 class composite_subscription;
240 struct tag_composite_subscription_empty {};
242 class composite_subscription_inner
246 struct composite_subscription_state :
public std::enable_shared_from_this<composite_subscription_state>
249 std::set<subscription> subscriptions;
256 std::atomic<bool> issubscribed;
258 ~composite_subscription_state()
260 std::unique_lock<decltype(lock)> guard(lock);
261 subscriptions.clear();
264 composite_subscription_state()
268 composite_subscription_state(tag_composite_subscription_empty)
269 : issubscribed(false)
284 inline weak_subscription add(subscription s) {
287 }
else if (s.is_subscribed()) {
288 std::unique_lock<decltype(lock)> guard(lock);
295 subscriptions.insert(s);
305 inline void remove(weak_subscription w) {
309 if (maybe_subscription.empty()) {
314 std::unique_lock<decltype(lock)> guard(lock);
318 subscription& s = maybe_subscription.get();
319 subscriptions.erase(std::move(s));
334 inline void clear() {
336 std::unique_lock<decltype(lock)> guard(lock);
343 std::set<subscription> v(std::move(subscriptions));
346 std::for_each(v.begin(), v.end(),
347 [](
const subscription& s) {
366 inline void unsubscribe() {
367 if (issubscribed.exchange(
false)) {
368 std::unique_lock<decltype(lock)> guard(lock);
373 std::set<subscription> v(std::move(subscriptions));
376 std::for_each(v.begin(), v.end(),
377 [](
const subscription& s) {
384 typedef std::shared_ptr<composite_subscription_state> shared_state_type;
387 mutable shared_state_type state;
390 composite_subscription_inner()
391 : state(std::make_shared<composite_subscription_state>())
394 composite_subscription_inner(tag_composite_subscription_empty et)
395 : state(std::make_shared<composite_subscription_state>(et))
399 composite_subscription_inner(
const composite_subscription_inner& o)
406 composite_subscription_inner(composite_subscription_inner&& o)
407 : state(std::move(o.state))
414 composite_subscription_inner& operator=(composite_subscription_inner o)
416 state = std::move(o.state);
423 inline weak_subscription add(subscription s)
const {
427 return state->add(std::move(s));
429 inline void remove(weak_subscription w)
const {
433 state->remove(std::move(w));
435 inline void clear()
const {
441 inline void unsubscribe() {
445 state->unsubscribe();
449 inline composite_subscription shared_empty();
460 :
protected detail::composite_subscription_inner
463 typedef detail::composite_subscription_inner inner_type;
487 : inner_type(std::move(o))
494 inner_type::operator=(std::move(o));
500 return detail::shared_empty();
506 using inner_type::clear;
509 if (s == static_cast<const subscription&>(*
this)) {
516 auto w = inner_type::add(std::move(s));
523 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
527 inline void remove(weak_subscription w)
const {
530 inner_type::remove(w);
536 return static_cast<const subscription&
>(lhs) < static_cast<const subscription&>(rhs);
539 return static_cast<const subscription&
>(lhs) == static_cast<const subscription&>(rhs);
542 return !(lhs == rhs);
547 inline composite_subscription shared_empty() {
548 static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
562 ,
value(std::make_shared<rxu::detail::maybe<T>>())
568 ,
value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
570 auto localValue =
value;
579 return value.get()->get();
593 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value,
weak_subscription>::type {
608 std::shared_ptr<rxu::detail::maybe<T>>
value;
composite_subscription lifetime
Definition: rx-subscription.hpp:607
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:192
subscription(subscription &&o)
Definition: rx-subscription.hpp:161
Definition: rx-all.hpp:26
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:522
static_subscription(const static_subscription &o)
Definition: rx-subscription.hpp:50
bool is_subscribed() const
Definition: rx-subscription.hpp:172
Definition: rx-subscription.hpp:555
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
bool operator<(const subscription &lhs, const subscription &rhs)
Definition: rx-subscription.hpp:208
Definition: rx-subscription.hpp:28
static rxu::maybe< subscription > maybe_lock(weak_state_type w)
Definition: rx-subscription.hpp:198
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:219
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:465
void unsubscribe() const
Definition: rx-subscription.hpp:602
friend bool operator<(const subscription &, const subscription &)
Definition: rx-subscription.hpp:208
friend bool operator==(const subscription &, const subscription &)
Definition: rx-subscription.hpp:211
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
tag_subscription subscription_tag
Definition: rx-subscription.hpp:29
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
subscription()
Definition: rx-subscription.hpp:130
subscription & operator=(subscription o)
Definition: rx-subscription.hpp:168
resource()
Definition: rx-subscription.hpp:560
std::shared_ptr< base_subscription_state > state
Definition: rx-subscription.hpp:107
Definition: rx-subscription.hpp:31
composite_subscription & get_subscription()
Definition: rx-subscription.hpp:581
composite_subscription(composite_subscription &&o)
Definition: rx-subscription.hpp:486
subscription(const subscription &o)
Definition: rx-subscription.hpp:154
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:588
static composite_subscription empty()
Definition: rx-subscription.hpp:499
subscription(U u, typename std::enable_if<!is_subscription< U >::value, void **>::type=nullptr)
Definition: rx-subscription.hpp:138
subscription(U u, typename std::enable_if<!std::is_same< subscription, U >::value &&is_subscription< U >::value, void **>::type=nullptr)
Definition: rx-subscription.hpp:146
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-subscription.hpp:42
composite_subscription & operator=(composite_subscription o)
Definition: rx-subscription.hpp:492
static_subscription(static_subscription &&o)
Definition: rx-subscription.hpp:54
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
static_subscription(unsubscribe_call_type s)
Definition: rx-subscription.hpp:58
std::weak_ptr< base_subscription_state > weak_state_type
Definition: rx-subscription.hpp:84
bool is_subscribed() const
Definition: rx-subscription.hpp:172
composite_subscription(const composite_subscription &o)
Definition: rx-subscription.hpp:481
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscription.hpp:558
void unsubscribe() const
Definition: rx-subscription.hpp:178
Definition: rx-subscription.hpp:29
bool is_subscribed() const
Definition: rx-subscription.hpp:585
void unsubscribe() const
Definition: rx-subscription.hpp:62
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:592
weak_state_type get_weak()
Definition: rx-subscription.hpp:186
void unsubscribe() const
Definition: rx-subscription.hpp:178
composite_subscription(detail::tag_composite_subscription_empty et)
Definition: rx-subscription.hpp:467
composite_subscription()
Definition: rx-subscription.hpp:475
Definition: rx-subscription.hpp:67
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:527
std::shared_ptr< rxu::detail::maybe< T > > value
Definition: rx-subscription.hpp:608
void clear() const
Definition: rx-subscription.hpp:599
resource(T t, composite_subscription cs=composite_subscription())
Definition: rx-subscription.hpp:566
static const bool value
Definition: rx-subscription.hpp:38