COROIO: coroio/actors/actorsystem.hpp Source File
COROIO
Loading...
Searching...
No Matches
actorsystem.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "node.hpp"
5#include "queue.hpp"
6
7#include <coroio/arena.hpp>
8#include <coroio/sockutils.hpp>
9
10#include <stack>
11
12#ifdef Yield
13#undef Yield
14#endif
15
16namespace NNet {
17namespace NActors {
18
19enum class ESystemMessages : TMessageId {
20 PoisonPill = 1
21};
22
23struct TPoison {
24 static constexpr TMessageId MessageId = static_cast<TMessageId>(ESystemMessages::PoisonPill);
25};
26
27template<typename T>
29{
30public:
31 THandle Handle = nullptr;
32 TMessageId MessageId = 0;
33 TBlob Blob;
34};
35
37{
38 TCookie Cookie = 0;
39 std::unique_ptr<TUnboundedVectorQueue<TEnvelope>> Mailbox;
40 TFuture<void> Pending;
41 IActor::TPtr Actor;
42
43 struct TFlags {
44 uint32_t IsReady : 1 = 0; // Is the actor exists in ReadyActors queue
45 } Flags = {};
46};
47
48template<typename T>
49class TAsk : public IActor {
50public:
51 TAsk(const std::shared_ptr<TAskState<T>>& state)
52 : State(state)
53 { }
54
55 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override;
56
57private:
58 std::shared_ptr<TAskState<T>> State;
59};
60
61class TActorSystem
62{
63public:
64 TActorSystem(TPollerBase* poller, int nodeId = 1)
65 : Poller(poller)
66 , NodeId_(nodeId)
67 { }
68
69 ~TActorSystem();
70
71 TActorId Register(IActor::TPtr actor);
72
73 auto Sleep(TTime until) {
74 return Poller->Sleep(until);
75 }
76
77 template<typename Rep, typename Period>
78 auto Sleep(std::chrono::duration<Rep,Period> duration) {
79 return Poller->Sleep(duration);
80 }
81
82 void Send(TActorId sender, TActorId recepient, TMessageId messageId, TBlob blob);
83 template<typename T>
84 void Send(TActorId sender, TActorId recepient, T&& message) {
85 auto blob = SerializeNear(std::forward<T>(message), GetPodAllocator());
86 Send(sender, recepient, T::MessageId, std::move(blob));
87 }
88
89 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
90 template<typename T>
91 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, T&& message) {
92 auto blob = SerializeNear(std::forward<T>(message), GetPodAllocator());
93 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
94 }
95 void Cancel(TEvent event);
96
97 template<typename T, typename TQuestion>
98 auto Ask(TActorId recepient, TQuestion&& message) {
99 class TAskAwaiter
100 {
101 public:
102 TAskAwaiter(const std::shared_ptr<TAskState<T>>& state)
103 : State(state)
104 { }
105
106 bool await_ready() const noexcept {
107 return false;
108 }
109
110 void await_suspend(THandle h) {
111 State->Handle = h;
112 }
113
114 T await_resume() {
115 if (T::MessageId != State->MessageId) {
116 throw std::runtime_error("MessageId mismatch in Ask awaiter");
117 }
118
119 return DeserializeNear<T>(State->Blob);
120 }
121
122 private:
123 std::shared_ptr<TAskState<T>> State;
124 };
125
126 auto state = std::make_shared<TAskState<T>>();
127 auto askActor = std::make_unique<TAsk<T>>(state);
128 auto actorId = Register(std::move(askActor));
129 Send(actorId, recepient, TQuestion::MessageId, SerializeNear(std::forward<TQuestion>(message), GetPodAllocator()));
130 return TAskAwaiter{state};
131 }
132
133 void YieldNotify();
134
135 size_t ActorsSize() const;
136
137 void AddNode(int id, std::unique_ptr<INode> node);
138
139 // Use Serve() for local actors and Serve(TSocket) for local and remote actors
140 void Serve();
141
142 template<typename TSocket>
143 void Serve(TSocket socket) {
144 Serve();
145 Handles.emplace_back(InboundServe(std::move(socket)));
146 for (int i = 0; i < static_cast<int>(Nodes.size()); ++i) {
147 if (Nodes[i].Node) {
148 Handles.emplace_back(OutboundServe(i));
149 }
150 }
151 }
152
153private:
154 void GcIterationSync();
155 void ExecuteSync();
156 void DrainReadyNodes();
157 void AddPendingFuture(TLocalActorId id, TFuture<void>&& future);
158
159 TVoidTask OutboundServe(int id) {
160 Nodes[id].Node->StartConnect();
161 while (true) {
162 co_await SuspendExecution(id);
163 auto& node = Nodes[id].Node;
164 if (node) {
165 node->Drain();
166 } else {
167 std::cerr << "Node with id: " << id << " is not registered\n";
168 }
169 }
170 }
171
172 template<typename TSocket>
173 TVoidTask InboundServe(TSocket socket) {
174 std::cerr << "InboundServe started\n";
175 while (true) {
176 auto client = co_await socket.Accept();
177 std::cerr << "Accepted\n";
178 InboundConnection(std::move(client));
179 }
180 co_return;
181 }
182
183 template<typename TSocket>
184 TVoidTask InboundConnection(TSocket socket) {
186 static constexpr auto BatchSize = 16;
187 uint64_t message = 0;
188
189 try {
190 while (true) {
191 auto data = co_await reader.Read();
192 if (data.Recipient.NodeId() != NodeId_) {
193 std::cerr << "Received message for different node: " << data.Recipient.ToString() << "\n";
194 continue;
195 }
196 TBlob blob{};
197 if (data.Size > 0) {
198 blob.Size = data.Size;
199 blob.Type = TBlob::PointerType::Far;
200 blob.Data = TBlob::TRawPtr(::operator new(blob.Size), [](void* ptr) {
201 ::operator delete(ptr);
202 });
203 co_await TByteReader(socket).Read(blob.Data.get(), blob.Size);
204 }
205 Send(data.Sender, data.Recipient, data.MessageId, std::move(blob));
206 if (++message % BatchSize == 0) {
207 co_await Poller->Yield();
208 }
209 }
210 } catch (const std::exception& e) {
211 std::cerr << "Error in InboundConnection: " << e.what() << "\n";
212 }
213 }
214
215 void ShutdownActor(TLocalActorId actorId) {
216 if (actorId < Actors.size()) {
217 AliveActors--;
218 Actors[actorId] = {};
219 FreeActorIds.push(actorId);
220 }
221 }
222
223 void* AllocateActorContext() {
224 return ContextAllocator.Allocate();
225 }
226
227 void DeallocateActorContext(TActorContext* ptr) {
228 ContextAllocator.Deallocate(ptr);
229 }
230
231 TFuture<void> SuspendExecution(int nodeId) {
232 Nodes[nodeId].Pending = co_await Self();
233 co_await std::suspend_always();
234 Nodes[nodeId].Pending = {};
235 co_return;
236 }
237
238 // TODO: rewrite
239 struct TPodAllocator {
240 void* Acquire(size_t size) {
241 return ::operator new(size);
242 }
243
244 void Release(void* ptr) {
245 ::operator delete(ptr);
246 }
247 };
248
249 TPodAllocator& GetPodAllocator() {
250 return PodAllocator;
251 }
252
253 TPollerBase* Poller;
254
256 std::vector<TActorInternalState> Actors;
257 int AliveActors = 0;
258
259 std::vector<TFuture<void>> CleanupMessages;
260 std::stack<TLocalActorId, std::vector<TLocalActorId>> FreeActorIds;
261
262 TArenaAllocator<TActorContext> ContextAllocator;
263
264 TPodAllocator PodAllocator;
265
266 TLocalActorId NextActorId_ = 1;
267 TCookie NextCookie_ = 1;
268 TNodeId NodeId_ = 1;
269 THandle YieldCoroutine_{};
270 THandle ScheduleCoroutine_{};
271 bool IsYielding_ = true;
272
273 struct TNodeState {
274 std::unique_ptr<INode> Node;
275 THandle Pending;
276 struct TFlags {
277 uint32_t IsReady : 1 = 0;
278 } Flags = {};
279 };
280 std::vector<TNodeState> Nodes;
282
283 struct TDelayed {
284 TTime When;
285 unsigned TimerId = 0;
286 bool valid = true;
287 TActorId Sender;
288 TActorId Recipient;
289 TMessageId MessageId;
290 TBlob Blob;
291
292 bool operator<(const TDelayed& other) const {
293 return std::tie(When, TimerId, valid) > std::tie(other.When, other.TimerId, other.valid);
294 }
295 };
296
297 std::priority_queue<TDelayed> DelayedMessages;
298
299 std::vector<THandle> Handles;
300
301 friend class TActorContext;
302};
303
304template<typename T>
305void TAsk<T>::Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) {
306 State->MessageId = messageId;
307 State->Blob = std::move(blob);
308 State->Handle.resume();
309 ctx->Send(ctx->Self(), TPoison{});
310}
311
313 co_return co_await ActorSystem->Sleep(until);
314}
315
316template<typename Rep, typename Period>
317inline TFuture<void> TActorContext::Sleep(std::chrono::duration<Rep,Period> duration) {
318 co_return co_await ActorSystem->Sleep(duration);
319}
320
321template<typename T, typename TQuestion>
322inline TFuture<T> TActorContext::Ask(TActorId recipient, TQuestion&& question) {
323 co_return co_await ActorSystem->Ask<T>(recipient, std::forward<TQuestion>(question));
324}
325
326template<typename T>
327inline void TActorContext::Send(TActorId to, T&& message) {
328 auto blob = SerializeNear(std::forward<T>(message), ActorSystem->GetPodAllocator());
329 Send(to, T::MessageId, std::move(blob));
330}
331template<typename T>
332inline void TActorContext::Forward(TActorId to, T&& message) {
333 auto blob = SerializeNear(std::forward<T>(message), ActorSystem->GetPodAllocator());
334 Forward(to, T::MessageId, std::move(blob));
335}
336
337template<typename T>
338inline TEvent TActorContext::Schedule(TTime when, TActorId sender, TActorId recipient, T&& message) {
339 auto blob = SerializeNear(std::forward<T>(message), ActorSystem->GetPodAllocator());
340 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
341}
342
343inline void* TActorContext::operator new(size_t size, TActorSystem* actorSystem) {
344 return actorSystem->AllocateActorContext();
345}
346
347inline void TActorContext::operator delete(void* ptr) {
348 if (ptr) {
349 auto* self = static_cast<TActorContext*>(ptr);
350 auto* actorSystem = self->ActorSystem;
351 actorSystem->DeallocateActorContext(self);
352 }
353}
354
355} // namespace NActors
356} // namespace NNet
357
Actor system implementation with message passing and behavior support.
uint16_t TNodeId
Node identifier in a distributed system.
Definition actor.hpp:113
uint32_t TMessageId
Message type identifier.
Definition actor.hpp:119
uint16_t TCookie
Cookie for actor versioning and disambiguation.
Definition actor.hpp:116
uint32_t TLocalActorId
Local actor identifier within a node.
Definition actor.hpp:110
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
TFuture< void > Sleep(TTime until)
Sleep until a specific time.
Definition actorsystem.hpp:312
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a message and wait for a response.
Definition actorsystem.hpp:322
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Unique identifier for actors in the system.
Definition actor.hpp:129
Definition actorsystem.hpp:62
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process an incoming message.
Definition actorsystem.hpp:305
Definition actorsystem.hpp:29
Arena allocator to preallocate memory for IOCP events.
Definition arena.hpp:27
Base class for pollers managing asynchronous I/O events and timers.
Definition poller.hpp:52
High-level asynchronous socket for network communication.
Definition socket.hpp:364
auto Accept()
Asynchronously accepts an incoming connection.
Definition socket.hpp:453
A minimal example of a coroutine "awaitable" object.
Memory-efficient unbounded queue implementation for actor message passing.
Definition actorsystem.hpp:43
Definition actorsystem.hpp:37
Definition messages.hpp:11
Definition actorsystem.hpp:23
Unbounded queue with automatic capacity growth.
Definition queue.hpp:63
A utility for reading data from a socket-like object, either a fixed number of bytes or until a speci...
Definition sockutils.hpp:64
TFuture< void > Read(void *data, size_t size)
Reads exactly size bytes and stores them into data.
Definition sockutils.hpp:89
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182
A utility for reading a fixed-size structure of type T from a socket-like object.
Definition sockutils.hpp:297
TFuture< T > Read()
Reads a single instance of type T from the socket.
Definition sockutils.hpp:320
Definition promises.hpp:9