71 TActorId Register(IActor::TPtr actor);
73 auto Sleep(TTime until) {
74 return Poller->Sleep(until);
77 template<
typename Rep,
typename Period>
78 auto Sleep(std::chrono::duration<Rep,Period> duration) {
79 return Poller->Sleep(duration);
85 auto blob = SerializeNear(std::forward<T>(message), GetPodAllocator());
86 Send(sender, recepient, T::MessageId, std::move(blob));
91 TEvent Schedule(TTime when,
TActorId sender,
TActorId recipient, T&& message) {
92 auto blob = SerializeNear(std::forward<T>(message), GetPodAllocator());
93 return Schedule(when, sender, recipient, T::MessageId, std::move(blob));
95 void Cancel(TEvent event);
97 template<
typename T,
typename TQuestion>
98 auto Ask(
TActorId recepient, TQuestion&& message) {
106 bool await_ready()
const noexcept {
110 void await_suspend(THandle h) {
115 if (T::MessageId != State->MessageId) {
116 throw std::runtime_error(
"MessageId mismatch in Ask awaiter");
119 return DeserializeNear<T>(State->Blob);
123 std::shared_ptr<TAskState<T>> State;
126 auto state = std::make_shared<TAskState<T>>();
127 auto askActor = std::make_unique<TAsk<T>>(state);
128 auto actorId = Register(std::move(askActor));
129 Send(actorId, recepient, TQuestion::MessageId, SerializeNear(std::forward<TQuestion>(message), GetPodAllocator()));
130 return TAskAwaiter{state};
135 size_t ActorsSize()
const;
137 void AddNode(
int id, std::unique_ptr<INode> node);
142 template<
typename TSocket>
145 Handles.emplace_back(InboundServe(std::move(socket)));
146 for (
int i = 0; i < static_cast<int>(Nodes.size()); ++i) {
148 Handles.emplace_back(OutboundServe(i));
154 void GcIterationSync();
156 void DrainReadyNodes();
160 Nodes[id].Node->StartConnect();
162 co_await SuspendExecution(
id);
163 auto& node = Nodes[id].Node;
167 std::cerr <<
"Node with id: " <<
id <<
" is not registered\n";
172 template<
typename TSocket>
174 std::cerr <<
"InboundServe started\n";
176 auto client =
co_await socket.
Accept();
177 std::cerr <<
"Accepted\n";
178 InboundConnection(std::move(client));
183 template<
typename TSocket>
186 static constexpr auto BatchSize = 16;
187 uint64_t message = 0;
191 auto data =
co_await reader.
Read();
192 if (data.Recipient.NodeId() != NodeId_) {
193 std::cerr <<
"Received message for different node: " << data.Recipient.ToString() <<
"\n";
198 blob.Size = data.Size;
199 blob.Type = TBlob::PointerType::Far;
200 blob.Data = TBlob::TRawPtr(::operator
new(blob.Size), [](
void* ptr) {
201 ::operator delete(ptr);
205 Send(data.Sender, data.Recipient, data.MessageId, std::move(blob));
206 if (++message % BatchSize == 0) {
207 co_await Poller->Yield();
210 }
catch (
const std::exception& e) {
211 std::cerr <<
"Error in InboundConnection: " << e.what() <<
"\n";
216 if (actorId < Actors.size()) {
218 Actors[actorId] = {};
219 FreeActorIds.push(actorId);
223 void* AllocateActorContext() {
224 return ContextAllocator.Allocate();
227 void DeallocateActorContext(TActorContext* ptr) {
228 ContextAllocator.Deallocate(ptr);
232 Nodes[nodeId].Pending =
co_await Self();
233 co_await std::suspend_always();
234 Nodes[nodeId].Pending = {};
239 struct TPodAllocator {
240 void* Acquire(
size_t size) {
241 return ::operator
new(size);
244 void Release(
void* ptr) {
245 ::operator
delete(ptr);
249 TPodAllocator& GetPodAllocator() {
256 std::vector<TActorInternalState> Actors;
259 std::vector<TFuture<void>> CleanupMessages;
260 std::stack<TLocalActorId, std::vector<TLocalActorId>> FreeActorIds;
264 TPodAllocator PodAllocator;
269 THandle YieldCoroutine_{};
270 THandle ScheduleCoroutine_{};
271 bool IsYielding_ =
true;
274 std::unique_ptr<INode> Node;
277 uint32_t IsReady : 1 = 0;
280 std::vector<TNodeState> Nodes;
285 unsigned TimerId = 0;
292 bool operator<(
const TDelayed& other)
const {
293 return std::tie(When, TimerId, valid) > std::tie(other.When, other.TimerId, other.valid);
297 std::priority_queue<TDelayed> DelayedMessages;
299 std::vector<THandle> Handles;
301 friend class TActorContext;
void Forward(TActorId to, TMessageId messageId, TBlob blob)
Forward a message to another actor (preserves original sender)
TFuture< void > Sleep(TTime until)
Sleep until a specific time.
Definition actorsystem.hpp:312
TFuture< T > Ask(TActorId recipient, TQuestion &&question)
Send a message and wait for a response.
Definition actorsystem.hpp:322
void Send(TActorId to, TMessageId messageId, TBlob blob)
Send a message to another actor.
TEvent Schedule(TTime when, TActorId sender, TActorId recipient, TMessageId messageId, TBlob blob)
Schedule a message to be delivered at a specific time.