COROIO: coroio/actors/actor.hpp Source File
COROIO
Loading...
Searching...
No Matches
actor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <memory>
5#include "messages.hpp"
6
103
104namespace NNet {
105namespace NActors {
106
107class TActorSystem;
108
110using TLocalActorId = uint32_t;
111
113using TNodeId = uint16_t;
114
116using TCookie = uint16_t;
117
119using TMessageId = uint32_t;
120
129class TActorId {
130public:
132 TActorId() = default;
133
138 operator bool() const {
139 return !((NodeId_ == 0) & (ActorId_ == 0) & (Cookie_ == 0));
140 }
141
143 TNodeId NodeId() const {
144 return NodeId_;
145 }
146
150 return ActorId_;
151 }
152
154 TCookie Cookie() const {
155 return Cookie_;
156 }
157
162 std::string ToString() const {
163 return "ActorId:"
164 + std::to_string(NodeId_) + ":"
165 + std::to_string(ActorId_) + ":"
166 + std::to_string(Cookie_);
167 }
168
175 TActorId(TNodeId nodeId, TLocalActorId actorId, TCookie cookie)
176 : NodeId_(nodeId)
177 , ActorId_(actorId)
178 , Cookie_(cookie)
179 { }
180
181private:
182 TLocalActorId ActorId_ = 0;
183 TNodeId NodeId_ = 0;
184 TCookie Cookie_ = 0;
185};
186
201
202using TEvent = std::pair<unsigned, TTime>;
203
215class TActorContext
216{
217public:
218 using TPtr = std::unique_ptr<TActorContext>;
219
221 TActorId Sender() const {
222 return Sender_;
223 }
224
226 TActorId Self() const {
227 return Self_;
228 }
229
236 void Send(TActorId to, TMessageId messageId, TBlob blob);
237
244 void Forward(TActorId to, TMessageId messageId, TBlob blob);
245
252 template<typename T>
253 void Send(TActorId to, T&& message);
254
261 template<typename T>
262 void Forward(TActorId to, T&& message);
263
273 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
274
284 template<typename T>
285 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, T&& message);
286
291 void Cancel(TEvent event);
292
298 TFuture<void> Sleep(TTime until);
299
307 template<typename Rep, typename Period>
308 TFuture<void> Sleep(std::chrono::duration<Rep,Period> duration);
309
318 template<typename T, typename TQuestion>
319 TFuture<T> Ask(TActorId recipient, TQuestion&& question);
320
328 struct TAsync {
329 TAsync(TActorSystem* actorSystem, TLocalActorId actorId)
330 : ActorSystem_(actorSystem)
331 , ActorId_(actorId)
332 { }
333
334 void Commit(TFuture<void>&& future);
335
336 private:
337 TLocalActorId ActorId_;
338 TActorSystem* ActorSystem_;
339 };
340
346
347 static void* operator new(size_t size, TActorSystem* actorSystem);
348 static void operator delete(void* ptr);
349
350private:
351 TActorContext(TActorId sender, TActorId self, TActorSystem* actorSystem)
352 : Sender_(sender)
353 , Self_(self)
354 , ActorSystem(actorSystem)
355 { }
356
357 TActorId Sender_;
358 TActorId Self_;
359 TActorSystem* ActorSystem = nullptr;
360
361 friend class TActorSystem;
362 friend class TMockActorContext;
363};
364
371class TMockActorContext : public TActorContext {
372public:
373 TMockActorContext(TActorId sender, TActorId self, TActorSystem* actorSystem)
374 : TActorContext(sender, self, actorSystem)
375 { }
376};
377
388class IActor {
389public:
390 using TPtr = std::unique_ptr<IActor>;
391 friend class TActorSystem;
392
393 IActor() = default;
394 virtual ~IActor() = default;
395
406 virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
407};
408
417class ICoroActor : public IActor {
418public:
425 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override;
426
437 virtual TFuture<void> CoReceive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
438};
439
448public:
449 virtual ~IBehavior() = default;
450
457 virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
458};
459
488template<typename TBaseBehavior, typename... TMessages>
489class TBehavior : public IBehavior
490{
491public:
501 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override {
502 bool handled = (TryHandleMessage<TMessages>(
503 messageId,
504 blob,
505 ctx
506 ) || ...);
507
508 if (!handled) {
509 static_cast<TBaseBehavior*>(this)->HandleUnknownMessage(messageId, std::move(blob), std::move(ctx));
510 }
511 }
512
513private:
514 template<typename TMessage>
515 bool TryHandleMessage(TMessageId messageId, TBlob& blob, TActorContext::TPtr& ctx) {
516 if (TMessage::MessageId == messageId) {
517 if (blob.Type == TBlob::PointerType::Near) {
518 auto&& mes = DeserializeNear<TMessage>(blob);
519 HandleMessage<TMessage>(
520 std::move(mes),
521 std::move(blob),
522 std::move(ctx)
523 );
524 } else {
525 auto&& mes = DeserializeFar<TMessage>(blob);
526 HandleMessage<TMessage>(
527 std::move(mes),
528 std::move(blob),
529 std::move(ctx)
530 );
531 }
532 return true;
533 }
534 return false;
535 }
536
537 template<typename TMessage>
538 void HandleMessage(TMessage&& message, TBlob blob, TActorContext::TPtr ctx)
539 {
540 using ReturnType = decltype(static_cast<TBaseBehavior*>(this)->Receive(
541 std::declval<TMessage>(),
542 std::declval<TBlob>(),
543 std::declval<TActorContext::TPtr>()
544 ));
545
546 if constexpr (std::is_same_v<ReturnType, void>) {
547 static_cast<TBaseBehavior*>(this)->Receive(
548 std::move(message),
549 std::move(blob),
550 std::move(ctx)
551 );
552 } else {
553 auto async = ctx->StartAsync();
554 auto future = static_cast<TBaseBehavior*>(this)->Receive(
555 std::move(message),
556 std::move(blob),
557 std::move(ctx)
558 );
559 if (!future.done()) {
560 async.Commit(std::move(future));
561 }
562 }
563 }
564};
565
599class IBehaviorActor : public IActor {
600public:
608 void Become(IBehavior* behavior) {
609 CurrentBehavior_ = behavior;
610 }
611
618 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override {
619 CurrentBehavior_->Receive(messageId, std::move(blob), std::move(ctx));
620 }
621
622private:
623 IBehavior* CurrentBehavior_;
624};
625
626} // namespace NActors
627} // namespace NNet
uint16_t TNodeId
Node identifier in a distributed system.
Definition actor.hpp:113
uint32_t TMessageId
Message type identifier.
Definition actor.hpp:119
uint16_t TCookie
Cookie for actor versioning and disambiguation.
Definition actor.hpp:116
uint32_t TLocalActorId
Local actor identifier within a node.
Definition actor.hpp:110
virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Process an incoming message.
Actor that delegates message handling to a pluggable behavior.
Definition actor.hpp:599
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Delegate message handling to the current behavior.
Definition actor.hpp:618
void Become(IBehavior *behavior)
Switch to a new behavior.
Definition actor.hpp:608
Base interface for actor behaviors.
Definition actor.hpp:447
virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Process an incoming message according to this behavior.
Coroutine-based actor interface for asynchronous message processing.
Definition actor.hpp:417
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Synchronous receive method (calls CoReceive internally)
virtual TFuture< void > CoReceive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Asynchronous message processing method.
TActorId Self() const
Get this actor's ID.
Definition actor.hpp:226
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
TFuture< void > Sleep(TTime until)
Sleep until a specific time.
Definition actorsystem.hpp:312
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a message and wait for a response.
Definition actorsystem.hpp:322
void Cancel(TEvent event)
Cancel a previously scheduled message.
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
TAsync StartAsync()
Start an asynchronous operation context.
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:221
Unique identifier for actors in the system.
Definition actor.hpp:129
TNodeId NodeId() const
Get the node ID component.
Definition actor.hpp:143
TCookie Cookie() const
Get the cookie component.
Definition actor.hpp:154
TActorId()=default
Default constructor creates an invalid actor ID.
TActorId(TNodeId nodeId, TLocalActorId actorId, TCookie cookie)
Construct actor ID with specific components.
Definition actor.hpp:175
std::string ToString() const
Convert actor ID to string representation.
Definition actor.hpp:162
TLocalActorId ActorId() const
Get the local actor ID component.
Definition actor.hpp:149
Definition actorsystem.hpp:62
Template for type-safe behavior implementations.
Definition actor.hpp:490
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process incoming message with automatic type dispatch.
Definition actor.hpp:501
Message envelope containing routing and payload information.
Definition actor.hpp:194
TActorId Recipient
Actor that should receive the message.
Definition actor.hpp:197
TBlob Blob
Serialized message data.
Definition actor.hpp:199
TActorId Sender
Actor that sent the message.
Definition actor.hpp:196
TMessageId MessageId
Type identifier of the message.
Definition actor.hpp:198
Mock actor context for testing purposes.
Definition actor.hpp:371
Implementation of a promise/future system for coroutines.
Helper class for managing asynchronous operations in actors.
Definition actor.hpp:328
Definition messages.hpp:11
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182