COROIO: coroio/actors/actor.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
actor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <memory>
5#include "messages.hpp"
6#include "actorid.hpp"
7
107namespace NNet {
108namespace NActors {
109
110class TActorSystem;
111
126
127using TEvent = std::pair<unsigned, TTime>;
128
145{
146public:
147 using TPtr = std::unique_ptr<TActorContext>;
148
150 TActorId Sender() const {
151 return Sender_;
152 }
153
155 TActorId Self() const {
156 return Self_;
157 }
158
165 void Send(TActorId to, TMessageId messageId, TBlob blob);
166
173 void Forward(TActorId to, TMessageId messageId, TBlob blob);
174
189 template<typename T, typename... Args>
190 void Send(TActorId to, Args&&... args);
191
203 template<typename T, typename... Args>
204 void Forward(TActorId to, Args&&... args);
205
215 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
216
226 template<typename T, typename... Args>
227 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, Args&&... args);
228
233 void Cancel(TEvent event);
234
245 TFuture<void> Sleep(TTime until);
246
261 template<typename Rep, typename Period>
262 TFuture<void> Sleep(std::chrono::duration<Rep,Period> duration);
263
281 template<typename T, typename TQuestion>
282 TFuture<T> Ask(TActorId recipient, TQuestion&& question);
283
291 struct TAsync {
292 TAsync(TActorSystem* actorSystem, TLocalActorId actorId)
293 : ActorSystem_(actorSystem)
294 , ActorId_(actorId)
295 { }
296
297 void Commit(TFuture<void>&& future);
298
299 private:
300 TLocalActorId ActorId_;
301 TActorSystem* ActorSystem_;
302 };
303
309
310 static void* operator new(size_t size, TActorSystem* actorSystem);
311 static void operator delete(void* ptr);
312
313private:
314 TActorContext(TActorId sender, TActorId self, TActorSystem* actorSystem)
315 : Sender_(sender)
316 , Self_(self)
317 , ActorSystem(actorSystem)
318 { }
319
320 TActorId Sender_;
321 TActorId Self_;
322 TActorSystem* ActorSystem = nullptr;
323
324 friend class TActorSystem;
325 friend class TMockActorContext;
326};
327
335public:
336 TMockActorContext(TActorId sender, TActorId self, TActorSystem* actorSystem)
337 : TActorContext(sender, self, actorSystem)
338 { }
339};
340
351class IActor {
352public:
353 using TPtr = std::unique_ptr<IActor>;
354 friend class TActorSystem;
355
356 IActor() = default;
357 virtual ~IActor() = default;
358
369 virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
370};
371
393class ICoroActor : public IActor {
394public:
401 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override;
402
416 virtual TFuture<void> CoReceive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
417};
418
427public:
428 virtual ~IBehavior() = default;
429
436 virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
437};
438
470template<typename TBaseBehavior, typename... TMessages>
471class TBehavior : public IBehavior
472{
473public:
483 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override {
484 bool handled = (TryHandleMessage<TMessages>(
485 messageId,
486 blob,
487 ctx
488 ) || ...);
489
490 if (!handled) {
491 static_cast<TBaseBehavior*>(this)->HandleUnknownMessage(messageId, std::move(blob), std::move(ctx));
492 }
493 }
494
495private:
496 template<typename TMessage>
497 bool TryHandleMessage(TMessageId messageId, TBlob& blob, TActorContext::TPtr& ctx) {
498 if (TMessage::MessageId == messageId) {
499 if (blob.Type == TBlob::PointerType::Near) {
500 auto&& mes = DeserializeNear<TMessage>(blob);
501 HandleMessage<TMessage>(
502 std::move(mes),
503 std::move(blob),
504 std::move(ctx)
505 );
506 } else {
507 auto&& mes = DeserializeFar<TMessage>(blob);
508 HandleMessage<TMessage>(
509 std::move(mes),
510 std::move(blob),
511 std::move(ctx)
512 );
513 }
514 return true;
515 }
516 return false;
517 }
518
519 template<typename TMessage>
520 void HandleMessage(TMessage&& message, TBlob blob, TActorContext::TPtr ctx)
521 {
522 using ReturnType = decltype(static_cast<TBaseBehavior*>(this)->Receive(
523 std::declval<TMessage>(),
524 std::declval<TBlob>(),
525 std::declval<TActorContext::TPtr>()
526 ));
527
528 if constexpr (std::is_same_v<ReturnType, void>) {
529 static_cast<TBaseBehavior*>(this)->Receive(
530 std::move(message),
531 std::move(blob),
532 std::move(ctx)
533 );
534 } else {
535 auto async = ctx->StartAsync();
536 auto future = static_cast<TBaseBehavior*>(this)->Receive(
537 std::move(message),
538 std::move(blob),
539 std::move(ctx)
540 );
541 if (!future.done()) {
542 async.Commit(std::move(future));
543 }
544 }
545 }
546};
547
586class IBehaviorActor : public IActor {
587public:
597 void Become(IBehavior* behavior) {
598 CurrentBehavior_ = behavior;
599 }
600
607 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override {
608 CurrentBehavior_->Receive(messageId, std::move(blob), std::move(ctx));
609 }
610
611private:
612 IBehavior* CurrentBehavior_;
613};
614
615} // namespace NActors
616} // namespace NNet
Base interface for all actors in the system.
Definition actor.hpp:351
virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Process an incoming message.
Actor that delegates message handling to a pluggable behavior.
Definition actor.hpp:586
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Delegate message handling to the current behavior.
Definition actor.hpp:607
void Become(IBehavior *behavior)
Switch to a new behavior.
Definition actor.hpp:597
Base interface for actor behaviors.
Definition actor.hpp:426
virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Process an incoming message according to this behavior.
Coroutine-based actor interface for asynchronous message processing.
Definition actor.hpp:393
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
IActor bridge — invokes CoReceive and parks pending futures.
Definition actor.cpp:7
virtual TFuture< void > CoReceive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Asynchronous message handler (override in subclass)
Context object providing actor communication and scheduling capabilities.
Definition actor.hpp:145
TActorId Self() const
Get this actor's ID.
Definition actor.hpp:155
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
Definition actorsystem.cpp:21
TFuture< void > Sleep(TTime until)
Suspend the current handler until a specific time.
Definition actorsystem.hpp:478
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a request and suspend until a reply of type T arrives.
Definition actorsystem.hpp:488
void Cancel(TEvent event)
Cancel a previously scheduled message.
Definition actorsystem.cpp:31
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
Definition actorsystem.cpp:16
TAsync StartAsync()
Start an asynchronous operation context.
Definition actor.cpp:16
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Definition actorsystem.cpp:26
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:150
Globally unique identifier for actors across a distributed system.
Definition actorid.hpp:29
Single-threaded actor runtime.
Definition actorsystem.hpp:99
Template for type-safe behavior implementations.
Definition actor.hpp:472
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process incoming message with automatic type dispatch.
Definition actor.hpp:483
Message envelope containing routing and payload information.
Definition actor.hpp:119
TActorId Recipient
Actor that should receive the message.
Definition actor.hpp:122
TBlob Blob
Serialized message data.
Definition actor.hpp:124
TActorId Sender
Actor that sent the message.
Definition actor.hpp:121
TMessageId MessageId
Type identifier of the message.
Definition actor.hpp:123
Mock actor context for testing purposes.
Definition actor.hpp:334
Implementation of a promise/future system for coroutines.
Helper class for managing asynchronous operations in actors.
Definition actor.hpp:291
Opaque message payload with Near/Far duality.
Definition messages.hpp:99
@ Near
Live object pointer — valid only within the same process.
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185