30 #if !defined(RXCPP_OPERATORS_RX_GROUP_BY_HPP) 31 #define RXCPP_OPERATORS_RX_GROUP_BY_HPP 33 #include "../rx-includes.hpp" 42 struct group_by_invalid_arguments {};
45 struct group_by_invalid :
public rxo::operator_base<group_by_invalid_arguments<AN...>> {
46 using type = observable<group_by_invalid_arguments<
AN...>, group_by_invalid<
AN...>>;
49 using group_by_invalid_t =
typename group_by_invalid<
AN...>::type;
51 template<
class T,
class Selector>
52 struct is_group_by_selector_for {
54 typedef rxu::decay_t<Selector> selector_type;
55 typedef T source_value_type;
57 struct tag_not_valid {};
58 template<
class CV,
class CS>
59 static auto check(
int) -> decltype((*(CS*)
nullptr)(*(CV*)
nullptr));
60 template<
class CV,
class CS>
61 static tag_not_valid check(...);
63 typedef decltype(check<source_value_type, selector_type>(0)) type;
64 static const bool value = !std::is_same<type, tag_not_valid>::value;
67 template<
class T,
class Observable,
class KeySelector,
class MarbleSelector,
class BinaryPredicate,
class DurationSelector>
68 struct group_by_traits
70 typedef T source_value_type;
71 typedef rxu::decay_t<Observable> source_type;
72 typedef rxu::decay_t<KeySelector> key_selector_type;
73 typedef rxu::decay_t<MarbleSelector> marble_selector_type;
74 typedef rxu::decay_t<BinaryPredicate> predicate_type;
75 typedef rxu::decay_t<DurationSelector> duration_selector_type;
77 static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value,
"group_by KeySelector must be a function with the signature key_type(source_value_type)");
79 typedef typename is_group_by_selector_for<source_value_type, key_selector_type>::type key_type;
81 static_assert(is_group_by_selector_for<source_value_type, marble_selector_type>::value,
"group_by MarbleSelector must be a function with the signature marble_type(source_value_type)");
83 typedef typename is_group_by_selector_for<source_value_type, marble_selector_type>::type marble_type;
85 typedef rxsub::subject<marble_type> subject_type;
87 typedef std::map<key_type, typename subject_type::subscriber_type, predicate_type> key_subscriber_map_type;
89 typedef grouped_observable<key_type, marble_type> grouped_observable_type;
92 template<
class T,
class Observable,
class KeySelector,
class MarbleSelector,
class BinaryPredicate,
class DurationSelector>
95 typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
96 typedef typename traits_type::key_selector_type key_selector_type;
97 typedef typename traits_type::marble_selector_type marble_selector_type;
98 typedef typename traits_type::marble_type marble_type;
99 typedef typename traits_type::predicate_type predicate_type;
100 typedef typename traits_type::duration_selector_type duration_selector_type;
101 typedef typename traits_type::subject_type subject_type;
102 typedef typename traits_type::key_type key_type;
104 typedef typename traits_type::key_subscriber_map_type group_map_type;
105 typedef std::vector<typename composite_subscription::weak_subscription> bindings_type;
107 struct group_by_state_type
109 group_by_state_type(composite_subscription sl, predicate_type p)
110 : source_lifetime(sl)
114 composite_subscription source_lifetime;
116 group_map_type groups;
117 std::atomic<int> observers;
120 template<
class Subscriber>
121 static void stopsource(Subscriber&& dest, std::shared_ptr<group_by_state_type>& state) {
124 if (!state->source_lifetime.is_subscribed()) {
128 if (state->observers == 0) {
129 state->source_lifetime.unsubscribe();
134 struct group_by_values
136 group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
137 : keySelector(std::move(ks))
138 , marbleSelector(std::move(ms))
139 , predicate(std::move(p))
140 , durationSelector(std::move(ds))
143 mutable key_selector_type keySelector;
144 mutable marble_selector_type marbleSelector;
145 mutable predicate_type predicate;
146 mutable duration_selector_type durationSelector;
149 group_by_values initial;
151 group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
152 : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds))
156 struct group_by_observable :
public rxs::source_base<marble_type>
158 mutable std::shared_ptr<group_by_state_type> state;
159 subject_type subject;
162 group_by_observable(std::shared_ptr<group_by_state_type> st, subject_type s, key_type k)
163 : state(std::move(st))
164 , subject(std::move(s))
169 template<
class Subscriber>
170 void on_subscribe(Subscriber&& o)
const {
171 group_by::stopsource(o, state);
172 subject.get_observable().subscribe(std::forward<Subscriber>(o));
175 key_type on_get_key() {
180 template<
class Subscriber>
181 struct group_by_observer :
public group_by_values
183 typedef group_by_observer<Subscriber> this_type;
184 typedef typename traits_type::grouped_observable_type value_type;
185 typedef rxu::decay_t<Subscriber> dest_type;
186 typedef observer<T, this_type> observer_type;
190 mutable std::shared_ptr<group_by_state_type> state;
192 group_by_observer(composite_subscription l, dest_type d, group_by_values v)
195 , state(std::make_shared<group_by_state_type>(l, group_by_values::predicate))
197 group_by::stopsource(dest, state);
199 void on_next(T v)
const {
202 return this->keySelector(v);},
204 if (selectedKey.empty()) {
207 auto g = state->groups.find(selectedKey.get());
208 if (g == state->groups.end()) {
209 if (!dest.is_subscribed()) {
212 auto sub = subject_type();
213 g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
214 auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()));
217 return this->durationSelector(obs);},
219 if (durationObs.empty()) {
224 composite_subscription duration_sub;
225 auto ssub = state->source_lifetime.add(duration_sub);
227 auto expire_state = state;
228 auto expire_dest = g->second;
229 auto expire = [=]() {
230 auto g = expire_state->groups.find(selectedKey.get());
231 if (g != expire_state->groups.end()) {
232 expire_state->groups.erase(g);
233 expire_dest.on_completed();
235 expire_state->source_lifetime.remove(ssub);
237 auto robs = durationObs.get().take(1);
238 duration_sub.add(robs.subscribe(
239 [](
const typename decltype(robs)::value_type &){},
246 return this->marbleSelector(v);},
248 if (selectedMarble.empty()) {
251 g->second.on_next(std::move(selectedMarble.get()));
254 for(
auto& g : state->groups) {
255 g.second.on_error(e);
259 void on_completed()
const {
260 for(
auto& g : state->groups) {
261 g.second.on_completed();
266 static subscriber<T, observer_type> make(dest_type d, group_by_values v) {
267 auto cs = composite_subscription();
268 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v))));
272 template<
class Subscriber>
273 auto operator()(Subscriber dest)
const 274 -> decltype(group_by_observer<Subscriber>::make(std::move(dest), initial)) {
275 return group_by_observer<Subscriber>::make(std::move(dest), initial);
279 template<
class KeySelector,
class MarbleSelector,
class BinaryPredicate,
class DurationSelector>
280 class group_by_factory
282 typedef rxu::decay_t<KeySelector> key_selector_type;
283 typedef rxu::decay_t<MarbleSelector> marble_selector_type;
284 typedef rxu::decay_t<BinaryPredicate> predicate_type;
285 typedef rxu::decay_t<DurationSelector> duration_selector_type;
286 key_selector_type keySelector;
287 marble_selector_type marbleSelector;
288 predicate_type predicate;
289 duration_selector_type durationSelector;
291 group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
292 : keySelector(std::move(ks))
293 , marbleSelector(std::move(ms))
294 , predicate(std::move(p))
295 , durationSelector(std::move(ds))
298 template<
class Observable>
299 struct group_by_factory_traits
301 typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
302 typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
303 typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type;
305 template<
class Observable>
306 auto operator()(Observable&& source)
307 -> decltype(source.template
lift<
typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(
typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) {
308 return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(
typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)));
316 template<
class...
AN>
327 template<
class Observable,
class KeySelector,
class MarbleSelector,
class BinaryPredicate,
class DurationSelector,
329 class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
331 class Value =
typename Traits::grouped_observable_type>
332 static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds)
333 -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) {
334 return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)));
337 template<
class Observable,
class KeySelector,
class MarbleSelector,
class BinaryPredicate,
340 class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
342 class Value =
typename Traits::grouped_observable_type>
343 static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
344 -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()))) {
345 return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()));
348 template<
class Observable,
class KeySelector,
class MarbleSelector,
352 class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
354 class Value =
typename Traits::grouped_observable_type>
355 static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms)
356 -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms),
rxu::less(),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()))) {
357 return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms),
rxu::less(),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()));
361 template<
class Observable,
class KeySelector,
362 class MarbleSelector=rxu::detail::take_at<0>,
366 class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
368 class Value =
typename Traits::grouped_observable_type>
369 static auto member(Observable&& o, KeySelector&& ks)
370 -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(),
rxu::less(),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()))) {
371 return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(),
rxu::less(),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()));
374 template<
class Observable,
375 class KeySelector=rxu::detail::take_at<0>,
376 class MarbleSelector=rxu::detail::take_at<0>,
382 class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
384 class Value =
typename Traits::grouped_observable_type>
386 -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(),
rxu::less(),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()))) {
387 return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(),
rxu::less(),
rxu::ret<
observable<
int, rxs::detail::never<int>>>()));
390 template<
class...
AN>
391 static operators::detail::group_by_invalid_t<
AN...>
member(
const AN&...) {
394 static_assert(
sizeof...(
AN) == 10000,
"group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable");
Definition: rx-util.hpp:112
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:47
Definition: rx-operators.hpp:69
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
Definition: rx-operators.hpp:47
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:126
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:478
static auto member(Observable &&o, KeySelector &&ks, MarbleSelector &&ms, BinaryPredicate &&p) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), std::forward< MarbleSelector >(ms), std::forward< BinaryPredicate >(p), rxu::ret< observable< int, rxs::detail::never< int >>>())))
Definition: rx-group_by.hpp:343
Definition: rx-util.hpp:432
static auto member(Observable &&o) -> decltype(o.template lift< Value >(GroupBy(rxu::detail::take_at< 0 >(), rxu::detail::take_at< 0 >(), rxu::less(), rxu::ret< observable< int, rxs::detail::never< int >>>())))
Definition: rx-group_by.hpp:385
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
Definition: rx-util.hpp:423
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
static auto member(Observable &&o, KeySelector &&ks) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), rxu::detail::take_at< 0 >(), rxu::less(), rxu::ret< observable< int, rxs::detail::never< int >>>())))
Definition: rx-group_by.hpp:369
static operators::detail::group_by_invalid_t< AN... > member(const AN &...)
Definition: rx-group_by.hpp:391
static auto member(Observable &&o, KeySelector &&ks, MarbleSelector &&ms) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), std::forward< MarbleSelector >(ms), rxu::less(), rxu::ret< observable< int, rxs::detail::never< int >>>())))
Definition: rx-group_by.hpp:355
auto group_by(AN &&... an) -> operator_factory< group_by_tag, AN... >
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-group_by.hpp:317
static auto member(Observable &&o, KeySelector &&ks, MarbleSelector &&ms, BinaryPredicate &&p, DurationSelector &&ds) -> decltype(o.template lift< Value >(GroupBy(std::forward< KeySelector >(ks), std::forward< MarbleSelector >(ms), std::forward< BinaryPredicate >(p), std::forward< DurationSelector >(ds))))
Definition: rx-group_by.hpp:332