COROIO: coroio/actors/node.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
node.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "envelope_reader.hpp"
5#include "messages_factory.hpp"
6
7#include <coroio/address.hpp>
8#include <coroio/dns/resolver.hpp>
9
10#include <span>
11
12namespace NNet {
13namespace NActors {
14
23public:
24 virtual ~IOutputStream() = default;
26 virtual std::span<char> Acquire(size_t size) = 0;
28 virtual void Commit(size_t size) = 0;
29};
30
38class INode : public IOutputStream {
39public:
40 virtual ~INode() = default;
42 virtual void Send(TEnvelope&& envelope) = 0;
44 virtual void StartConnect() = 0;
46 virtual void Drain() = 0;
48 virtual THostPort GetHostPort() const = 0;
49};
50
60template<typename TPoller>
61class TNode : public INode {
62public:
63 using TSocket = typename TPoller::TSocket;
75 TNode(TPoller& poller, TMessagesFactory& factory, TResolver& resolver, const std::function<TSocket(const TAddress&)>& socketFactory, const THostPort& hostPort)
76 : Poller(poller)
77 , Factory(factory)
78 , Resolver(resolver)
79 , SocketFactory(socketFactory)
80 , HostPort(hostPort)
81 { }
82
83 // TODO: remove me, envelope needed only for local actors
84 void Send(TEnvelope&& envelope) override {
85 auto blob = std::move(envelope.Blob);
86 if (blob.Size > 0) {
87 blob = Factory.SerializeFar(envelope.MessageId, std::move(blob));
88 }
89 THeader data{
90 .Sender = envelope.Sender,
91 .Recipient = envelope.Recipient,
92 .MessageId = envelope.MessageId,
93 .Size = blob.Size
94 };
95 auto buf = Acquire(sizeof(data) + blob.Size);
96 std::memcpy(buf.data(), &data, sizeof(data));
97 if (blob.Size > 0) {
98 std::memcpy(buf.data() + sizeof(data), blob.Data.get(), blob.Size);
99 }
100 Commit(sizeof(data) + blob.Size);
101 }
102
103 std::span<char> Acquire(size_t size) override {
104 if (UncommittedBytes + size > OutputBuffer.size()) {
105 OutputBuffer.resize(UncommittedBytes + size);
106 }
107 auto buf = std::span<char>(OutputBuffer.data() + UncommittedBytes, size);
108 UncommittedBytes += size;
109 return buf;
110 }
111
112 void Commit(size_t size) override {
113 CommittedBytes += size;
114 UncommittedBytes = CommittedBytes;
115 }
116
117 void StartConnect() override {
118 if (!Connected) {
119 Connect();
120 return;
121 }
122 }
123
124 void Drain() override {
125 StartConnect();
126 if (Connected && (!Drainer.raw() || Drainer.done())) {
127 Drainer = DoDrain();
128 }
129 }
130
131 THostPort GetHostPort() const override {
132 return HostPort;
133 }
134
135private:
136 TFuture<void> DoDrain() {
137 try {
138 while (CommittedBytes > 0) {
139 SendBuffer.swap(OutputBuffer);
140 auto readSize = CommittedBytes;
141 UncommittedBytes = CommittedBytes = 0;
142 co_await TByteWriter(Socket).Write(SendBuffer.data(), readSize);
143 }
144 co_return;
145 } catch (const std::exception& ex) {
146 std::cerr << "Error during draining: " << ex.what() << "\n";
147 Connect();
148 co_return;
149 }
150 }
151
152 void Connect() {
153 if (Connector.raw() && !Connector.done()) {
154 return;
155 }
156
157 Connector = DoConnect();
158 }
159
160 TFuture<void> DoConnect() {
161 Connected = false;
162 std::cout << "Connecting to " << HostPort.ToString() << "\n";
163 while (!Connected) {
164 try {
165 //auto deadline = NNet::TClock::now() + std::chrono::milliseconds(5000);
166 TAddress addr = co_await HostPort.Resolve(Resolver);
167 Socket = SocketFactory(addr);
168 co_await Socket.Connect(addr /*, deadline*/);
169 // Connection bug workaround:
170 {
171 auto sender = TActorId(0, 0, 0);
172 auto recipient = TActorId(0, 0, 0);
173 THeader data{
174 .Sender = sender,
175 .Recipient = recipient,
176 .MessageId = 0
177 };
178 co_await TByteWriter(Socket).Write(&data, sizeof(data));
179 }
180 Connected = true;
181 std::cout << "Connected to " << HostPort.ToString() << "\n";
182 } catch (const std::exception& ex) {
183 std::cerr << "Error connecting to " << HostPort.ToString() << ": " << ex.what() << "\n";
184 Connected = false;
185 }
186 if (!Connected) {
187 co_await Poller.Sleep(std::chrono::milliseconds(1000));
188 }
189 }
190 co_return;
191 }
192
193 bool Connected = false;
194 TFuture<void> Drainer;
195 TFuture<void> Connector;
196 TSocket Socket;
197
198 TPoller& Poller;
199 TMessagesFactory& Factory;
200 TResolver& Resolver;
201 std::function<TSocket(TAddress&)> SocketFactory;
202 THostPort HostPort;
203 std::vector<char> OutputBuffer;
204 size_t UncommittedBytes = 0;
205 size_t CommittedBytes = 0;
206 std::vector<char> SendBuffer;
207};
208
209} // namespace NActors
210} // namespace NNet
Actor system implementation with message passing and behavior support.
Interface for a remote node connection in the actor transport layer.
Definition node.hpp:38
virtual void Drain()=0
Calls StartConnect() then asynchronously writes all buffered bytes to the socket.
virtual void Send(TEnvelope &&envelope)=0
Serializes envelope and appends it to the outbound buffer.
virtual THostPort GetHostPort() const =0
Returns the remote host:port this node connects to.
virtual void StartConnect()=0
Initiates an async TCP connection if not already connected or connecting.
Abstract write-buffer interface used by the actor transport layer.
Definition node.hpp:22
virtual std::span< char > Acquire(size_t size)=0
Reserves size contiguous bytes in the buffer and returns a span over them.
virtual void Commit(size_t size)=0
Marks size previously acquired bytes as ready to send.
Message envelope containing routing and payload information.
Definition actor.hpp:119
Definition messages_factory.hpp:10
Concrete INode implementation for a single remote actor-system endpoint.
Definition node.hpp:61
THostPort GetHostPort() const override
Returns the remote host:port this node connects to.
Definition node.hpp:131
void StartConnect() override
Initiates an async TCP connection if not already connected or connecting.
Definition node.hpp:117
void Send(TEnvelope &&envelope) override
Serializes envelope and appends it to the outbound buffer.
Definition node.hpp:84
std::span< char > Acquire(size_t size) override
Reserves size contiguous bytes in the buffer and returns a span over them.
Definition node.hpp:103
void Commit(size_t size) override
Marks size previously acquired bytes as ready to send.
Definition node.hpp:112
TNode(TPoller &poller, TMessagesFactory &factory, TResolver &resolver, const std::function< TSocket(const TAddress &)> &socketFactory, const THostPort &hostPort)
Constructs a TNode but does not connect immediately.
Definition node.hpp:75
void Drain() override
Calls StartConnect() then asynchronously writes all buffered bytes to the socket.
Definition node.hpp:124
A class representing an IPv4 or IPv6 address (with port).
Definition address.hpp:38
A host:port pair that resolves to a TAddress.
Definition resolver.hpp:261
TFuture< TAddress > Resolve(TResolver &resolver)
Resolves the host to a TAddress using the given resolver.
Definition resolver.hpp:274
Async DNS resolver over UDP.
Definition resolver.hpp:115
Wire header prepended to every remote actor message.
Definition actorid.hpp:92
TActorId Sender
Originating actor.
Definition actorid.hpp:93
A utility for writing data to a socket-like object.
Definition sockutils.hpp:239
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:262
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185