RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-subscription.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP)
6 #define RXCPP_RX_SUBSCRIPTION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class F>
15 struct is_unsubscribe_function
16 {
17  struct not_void {};
18  template<class CF>
19  static auto check(int) -> decltype((*(CF*)nullptr)());
20  template<class CF>
21  static not_void check(...);
22 
23  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
24 };
25 
26 }
27 
28 struct tag_subscription {};
30 template<class T>
32 {
33  template<class C>
34  static typename C::subscription_tag* check(int);
35  template<class C>
36  static void check(...);
37 public:
38  static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value;
39 };
40 
41 template<class Unsubscribe>
43 {
44  typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type;
45  unsubscribe_call_type unsubscribe_call;
47  {
48  }
49 public:
51  : unsubscribe_call(o.unsubscribe_call)
52  {
53  }
55  : unsubscribe_call(std::move(o.unsubscribe_call))
56  {
57  }
58  static_subscription(unsubscribe_call_type s)
59  : unsubscribe_call(std::move(s))
60  {
61  }
62  void unsubscribe() const {
63  unsubscribe_call();
64  }
65 };
66 
68 {
69  class base_subscription_state : public std::enable_shared_from_this<base_subscription_state>
70  {
71  base_subscription_state();
72  public:
73 
74  explicit base_subscription_state(bool initial)
75  : issubscribed(initial)
76  {
77  }
78  virtual ~base_subscription_state() {}
79  virtual void unsubscribe() {
80  }
81  std::atomic<bool> issubscribed;
82  };
83 public:
84  typedef std::weak_ptr<base_subscription_state> weak_state_type;
85 
86 private:
87  template<class I>
88  struct subscription_state : public base_subscription_state
89  {
90  typedef rxu::decay_t<I> inner_t;
91  subscription_state(inner_t i)
92  : base_subscription_state(true)
93  , inner(std::move(i))
94  {
95  }
96  virtual void unsubscribe() {
97  if (issubscribed.exchange(false)) {
98  trace_activity().unsubscribe_enter(*this);
99  inner.unsubscribe();
100  trace_activity().unsubscribe_return(*this);
101  }
102  }
103  inner_t inner;
104  };
105 
106 protected:
107  std::shared_ptr<base_subscription_state> state;
108 
109  friend bool operator<(const subscription&, const subscription&);
110  friend bool operator==(const subscription&, const subscription&);
111 
112 private:
114  : state(w.lock())
115  {
116  if (!state) {
117  std::terminate();
118  }
119  }
120 
121  explicit subscription(std::shared_ptr<base_subscription_state> s)
122  : state(std::move(s))
123  {
124  if (!state) {
125  std::terminate();
126  }
127  }
128 public:
129 
131  : state(std::make_shared<base_subscription_state>(false))
132  {
133  if (!state) {
134  std::terminate();
135  }
136  }
137  template<class U>
138  explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr)
139  : state(std::make_shared<subscription_state<U>>(std::move(u)))
140  {
141  if (!state) {
142  std::terminate();
143  }
144  }
145  template<class U>
146  explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr)
147  // intentionally slice
148  : state(std::move((*static_cast<subscription*>(&u)).state))
149  {
150  if (!state) {
151  std::terminate();
152  }
153  }
155  : state(o.state)
156  {
157  if (!state) {
158  std::terminate();
159  }
160  }
162  : state(std::move(o.state))
163  {
164  if (!state) {
165  std::terminate();
166  }
167  }
169  state = std::move(o.state);
170  return *this;
171  }
172  bool is_subscribed() const {
173  if (!state) {
174  std::terminate();
175  }
176  return state->issubscribed;
177  }
178  void unsubscribe() const {
179  if (!state) {
180  std::terminate();
181  }
182  auto keepAlive = state;
183  state->unsubscribe();
184  }
185 
187  return state;
188  }
189 
190  // Atomically promote weak subscription to strong.
191  // Calls std::terminate if w has already expired.
193  return subscription(w);
194  }
195 
196  // Atomically try to promote weak subscription to strong.
197  // Returns an empty maybe<> if w has already expired.
198  static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
199  auto strong_subscription = w.lock();
200  if (!strong_subscription) {
201  return rxu::detail::maybe<subscription>{};
202  } else {
203  return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
204  }
205  }
206 };
207 
208 inline bool operator<(const subscription& lhs, const subscription& rhs) {
209  return lhs.state < rhs.state;
210 }
211 inline bool operator==(const subscription& lhs, const subscription& rhs) {
212  return lhs.state == rhs.state;
213 }
214 inline bool operator!=(const subscription& lhs, const subscription& rhs) {
215  return !(lhs == rhs);
216 }
217 
218 
219 inline auto make_subscription()
220  -> subscription {
221  return subscription();
222 }
223 template<class I>
224 auto make_subscription(I&& i)
225  -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
226  subscription>::type {
227  return subscription(std::forward<I>(i));
228 }
229 template<class Unsubscribe>
230 auto make_subscription(Unsubscribe&& u)
231  -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
232  subscription>::type {
233  return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
234 }
235 
236 class composite_subscription;
237 
238 namespace detail {
239 
240 struct tag_composite_subscription_empty {};
241 
242 class composite_subscription_inner
243 {
244 private:
245  typedef subscription::weak_state_type weak_subscription;
246  struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
247  {
248  // invariant: cannot access this data without the lock held.
249  std::set<subscription> subscriptions;
250  // double checked locking:
251  // issubscribed must be loaded again after each lock acquisition.
252  // invariant:
253  // never call subscription::unsubscribe with lock held.
254  std::mutex lock;
255  // invariant: transitions from 'true' to 'false' exactly once, at any time.
256  std::atomic<bool> issubscribed;
257 
258  ~composite_subscription_state()
259  {
260  std::unique_lock<decltype(lock)> guard(lock);
261  subscriptions.clear();
262  }
263 
264  composite_subscription_state()
265  : issubscribed(true)
266  {
267  }
268  composite_subscription_state(tag_composite_subscription_empty)
269  : issubscribed(false)
270  {
271  }
272 
273  // Atomically add 's' to the set of subscriptions.
274  //
275  // If unsubscribe() has already occurred, this immediately
276  // calls s.unsubscribe().
277  //
278  // cs.unsubscribe() [must] happens-before s.unsubscribe()
279  //
280  // Due to the un-atomic nature of calling 's.unsubscribe()',
281  // it is possible to observe the unintuitive
282  // add(s)=>s.unsubscribe() prior
283  // to any of the unsubscribe()=>sN.unsubscribe().
284  inline weak_subscription add(subscription s) {
285  if (!issubscribed) { // load.acq [seq_cst]
286  s.unsubscribe();
287  } else if (s.is_subscribed()) {
288  std::unique_lock<decltype(lock)> guard(lock);
289  if (!issubscribed) { // load.acq [seq_cst]
290  // unsubscribe was called concurrently.
291  guard.unlock();
292  // invariant: do not call unsubscribe with lock held.
293  s.unsubscribe();
294  } else {
295  subscriptions.insert(s);
296  }
297  }
298  return s.get_weak();
299  }
300 
301  // Atomically remove 'w' from the set of subscriptions.
302  //
303  // This does nothing if 'w' was already previously removed,
304  // or refers to an expired value.
305  inline void remove(weak_subscription w) {
306  if (issubscribed) { // load.acq [seq_cst]
307  rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
308 
309  if (maybe_subscription.empty()) {
310  // Do nothing if the subscription has already expired.
311  return;
312  }
313 
314  std::unique_lock<decltype(lock)> guard(lock);
315  // invariant: subscriptions must be accessed under the lock.
316 
317  if (issubscribed) { // load.acq [seq_cst]
318  subscription& s = maybe_subscription.get();
319  subscriptions.erase(std::move(s));
320  } // else unsubscribe() was called concurrently; this becomes a no-op.
321  }
322  }
323 
324  // Atomically clear all subscriptions that were observably added
325  // (and not subsequently observably removed).
326  //
327  // Un-atomically call unsubscribe on those subscriptions.
328  //
329  // forall subscriptions in {add(s1),add(s2),...}
330  // - {remove(s3), remove(s4), ...}:
331  // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
332  //
333  // cs.unsubscribe() observed-before cs.clear ==> do nothing.
334  inline void clear() {
335  if (issubscribed) { // load.acq [seq_cst]
336  std::unique_lock<decltype(lock)> guard(lock);
337 
338  if (!issubscribed) { // load.acq [seq_cst]
339  // unsubscribe was called concurrently.
340  return;
341  }
342 
343  std::set<subscription> v(std::move(subscriptions));
344  // invariant: do not call unsubscribe with lock held.
345  guard.unlock();
346  std::for_each(v.begin(), v.end(),
347  [](const subscription& s) {
348  s.unsubscribe(); });
349  }
350  }
351 
352  // Atomically clear all subscriptions that were observably added
353  // (and not subsequently observably removed).
354  //
355  // Un-atomically call unsubscribe on those subscriptions.
356  //
357  // Switches to an 'unsubscribed' state, all subsequent
358  // adds are immediately unsubscribed.
359  //
360  // cs.unsubscribe() [must] happens-before
361  // cs.add(s) ==> s.unsubscribe()
362  //
363  // forall subscriptions in {add(s1),add(s2),...}
364  // - {remove(s3), remove(s4), ...}:
365  // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
366  inline void unsubscribe() {
367  if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst]
368  std::unique_lock<decltype(lock)> guard(lock);
369 
370  // is_subscribed can only transition to 'false' once,
371  // does not need an extra atomic access here.
372 
373  std::set<subscription> v(std::move(subscriptions));
374  // invariant: do not call unsubscribe with lock held.
375  guard.unlock();
376  std::for_each(v.begin(), v.end(),
377  [](const subscription& s) {
378  s.unsubscribe(); });
379  }
380  }
381  };
382 
383 public:
384  typedef std::shared_ptr<composite_subscription_state> shared_state_type;
385 
386 protected:
387  mutable shared_state_type state;
388 
389 public:
390  composite_subscription_inner()
391  : state(std::make_shared<composite_subscription_state>())
392  {
393  }
394  composite_subscription_inner(tag_composite_subscription_empty et)
395  : state(std::make_shared<composite_subscription_state>(et))
396  {
397  }
398 
399  composite_subscription_inner(const composite_subscription_inner& o)
400  : state(o.state)
401  {
402  if (!state) {
403  std::terminate();
404  }
405  }
406  composite_subscription_inner(composite_subscription_inner&& o)
407  : state(std::move(o.state))
408  {
409  if (!state) {
410  std::terminate();
411  }
412  }
413 
414  composite_subscription_inner& operator=(composite_subscription_inner o)
415  {
416  state = std::move(o.state);
417  if (!state) {
418  std::terminate();
419  }
420  return *this;
421  }
422 
423  inline weak_subscription add(subscription s) const {
424  if (!state) {
425  std::terminate();
426  }
427  return state->add(std::move(s));
428  }
429  inline void remove(weak_subscription w) const {
430  if (!state) {
431  std::terminate();
432  }
433  state->remove(std::move(w));
434  }
435  inline void clear() const {
436  if (!state) {
437  std::terminate();
438  }
439  state->clear();
440  }
441  inline void unsubscribe() {
442  if (!state) {
443  std::terminate();
444  }
445  state->unsubscribe();
446  }
447 };
448 
449 inline composite_subscription shared_empty();
450 
451 }
452 
460  : protected detail::composite_subscription_inner
461  , public subscription
462 {
463  typedef detail::composite_subscription_inner inner_type;
464 public:
466 
467  composite_subscription(detail::tag_composite_subscription_empty et)
468  : inner_type(et)
469  , subscription() // use empty base
470  {
471  }
472 
473 public:
474 
476  : inner_type()
477  , subscription(*static_cast<const inner_type*>(this))
478  {
479  }
480 
482  : inner_type(o)
483  , subscription(static_cast<const subscription&>(o))
484  {
485  }
487  : inner_type(std::move(o))
488  , subscription(std::move(static_cast<subscription&>(o)))
489  {
490  }
491 
493  {
494  inner_type::operator=(std::move(o));
495  subscription::operator=(std::move(*static_cast<subscription*>(&o)));
496  return *this;
497  }
498 
499  static inline composite_subscription empty() {
500  return detail::shared_empty();
501  }
502 
505 
506  using inner_type::clear;
507 
508  inline weak_subscription add(subscription s) const {
509  if (s == static_cast<const subscription&>(*this)) {
510  // do not nest the same subscription
511  std::terminate();
512  //return s.get_weak();
513  }
514  auto that = this->subscription::state.get();
515  trace_activity().subscription_add_enter(*that, s);
516  auto w = inner_type::add(std::move(s));
517  trace_activity().subscription_add_return(*that);
518  return w;
519  }
520 
521  template<class F>
522  auto add(F f) const
523  -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
524  return add(make_subscription(std::move(f)));
525  }
526 
527  inline void remove(weak_subscription w) const {
528  auto that = this->subscription::state.get();
529  trace_activity().subscription_remove_enter(*that, w);
530  inner_type::remove(w);
531  trace_activity().subscription_remove_return(*that);
532  }
533 };
534 
535 inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) {
536  return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs);
537 }
538 inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
539  return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs);
540 }
541 inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
542  return !(lhs == rhs);
543 }
544 
545 namespace detail {
546 
547 inline composite_subscription shared_empty() {
548  static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
549  return shared_empty;
550 }
551 
552 }
553 
554 template<class T>
556 {
557 public:
559 
562  , value(std::make_shared<rxu::detail::maybe<T>>())
563  {
564  }
565 
567  : lifetime(std::move(cs))
568  , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
569  {
570  auto localValue = value;
571  lifetime.add(
572  [localValue](){
573  localValue->reset();
574  }
575  );
576  }
577 
578  T& get() {
579  return value.get()->get();
580  }
582  return lifetime;
583  }
584 
585  bool is_subscribed() const {
586  return lifetime.is_subscribed();
587  }
589  return lifetime.add(std::move(s));
590  }
591  template<class F>
592  auto add(F f) const
593  -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
594  return lifetime.add(make_subscription(std::move(f)));
595  }
596  void remove(weak_subscription w) const {
597  return lifetime.remove(std::move(w));
598  }
599  void clear() const {
600  return lifetime.clear();
601  }
602  void unsubscribe() const {
603  return lifetime.unsubscribe();
604  }
605 
606 protected:
608  std::shared_ptr<rxu::detail::maybe<T>> value;
609 };
610 
611 }
612 
613 #endif
composite_subscription lifetime
Definition: rx-subscription.hpp:607
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:192
subscription(subscription &&o)
Definition: rx-subscription.hpp:161
Definition: rx-all.hpp:26
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:522
static_subscription(const static_subscription &o)
Definition: rx-subscription.hpp:50
bool is_subscribed() const
Definition: rx-subscription.hpp:172
Definition: rx-subscription.hpp:555
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
bool operator<(const subscription &lhs, const subscription &rhs)
Definition: rx-subscription.hpp:208
Definition: rx-subscription.hpp:28
static rxu::maybe< subscription > maybe_lock(weak_state_type w)
Definition: rx-subscription.hpp:198
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:219
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:465
void unsubscribe() const
Definition: rx-subscription.hpp:602
friend bool operator<(const subscription &, const subscription &)
Definition: rx-subscription.hpp:208
friend bool operator==(const subscription &, const subscription &)
Definition: rx-subscription.hpp:211
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
tag_subscription subscription_tag
Definition: rx-subscription.hpp:29
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
subscription()
Definition: rx-subscription.hpp:130
subscription & operator=(subscription o)
Definition: rx-subscription.hpp:168
resource()
Definition: rx-subscription.hpp:560
std::shared_ptr< base_subscription_state > state
Definition: rx-subscription.hpp:107
Definition: rx-subscription.hpp:31
composite_subscription & get_subscription()
Definition: rx-subscription.hpp:581
composite_subscription(composite_subscription &&o)
Definition: rx-subscription.hpp:486
subscription(const subscription &o)
Definition: rx-subscription.hpp:154
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:588
static composite_subscription empty()
Definition: rx-subscription.hpp:499
subscription(U u, typename std::enable_if<!is_subscription< U >::value, void **>::type=nullptr)
Definition: rx-subscription.hpp:138
subscription(U u, typename std::enable_if<!std::is_same< subscription, U >::value &&is_subscription< U >::value, void **>::type=nullptr)
Definition: rx-subscription.hpp:146
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-subscription.hpp:42
composite_subscription & operator=(composite_subscription o)
Definition: rx-subscription.hpp:492
static_subscription(static_subscription &&o)
Definition: rx-subscription.hpp:54
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
static_subscription(unsubscribe_call_type s)
Definition: rx-subscription.hpp:58
std::weak_ptr< base_subscription_state > weak_state_type
Definition: rx-subscription.hpp:84
bool is_subscribed() const
Definition: rx-subscription.hpp:172
composite_subscription(const composite_subscription &o)
Definition: rx-subscription.hpp:481
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscription.hpp:558
void unsubscribe() const
Definition: rx-subscription.hpp:178
Definition: rx-subscription.hpp:29
bool is_subscribed() const
Definition: rx-subscription.hpp:585
void unsubscribe() const
Definition: rx-subscription.hpp:62
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:592
weak_state_type get_weak()
Definition: rx-subscription.hpp:186
void unsubscribe() const
Definition: rx-subscription.hpp:178
composite_subscription(detail::tag_composite_subscription_empty et)
Definition: rx-subscription.hpp:467
composite_subscription()
Definition: rx-subscription.hpp:475
Definition: rx-subscription.hpp:67
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:527
std::shared_ptr< rxu::detail::maybe< T > > value
Definition: rx-subscription.hpp:608
void clear() const
Definition: rx-subscription.hpp:599
resource(T t, composite_subscription cs=composite_subscription())
Definition: rx-subscription.hpp:566
static const bool value
Definition: rx-subscription.hpp:38