RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-eventloop.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
15 {
16 private:
17  typedef event_loop this_type;
18  event_loop(const this_type&);
19 
20  struct loop_worker : public worker_interface
21  {
22  private:
23  typedef loop_worker this_type;
24  loop_worker(const this_type&);
25 
26  typedef detail::schedulable_queue<
27  typename clock_type::time_point> queue_item_time;
28 
29  typedef queue_item_time::item_type item_type;
30 
31  composite_subscription lifetime;
32  worker controller;
33  std::shared_ptr<const scheduler_interface> alive;
34 
35  public:
36  virtual ~loop_worker()
37  {
38  }
39  loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
40  : lifetime(cs)
41  , controller(w)
42  , alive(alive)
43  {
44  auto token = controller.add(cs);
45  cs.add([token, w](){
46  w.remove(token);
47  });
48  }
49 
50  virtual clock_type::time_point now() const {
51  return clock_type::now();
52  }
53 
54  virtual void schedule(const schedulable& scbl) const {
55  controller.schedule(lifetime, scbl.get_action());
56  }
57 
58  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
59  controller.schedule(when, lifetime, scbl.get_action());
60  }
61  };
62 
63  mutable thread_factory factory;
64  scheduler newthread;
65  mutable std::atomic<std::size_t> count;
66  composite_subscription loops_lifetime;
67  std::vector<worker> loops;
68 
69 public:
71  : factory([](std::function<void()> start){
72  return std::thread(std::move(start));
73  })
74  , newthread(make_new_thread())
75  , count(0)
76  {
77  auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
78  while (remaining--) {
79  loops.push_back(newthread.create_worker(loops_lifetime));
80  }
81  }
83  : factory(tf)
84  , newthread(make_new_thread(tf))
85  , count(0)
86  {
87  auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
88  while (remaining--) {
89  loops.push_back(newthread.create_worker(loops_lifetime));
90  }
91  }
92  virtual ~event_loop()
93  {
94  loops_lifetime.unsubscribe();
95  }
96 
97  virtual clock_type::time_point now() const {
98  return clock_type::now();
99  }
100 
102  return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
103  }
104 };
105 
107  static scheduler instance = make_scheduler<event_loop>();
108  return instance;
109 }
111  return make_scheduler<event_loop>(tf);
112 }
113 
114 }
115 
116 }
117 
118 #endif
Definition: rx-scheduler.hpp:163
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:240
Definition: rx-all.hpp:26
virtual clock_type::time_point now() const
Definition: rx-eventloop.hpp:97
virtual ~event_loop()
Definition: rx-eventloop.hpp:92
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:237
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
Definition: rx-eventloop.hpp:14
const action & get_action() const
Definition: rx-scheduler.hpp:557
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_event_loop()
Definition: rx-eventloop.hpp:106
scheduler make_new_thread()
Definition: rx-newthread.hpp:169
virtual worker create_worker(composite_subscription cs) const
Definition: rx-eventloop.hpp:101
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
Definition: rx-scheduler.hpp:353
void unsubscribe() const
Definition: rx-subscription.hpp:178
event_loop()
Definition: rx-eventloop.hpp:70
event_loop(thread_factory tf)
Definition: rx-eventloop.hpp:82
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200