COROIO: coroio/actors/node.hpp Source File
COROIO
Loading...
Searching...
No Matches
node.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "messages_factory.hpp"
5
6#include <coroio/address.hpp>
7#include <coroio/resolver.hpp>
8
9namespace NNet {
10namespace NActors {
11
12class INode {
13public:
14 virtual ~INode() = default;
15 virtual void Send(TEnvelope&& envelope) = 0;
16 virtual void StartConnect() = 0;
17 virtual void Drain() = 0;
18 virtual THostPort GetHostPort() const = 0;
19};
20
21struct TSendData {
22 TActorId Sender;
23 TActorId Recipient;
24 TMessageId MessageId = 0;
25 uint32_t Size = 0;
26};
27
28template<typename TPoller, typename TResolver>
29class TNode : public INode {
30public:
31 using TSocket = typename TPoller::TSocket;
32 TNode(TPoller& poller, TMessagesFactory& factory, TResolver& resolver, const std::function<TSocket(const TAddress&)>& socketFactory, const THostPort& hostPort)
33 : Poller(poller)
34 , Factory(factory)
35 , Resolver(resolver)
36 , SocketFactory(socketFactory)
37 , HostPort(hostPort)
38 { }
39
40 void Send(TEnvelope&& envelope) override {
41 auto blob = std::move(envelope.Blob);
42 if (blob.Size > 0) {
43 blob = Factory.SerializeFar(envelope.MessageId, std::move(blob));
44 }
45 TSendData data{
46 .Sender = envelope.Sender,
47 .Recipient = envelope.Recipient,
48 .MessageId = envelope.MessageId,
49 .Size = blob.Size
50 };
51 OutputBuffer.insert(OutputBuffer.end(), (char*)&data, (char*)&data + sizeof(data));
52 if (blob.Size > 0) {
53 OutputBuffer.insert(OutputBuffer.end(), (char*)blob.Data.get(), (char*)blob.Data.get() + blob.Size);
54 }
55 }
56
57 void StartConnect() override {
58 if (!Connected) {
59 Connect();
60 return;
61 }
62 }
63
64 void Drain() override {
65 StartConnect();
66 if (Connected && (!Drainer.raw() || Drainer.done())) {
67 Drainer = DoDrain();
68 }
69 }
70
71 THostPort GetHostPort() const override {
72 return HostPort;
73 }
74
75private:
76 TFuture<void> DoDrain() {
77 try {
78 while (!OutputBuffer.empty()) {
79 SendBuffer.swap(OutputBuffer);
80 co_await TByteWriter(Socket).Write(SendBuffer.data(), SendBuffer.size());
81 SendBuffer.clear();
82 }
83 co_return;
84 } catch (const std::exception& ex) {
85 std::cerr << "Error during draining: " << ex.what() << "\n";
86 Connect();
87 co_return;
88 }
89 }
90
91 void Connect() {
92 if (Connector.raw() && !Connector.done()) {
93 return;
94 }
95
96 Connector = DoConnect();
97 }
98
99 TFuture<void> DoConnect() {
100 Connected = false;
101 std::cout << "Connecting to " << HostPort.ToString() << "\n";
102 while (!Connected) {
103 try {
104 //auto deadline = NNet::TClock::now() + std::chrono::milliseconds(5000);
105 TAddress addr = co_await HostPort.Resolve(Resolver);
106 Socket = SocketFactory(addr);
107 co_await Socket.Connect(addr /*, deadline*/);
108 // Connection bug workaround:
109 {
110 auto sender = TActorId(0, 0, 0);
111 auto recipient = TActorId(0, 0, 0);
112 TSendData data{
113 .Sender = sender,
114 .Recipient = recipient,
115 .MessageId = 0
116 };
117 co_await TByteWriter(Socket).Write(&data, sizeof(data));
118 }
119 Connected = true;
120 std::cout << "Connected to " << HostPort.ToString() << "\n";
121 } catch (const std::exception& ex) {
122 std::cerr << "Error connecting to " << HostPort.ToString() << ": " << ex.what() << "\n";
123 Connected = false;
124 }
125 if (!Connected) {
126 co_await Poller.Sleep(std::chrono::milliseconds(1000));
127 }
128 }
129 co_return;
130 }
131
132 bool Connected = false;
133 TFuture<void> Drainer;
134 TFuture<void> Connector;
135 TSocket Socket;
136
137 TPoller& Poller;
138 TMessagesFactory& Factory;
139 TResolver& Resolver;
140 std::function<TSocket(TAddress&)> SocketFactory;
141 THostPort HostPort;
142 std::vector<char> OutputBuffer;
143 std::vector<char> SendBuffer;
144};
145
146} // namespace NActors
147} // namespace NNet
Actor system implementation with message passing and behavior support.
uint32_t TMessageId
Message type identifier.
Definition actor.hpp:119
Definition node.hpp:12
Unique identifier for actors in the system.
Definition actor.hpp:129
Message envelope containing routing and payload information.
Definition actor.hpp:194
Definition messages_factory.hpp:11
A class representing an IPv4 or IPv6 address (with port).
Definition address.hpp:38
Definition resolver.hpp:193
Resolves hostnames into IP addresses using a custom poller.
Definition resolver.hpp:84
Definition node.hpp:21
A utility for writing data to a socket-like object.
Definition sockutils.hpp:199
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:222
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182