hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
delay.hpp
Go to the documentation of this file.
1 
7 #ifndef IROHA_DELAY_HPP
8 #define IROHA_DELAY_HPP
9 
10 #include <rxcpp/operators/rx-delay.hpp>
11 
12 namespace iroha {
13 
26  template <class T, class Selector, class Coordination>
27  struct delay {
28  typedef rxcpp::util::decay_t<T> source_value_type;
29  typedef rxcpp::util::decay_t<Coordination> coordination_type;
30  typedef typename coordination_type::coordinator_type coordinator_type;
31  typedef rxcpp::util::decay_t<Selector> select_type;
32 
33  struct delay_values {
34  delay_values(select_type s, coordination_type c)
35  : selector(std::move(s)), coordination(c) {}
36 
37  select_type selector;
38  coordination_type coordination;
39  };
41 
42  delay(select_type s, coordination_type coordination)
43  : initial(std::move(s), coordination) {}
44 
45  template <class Subscriber>
46  struct delay_observer {
48  typedef rxcpp::util::decay_t<T> value_type;
49  typedef rxcpp::util::decay_t<Subscriber> dest_type;
50  typedef rxcpp::observer<T, this_type> observer_type;
51 
53  delay_subscriber_values(rxcpp::composite_subscription cs,
54  dest_type d,
55  delay_values v,
56  coordinator_type c)
57  : delay_values(v),
58  cs(std::move(cs)),
59  dest(std::move(d)),
60  coordinator(std::move(c)),
61  worker(coordinator.get_worker()),
62  expected(worker.now()) {}
63 
64  rxcpp::composite_subscription cs;
65  dest_type dest;
66  coordinator_type coordinator;
67  rxcpp::schedulers::worker worker;
68  rxcpp::schedulers::scheduler::clock_type::time_point expected;
69  };
70  std::shared_ptr<delay_subscriber_values> state;
71 
72  delay_observer(rxcpp::composite_subscription cs,
73  dest_type d,
74  delay_values v,
75  coordinator_type c)
76  : state(std::make_shared<delay_subscriber_values>(
78  std::move(cs), std::move(d), v, std::move(c)))) {
79  auto localState = state;
80 
81  auto disposer = [=](const rxcpp::schedulers::schedulable &) {
82  localState->cs.unsubscribe();
83  localState->dest.unsubscribe();
84  localState->worker.unsubscribe();
85  };
86  auto selectedDisposer = on_exception(
87  [&]() { return localState->coordinator.act(disposer); },
88  localState->dest);
89  if (selectedDisposer.empty()) {
90  return;
91  }
92 
93  localState->dest.add(
94  [=]() { localState->worker.schedule(selectedDisposer.get()); });
95  localState->cs.add(
96  [=]() { localState->worker.schedule(selectedDisposer.get()); });
97  }
98 
99  template <class Value>
100  void on_next(Value &&v) const {
101  auto localState = state;
102 
103  auto selected = on_exception(
104  [&]() { return localState->selector(std::forward<Value>(v)); },
105  localState->dest);
106  if (selected.empty()) {
107  return;
108  }
109 
110  auto work = [v, localState](const rxcpp::schedulers::schedulable &) {
111  localState->dest.on_next(v);
112  };
113  auto selectedWork =
114  on_exception([&]() { return localState->coordinator.act(work); },
115  localState->dest);
116  if (selectedWork.empty()) {
117  return;
118  }
119  localState->worker.schedule(localState->worker.now() + selected.get(),
120  selectedWork.get());
121  }
122 
123  void on_error(std::exception_ptr e) const {
124  auto localState = state;
125  auto work = [e, localState](const rxcpp::schedulers::schedulable &) {
126  localState->dest.on_error(e);
127  };
128  auto selectedWork =
129  on_exception([&]() { return localState->coordinator.act(work); },
130  localState->dest);
131  if (selectedWork.empty()) {
132  return;
133  }
134  localState->worker.schedule(selectedWork.get());
135  }
136 
137  void on_completed() const {
138  auto localState = state;
139  auto work = [localState](const rxcpp::schedulers::schedulable &) {
140  localState->dest.on_completed();
141  };
142  auto selectedWork =
143  on_exception([&]() { return localState->coordinator.act(work); },
144  localState->dest);
145  if (selectedWork.empty()) {
146  return;
147  }
148  localState->worker.schedule(selectedWork.get());
149  }
150 
151  static rxcpp::subscriber<T, observer_type> make(dest_type d,
152  delay_values v) {
153  auto cs = rxcpp::composite_subscription();
154  auto coordinator = v.coordination.create_coordinator();
155 
156  return rxcpp::make_subscriber<T>(
157  cs,
158  observer_type(this_type(
159  cs, std::move(d), std::move(v), std::move(coordinator))));
160  }
161  };
162 
163  template <class Subscriber>
164  auto operator()(Subscriber dest) const
165  -> decltype(delay_observer<Subscriber>::make(std::move(dest),
166  initial)) {
167  return delay_observer<Subscriber>::make(std::move(dest), initial);
168  }
169  };
170 
171  template <typename T,
172  typename Selector,
173  typename Coordination,
174  class ResolvedSelector = rxcpp::util::decay_t<Selector>,
175  class Duration = decltype(std::declval<ResolvedSelector>()(
176  (std::declval<std::decay_t<T>>()))),
177  class Enabled = rxcpp::util::enable_if_all_true_type_t<
178  rxcpp::is_coordination<Coordination>,
179  rxcpp::util::is_duration<Duration>>,
180  class Delay =
182  static auto makeDelay(Selector &&s, Coordination &&cn) {
183  return Delay(std::forward<Selector>(s), std::forward<Coordination>(cn));
184  }
185 
186 } // namespace iroha
187 
188 #endif // IROHA_DELAY_HPP
coordination_type::coordinator_type coordinator_type
Definition: delay.hpp:30
rxcpp::util::decay_t< T > value_type
Definition: delay.hpp:48
rxcpp::composite_subscription cs
Definition: delay.hpp:64
coordination_type coordination
Definition: delay.hpp:38
delay_subscriber_values(rxcpp::composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
Definition: delay.hpp:53
delay_observer(rxcpp::composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
Definition: delay.hpp:72
void on_completed() const
Definition: delay.hpp:137
rxcpp::util::decay_t< Subscriber > dest_type
Definition: delay.hpp:49
rxcpp::util::decay_t< T > source_value_type
Definition: delay.hpp:28
delay(select_type s, coordination_type coordination)
Definition: delay.hpp:42
auto now()
Definition: time.hpp:23
rxcpp::schedulers::scheduler::clock_type::time_point expected
Definition: delay.hpp:68
Definition: peer.hpp:48
Definition: delay.hpp:33
static auto makeDelay(Selector &&s, Coordination &&cn)
Definition: delay.hpp:182
Definition: delay.hpp:46
std::shared_ptr< delay_subscriber_values > state
Definition: delay.hpp:70
delay_values initial
Definition: delay.hpp:40
rxcpp::schedulers::worker worker
Definition: delay.hpp:67
coordinator_type coordinator
Definition: delay.hpp:66
rxcpp::observer< T, this_type > observer_type
Definition: delay.hpp:50
Definition: block_query.hpp:16
static rxcpp::subscriber< T, observer_type > make(dest_type d, delay_values v)
Definition: delay.hpp:151
void on_next(Value &&v) const
Definition: delay.hpp:100
void on_error(std::exception_ptr e) const
Definition: delay.hpp:123
auto operator()(Subscriber dest) const -> decltype(delay_observer< Subscriber >::make(std::move(dest), initial))
Definition: delay.hpp:164
delay_observer< Subscriber > this_type
Definition: delay.hpp:47
select_type selector
Definition: delay.hpp:37
rxcpp::util::decay_t< Coordination > coordination_type
Definition: delay.hpp:29
rxcpp::util::decay_t< Selector > select_type
Definition: delay.hpp:31
delay_values(select_type s, coordination_type c)
Definition: delay.hpp:34
Definition: delay.hpp:27
static const std::regex e("\\bdbname=([^ ]*)")