72 TActorId Register(IActor::TPtr actor);
74 auto Sleep(TTime until) {
75 return Poller->
Sleep(until);
78 template<
typename Rep,
typename Period>
79 auto Sleep(std::chrono::duration<Rep,Period> duration) {
80 return Poller->
Sleep(duration);
84 template<
typename T,
typename... Args>
86 if (recipient.
NodeId() == NodeId_) {
87 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
88 Send(sender, recipient, T::MessageId, std::move(blob));
90 auto& maybeRemote = Nodes[recipient.
NodeId()];
91 if (!maybeRemote.Node) {
92 std::cerr <<
"Cannot send message to actor on different node: " << recipient.
ToString() <<
"\n";
95 SerializeFarInplace<T>(*maybeRemote.Node, sender, recipient, std::forward<Args>(args)...);
96 if (maybeRemote.Flags.IsReady == 0) {
97 maybeRemote.Flags.IsReady = 1;
106 template<
typename T,
typename... Args>
107 TEvent Schedule(TTime when,
TActorId sender,
TActorId recipient, Args&&... args) {
108 auto blob = SerializeNear<T>(GetPodAllocator(), std::forward<Args>(args)...);
109 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
111 void Cancel(TEvent event);
113 template<
typename T,
typename TQuestion>
114 auto Ask(
TActorId recepient, TQuestion&& message) {
122 bool await_ready()
const noexcept {
126 void await_suspend(THandle h) {
131 if (T::MessageId != State->MessageId) {
132 throw std::runtime_error(
"MessageId mismatch in Ask awaiter");
135 return DeserializeNear<T>(State->Blob);
139 std::shared_ptr<TAskState<T>> State;
142 auto state = std::make_shared<TAskState<T>>();
143 auto askActor = std::make_unique<TAsk<T>>(state);
144 auto actorId = Register(std::move(askActor));
145 Send(actorId, recepient, TQuestion::MessageId, SerializeNear<TQuestion>(GetPodAllocator(), std::forward<TQuestion>(message)));
146 return TAskAwaiter{state};
151 size_t ActorsSize()
const;
153 void AddNode(
int id, std::unique_ptr<INode> node);
158 template<
typename TSocket,
typename TEnvelopeReader = TZeroCopyEnvelopeReader>
161 Handles.emplace_back(InboundServe<TSocket, TEnvelopeReader>(std::move(socket)));
162 for (
int i = 0; i < static_cast<int>(Nodes.size()); ++i) {
164 Handles.emplace_back(OutboundServe(i));
170 void GcIterationSync();
172 void DrainReadyNodes();
173 void AddPendingFuture(TLocalActorId
id,
TFuture<void>&& future);
176 Nodes[id].Node->StartConnect();
178 co_await SuspendExecution(
id);
179 auto& node = Nodes[id].Node;
183 std::cerr <<
"Node with id: " <<
id <<
" is not registered\n";
188 template<
typename TSocket,
typename TEnvelopeReader>
190 std::cerr <<
"InboundServe started\n";
192 auto client =
co_await socket.
Accept();
193 std::cerr <<
"Accepted\n";
194 InboundConnection<TSocket, TEnvelopeReader>(std::move(client));
199 template<
typename TSocket,
typename TEnvelopeReader>
201 static constexpr size_t ReadSize = 512 * 1024;
202 static constexpr size_t InflightBytes = 16 * 1024 * 1024;
203 static constexpr size_t MaxBytesBeforeYield = 2 * 1024 * 1024;
205 uint64_t message = 0;
209 if (envelopeReader.Size() < InflightBytes || envelopeReader.NeedMoreData()) {
210 auto buffer = envelopeReader.Acquire(ReadSize);
211 auto size =
co_await socket.
ReadSome(buffer.data(), buffer.size());
216 throw std::runtime_error(
"Socket closed");
218 envelopeReader.Commit(size);
223 size_t bytesProcessed = 0;
224 while (
auto envelope = envelopeReader.Pop()) {
225 if (envelope->Recipient.NodeId() != NodeId_) [[unlikely]] {
226 std::cerr <<
"Received message for different node: " << envelope->Recipient.ToString() <<
"\n";
230 bytesProcessed += envelope->Blob.Size +
sizeof(
THeader);
231 Send(envelope->Sender, envelope->Recipient, envelope->MessageId, std::move(envelope->Blob));
232 if (bytesProcessed >= MaxBytesBeforeYield) {
237 co_await Poller->
Yield();
239 }
catch (
const std::exception& e) {
240 std::cerr <<
"Error in InboundConnection: " << e.what() <<
"\n";
244 void ShutdownActor(TLocalActorId actorId) {
245 if (actorId < Actors.size()) {
247 Actors[actorId] = {};
248 FreeActorIds.push(actorId);
252 void* AllocateActorContext() {
253 return ContextAllocator.Allocate();
257 ContextAllocator.Deallocate(ptr);
261 Nodes[nodeId].Pending =
co_await Self();
262 co_await std::suspend_always();
263 Nodes[nodeId].Pending = {};
268 struct TPodAllocator {
269 void* Acquire(
size_t size) {
270 return ::operator
new(size);
273 void Release(
void* ptr) {
274 ::operator
delete(ptr);
278 TPodAllocator& GetPodAllocator() {
285 std::vector<TActorInternalState> Actors;
288 std::vector<TFuture<void>> CleanupMessages;
289 std::stack<TLocalActorId, std::vector<TLocalActorId>> FreeActorIds;
293 TPodAllocator PodAllocator;
295 TLocalActorId NextActorId_ = 1;
296 TCookie NextCookie_ = 1;
298 THandle YieldCoroutine_{};
299 THandle ScheduleCoroutine_{};
300 bool IsYielding_ =
true;
303 std::unique_ptr<INode> Node;
306 uint32_t IsReady : 1 = 0;
309 std::vector<TNodeState> Nodes;
314 unsigned TimerId = 0;
318 TMessageId MessageId;
321 bool operator<(
const TDelayed& other)
const {
322 return std::tie(When, TimerId, valid) > std::tie(other.When, other.TimerId, other.valid);
326 std::priority_queue<TDelayed> DelayedMessages;
328 std::vector<THandle> Handles;
330 friend class TActorContext;
Context object providing actor communication and scheduling capabilities.
Definition actor.hpp:139
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
Definition actorsystem.cpp:21
TFuture< void > Sleep(TTime until)
Sleep until a specific time.
Definition actorsystem.hpp:341
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a message and wait for a response.
Definition actorsystem.hpp:351
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
Definition actorsystem.cpp:16
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.
Definition actorsystem.cpp:26
TActorId Sender() const
Get the sender of the current message.
Definition actor.hpp:144