COROIO: coroio/poller.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
poller.hpp
1#pragma once
2
3#include <chrono>
4#include <iostream>
5#include <vector>
6#include <map>
7#include <queue>
8#include <assert.h>
9
10#include "base.hpp"
11
12#ifdef Yield
13#undef Yield
14#endif
15
16#ifdef min
17#undef min
18#endif
19
20#ifdef max
21#undef max
22#endif
23
24namespace NNet {
25
39public:
41 TPollerBase() = default;
42
44 TPollerBase(const TPollerBase& ) = delete;
45 TPollerBase& operator=(const TPollerBase& ) = delete;
46
56 unsigned AddTimer(TTime deadline, THandle h) {
57 Timers_.emplace(TTimer{deadline, TimerId_, h});
58 return TimerId_++;
59 }
60
71 bool RemoveTimer(unsigned timerId, TTime deadline) {
72 bool fired = timerId == LastFiredTimer_;
73 if (!fired) {
74 Timers_.emplace(TTimer{deadline, timerId, {}}); // insert empty timer before existing
75 }
76 return fired;
77 }
78
88 void AddRead(int fd, THandle h) {
89 MaxFd_ = std::max(MaxFd_, fd);
90 Changes_.emplace_back(TEvent{fd, TEvent::READ, h});
91 }
101 void AddWrite(int fd, THandle h) {
102 MaxFd_ = std::max(MaxFd_, fd);
103 Changes_.emplace_back(TEvent{fd, TEvent::WRITE, h});
104 }
111 void AddRemoteHup(int fd, THandle h) {
112 MaxFd_ = std::max(MaxFd_, fd);
113 Changes_.emplace_back(TEvent{fd, TEvent::RHUP, h});
114 }
122 void RemoveEvent(int fd) {
123 // TODO: resume waiting coroutines here
124 MaxFd_ = std::max(MaxFd_, fd);
125 Changes_.emplace_back(TEvent{fd, TEvent::READ|TEvent::WRITE|TEvent::RHUP, {}});
126 }
135 void RemoveEvent(THandle /*h*/) {
136 // TODO: Add new vector for this type of removing
137 // Will be called in destuctor of unfinished futures
138 }
147 auto Sleep(TTime until) {
148 struct TAwaitableSleep {
149 TAwaitableSleep(TPollerBase* poller, TTime n)
150 : poller(poller)
151 , n(n)
152 { }
153 ~TAwaitableSleep() {
154 if (poller) {
155 poller->RemoveTimer(timerId, n);
156 }
157 }
158
159 TAwaitableSleep(TAwaitableSleep&& other)
160 : poller(other.poller)
161 , n(other.n)
162 {
163 other.poller = nullptr;
164 }
165
166 TAwaitableSleep(const TAwaitableSleep&) = delete;
167 TAwaitableSleep& operator=(const TAwaitableSleep&) = delete;
168
169 bool await_ready() {
170 return false;
171 }
172
173 void await_suspend(std::coroutine_handle<> h) {
174 timerId = poller->AddTimer(n, h);
175 }
176
177 void await_resume() { poller = nullptr; }
178
179 TPollerBase* poller;
180 TTime n;
181 unsigned timerId = 0;
182 };
183
184 return TAwaitableSleep{this,until};
185 }
194 template<typename Rep, typename Period>
195 auto Sleep(std::chrono::duration<Rep,Period> duration) {
196 return Sleep(TClock::now() + duration);
197 }
207 auto Yield() {
208 return Sleep(TTime{});
209 }
210
219 void Wakeup(TEvent&& change) {
237 auto index = Changes_.size();
238 change.Handle.resume();
239 if (change.Fd >= 0) {
240 bool matched = false;
241 for (; index < Changes_.size() && !matched; index++) {
242 matched = Changes_[index].Match(change);
243 }
244 if (!matched) {
245 change.Handle = {};
246 Changes_.emplace_back(std::move(change));
247 }
248 }
249 }
256 for (auto&& ev : ReadyEvents_) {
257 Wakeup(std::move(ev));
258 }
259 }
265 void SetMaxDuration(std::chrono::milliseconds maxDuration) {
266 MaxDuration_ = maxDuration;
268 }
270 auto TimersSize() const {
271 return Timers_.size();
272 }
273
274protected:
280 timespec GetTimeout() const {
281 return Timers_.empty()
283 : Timers_.top().Deadline == TTime{}
284 ? timespec {0, 0}
285 : GetTimespec(TClock::now(), Timers_.top().Deadline, MaxDuration_);
286 }
293 static constexpr timespec GetMaxDuration(std::chrono::milliseconds duration) {
294 auto p1 = std::chrono::duration_cast<std::chrono::seconds>(duration);
295 auto p2 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration - p1);
296 timespec ts;
297 ts.tv_sec = p1.count();
298 ts.tv_nsec = p2.count();
299 return ts;
300 }
302 void Reset() {
303 ReadyEvents_.clear();
304 Changes_.clear();
305 MaxFd_ = 0;
306 }
314 auto now = TClock::now();
315 bool first = true;
316 unsigned prevId = 0;
317
318 while (!Timers_.empty()&&Timers_.top().Deadline <= now) {
319 TTimer timer = Timers_.top(); Timers_.pop();
320
321 if ((first || prevId != timer.Id) && timer.Handle) { // skip removed timers
322 LastFiredTimer_ = timer.Id;
323 timer.Handle.resume();
324 }
325
326 first = false;
327 prevId = timer.Id;
328 }
329
331 }
332
333 int MaxFd_ = 0;
334 std::vector<TEvent> Changes_;
335 std::vector<TEvent> ReadyEvents_;
336 unsigned TimerId_ = 0;
337 std::priority_queue<TTimer> Timers_;
339 unsigned LastFiredTimer_ = (unsigned)(-1);
340 std::chrono::milliseconds MaxDuration_ = std::chrono::milliseconds(100);
342};
343
344} // namespace NNet
Backend-independent base for I/O pollers.
Definition poller.hpp:38
timespec GetTimeout() const
Computes the poll timeout based on scheduled timers.
Definition poller.hpp:280
void AddRemoteHup(int fd, THandle h)
Registers a remote hang-up (RHUP) event.
Definition poller.hpp:111
TPollerBase()=default
Default constructor.
std::chrono::milliseconds MaxDuration_
Maximum poll duration.
Definition poller.hpp:340
TPollerBase(const TPollerBase &)=delete
Copying is disabled.
void Wakeup(TEvent &&change)
Wakes up a coroutine waiting on an event.
Definition poller.hpp:219
void Reset()
Clears the lists of ready events and pending changes.
Definition poller.hpp:302
void SetMaxDuration(std::chrono::milliseconds maxDuration)
Sets the maximum polling duration.
Definition poller.hpp:265
void ProcessTimers()
Processes scheduled timers.
Definition poller.hpp:313
unsigned LastFiredTimer_
ID of the last fired timer.
Definition poller.hpp:339
unsigned AddTimer(TTime deadline, THandle h)
Schedules a timer.
Definition poller.hpp:56
std::vector< TEvent > ReadyEvents_
Events ready to wake up their coroutines.
Definition poller.hpp:335
void AddWrite(int fd, THandle h)
Registers a write interest on a file descriptor.
Definition poller.hpp:101
int MaxFd_
Highest file descriptor in use.
Definition poller.hpp:333
auto Sleep(TTime until)
Suspends execution until the specified time.
Definition poller.hpp:147
std::priority_queue< TTimer > Timers_
Priority queue for scheduled timers.
Definition poller.hpp:337
timespec MaxDurationTs_
Max duration represented as timespec.
Definition poller.hpp:341
bool RemoveTimer(unsigned timerId, TTime deadline)
Removes or cancels a timer.
Definition poller.hpp:71
auto TimersSize() const
Returns the number of scheduled timers.
Definition poller.hpp:270
void RemoveEvent(int fd)
Removes registered events for a specific file descriptor.
Definition poller.hpp:122
void AddRead(int fd, THandle h)
Registers a read interest on a file descriptor.
Definition poller.hpp:88
void RemoveEvent(THandle)
No-op placeholder for future cleanup by handle.
Definition poller.hpp:135
TTime LastTimersProcessTime_
Last time timers were processed.
Definition poller.hpp:338
std::vector< TEvent > Changes_
Pending changes (registered events).
Definition poller.hpp:334
static constexpr timespec GetMaxDuration(std::chrono::milliseconds duration)
Computes a timespec from a duration.
Definition poller.hpp:293
unsigned TimerId_
Counter for generating unique timer IDs.
Definition poller.hpp:336
auto Yield()
Suspends the coroutine until the next event-loop iteration.
Definition poller.hpp:207
void WakeupReadyHandles()
Wakes up all coroutines waiting on ready events.
Definition poller.hpp:255
auto Sleep(std::chrono::duration< Rep, Period > duration)
Overload of Sleep() accepting a duration.
Definition poller.hpp:195
Definition base.hpp:43
Definition base.hpp:18