COROIO: coroio/actors/actorsystem.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
actorsystem.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "envelope_reader.hpp"
5#include "node.hpp"
6#include "queue.hpp"
7
8#include <coroio/arena.hpp>
9#include <coroio/sockutils.hpp>
10
11#include <stack>
12
13#ifdef Yield
14#undef Yield
15#endif
16
17namespace NNet {
18namespace NActors {
19
20enum class ESystemMessages : TMessageId {
21 PoisonPill = 1
22};
23
35struct TPoison {
36 static constexpr TMessageId MessageId = static_cast<TMessageId>(ESystemMessages::PoisonPill);
37};
38
39template<typename T>
41{
42public:
43 THandle Handle = nullptr;
44 TMessageId MessageId = 0;
45 TBlob Blob;
46};
47
49{
50 TCookie Cookie = 0;
51 std::unique_ptr<TUnboundedVectorQueue<TEnvelope>> Mailbox;
52 TFuture<void> Pending;
53 IActor::TPtr Actor;
54
55 struct TFlags {
56 uint32_t IsReady : 1 = 0; // Is the actor exists in ReadyActors queue
57 } Flags = {};
58};
59
60template<typename T>
61class TAsk : public IActor {
62public:
63 TAsk(const std::shared_ptr<TAskState<T>>& state)
64 : State(state)
65 { }
66
67 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override;
68
69private:
70 std::shared_ptr<TAskState<T>> State;
71};
72
99{
100public:
108 TActorSystem(TPollerBase* poller, int nodeId = 1)
109 : Poller(poller)
110 , NodeId_(nodeId)
111 { }
112
114
125 TActorId Register(IActor::TPtr actor);
126
128 auto Sleep(TTime until) {
129 return Poller->Sleep(until);
130 }
131
133 template<typename Rep, typename Period>
134 auto Sleep(std::chrono::duration<Rep,Period> duration) {
135 return Poller->Sleep(duration);
136 }
137
139 void Send(TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
140
153 template<typename T, typename... Args>
154 void Send(TActorId sender, TActorId recipient, Args&&... args) {
155 if (recipient.NodeId() == NodeId_) {
156 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
157 Send(sender, recipient, T::MessageId, std::move(blob));
158 } else {
159 auto& maybeRemote = Nodes[recipient.NodeId()];
160 if (!maybeRemote.Node) {
161 std::cerr << "Cannot send message to actor on different node: " << recipient.ToString() << "\n";
162 return;
163 }
164 SerializeFarInplace<T>(*maybeRemote.Node, sender, recipient, std::forward<Args>(args)...);
165 if (maybeRemote.Flags.IsReady == 0) {
166 maybeRemote.Flags.IsReady = 1;
167 ReadyNodes.Push(recipient.NodeId());
168 }
169
170 YieldNotify();
171 }
172 }
173
175 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
176
186 template<typename T, typename... Args>
187 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, Args&&... args) {
188 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
189 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
190 }
191
198 void Cancel(TEvent event);
199
217 template<typename T, typename TQuestion>
218 auto Ask(TActorId recepient, TQuestion&& message) {
219 class TAskAwaiter
220 {
221 public:
222 TAskAwaiter(const std::shared_ptr<TAskState<T>>& state)
223 : State(state)
224 { }
225
226 bool await_ready() const noexcept {
227 return false;
228 }
229
230 void await_suspend(THandle h) {
231 State->Handle = h;
232 }
233
234 T await_resume() {
235 if (T::MessageId != State->MessageId) {
236 throw std::runtime_error("MessageId mismatch in Ask awaiter");
237 }
238
239 return DeserializeNear<T>(State->Blob);
240 }
241
242 private:
243 std::shared_ptr<TAskState<T>> State;
244 };
245
246 auto state = std::make_shared<TAskState<T>>();
247 auto askActor = std::make_unique<TAsk<T>>(state);
248 auto actorId = Register(std::move(askActor));
249 Send(actorId, recepient, TQuestion::MessageId, SerializeNear<TQuestion>(GetPodAllocator(), std::forward<TQuestion>(message)));
250 return TAskAwaiter{state};
251 }
252
259 void YieldNotify();
260
262 size_t ActorsSize() const;
263
274 void AddNode(int id, std::unique_ptr<INode> node);
275
283 void Serve();
284
295 template<typename TSocket, typename TEnvelopeReader = TZeroCopyEnvelopeReader>
296 void Serve(TSocket socket) {
297 Serve();
298 Handles.emplace_back(InboundServe<TSocket, TEnvelopeReader>(std::move(socket)));
299 for (int i = 0; i < static_cast<int>(Nodes.size()); ++i) {
300 if (Nodes[i].Node) {
301 Handles.emplace_back(OutboundServe(i));
302 }
303 }
304 }
305
306private:
307 void GcIterationSync();
308 void ExecuteSync();
309 void DrainReadyNodes();
310 void AddPendingFuture(TLocalActorId id, TFuture<void>&& future);
311
312 TVoidTask OutboundServe(int id) {
313 Nodes[id].Node->StartConnect();
314 while (true) {
315 co_await SuspendExecution(id);
316 auto& node = Nodes[id].Node;
317 if (node) {
318 node->Drain();
319 } else {
320 std::cerr << "Node with id: " << id << " is not registered\n";
321 }
322 }
323 }
324
325 template<typename TSocket, typename TEnvelopeReader>
326 TVoidTask InboundServe(TSocket socket) {
327 std::cerr << "InboundServe started\n";
328 while (true) {
329 auto client = co_await socket.Accept();
330 std::cerr << "Accepted\n";
331 InboundConnection<TSocket, TEnvelopeReader>(std::move(client));
332 }
333 co_return;
334 }
335
336 template<typename TSocket, typename TEnvelopeReader>
337 TVoidTask InboundConnection(TSocket socket) {
338 static constexpr size_t ReadSize = 512 * 1024;
339 static constexpr size_t InflightBytes = 16 * 1024 * 1024;
340 static constexpr size_t MaxBytesBeforeYield = 2 * 1024 * 1024;
341 TEnvelopeReader envelopeReader(InflightBytes, /*lowWatermark = */ 1024);
342 uint64_t message = 0;
343
344 try {
345 while (true) {
346 if (envelopeReader.Size() < InflightBytes || envelopeReader.NeedMoreData()) {
347 auto buffer = envelopeReader.Acquire(ReadSize);
348 auto size = co_await socket.ReadSome(buffer.data(), buffer.size());
349 if (size < 0) {
350 continue;
351 }
352 if (size == 0) {
353 throw std::runtime_error("Socket closed");
354 }
355 envelopeReader.Commit(size);
356 }
357
358 //envelopeReader.PrintDebugInfo();
359
360 size_t bytesProcessed = 0;
361 while (auto envelope = envelopeReader.Pop()) {
362 if (envelope->Recipient.NodeId() != NodeId_) [[unlikely]] {
363 std::cerr << "Received message for different node: " << envelope->Recipient.ToString() << "\n";
364 continue;
365 }
366
367 bytesProcessed += envelope->Blob.Size + sizeof(THeader);
368 Send(envelope->Sender, envelope->Recipient, envelope->MessageId, std::move(envelope->Blob));
369 if (bytesProcessed >= MaxBytesBeforeYield) {
370 break;
371 }
372 }
373
374 co_await Poller->Yield();
375 }
376 } catch (const std::exception& e) {
377 std::cerr << "Error in InboundConnection: " << e.what() << "\n";
378 }
379 }
380
381 void ShutdownActor(TLocalActorId actorId) {
382 if (actorId < Actors.size()) {
383 AliveActors--;
384 Actors[actorId] = {};
385 FreeActorIds.push(actorId);
386 }
387 }
388
389 void* AllocateActorContext() {
390 return ContextAllocator.Allocate();
391 }
392
393 void DeallocateActorContext(TActorContext* ptr) {
394 ContextAllocator.Deallocate(ptr);
395 }
396
397 TFuture<void> SuspendExecution(int nodeId) {
398 Nodes[nodeId].Pending = co_await Self();
399 co_await std::suspend_always();
400 Nodes[nodeId].Pending = {};
401 co_return;
402 }
403
404 // TODO: rewrite
405 struct TPodAllocator {
406 void* Acquire(size_t size) {
407 return ::operator new(size);
408 }
409
410 void Release(void* ptr) {
411 ::operator delete(ptr);
412 }
413 };
414
415 TPodAllocator& GetPodAllocator() {
416 return PodAllocator;
417 }
418
419 TPollerBase* Poller;
420
421 TUnboundedVectorQueue<TLocalActorId> ReadyActors;
422 std::vector<TActorInternalState> Actors;
423 int AliveActors = 0;
424
425 std::vector<TFuture<void>> CleanupMessages;
426 std::stack<TLocalActorId, std::vector<TLocalActorId>> FreeActorIds;
427
428 TArenaAllocator<TActorContext> ContextAllocator;
429
430 TPodAllocator PodAllocator;
431
432 TLocalActorId NextActorId_ = 1;
433 TCookie NextCookie_ = 1;
434 TNodeId NodeId_ = 1;
435 THandle YieldCoroutine_{};
436 THandle ScheduleCoroutine_{};
437 bool IsYielding_ = true;
438
439 struct TNodeState {
440 std::unique_ptr<INode> Node;
441 THandle Pending;
442 struct TFlags {
443 uint32_t IsReady : 1 = 0;
444 } Flags = {};
445 };
446 std::vector<TNodeState> Nodes;
448
449 struct TDelayed {
450 TTime When;
451 unsigned TimerId = 0;
452 bool valid = true;
453 TActorId Sender;
454 TActorId Recipient;
455 TMessageId MessageId;
456 TBlob Blob;
457
458 bool operator<(const TDelayed& other) const {
459 return std::tie(When, TimerId, valid) > std::tie(other.When, other.TimerId, other.valid);
460 }
461 };
462
463 std::priority_queue<TDelayed> DelayedMessages;
464
465 std::vector<THandle> Handles;
466
467 friend class TActorContext;
468};
469
470template<typename T>
471void TAsk<T>::Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) {
472 State->MessageId = messageId;
473 State->Blob = std::move(blob);
474 State->Handle.resume();
475 ctx->Send<TPoison>(ctx->Self());
476}
477
479 co_return co_await ActorSystem->Sleep(until);
480}
481
482template<typename Rep, typename Period>
483inline TFuture<void> TActorContext::Sleep(std::chrono::duration<Rep,Period> duration) {
484 co_return co_await ActorSystem->Sleep(duration);
485}
486
487template<typename T, typename TQuestion>
488inline TFuture<T> TActorContext::Ask(TActorId recipient, TQuestion&& question) {
489 co_return co_await ActorSystem->Ask<T>(recipient, std::forward<TQuestion>(question));
490}
491
492template<typename T, typename... Args>
493inline void TActorContext::Send(TActorId to, Args&&... args) {
494 ActorSystem->Send<T>(Self(), to, std::forward<Args>(args)...);
495}
496template<typename T, typename... Args>
497inline void TActorContext::Forward(TActorId to, Args&&... args) {
498 ActorSystem->Send<T>(Sender(), to, std::forward<Args>(args)...);
499}
500
501template<typename T, typename... Args>
502inline TEvent TActorContext::Schedule(TTime when, TActorId sender, TActorId recipient, Args&&... args) {
503 auto blob = SerializeNear<T>(ActorSystem->GetPodAllocator(), std::forward<Args>(args)...);
504 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
505}
506
507inline void* TActorContext::operator new(size_t size, TActorSystem* actorSystem) {
508 return actorSystem->AllocateActorContext();
509}
510
511inline void TActorContext::operator delete(void* ptr) {
512 if (ptr) {
513 auto* self = static_cast<TActorContext*>(ptr);
514 auto* actorSystem = self->ActorSystem;
515 actorSystem->DeallocateActorContext(self);
516 }
517}
518
519} // namespace NActors
520} // namespace NNet
521
Actor system implementation with message passing and behavior support.
Base interface for all actors in the system.
Definition actor.hpp:351
TActorId Self() const
Get this actor's ID.
Definition actor.hpp:155
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
Definition actorsystem.cpp:21
TFuture< void > Sleep(TTime until)
Suspend the current handler until a specific time.
Definition actorsystem.hpp:478
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a request and suspend until a reply of type T arrives.
Definition actorsystem.hpp:488
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
Definition actorsystem.cpp:16
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Definition actorsystem.cpp:26
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:150
Globally unique identifier for actors across a distributed system.
Definition actorid.hpp:29
TNodeId NodeId() const
Get the node ID component.
Definition actorid.hpp:43
std::string ToString() const
Convert actor ID to a human-readable string.
Definition actorid.hpp:61
Single-threaded actor runtime.
Definition actorsystem.hpp:99
auto Ask(TActorId recepient, TQuestion &&message)
Sends a request and returns an awaitable for the reply of type T.
Definition actorsystem.hpp:218
size_t ActorsSize() const
Returns the number of currently registered (alive) actors.
Definition actorsystem.cpp:240
void Serve(TSocket socket)
Starts the actor system with inbound and outbound network serving.
Definition actorsystem.hpp:296
auto Sleep(std::chrono::duration< Rep, Period > duration)
Suspends the current coroutine for duration. Delegates to the poller.
Definition actorsystem.hpp:134
void Send(TActorId sender, TActorId recipient, Args &&... args)
Sends a typed message from sender to recipient (non-blocking).
Definition actorsystem.hpp:154
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Low-level schedule with a pre-serialized blob. Prefer the typed Schedule<T> overload.
Definition actorsystem.cpp:250
void Serve()
Starts the actor system event loop for local actors only.
Definition actorsystem.cpp:176
void Cancel(TEvent event)
Cancels a previously scheduled message.
Definition actorsystem.cpp:265
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, Args &&... args)
Schedules a typed message to be delivered at when.
Definition actorsystem.hpp:187
void YieldNotify()
Wakes up the internal scheduler to process newly ready nodes.
Definition actorsystem.cpp:244
void Send(TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Low-level send with a pre-serialized blob. Prefer the typed Send<T> overload.
Definition actorsystem.cpp:62
auto Sleep(TTime until)
Suspends the current coroutine until until. Delegates to the poller.
Definition actorsystem.hpp:128
TActorId Register(IActor::TPtr actor)
Registers an actor and returns its system-wide ID.
Definition actorsystem.cpp:36
TActorSystem(TPollerBase *poller, int nodeId=1)
Constructs the actor system.
Definition actorsystem.hpp:108
void AddNode(int id, std::unique_ptr< INode > node)
Registers a remote node for distributed message routing.
Definition actorsystem.cpp:228
Definition actorsystem.hpp:41
Definition actorsystem.hpp:61
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process an incoming message.
Definition actorsystem.hpp:471
Backend-independent base for I/O pollers.
Definition poller.hpp:38
auto Sleep(TTime until)
Suspends execution until the specified time.
Definition poller.hpp:147
auto Yield()
Suspends the coroutine until the next event-loop iteration.
Definition poller.hpp:207
High-level asynchronous socket for network communication.
Definition socket.hpp:367
Memory-efficient unbounded queue implementation for actor message passing.
Definition actorsystem.hpp:55
Definition actorsystem.hpp:49
Opaque message payload with Near/Far duality.
Definition messages.hpp:99
System message that terminates the receiving actor.
Definition actorsystem.hpp:35
Unbounded queue with automatic capacity growth.
Definition queue.hpp:63
void Push(T &&item)
Add element to the back of the queue.
Definition queue.hpp:77
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185
Fire-and-forget coroutine handle.
Definition promises.hpp:26