COROIO: coroio/actors/actorsystem.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
actorsystem.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "envelope_reader.hpp"
5#include "node.hpp"
6#include "queue.hpp"
7
8#include <coroio/arena.hpp>
9#include <coroio/sockutils.hpp>
10
11#include <stack>
12
13#ifdef Yield
14#undef Yield
15#endif
16
17namespace NNet {
18namespace NActors {
19
20enum class ESystemMessages : TMessageId {
21 PoisonPill = 1
22};
23
24struct TPoison {
25 static constexpr TMessageId MessageId = static_cast<TMessageId>(ESystemMessages::PoisonPill);
26};
27
28template<typename T>
30{
31public:
32 THandle Handle = nullptr;
33 TMessageId MessageId = 0;
34 TBlob Blob;
35};
36
38{
39 TCookie Cookie = 0;
40 std::unique_ptr<TUnboundedVectorQueue<TEnvelope>> Mailbox;
41 TFuture<void> Pending;
42 IActor::TPtr Actor;
43
44 struct TFlags {
45 uint32_t IsReady : 1 = 0; // Is the actor exists in ReadyActors queue
46 } Flags = {};
47};
48
49template<typename T>
50class TAsk : public IActor {
51public:
52 TAsk(const std::shared_ptr<TAskState<T>>& state)
53 : State(state)
54 { }
55
56 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override;
57
58private:
59 std::shared_ptr<TAskState<T>> State;
60};
61
63{
64public:
65 TActorSystem(TPollerBase* poller, int nodeId = 1)
66 : Poller(poller)
67 , NodeId_(nodeId)
68 { }
69
71
72 TActorId Register(IActor::TPtr actor);
73
74 auto Sleep(TTime until) {
75 return Poller->Sleep(until);
76 }
77
78 template<typename Rep, typename Period>
79 auto Sleep(std::chrono::duration<Rep,Period> duration) {
80 return Poller->Sleep(duration);
81 }
82
83 void Send(TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
84 template<typename T, typename... Args>
85 void Send(TActorId sender, TActorId recipient, Args&&... args) {
86 if (recipient.NodeId() == NodeId_) {
87 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
88 Send(sender, recipient, T::MessageId, std::move(blob));
89 } else {
90 auto& maybeRemote = Nodes[recipient.NodeId()];
91 if (!maybeRemote.Node) {
92 std::cerr << "Cannot send message to actor on different node: " << recipient.ToString() << "\n";
93 return;
94 }
95 SerializeFarInplace<T>(*maybeRemote.Node, sender, recipient, std::forward<Args>(args)...);
96 if (maybeRemote.Flags.IsReady == 0) {
97 maybeRemote.Flags.IsReady = 1;
98 ReadyNodes.Push(recipient.NodeId());
99 }
100
101 YieldNotify();
102 }
103 }
104
105 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
106 template<typename T, typename... Args>
107 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, Args&&... args) {
108 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
109 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
110 }
111 void Cancel(TEvent event);
112
113 template<typename T, typename TQuestion>
114 auto Ask(TActorId recepient, TQuestion&& message) {
115 class TAskAwaiter
116 {
117 public:
118 TAskAwaiter(const std::shared_ptr<TAskState<T>>& state)
119 : State(state)
120 { }
121
122 bool await_ready() const noexcept {
123 return false;
124 }
125
126 void await_suspend(THandle h) {
127 State->Handle = h;
128 }
129
130 T await_resume() {
131 if (T::MessageId != State->MessageId) {
132 throw std::runtime_error("MessageId mismatch in Ask awaiter");
133 }
134
135 return DeserializeNear<T>(State->Blob);
136 }
137
138 private:
139 std::shared_ptr<TAskState<T>> State;
140 };
141
142 auto state = std::make_shared<TAskState<T>>();
143 auto askActor = std::make_unique<TAsk<T>>(state);
144 auto actorId = Register(std::move(askActor));
145 Send(actorId, recepient, TQuestion::MessageId, SerializeNear<TQuestion>(GetPodAllocator(), std::forward<TQuestion>(message)));
146 return TAskAwaiter{state};
147 }
148
149 void YieldNotify();
150
151 size_t ActorsSize() const;
152
153 void AddNode(int id, std::unique_ptr<INode> node);
154
155 // Use Serve() for local actors and Serve(TSocket) for local and remote actors
156 void Serve();
157
158 template<typename TSocket, typename TEnvelopeReader = TZeroCopyEnvelopeReader>
159 void Serve(TSocket socket) {
160 Serve();
161 Handles.emplace_back(InboundServe<TSocket, TEnvelopeReader>(std::move(socket)));
162 for (int i = 0; i < static_cast<int>(Nodes.size()); ++i) {
163 if (Nodes[i].Node) {
164 Handles.emplace_back(OutboundServe(i));
165 }
166 }
167 }
168
169private:
170 void GcIterationSync();
171 void ExecuteSync();
172 void DrainReadyNodes();
173 void AddPendingFuture(TLocalActorId id, TFuture<void>&& future);
174
175 TVoidTask OutboundServe(int id) {
176 Nodes[id].Node->StartConnect();
177 while (true) {
178 co_await SuspendExecution(id);
179 auto& node = Nodes[id].Node;
180 if (node) {
181 node->Drain();
182 } else {
183 std::cerr << "Node with id: " << id << " is not registered\n";
184 }
185 }
186 }
187
188 template<typename TSocket, typename TEnvelopeReader>
189 TVoidTask InboundServe(TSocket socket) {
190 std::cerr << "InboundServe started\n";
191 while (true) {
192 auto client = co_await socket.Accept();
193 std::cerr << "Accepted\n";
194 InboundConnection<TSocket, TEnvelopeReader>(std::move(client));
195 }
196 co_return;
197 }
198
199 template<typename TSocket, typename TEnvelopeReader>
200 TVoidTask InboundConnection(TSocket socket) {
201 static constexpr size_t ReadSize = 512 * 1024;
202 static constexpr size_t InflightBytes = 16 * 1024 * 1024;
203 static constexpr size_t MaxBytesBeforeYield = 2 * 1024 * 1024;
204 TEnvelopeReader envelopeReader(InflightBytes, /*lowWatermark = */ 1024);
205 uint64_t message = 0;
206
207 try {
208 while (true) {
209 if (envelopeReader.Size() < InflightBytes || envelopeReader.NeedMoreData()) {
210 auto buffer = envelopeReader.Acquire(ReadSize);
211 auto size = co_await socket.ReadSome(buffer.data(), buffer.size());
212 if (size < 0) {
213 continue;
214 }
215 if (size == 0) {
216 throw std::runtime_error("Socket closed");
217 }
218 envelopeReader.Commit(size);
219 }
220
221 //envelopeReader.PrintDebugInfo();
222
223 size_t bytesProcessed = 0;
224 while (auto envelope = envelopeReader.Pop()) {
225 if (envelope->Recipient.NodeId() != NodeId_) [[unlikely]] {
226 std::cerr << "Received message for different node: " << envelope->Recipient.ToString() << "\n";
227 continue;
228 }
229
230 bytesProcessed += envelope->Blob.Size + sizeof(THeader);
231 Send(envelope->Sender, envelope->Recipient, envelope->MessageId, std::move(envelope->Blob));
232 if (bytesProcessed >= MaxBytesBeforeYield) {
233 break;
234 }
235 }
236
237 co_await Poller->Yield();
238 }
239 } catch (const std::exception& e) {
240 std::cerr << "Error in InboundConnection: " << e.what() << "\n";
241 }
242 }
243
244 void ShutdownActor(TLocalActorId actorId) {
245 if (actorId < Actors.size()) {
246 AliveActors--;
247 Actors[actorId] = {};
248 FreeActorIds.push(actorId);
249 }
250 }
251
252 void* AllocateActorContext() {
253 return ContextAllocator.Allocate();
254 }
255
256 void DeallocateActorContext(TActorContext* ptr) {
257 ContextAllocator.Deallocate(ptr);
258 }
259
260 TFuture<void> SuspendExecution(int nodeId) {
261 Nodes[nodeId].Pending = co_await Self();
262 co_await std::suspend_always();
263 Nodes[nodeId].Pending = {};
264 co_return;
265 }
266
267 // TODO: rewrite
268 struct TPodAllocator {
269 void* Acquire(size_t size) {
270 return ::operator new(size);
271 }
272
273 void Release(void* ptr) {
274 ::operator delete(ptr);
275 }
276 };
277
278 TPodAllocator& GetPodAllocator() {
279 return PodAllocator;
280 }
281
282 TPollerBase* Poller;
283
285 std::vector<TActorInternalState> Actors;
286 int AliveActors = 0;
287
288 std::vector<TFuture<void>> CleanupMessages;
289 std::stack<TLocalActorId, std::vector<TLocalActorId>> FreeActorIds;
290
291 TArenaAllocator<TActorContext> ContextAllocator;
292
293 TPodAllocator PodAllocator;
294
295 TLocalActorId NextActorId_ = 1;
296 TCookie NextCookie_ = 1;
297 TNodeId NodeId_ = 1;
298 THandle YieldCoroutine_{};
299 THandle ScheduleCoroutine_{};
300 bool IsYielding_ = true;
301
302 struct TNodeState {
303 std::unique_ptr<INode> Node;
304 THandle Pending;
305 struct TFlags {
306 uint32_t IsReady : 1 = 0;
307 } Flags = {};
308 };
309 std::vector<TNodeState> Nodes;
311
312 struct TDelayed {
313 TTime When;
314 unsigned TimerId = 0;
315 bool valid = true;
316 TActorId Sender;
317 TActorId Recipient;
318 TMessageId MessageId;
319 TBlob Blob;
320
321 bool operator<(const TDelayed& other) const {
322 return std::tie(When, TimerId, valid) > std::tie(other.When, other.TimerId, other.valid);
323 }
324 };
325
326 std::priority_queue<TDelayed> DelayedMessages;
327
328 std::vector<THandle> Handles;
329
330 friend class TActorContext;
331};
332
333template<typename T>
334void TAsk<T>::Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) {
335 State->MessageId = messageId;
336 State->Blob = std::move(blob);
337 State->Handle.resume();
338 ctx->Send<TPoison>(ctx->Self());
339}
340
342 co_return co_await ActorSystem->Sleep(until);
343}
344
345template<typename Rep, typename Period>
346inline TFuture<void> TActorContext::Sleep(std::chrono::duration<Rep,Period> duration) {
347 co_return co_await ActorSystem->Sleep(duration);
348}
349
350template<typename T, typename TQuestion>
351inline TFuture<T> TActorContext::Ask(TActorId recipient, TQuestion&& question) {
352 co_return co_await ActorSystem->Ask<T>(recipient, std::forward<TQuestion>(question));
353}
354
355template<typename T, typename... Args>
356inline void TActorContext::Send(TActorId to, Args&&... args) {
357 ActorSystem->Send<T>(Self(), to, std::forward<Args>(args)...);
358}
359template<typename T, typename... Args>
360inline void TActorContext::Forward(TActorId to, Args&&... args) {
361 ActorSystem->Send<T>(Sender(), to, std::forward<Args>(args)...);
362}
363
364template<typename T, typename... Args>
365inline TEvent TActorContext::Schedule(TTime when, TActorId sender, TActorId recipient, Args&&... args) {
366 auto blob = SerializeNear<T>(ActorSystem->GetPodAllocator(), std::forward<Args>(args)...);
367 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
368}
369
370inline void* TActorContext::operator new(size_t size, TActorSystem* actorSystem) {
371 return actorSystem->AllocateActorContext();
372}
373
374inline void TActorContext::operator delete(void* ptr) {
375 if (ptr) {
376 auto* self = static_cast<TActorContext*>(ptr);
377 auto* actorSystem = self->ActorSystem;
378 actorSystem->DeallocateActorContext(self);
379 }
380}
381
382} // namespace NActors
383} // namespace NNet
384
Actor system implementation with message passing and behavior support.
Base interface for all actors in the system.
Definition actor.hpp:311
Context object providing actor communication and scheduling capabilities.
Definition actor.hpp:139
TActorId Self() const
Get this actor's ID.
Definition actor.hpp:149
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
Definition actorsystem.cpp:21
TFuture< void > Sleep(TTime until)
Sleep until a specific time.
Definition actorsystem.hpp:341
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a message and wait for a response.
Definition actorsystem.hpp:351
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
Definition actorsystem.cpp:16
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Definition actorsystem.cpp:26
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:144
Unique identifier for actors in the system.
Definition actorid.hpp:26
TNodeId NodeId() const
Get the node ID component.
Definition actorid.hpp:40
std::string ToString() const
Convert actor ID to string representation.
Definition actorid.hpp:59
Definition actorsystem.hpp:63
Definition actorsystem.hpp:30
Definition actorsystem.hpp:50
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process an incoming message.
Definition actorsystem.hpp:334
Definition envelope_reader.hpp:14
Arena allocator to preallocate memory for IOCP events.
Definition arena.hpp:27
Base class for pollers managing asynchronous I/O events and timers.
Definition poller.hpp:52
auto Sleep(TTime until)
Suspends execution until the specified time.
Definition poller.hpp:155
auto Yield()
Yields execution to the next event loop iteration.
Definition poller.hpp:213
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket into the provided buffer.
Definition socket.hpp:138
High-level asynchronous socket for network communication.
Definition socket.hpp:364
auto Accept()
Asynchronously accepts an incoming connection.
Definition socket.hpp:453
A minimal example of a coroutine "awaitable" object.
Memory-efficient unbounded queue implementation for actor message passing.
Definition actorsystem.hpp:44
Definition actorsystem.hpp:38
Definition messages.hpp:84
Header for messages sent between actors. Used in remote communication and serialization.
Definition actorid.hpp:88
Definition actorsystem.hpp:24
Unbounded queue with automatic capacity growth.
Definition queue.hpp:63
void Push(T &&item)
Add element to the back of the queue.
Definition queue.hpp:77
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182
Definition promises.hpp:9