4#include "envelope_reader.hpp"
8#include <coroio/arena.hpp>
9#include <coroio/sockutils.hpp>
20enum class ESystemMessages : TMessageId {
36 static constexpr TMessageId MessageId =
static_cast<TMessageId
>(ESystemMessages::PoisonPill);
43 THandle Handle =
nullptr;
44 TMessageId MessageId = 0;
51 std::unique_ptr<TUnboundedVectorQueue<TEnvelope>> Mailbox;
56 uint32_t IsReady : 1 = 0;
67 void Receive(TMessageId messageId,
TBlob blob, TActorContext::TPtr ctx)
override;
70 std::shared_ptr<TAskState<T>> State;
129 return Poller->
Sleep(until);
133 template<
typename Rep,
typename Period>
134 auto Sleep(std::chrono::duration<Rep,Period> duration) {
135 return Poller->
Sleep(duration);
153 template<
typename T,
typename... Args>
155 if (recipient.
NodeId() == NodeId_) {
156 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
157 Send(sender, recipient, T::MessageId, std::move(blob));
159 auto& maybeRemote = Nodes[recipient.
NodeId()];
160 if (!maybeRemote.Node) {
161 std::cerr <<
"Cannot send message to actor on different node: " << recipient.
ToString() <<
"\n";
164 SerializeFarInplace<T>(*maybeRemote.Node, sender, recipient, std::forward<Args>(args)...);
165 if (maybeRemote.Flags.IsReady == 0) {
166 maybeRemote.Flags.IsReady = 1;
186 template<
typename T,
typename... Args>
188 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
189 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
198 void Cancel(TEvent event);
217 template<
typename T,
typename TQuestion>
226 bool await_ready()
const noexcept {
230 void await_suspend(THandle h) {
235 if (T::MessageId != State->MessageId) {
236 throw std::runtime_error(
"MessageId mismatch in Ask awaiter");
239 return DeserializeNear<T>(State->Blob);
243 std::shared_ptr<TAskState<T>> State;
246 auto state = std::make_shared<TAskState<T>>();
247 auto askActor = std::make_unique<TAsk<T>>(state);
248 auto actorId =
Register(std::move(askActor));
249 Send(actorId, recepient, TQuestion::MessageId, SerializeNear<TQuestion>(GetPodAllocator(), std::forward<TQuestion>(message)));
250 return TAskAwaiter{state};
274 void AddNode(
int id, std::unique_ptr<INode> node);
295 template<
typename TSocket,
typename TEnvelopeReader = TZeroCopyEnvelopeReader>
298 Handles.emplace_back(InboundServe<TSocket, TEnvelopeReader>(std::move(socket)));
299 for (
int i = 0; i < static_cast<int>(Nodes.size()); ++i) {
301 Handles.emplace_back(OutboundServe(i));
307 void GcIterationSync();
309 void DrainReadyNodes();
310 void AddPendingFuture(TLocalActorId
id,
TFuture<void>&& future);
313 Nodes[id].Node->StartConnect();
315 co_await SuspendExecution(
id);
316 auto& node = Nodes[id].Node;
320 std::cerr <<
"Node with id: " <<
id <<
" is not registered\n";
325 template<
typename TSocket,
typename TEnvelopeReader>
326 TVoidTask InboundServe(TSocket socket) {
327 std::cerr <<
"InboundServe started\n";
329 auto client =
co_await socket.Accept();
330 std::cerr <<
"Accepted\n";
331 InboundConnection<TSocket, TEnvelopeReader>(std::move(client));
336 template<
typename TSocket,
typename TEnvelopeReader>
337 TVoidTask InboundConnection(TSocket socket) {
338 static constexpr size_t ReadSize = 512 * 1024;
339 static constexpr size_t InflightBytes = 16 * 1024 * 1024;
340 static constexpr size_t MaxBytesBeforeYield = 2 * 1024 * 1024;
341 TEnvelopeReader envelopeReader(InflightBytes, 1024);
342 uint64_t message = 0;
346 if (envelopeReader.Size() < InflightBytes || envelopeReader.NeedMoreData()) {
347 auto buffer = envelopeReader.Acquire(ReadSize);
348 auto size =
co_await socket.ReadSome(buffer.data(), buffer.size());
353 throw std::runtime_error(
"Socket closed");
355 envelopeReader.Commit(size);
360 size_t bytesProcessed = 0;
361 while (
auto envelope = envelopeReader.Pop()) {
362 if (envelope->Recipient.NodeId() != NodeId_) [[unlikely]] {
363 std::cerr <<
"Received message for different node: " << envelope->Recipient.ToString() <<
"\n";
367 bytesProcessed += envelope->Blob.Size +
sizeof(THeader);
368 Send(envelope->Sender, envelope->Recipient, envelope->MessageId, std::move(envelope->Blob));
369 if (bytesProcessed >= MaxBytesBeforeYield) {
374 co_await Poller->
Yield();
376 }
catch (
const std::exception& e) {
377 std::cerr <<
"Error in InboundConnection: " << e.what() <<
"\n";
381 void ShutdownActor(TLocalActorId actorId) {
382 if (actorId < Actors.size()) {
384 Actors[actorId] = {};
385 FreeActorIds.push(actorId);
389 void* AllocateActorContext() {
390 return ContextAllocator.Allocate();
393 void DeallocateActorContext(TActorContext* ptr) {
394 ContextAllocator.Deallocate(ptr);
397 TFuture<void> SuspendExecution(
int nodeId) {
398 Nodes[nodeId].Pending =
co_await Self();
399 co_await std::suspend_always();
400 Nodes[nodeId].Pending = {};
405 struct TPodAllocator {
406 void* Acquire(
size_t size) {
407 return ::operator
new(size);
410 void Release(
void* ptr) {
411 ::operator
delete(ptr);
415 TPodAllocator& GetPodAllocator() {
421 TUnboundedVectorQueue<TLocalActorId> ReadyActors;
422 std::vector<TActorInternalState> Actors;
425 std::vector<TFuture<void>> CleanupMessages;
426 std::stack<TLocalActorId, std::vector<TLocalActorId>> FreeActorIds;
428 TArenaAllocator<TActorContext> ContextAllocator;
430 TPodAllocator PodAllocator;
432 TLocalActorId NextActorId_ = 1;
433 TCookie NextCookie_ = 1;
435 THandle YieldCoroutine_{};
436 THandle ScheduleCoroutine_{};
437 bool IsYielding_ =
true;
440 std::unique_ptr<INode> Node;
443 uint32_t IsReady : 1 = 0;
446 std::vector<TNodeState> Nodes;
451 unsigned TimerId = 0;
455 TMessageId MessageId;
458 bool operator<(
const TDelayed& other)
const {
459 return std::tie(When, TimerId, valid) > std::tie(other.When, other.TimerId, other.valid);
463 std::priority_queue<TDelayed> DelayedMessages;
465 std::vector<THandle> Handles;
467 friend class TActorContext;
472 State->MessageId = messageId;
473 State->Blob = std::move(blob);
474 State->Handle.resume();
475 ctx->Send<
TPoison>(ctx->Self());
479 co_return co_await ActorSystem->
Sleep(until);
482template<
typename Rep,
typename Period>
484 co_return co_await ActorSystem->
Sleep(duration);
487template<
typename T,
typename TQuestion>
489 co_return co_await ActorSystem->
Ask<T>(recipient, std::forward<TQuestion>(question));
492template<
typename T,
typename... Args>
494 ActorSystem->
Send<T>(
Self(), to, std::forward<Args>(args)...);
496template<
typename T,
typename... Args>
498 ActorSystem->
Send<T>(
Sender(), to, std::forward<Args>(args)...);
501template<
typename T,
typename... Args>
503 auto blob = SerializeNear<T>(ActorSystem->GetPodAllocator(), std::forward<Args>(args)...);
504 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
507inline void* TActorContext::operator
new(
size_t size,
TActorSystem* actorSystem) {
508 return actorSystem->AllocateActorContext();
511inline void TActorContext::operator
delete(
void* ptr) {
513 auto* self =
static_cast<TActorContext*
>(ptr);
514 auto* actorSystem = self->ActorSystem;
515 actorSystem->DeallocateActorContext(self);
Actor system implementation with message passing and behavior support.
Base interface for all actors in the system.
Definition actor.hpp:351
TActorId Self() const
Get this actor's ID.
Definition actor.hpp:155
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
Definition actorsystem.cpp:21
TFuture< void > Sleep(TTime until)
Suspend the current handler until a specific time.
Definition actorsystem.hpp:478
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a request and suspend until a reply of type T arrives.
Definition actorsystem.hpp:488
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
Definition actorsystem.cpp:16
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Definition actorsystem.cpp:26
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:150
Globally unique identifier for actors across a distributed system.
Definition actorid.hpp:29
TNodeId NodeId() const
Get the node ID component.
Definition actorid.hpp:43
std::string ToString() const
Convert actor ID to a human-readable string.
Definition actorid.hpp:61
Single-threaded actor runtime.
Definition actorsystem.hpp:99
auto Ask(TActorId recepient, TQuestion &&message)
Sends a request and returns an awaitable for the reply of type T.
Definition actorsystem.hpp:218
size_t ActorsSize() const
Returns the number of currently registered (alive) actors.
Definition actorsystem.cpp:240
void Serve(TSocket socket)
Starts the actor system with inbound and outbound network serving.
Definition actorsystem.hpp:296
auto Sleep(std::chrono::duration< Rep, Period > duration)
Suspends the current coroutine for duration. Delegates to the poller.
Definition actorsystem.hpp:134
void Send(TActorId sender, TActorId recipient, Args &&... args)
Sends a typed message from sender to recipient (non-blocking).
Definition actorsystem.hpp:154
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Low-level schedule with a pre-serialized blob. Prefer the typed Schedule<T> overload.
Definition actorsystem.cpp:250
void Serve()
Starts the actor system event loop for local actors only.
Definition actorsystem.cpp:176
void Cancel(TEvent event)
Cancels a previously scheduled message.
Definition actorsystem.cpp:265
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, Args &&... args)
Schedules a typed message to be delivered at when.
Definition actorsystem.hpp:187
void YieldNotify()
Wakes up the internal scheduler to process newly ready nodes.
Definition actorsystem.cpp:244
void Send(TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Low-level send with a pre-serialized blob. Prefer the typed Send<T> overload.
Definition actorsystem.cpp:62
auto Sleep(TTime until)
Suspends the current coroutine until until. Delegates to the poller.
Definition actorsystem.hpp:128
TActorId Register(IActor::TPtr actor)
Registers an actor and returns its system-wide ID.
Definition actorsystem.cpp:36
TActorSystem(TPollerBase *poller, int nodeId=1)
Constructs the actor system.
Definition actorsystem.hpp:108
void AddNode(int id, std::unique_ptr< INode > node)
Registers a remote node for distributed message routing.
Definition actorsystem.cpp:228
Definition actorsystem.hpp:41
Definition actorsystem.hpp:61
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process an incoming message.
Definition actorsystem.hpp:471
Backend-independent base for I/O pollers.
Definition poller.hpp:38
auto Sleep(TTime until)
Suspends execution until the specified time.
Definition poller.hpp:147
auto Yield()
Suspends the coroutine until the next event-loop iteration.
Definition poller.hpp:207
High-level asynchronous socket for network communication.
Definition socket.hpp:367
Memory-efficient unbounded queue implementation for actor message passing.
Definition actorsystem.hpp:55
Definition actorsystem.hpp:49
Definition actorsystem.hpp:442
Opaque message payload with Near/Far duality.
Definition messages.hpp:99
System message that terminates the receiving actor.
Definition actorsystem.hpp:35
Unbounded queue with automatic capacity growth.
Definition queue.hpp:63
void Push(T &&item)
Add element to the back of the queue.
Definition queue.hpp:77
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185
Fire-and-forget coroutine handle.
Definition promises.hpp:26