COROIO: coroio/poller.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
poller.hpp
1#pragma once
2
3#include <chrono>
4#include <iostream>
5#include <vector>
6#include <map>
7#include <queue>
8#include <assert.h>
9
10#include "base.hpp"
11
12#ifdef Yield
13#undef Yield
14#endif
15
16#ifdef min
17#undef min
18#endif
19
20#ifdef max
21#undef max
22#endif
23
24namespace NNet {
25
53public:
55 TPollerBase() = default;
56
58 TPollerBase(const TPollerBase& ) = delete;
59 TPollerBase& operator=(const TPollerBase& ) = delete;
60
70 unsigned AddTimer(TTime deadline, THandle h) {
71 Timers_.emplace(TTimer{deadline, TimerId_, h});
72 return TimerId_++;
73 }
74
85 bool RemoveTimer(unsigned timerId, TTime deadline) {
86 bool fired = timerId == LastFiredTimer_;
87 if (!fired) {
88 Timers_.emplace(TTimer{deadline, timerId, {}}); // insert empty timer before existing
89 }
90 return fired;
91 }
92
99 void AddRead(int fd, THandle h) {
100 MaxFd_ = std::max(MaxFd_, fd);
101 Changes_.emplace_back(TEvent{fd, TEvent::READ, h});
102 }
109 void AddWrite(int fd, THandle h) {
110 MaxFd_ = std::max(MaxFd_, fd);
111 Changes_.emplace_back(TEvent{fd, TEvent::WRITE, h});
112 }
119 void AddRemoteHup(int fd, THandle h) {
120 MaxFd_ = std::max(MaxFd_, fd);
121 Changes_.emplace_back(TEvent{fd, TEvent::RHUP, h});
122 }
130 void RemoveEvent(int fd) {
131 // TODO: resume waiting coroutines here
132 MaxFd_ = std::max(MaxFd_, fd);
133 Changes_.emplace_back(TEvent{fd, TEvent::READ|TEvent::WRITE|TEvent::RHUP, {}});
134 }
143 void RemoveEvent(THandle /*h*/) {
144 // TODO: Add new vector for this type of removing
145 // Will be called in destuctor of unfinished futures
146 }
155 auto Sleep(TTime until) {
156 struct TAwaitableSleep {
157 TAwaitableSleep(TPollerBase* poller, TTime n)
158 : poller(poller)
159 , n(n)
160 { }
161 ~TAwaitableSleep() {
162 if (poller) {
163 poller->RemoveTimer(timerId, n);
164 }
165 }
166
167 TAwaitableSleep(TAwaitableSleep&& other)
168 : poller(other.poller)
169 , n(other.n)
170 {
171 other.poller = nullptr;
172 }
173
174 TAwaitableSleep(const TAwaitableSleep&) = delete;
175 TAwaitableSleep& operator=(const TAwaitableSleep&) = delete;
176
177 bool await_ready() {
178 return false;
179 }
180
181 void await_suspend(std::coroutine_handle<> h) {
182 timerId = poller->AddTimer(n, h);
183 }
184
185 void await_resume() { poller = nullptr; }
186
187 TPollerBase* poller;
188 TTime n;
189 unsigned timerId = 0;
190 };
191
192 return TAwaitableSleep{this,until};
193 }
202 template<typename Rep, typename Period>
203 auto Sleep(std::chrono::duration<Rep,Period> duration) {
204 return Sleep(TClock::now() + duration);
205 }
213 auto Yield() {
214 return Sleep(TTime{});
215 }
216
225 void Wakeup(TEvent&& change) {
243 auto index = Changes_.size();
244 change.Handle.resume();
245 if (change.Fd >= 0) {
246 bool matched = false;
247 for (; index < Changes_.size() && !matched; index++) {
248 matched = Changes_[index].Match(change);
249 }
250 if (!matched) {
251 change.Handle = {};
252 Changes_.emplace_back(std::move(change));
253 }
254 }
255 }
262 for (auto&& ev : ReadyEvents_) {
263 Wakeup(std::move(ev));
264 }
265 }
271 void SetMaxDuration(std::chrono::milliseconds maxDuration) {
272 MaxDuration_ = maxDuration;
274 }
276 auto TimersSize() const {
277 return Timers_.size();
278 }
279
280protected:
286 timespec GetTimeout() const {
287 return Timers_.empty()
289 : Timers_.top().Deadline == TTime{}
290 ? timespec {0, 0}
291 : GetTimespec(TClock::now(), Timers_.top().Deadline, MaxDuration_);
292 }
299 static constexpr timespec GetMaxDuration(std::chrono::milliseconds duration) {
300 auto p1 = std::chrono::duration_cast<std::chrono::seconds>(duration);
301 auto p2 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration - p1);
302 timespec ts;
303 ts.tv_sec = p1.count();
304 ts.tv_nsec = p2.count();
305 return ts;
306 }
308 void Reset() {
309 ReadyEvents_.clear();
310 Changes_.clear();
311 MaxFd_ = 0;
312 }
320 auto now = TClock::now();
321 bool first = true;
322 unsigned prevId = 0;
323
324 while (!Timers_.empty()&&Timers_.top().Deadline <= now) {
325 TTimer timer = Timers_.top(); Timers_.pop();
326
327 if ((first || prevId != timer.Id) && timer.Handle) { // skip removed timers
328 LastFiredTimer_ = timer.Id;
329 timer.Handle.resume();
330 }
331
332 first = false;
333 prevId = timer.Id;
334 }
335
337 }
338
339 int MaxFd_ = 0;
340 std::vector<TEvent> Changes_;
341 std::vector<TEvent> ReadyEvents_;
342 unsigned TimerId_ = 0;
343 std::priority_queue<TTimer> Timers_;
345 unsigned LastFiredTimer_ = (unsigned)(-1);
346 std::chrono::milliseconds MaxDuration_ = std::chrono::milliseconds(100);
348};
349
350} // namespace NNet
Base class for pollers managing asynchronous I/O events and timers.
Definition poller.hpp:52
timespec GetTimeout() const
Computes the poll timeout based on scheduled timers.
Definition poller.hpp:286
void AddRemoteHup(int fd, THandle h)
Registers a remote hang-up (RHUP) event.
Definition poller.hpp:119
TPollerBase()=default
Default constructor.
std::chrono::milliseconds MaxDuration_
Maximum poll duration.
Definition poller.hpp:346
TPollerBase(const TPollerBase &)=delete
Copying is disabled.
void Wakeup(TEvent &&change)
Wakes up a coroutine waiting on an event.
Definition poller.hpp:225
void Reset()
Clears the lists of ready events and pending changes.
Definition poller.hpp:308
void SetMaxDuration(std::chrono::milliseconds maxDuration)
Sets the maximum polling duration.
Definition poller.hpp:271
void ProcessTimers()
Processes scheduled timers.
Definition poller.hpp:319
unsigned LastFiredTimer_
ID of the last fired timer.
Definition poller.hpp:345
unsigned AddTimer(TTime deadline, THandle h)
Schedules a timer.
Definition poller.hpp:70
std::vector< TEvent > ReadyEvents_
Events ready to wake up their coroutines.
Definition poller.hpp:341
void AddWrite(int fd, THandle h)
Registers a write event on a file descriptor.
Definition poller.hpp:109
int MaxFd_
Highest file descriptor in use.
Definition poller.hpp:339
auto Sleep(TTime until)
Suspends execution until the specified time.
Definition poller.hpp:155
std::priority_queue< TTimer > Timers_
Priority queue for scheduled timers.
Definition poller.hpp:343
timespec MaxDurationTs_
Max duration represented as timespec.
Definition poller.hpp:347
bool RemoveTimer(unsigned timerId, TTime deadline)
Removes or cancels a timer.
Definition poller.hpp:85
auto TimersSize() const
Returns the number of scheduled timers.
Definition poller.hpp:276
void RemoveEvent(int fd)
Removes registered events for a specific file descriptor.
Definition poller.hpp:130
void AddRead(int fd, THandle h)
Registers a read event on a file descriptor.
Definition poller.hpp:99
void RemoveEvent(THandle)
Removes events associated with a given coroutine handle.
Definition poller.hpp:143
TTime LastTimersProcessTime_
Last time timers were processed.
Definition poller.hpp:344
std::vector< TEvent > Changes_
Pending changes (registered events).
Definition poller.hpp:340
static constexpr timespec GetMaxDuration(std::chrono::milliseconds duration)
Computes a timespec from a duration.
Definition poller.hpp:299
unsigned TimerId_
Counter for generating unique timer IDs.
Definition poller.hpp:342
auto Yield()
Yields execution to the next event loop iteration.
Definition poller.hpp:213
void WakeupReadyHandles()
Wakes up all coroutines waiting on ready events.
Definition poller.hpp:261
auto Sleep(std::chrono::duration< Rep, Period > duration)
Overload of Sleep() accepting a duration.
Definition poller.hpp:203
Definition base.hpp:33
Definition base.hpp:18