COROIO: coroio/actors/actor.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
actor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <memory>
5#include "messages.hpp"
6#include "actorid.hpp"
7
105namespace NNet {
106namespace NActors {
107
108class TActorSystem;
109
124
125using TEvent = std::pair<unsigned, TTime>;
126
139{
140public:
141 using TPtr = std::unique_ptr<TActorContext>;
142
144 TActorId Sender() const {
145 return Sender_;
146 }
147
149 TActorId Self() const {
150 return Self_;
151 }
152
159 void Send(TActorId to, TMessageId messageId, TBlob blob);
160
167 void Forward(TActorId to, TMessageId messageId, TBlob blob);
168
175 template<typename T, typename... Args>
176 void Send(TActorId to, Args&&... args);
177
184 template<typename T, typename... Args>
185 void Forward(TActorId to, Args&&... args);
186
196 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob);
197
207 template<typename T, typename... Args>
208 TEvent Schedule(TTime when, TActorId sender, TActorId recipient, Args&&... args);
209
214 void Cancel(TEvent event);
215
221 TFuture<void> Sleep(TTime until);
222
230 template<typename Rep, typename Period>
231 TFuture<void> Sleep(std::chrono::duration<Rep,Period> duration);
232
241 template<typename T, typename TQuestion>
242 TFuture<T> Ask(TActorId recipient, TQuestion&& question);
243
251 struct TAsync {
252 TAsync(TActorSystem* actorSystem, TLocalActorId actorId)
253 : ActorSystem_(actorSystem)
254 , ActorId_(actorId)
255 { }
256
257 void Commit(TFuture<void>&& future);
258
259 private:
260 TLocalActorId ActorId_;
261 TActorSystem* ActorSystem_;
262 };
263
269
270 static void* operator new(size_t size, TActorSystem* actorSystem);
271 static void operator delete(void* ptr);
272
273private:
274 TActorContext(TActorId sender, TActorId self, TActorSystem* actorSystem)
275 : Sender_(sender)
276 , Self_(self)
277 , ActorSystem(actorSystem)
278 { }
279
280 TActorId Sender_;
281 TActorId Self_;
282 TActorSystem* ActorSystem = nullptr;
283
284 friend class TActorSystem;
285 friend class TMockActorContext;
286};
287
295public:
296 TMockActorContext(TActorId sender, TActorId self, TActorSystem* actorSystem)
297 : TActorContext(sender, self, actorSystem)
298 { }
299};
300
311class IActor {
312public:
313 using TPtr = std::unique_ptr<IActor>;
314 friend class TActorSystem;
315
316 IActor() = default;
317 virtual ~IActor() = default;
318
329 virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
330};
331
340class ICoroActor : public IActor {
341public:
348 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override;
349
360 virtual TFuture<void> CoReceive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
361};
362
371public:
372 virtual ~IBehavior() = default;
373
380 virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) = 0;
381};
382
411template<typename TBaseBehavior, typename... TMessages>
412class TBehavior : public IBehavior
413{
414public:
424 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override {
425 bool handled = (TryHandleMessage<TMessages>(
426 messageId,
427 blob,
428 ctx
429 ) || ...);
430
431 if (!handled) {
432 static_cast<TBaseBehavior*>(this)->HandleUnknownMessage(messageId, std::move(blob), std::move(ctx));
433 }
434 }
435
436private:
437 template<typename TMessage>
438 bool TryHandleMessage(TMessageId messageId, TBlob& blob, TActorContext::TPtr& ctx) {
439 if (TMessage::MessageId == messageId) {
440 if (blob.Type == TBlob::PointerType::Near) {
441 auto&& mes = DeserializeNear<TMessage>(blob);
442 HandleMessage<TMessage>(
443 std::move(mes),
444 std::move(blob),
445 std::move(ctx)
446 );
447 } else {
448 auto&& mes = DeserializeFar<TMessage>(blob);
449 HandleMessage<TMessage>(
450 std::move(mes),
451 std::move(blob),
452 std::move(ctx)
453 );
454 }
455 return true;
456 }
457 return false;
458 }
459
460 template<typename TMessage>
461 void HandleMessage(TMessage&& message, TBlob blob, TActorContext::TPtr ctx)
462 {
463 using ReturnType = decltype(static_cast<TBaseBehavior*>(this)->Receive(
464 std::declval<TMessage>(),
465 std::declval<TBlob>(),
466 std::declval<TActorContext::TPtr>()
467 ));
468
469 if constexpr (std::is_same_v<ReturnType, void>) {
470 static_cast<TBaseBehavior*>(this)->Receive(
471 std::move(message),
472 std::move(blob),
473 std::move(ctx)
474 );
475 } else {
476 auto async = ctx->StartAsync();
477 auto future = static_cast<TBaseBehavior*>(this)->Receive(
478 std::move(message),
479 std::move(blob),
480 std::move(ctx)
481 );
482 if (!future.done()) {
483 async.Commit(std::move(future));
484 }
485 }
486 }
487};
488
522class IBehaviorActor : public IActor {
523public:
531 void Become(IBehavior* behavior) {
532 CurrentBehavior_ = behavior;
533 }
534
541 void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override {
542 CurrentBehavior_->Receive(messageId, std::move(blob), std::move(ctx));
543 }
544
545private:
546 IBehavior* CurrentBehavior_;
547};
548
549} // namespace NActors
550} // namespace NNet
Base interface for all actors in the system.
Definition actor.hpp:311
virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Process an incoming message.
Actor that delegates message handling to a pluggable behavior.
Definition actor.hpp:522
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Delegate message handling to the current behavior.
Definition actor.hpp:541
void Become(IBehavior *behavior)
Switch to a new behavior.
Definition actor.hpp:531
Base interface for actor behaviors.
Definition actor.hpp:370
virtual void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Process an incoming message according to this behavior.
Coroutine-based actor interface for asynchronous message processing.
Definition actor.hpp:340
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Synchronous receive method (calls CoReceive internally)
Definition actor.cpp:7
virtual TFuture< void > CoReceive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx)=0
Asynchronous message processing method.
Context object providing actor communication and scheduling capabilities.
Definition actor.hpp:139
TActorId Self() const
Get this actor's ID.
Definition actor.hpp:149
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
Definition actorsystem.cpp:21
TFuture< void > Sleep(TTime until)
Sleep until a specific time.
Definition actorsystem.hpp:341
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a message and wait for a response.
Definition actorsystem.hpp:351
void Cancel(TEvent event)
Cancel a previously scheduled message.
Definition actorsystem.cpp:31
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
Definition actorsystem.cpp:16
TAsync StartAsync()
Start an asynchronous operation context.
Definition actor.cpp:16
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Definition actorsystem.cpp:26
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:144
Unique identifier for actors in the system.
Definition actorid.hpp:26
Definition actorsystem.hpp:63
Template for type-safe behavior implementations.
Definition actor.hpp:413
void Receive(TMessageId messageId, TBlob blob, TActorContext::TPtr ctx) override
Process incoming message with automatic type dispatch.
Definition actor.hpp:424
Message envelope containing routing and payload information.
Definition actor.hpp:117
TActorId Recipient
Actor that should receive the message.
Definition actor.hpp:120
TBlob Blob
Serialized message data.
Definition actor.hpp:122
TActorId Sender
Actor that sent the message.
Definition actor.hpp:119
TMessageId MessageId
Type identifier of the message.
Definition actor.hpp:121
Mock actor context for testing purposes.
Definition actor.hpp:294
Implementation of a promise/future system for coroutines.
Helper class for managing asynchronous operations in actors.
Definition actor.hpp:251
Definition messages.hpp:84
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182