hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
combine_latest_until_first_completed.hpp
Go to the documentation of this file.
1 
7 #ifndef IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
8 #define IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
9 
10 #include <rxcpp/operators/rx-combine_latest.hpp>
11 
12 namespace iroha {
13 
24  template <class Coordination, class Selector, class... ObservableN>
26  : public rxcpp::operators::operator_base<rxcpp::util::value_type_t<
27  rxcpp::operators::detail::combine_latest_traits<Coordination,
28  Selector,
29  ObservableN...>>> {
30  typedef combine_latest_until_first_completed<Coordination,
31  Selector,
32  ObservableN...>
34 
35  typedef rxcpp::operators::detail::
36  combine_latest_traits<Coordination, Selector, ObservableN...>
38 
39  typedef typename traits::tuple_source_type tuple_source_type;
40  typedef typename traits::tuple_source_value_type tuple_source_value_type;
41 
42  typedef typename traits::selector_type selector_type;
43 
44  typedef typename traits::coordination_type coordination_type;
45  typedef typename coordination_type::coordinator_type coordinator_type;
46 
47  struct values {
48  values(tuple_source_type o, selector_type s, coordination_type sf)
49  : source(std::move(o)),
50  selector(std::move(s)),
51  coordination(std::move(sf)) {}
52  tuple_source_type source;
53  selector_type selector;
54  coordination_type coordination;
55  };
57 
59  selector_type s,
60  tuple_source_type ts)
61  : initial(std::move(ts), std::move(s), std::move(sf)) {}
62 
63  template <int Index, class State>
64  void subscribe_one(std::shared_ptr<State> state) const {
65  typedef typename std::tuple_element<Index,
66  tuple_source_type>::type::value_type
67  source_value_type;
68 
69  rxcpp::composite_subscription innercs;
70 
71  // when the out observer is unsubscribed all the
72  // inner subscriptions are unsubscribed as well
73  state->out.add(innercs);
74 
75  auto source = on_exception(
76  [&]() {
77  return state->coordinator.in(std::get<Index>(state->source));
78  },
79  state->out);
80  if (source.empty()) {
81  return;
82  }
83 
84  // this subscribe does not share the observer subscription
85  // so that when it is unsubscribed the observer can be called
86  // until the inner subscriptions have finished
87  auto sink = rxcpp::make_subscriber<source_value_type>(
88  state->out,
89  innercs,
90  // on_next
91  [state](source_value_type st) {
92  auto &value = std::get<Index>(state->latest);
93 
94  if (value.empty()) {
95  ++state->valuesSet;
96  }
97 
98  value.reset(st);
99 
100  if (state->valuesSet == sizeof...(ObservableN)) {
101  auto values = rxcpp::util::surely(state->latest);
102  auto selectedResult = rxcpp::util::apply(values, state->selector);
103  state->out.on_next(selectedResult);
104  }
105  },
106  // on_error
107  [state](std::exception_ptr e) { state->out.on_error(e); },
108  // on_completed
109  [state]() { state->out.on_completed(); });
110  auto selectedSink = on_exception(
111  [&]() { return state->coordinator.out(sink); }, state->out);
112  if (selectedSink.empty()) {
113  return;
114  }
115  source->subscribe(std::move(selectedSink.get()));
116  }
117 
118  template <class State, int... IndexN>
119  void subscribe_all(std::shared_ptr<State> state,
120  rxcpp::util::values<int, IndexN...>) const {
121  bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
122  subscribed[0] = (*subscribed); // silence warning
123  }
124 
125  template <class Subscriber>
126  void on_subscribe(Subscriber scbr) const {
127  static_assert(rxcpp::is_subscriber<Subscriber>::value,
128  "subscribe must be passed a subscriber");
129 
130  typedef Subscriber output_type;
131 
132  struct combine_latest_until_first_completed_state_type
133  : public std::enable_shared_from_this<
134  combine_latest_until_first_completed_state_type>,
135  public values {
136  combine_latest_until_first_completed_state_type(values i,
137  coordinator_type coor,
138  output_type oarg)
139  : values(std::move(i)),
140  valuesSet(0),
141  coordinator(std::move(coor)),
142  out(std::move(oarg)) {}
143 
144  mutable int valuesSet;
145  mutable tuple_source_value_type latest;
146  coordinator_type coordinator;
147  output_type out;
148  };
149 
150  auto coordinator =
151  initial.coordination.create_coordinator(scbr.get_subscription());
152 
153  // take a copy of the values for each subscription
154  auto state =
155  std::make_shared<combine_latest_until_first_completed_state_type>(
156  initial, std::move(coordinator), std::move(scbr));
157 
159  state,
160  typename rxcpp::util::values_from<int,
161  sizeof...(ObservableN)>::type());
162  }
163  };
164 
165  template <
166  class Coordination,
167  class Selector,
168  class Observable,
169  class... ObservableN,
170  class Enabled = rxcpp::util::enable_if_all_true_type_t<
171  rxcpp::is_coordination<Coordination>,
172  rxcpp::operators::detail::
173  is_combine_latest_selector<Selector, Observable, ObservableN...>,
174  rxcpp::all_observables<Observable, ObservableN...>>,
175  class ResolvedSelector = rxcpp::util::decay_t<Selector>,
176  class combine_latest = combine_latest_until_first_completed<
177  Coordination,
178  ResolvedSelector,
179  rxcpp::util::decay_t<Observable>,
180  rxcpp::util::decay_t<ObservableN>...>,
181  class Value = rxcpp::util::value_type_t<combine_latest>,
182  class Result = rxcpp::observable<Value, combine_latest>>
183  static Result makeCombineLatestUntilFirstCompleted(Observable &&o,
184  Coordination &&cn,
185  Selector &&s,
186  ObservableN &&... on) {
187  return Result(
188  combine_latest(std::forward<Coordination>(cn),
189  std::forward<Selector>(s),
190  std::make_tuple(std::forward<Observable>(o),
191  std::forward<ObservableN>(on)...)));
192  }
193 
194 } // namespace iroha
195 
196 #endif // IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP
void subscribe_all(std::shared_ptr< State > state, rxcpp::util::values< int, IndexN... >) const
Definition: combine_latest_until_first_completed.hpp:119
decltype(auto) constexpr apply(Tuple &&t, F &&f)
apply F to Tuple
Definition: soci_utils.hpp:72
values(tuple_source_type o, selector_type s, coordination_type sf)
Definition: combine_latest_until_first_completed.hpp:48
coordination_type coordination
Definition: combine_latest_until_first_completed.hpp:54
traits::selector_type selector_type
Definition: combine_latest_until_first_completed.hpp:42
void on_subscribe(Subscriber scbr) const
Definition: combine_latest_until_first_completed.hpp:126
Definition: peer.hpp:48
static Result makeCombineLatestUntilFirstCompleted(Observable &&o, Coordination &&cn, Selector &&s, ObservableN &&...on)
Definition: combine_latest_until_first_completed.hpp:183
rxcpp::operators::detail::combine_latest_traits< Coordination, Selector, ObservableN... > traits
Definition: combine_latest_until_first_completed.hpp:37
Definition: block_query.hpp:14
void subscribe_one(std::shared_ptr< State > state) const
Definition: combine_latest_until_first_completed.hpp:64
Definition: combine_latest_until_first_completed.hpp:25
combine_latest_until_first_completed< Coordination, Selector, ObservableN... > this_type
Definition: combine_latest_until_first_completed.hpp:33
combine_latest_until_first_completed(coordination_type sf, selector_type s, tuple_source_type ts)
Definition: combine_latest_until_first_completed.hpp:58
traits::tuple_source_value_type tuple_source_value_type
Definition: combine_latest_until_first_completed.hpp:40
tuple_source_type source
Definition: combine_latest_until_first_completed.hpp:52
coordination_type::coordinator_type coordinator_type
Definition: combine_latest_until_first_completed.hpp:45
values initial
Definition: combine_latest_until_first_completed.hpp:56
traits::tuple_source_type tuple_source_type
Definition: combine_latest_until_first_completed.hpp:39
Definition: combine_latest_until_first_completed.hpp:47
selector_type selector
Definition: combine_latest_until_first_completed.hpp:53
traits::coordination_type coordination_type
Definition: combine_latest_until_first_completed.hpp:44