COROIO: coroio/poller.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
poller.hpp
1#pragma once
2
3#include <chrono>
4#include <iostream>
5#include <vector>
6#include <map>
7#include <queue>
8#include <assert.h>
9
10#include "base.hpp"
11
12#ifdef Yield
13#undef Yield
14#endif
15
16namespace NNet {
17
45public:
47 TPollerBase() = default;
48
50 TPollerBase(const TPollerBase& ) = delete;
51 TPollerBase& operator=(const TPollerBase& ) = delete;
52
62 unsigned AddTimer(TTime deadline, THandle h) {
63 Timers_.emplace(TTimer{deadline, TimerId_, h});
64 return TimerId_++;
65 }
66
77 bool RemoveTimer(unsigned timerId, TTime deadline) {
78 bool fired = timerId == LastFiredTimer_;
79 if (!fired) {
80 Timers_.emplace(TTimer{deadline, timerId, {}}); // insert empty timer before existing
81 }
82 return fired;
83 }
84
91 void AddRead(int fd, THandle h) {
92 MaxFd_ = std::max(MaxFd_, fd);
93 Changes_.emplace_back(TEvent{fd, TEvent::READ, h});
94 }
95
101 void AddWrite(int fd, THandle h) {
102 MaxFd_ = std::max(MaxFd_, fd);
103 Changes_.emplace_back(TEvent{fd, TEvent::WRITE, h});
104 }
105
111 void AddRemoteHup(int fd, THandle h) {
112 MaxFd_ = std::max(MaxFd_, fd);
113 Changes_.emplace_back(TEvent{fd, TEvent::RHUP, h});
114 }
115
122 void RemoveEvent(int fd) {
123 // TODO: resume waiting coroutines here
124 MaxFd_ = std::max(MaxFd_, fd);
125 Changes_.emplace_back(TEvent{fd, TEvent::READ|TEvent::WRITE|TEvent::RHUP, {}});
126 }
127
135 void RemoveEvent(THandle /*h*/) {
136 // TODO: Add new vector for this type of removing
137 // Will be called in destuctor of unfinished futures
138 }
139
147 auto Sleep(TTime until) {
148 struct TAwaitableSleep {
149 TAwaitableSleep(TPollerBase* poller, TTime n)
150 : poller(poller)
151 , n(n)
152 { }
153 ~TAwaitableSleep() {
154 if (poller) {
155 poller->RemoveTimer(timerId, n);
156 }
157 }
158
159 TAwaitableSleep(TAwaitableSleep&& other)
160 : poller(other.poller)
161 , n(other.n)
162 {
163 other.poller = nullptr;
164 }
165
166 TAwaitableSleep(const TAwaitableSleep&) = delete;
167 TAwaitableSleep& operator=(const TAwaitableSleep&) = delete;
168
169 bool await_ready() {
170 return false;
171 }
172
173 void await_suspend(std::coroutine_handle<> h) {
174 timerId = poller->AddTimer(n, h);
175 }
176
177 void await_resume() { poller = nullptr; }
178
179 TPollerBase* poller;
180 TTime n;
181 unsigned timerId = 0;
182 };
183
184 return TAwaitableSleep{this,until};
185 }
186
194 template<typename Rep, typename Period>
195 auto Sleep(std::chrono::duration<Rep,Period> duration) {
196 return Sleep(TClock::now() + duration);
197 }
198
205 auto Yield() {
206 return Sleep(TTime{});
207 }
208
217 void Wakeup(TEvent&& change) {
235 auto index = Changes_.size();
236 change.Handle.resume();
237 if (change.Fd >= 0) {
238 bool matched = false;
239 for (; index < Changes_.size() && !matched; index++) {
240 matched = Changes_[index].Match(change);
241 }
242 if (!matched) {
243 change.Handle = {};
244 Changes_.emplace_back(std::move(change));
245 }
246 }
247 }
248
254 for (auto&& ev : ReadyEvents_) {
255 Wakeup(std::move(ev));
256 }
257 }
258
263 void SetMaxDuration(std::chrono::milliseconds maxDuration) {
264 MaxDuration_ = maxDuration;
266 }
267
268 auto TimersSize() const {
269 return Timers_.size();
270 }
271
272protected:
278 timespec GetTimeout() const {
279 return Timers_.empty()
281 : Timers_.top().Deadline == TTime{}
282 ? timespec {0, 0}
283 : GetTimespec(TClock::now(), Timers_.top().Deadline, MaxDuration_);
284 }
285
291 static constexpr timespec GetMaxDuration(std::chrono::milliseconds duration) {
292 auto p1 = std::chrono::duration_cast<std::chrono::seconds>(duration);
293 auto p2 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration - p1);
294 timespec ts;
295 ts.tv_sec = p1.count();
296 ts.tv_nsec = p2.count();
297 return ts;
298 }
299
300 void Reset() {
301 ReadyEvents_.clear();
302 Changes_.clear();
303 MaxFd_ = 0;
304 }
305
312 auto now = TClock::now();
313 bool first = true;
314 unsigned prevId = 0;
315
316 while (!Timers_.empty()&&Timers_.top().Deadline <= now) {
317 TTimer timer = Timers_.top(); Timers_.pop();
318
319 if ((first || prevId != timer.Id) && timer.Handle) { // skip removed timers
320 LastFiredTimer_ = timer.Id;
321 timer.Handle.resume();
322 }
323
324 first = false;
325 prevId = timer.Id;
326 }
327
329 }
330
331 int MaxFd_ = -1;
332 std::vector<TEvent> Changes_;
333 std::vector<TEvent> ReadyEvents_;
334 unsigned TimerId_ = 0;
335 std::priority_queue<TTimer> Timers_;
337 unsigned LastFiredTimer_ = (unsigned)(-1);
338 std::chrono::milliseconds MaxDuration_ = std::chrono::milliseconds(100);
340};
341
342} // namespace NNet
timespec GetTimeout() const
Computes the poll timeout based on scheduled timers.
Definition poller.hpp:278
void AddRemoteHup(int fd, THandle h)
Registers a remote hang-up (RHUP) event.
Definition poller.hpp:111
TPollerBase()=default
Default constructor.
std::chrono::milliseconds MaxDuration_
Maximum poll duration.
Definition poller.hpp:338
TPollerBase(const TPollerBase &)=delete
Copying is disabled.
void Wakeup(TEvent &&change)
Wakes up a coroutine waiting on an event.
Definition poller.hpp:217
void Reset()
Clears the lists of ready events and pending changes.
Definition poller.hpp:300
void SetMaxDuration(std::chrono::milliseconds maxDuration)
Sets the maximum polling duration.
Definition poller.hpp:263
void ProcessTimers()
Processes scheduled timers.
Definition poller.hpp:311
unsigned LastFiredTimer_
ID of the last fired timer.
Definition poller.hpp:337
unsigned AddTimer(TTime deadline, THandle h)
Schedules a timer.
Definition poller.hpp:62
std::vector< TEvent > ReadyEvents_
Events ready to wake up their coroutines.
Definition poller.hpp:333
void AddWrite(int fd, THandle h)
Registers a write event on a file descriptor.
Definition poller.hpp:101
int MaxFd_
Highest file descriptor in use.
Definition poller.hpp:331
auto Sleep(TTime until)
Suspends execution until the specified time.
Definition poller.hpp:147
std::priority_queue< TTimer > Timers_
Priority queue for scheduled timers.
Definition poller.hpp:335
timespec MaxDurationTs_
Max duration represented as timespec.
Definition poller.hpp:339
bool RemoveTimer(unsigned timerId, TTime deadline)
Removes or cancels a timer.
Definition poller.hpp:77
auto TimersSize() const
Returns the number of scheduled timers.
Definition poller.hpp:268
void RemoveEvent(int fd)
Removes registered events for a specific file descriptor.
Definition poller.hpp:122
void AddRead(int fd, THandle h)
Registers a read event on a file descriptor.
Definition poller.hpp:91
void RemoveEvent(THandle)
Removes events associated with a given coroutine handle.
Definition poller.hpp:135
TTime LastTimersProcessTime_
Last time timers were processed.
Definition poller.hpp:336
std::vector< TEvent > Changes_
Pending changes (registered events).
Definition poller.hpp:332
static constexpr timespec GetMaxDuration(std::chrono::milliseconds duration)
Computes a timespec from a duration.
Definition poller.hpp:291
unsigned TimerId_
Counter for generating unique timer IDs.
Definition poller.hpp:334
auto Yield()
Yields execution to the next event loop iteration.
Definition poller.hpp:205
void WakeupReadyHandles()
Wakes up all coroutines waiting on ready events.
Definition poller.hpp:253
auto Sleep(std::chrono::duration< Rep, Period > duration)
Overload of Sleep() accepting a duration.
Definition poller.hpp:195
Definition base.hpp:33
Definition base.hpp:18