hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
timeout.hpp
Go to the documentation of this file.
1 
7 #ifndef IROHA_TIMEOUT_HPP
8 #define IROHA_TIMEOUT_HPP
9 
10 #include <rxcpp/operators/rx-timeout.hpp>
11 
12 namespace iroha {
13 
27  template <class T, class Selector, class Coordination>
28  struct timeout {
29  typedef rxcpp::util::decay_t<T> source_value_type;
30  typedef rxcpp::util::decay_t<Coordination> coordination_type;
31  typedef typename coordination_type::coordinator_type coordinator_type;
32  typedef rxcpp::util::decay_t<Selector> select_type;
33 
34  struct timeout_values {
35  timeout_values(select_type s, coordination_type c)
36  : selector(std::move(s)), coordination(c) {}
37 
38  select_type selector;
39  coordination_type coordination;
40  };
42 
43  timeout(select_type s, coordination_type coordination)
44  : initial(std::move(s), coordination) {}
45 
46  template <class Subscriber>
49  typedef rxcpp::util::decay_t<T> value_type;
50  typedef rxcpp::util::decay_t<Subscriber> dest_type;
51  typedef rxcpp::observer<T, this_type> observer_type;
52 
54  timeout_subscriber_values(rxcpp::composite_subscription cs,
55  dest_type d,
57  coordinator_type c)
58  : timeout_values(v),
59  cs(std::move(cs)),
60  dest(std::move(d)),
61  coordinator(std::move(c)),
62  worker(coordinator.get_worker()),
63  index(0) {}
64 
65  rxcpp::composite_subscription cs;
66  dest_type dest;
67  coordinator_type coordinator;
68  rxcpp::schedulers::worker worker;
69  mutable std::size_t index;
70  };
71  typedef std::shared_ptr<timeout_subscriber_values> state_type;
72  state_type state;
73 
74  timeout_observer(rxcpp::composite_subscription cs,
75  dest_type d,
77  coordinator_type c)
78  : state(std::make_shared<timeout_subscriber_values>(
80  std::move(cs), std::move(d), v, std::move(c)))) {
81  auto localState = state;
82 
83  auto disposer = [=](const rxcpp::schedulers::schedulable &) {
84  localState->cs.unsubscribe();
85  localState->dest.unsubscribe();
86  localState->worker.unsubscribe();
87  };
88  auto selectedDisposer = on_exception(
89  [&]() { return localState->coordinator.act(disposer); },
90  localState->dest);
91  if (selectedDisposer.empty()) {
92  return;
93  }
94 
95  localState->dest.add(
96  [=]() { localState->worker.schedule(selectedDisposer.get()); });
97  localState->cs.add(
98  [=]() { localState->worker.schedule(selectedDisposer.get()); });
99  }
100 
101  static std::function<void(const rxcpp::schedulers::schedulable &)>
102  produce_timeout(std::size_t id, state_type state) {
103  auto produce = [id, state](const rxcpp::schedulers::schedulable &) {
104  if (id != state->index)
105  return;
106 
107  state->dest.on_error(std::make_exception_ptr(
108  rxcpp::timeout_error("timeout has occurred")));
109  };
110 
111  auto selectedProduce = on_exception(
112  [&]() { return state->coordinator.act(produce); }, state->dest);
113  if (selectedProduce.empty()) {
114  return std::function<void(const rxcpp::schedulers::schedulable &)>();
115  }
116 
117  return std::function<void(const rxcpp::schedulers::schedulable &)>(
118  selectedProduce.get());
119  }
120 
121  template <class Value>
122  void on_next(Value &&v) const {
123  auto localState = state;
124 
125  auto selected = on_exception(
126  [&]() { return localState->selector(std::forward<Value>(v)); },
127  localState->dest);
128  if (selected.empty()) {
129  return;
130  }
131 
132  auto work = [v, localState, period = std::move(selected.get())](
133  const rxcpp::schedulers::schedulable &) {
134  auto new_id = ++localState->index;
135  auto produce_time = localState->worker.now() + period;
136 
137  localState->dest.on_next(v);
138  localState->worker.schedule(produce_time,
139  produce_timeout(new_id, localState));
140  };
141  auto selectedWork =
142  on_exception([&]() { return localState->coordinator.act(work); },
143  localState->dest);
144  if (selectedWork.empty()) {
145  return;
146  }
147  localState->worker.schedule(selectedWork.get());
148  }
149 
150  void on_error(std::exception_ptr e) const {
151  auto localState = state;
152  auto work = [e, localState](const rxcpp::schedulers::schedulable &) {
153  localState->dest.on_error(e);
154  };
155  auto selectedWork =
156  on_exception([&]() { return localState->coordinator.act(work); },
157  localState->dest);
158  if (selectedWork.empty()) {
159  return;
160  }
161  localState->worker.schedule(selectedWork.get());
162  }
163 
164  void on_completed() const {
165  auto localState = state;
166  auto work = [localState](const rxcpp::schedulers::schedulable &) {
167  localState->dest.on_completed();
168  };
169  auto selectedWork =
170  on_exception([&]() { return localState->coordinator.act(work); },
171  localState->dest);
172  if (selectedWork.empty()) {
173  return;
174  }
175  localState->worker.schedule(selectedWork.get());
176  }
177 
178  static rxcpp::subscriber<T, observer_type> make(dest_type d,
179  timeout_values v) {
180  auto cs = rxcpp::composite_subscription();
181  auto coordinator = v.coordination.create_coordinator();
182 
183  return rxcpp::make_subscriber<T>(
184  cs,
185  observer_type(this_type(
186  cs, std::move(d), std::move(v), std::move(coordinator))));
187  }
188  };
189 
190  template <class Subscriber>
191  auto operator()(Subscriber dest) const
192  -> decltype(timeout_observer<Subscriber>::make(std::move(dest),
193  initial)) {
194  return timeout_observer<Subscriber>::make(std::move(dest), initial);
195  }
196  };
197 
198  template <
199  typename T,
200  typename Selector,
201  typename Coordination,
202  class ResolvedSelector = rxcpp::util::decay_t<Selector>,
203  class Duration = decltype(
204  std::declval<ResolvedSelector>()((std::declval<std::decay_t<T>>()))),
205  class Enabled = rxcpp::util::enable_if_all_true_type_t<
206  rxcpp::is_coordination<Coordination>,
207  rxcpp::util::is_duration<Duration>>,
208  class Timeout =
210  static auto makeTimeout(Selector &&s, Coordination &&cn) {
211  return Timeout(std::forward<Selector>(s), std::forward<Coordination>(cn));
212  }
213 
214 } // namespace iroha
215 
216 #endif // IROHA_TIMEOUT_HPP
void on_error(std::exception_ptr e) const
Definition: timeout.hpp:150
coordination_type::coordinator_type coordinator_type
Definition: timeout.hpp:31
coordinator_type coordinator
Definition: timeout.hpp:67
void on_next(Value &&v) const
Definition: timeout.hpp:122
static rxcpp::subscriber< T, observer_type > make(dest_type d, timeout_values v)
Definition: timeout.hpp:178
static std::function< void(const rxcpp::schedulers::schedulable &)> produce_timeout(std::size_t id, state_type state)
Definition: timeout.hpp:102
rxcpp::util::decay_t< Coordination > coordination_type
Definition: timeout.hpp:30
coordination_type coordination
Definition: timeout.hpp:39
timeout_values initial
Definition: timeout.hpp:41
timeout_values(select_type s, coordination_type c)
Definition: timeout.hpp:35
Definition: peer.hpp:48
select_type selector
Definition: timeout.hpp:38
rxcpp::util::decay_t< T > source_value_type
Definition: timeout.hpp:29
timeout_subscriber_values(rxcpp::composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
Definition: timeout.hpp:54
timeout(select_type s, coordination_type coordination)
Definition: timeout.hpp:43
timeout_observer(rxcpp::composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
Definition: timeout.hpp:74
rxcpp::observer< T, this_type > observer_type
Definition: timeout.hpp:51
std::shared_ptr< timeout_subscriber_values > state_type
Definition: timeout.hpp:71
auto operator()(Subscriber dest) const -> decltype(timeout_observer< Subscriber >::make(std::move(dest), initial))
Definition: timeout.hpp:191
Definition: block_query.hpp:16
state_type state
Definition: timeout.hpp:72
Definition: timeout.hpp:28
rxcpp::util::decay_t< T > value_type
Definition: timeout.hpp:49
static auto makeTimeout(Selector &&s, Coordination &&cn)
Definition: timeout.hpp:210
Definition: timeout.hpp:34
rxcpp::util::decay_t< Selector > select_type
Definition: timeout.hpp:32
rxcpp::schedulers::worker worker
Definition: timeout.hpp:68
rxcpp::util::decay_t< Subscriber > dest_type
Definition: timeout.hpp:50
Definition: timeout.hpp:47
timeout_observer< Subscriber > this_type
Definition: timeout.hpp:48
static const std::regex e("\\bdbname=([^ ]*)")
void on_completed() const
Definition: timeout.hpp:164
rxcpp::composite_subscription cs
Definition: timeout.hpp:65