5 #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP) 6 #define RXCPP_RX_REPLAYSUBJECT_HPP 8 #include "../rx-includes.hpp" 16 template<
class Coordination>
19 typedef rxu::maybe<std::size_t> count_type;
20 typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type;
21 typedef rxsc::scheduler::clock_type::time_point time_point_type;
22 typedef rxu::decay_t<Coordination> coordination_type;
23 typedef typename coordination_type::coordinator_type coordinator_type;
26 template<
class T,
class Coordination>
27 class replay_observer :
public detail::multicast_observer<T>
29 typedef replay_observer<T, Coordination> this_type;
30 typedef detail::multicast_observer<T> base_type;
32 typedef replay_traits<Coordination> traits;
33 typedef typename traits::count_type count_type;
34 typedef typename traits::period_type period_type;
35 typedef typename traits::time_point_type time_point_type;
36 typedef typename traits::coordination_type coordination_type;
37 typedef typename traits::coordinator_type coordinator_type;
39 class replay_observer_state :
public std::enable_shared_from_this<replay_observer_state>
41 mutable std::mutex lock;
42 mutable std::list<T> values;
43 mutable std::list<time_point_type> time_points;
44 mutable count_type
count;
45 mutable period_type period;
46 mutable composite_subscription replayLifetime;
48 mutable coordination_type coordination;
49 mutable coordinator_type coordinator;
52 void remove_oldest()
const {
54 if (!period.empty()) {
55 time_points.pop_front();
60 ~replay_observer_state(){
61 replayLifetime.unsubscribe();
63 explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
66 , replayLifetime(_replayLifetime)
67 , coordination(std::move(_coordination))
68 , coordinator(std::move(_coordinator))
73 std::unique_lock<std::mutex> guard(lock);
76 if (values.size() ==
count.get())
80 if (!period.empty()) {
81 auto now = coordination.now();
82 while (!time_points.empty() && (now - time_points.front() > period.get()))
84 time_points.push_back(now);
87 values.push_back(std::move(v));
89 std::list<T>
get()
const {
90 std::unique_lock<std::mutex> guard(lock);
95 std::shared_ptr<replay_observer_state> state;
98 replay_observer(count_type
count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
99 : base_type(subscriberLifetime)
101 replayLifetime.add(subscriberLifetime);
102 auto coordinator = coordination.create_coordinator(replayLifetime);
103 state = std::make_shared<replay_observer_state>(std::move(
count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
106 subscriber<T> get_subscriber()
const {
107 return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).
as_dynamic();
110 std::list<T> get_values()
const {
114 coordinator_type& get_coordinator()
const {
115 return state->coordinator;
119 void on_next(V v)
const {
121 base_type::on_next(std::move(v));
127 template<
class T,
class Coordination>
130 typedef detail::replay_traits<Coordination> traits;
131 typedef typename traits::count_type count_type;
132 typedef typename traits::period_type period_type;
133 typedef typename traits::time_point_type time_point_type;
135 detail::replay_observer<T, Coordination> s;
159 return s.has_observers();
163 return s.get_values();
167 return s.get_subscriber();
176 keepAlive.add(keepAlive.get_subscriber(), std::move(o));
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:148
replay(std::size_t count, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:143
observable< T > get_observable() const
Definition: rx-replaysubject.hpp:170
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:478
replay(Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:138
subscriber< T > get_subscriber() const
Definition: rx-replaysubject.hpp:166
Definition: rx-replaysubject.hpp:128
std::list< T > get_values() const
Definition: rx-replaysubject.hpp:162
auto as_dynamic() -> detail::dynamic_factory
Definition: rx-subscribe.hpp:117
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:153
bool has_observers() const
Definition: rx-replaysubject.hpp:158