4#include "envelope_reader.hpp"
5#include "messages_factory.hpp"
7#include <coroio/address.hpp>
8#include <coroio/dns/resolver.hpp>
26 virtual std::span<char>
Acquire(
size_t size) = 0;
28 virtual void Commit(
size_t size) = 0;
40 virtual ~INode() =
default;
60template<
typename TPoller>
63 using TSocket =
typename TPoller::TSocket;
79 , SocketFactory(socketFactory)
85 auto blob = std::move(envelope.Blob);
87 blob = Factory.SerializeFar(envelope.MessageId, std::move(blob));
91 .Recipient = envelope.Recipient,
92 .MessageId = envelope.MessageId,
95 auto buf =
Acquire(
sizeof(data) + blob.Size);
96 std::memcpy(buf.data(), &data,
sizeof(data));
98 std::memcpy(buf.data() +
sizeof(data), blob.Data.get(), blob.Size);
100 Commit(
sizeof(data) + blob.Size);
103 std::span<char>
Acquire(
size_t size)
override {
104 if (UncommittedBytes + size > OutputBuffer.size()) {
105 OutputBuffer.resize(UncommittedBytes + size);
107 auto buf = std::span<char>(OutputBuffer.data() + UncommittedBytes, size);
108 UncommittedBytes += size;
113 CommittedBytes += size;
114 UncommittedBytes = CommittedBytes;
126 if (Connected && (!Drainer.raw() || Drainer.done())) {
138 while (CommittedBytes > 0) {
139 SendBuffer.swap(OutputBuffer);
140 auto readSize = CommittedBytes;
141 UncommittedBytes = CommittedBytes = 0;
145 }
catch (
const std::exception& ex) {
146 std::cerr <<
"Error during draining: " << ex.what() <<
"\n";
153 if (Connector.raw() && !Connector.done()) {
157 Connector = DoConnect();
160 TFuture<void> DoConnect() {
162 std::cout <<
"Connecting to " << HostPort.ToString() <<
"\n";
166 TAddress addr =
co_await HostPort.
Resolve(Resolver);
167 Socket = SocketFactory(addr);
168 co_await Socket.Connect(addr );
171 auto sender = TActorId(0, 0, 0);
172 auto recipient = TActorId(0, 0, 0);
175 .Recipient = recipient,
178 co_await TByteWriter(Socket).Write(&data,
sizeof(data));
181 std::cout <<
"Connected to " << HostPort.ToString() <<
"\n";
182 }
catch (
const std::exception& ex) {
183 std::cerr <<
"Error connecting to " << HostPort.ToString() <<
": " << ex.what() <<
"\n";
187 co_await Poller.Sleep(std::chrono::milliseconds(1000));
193 bool Connected =
false;
194 TFuture<void> Drainer;
195 TFuture<void> Connector;
199 TMessagesFactory& Factory;
201 std::function<TSocket(TAddress&)> SocketFactory;
203 std::vector<char> OutputBuffer;
204 size_t UncommittedBytes = 0;
205 size_t CommittedBytes = 0;
206 std::vector<char> SendBuffer;
Actor system implementation with message passing and behavior support.
Interface for a remote node connection in the actor transport layer.
Definition node.hpp:38
virtual void Drain()=0
Calls StartConnect() then asynchronously writes all buffered bytes to the socket.
virtual void Send(TEnvelope &&envelope)=0
Serializes envelope and appends it to the outbound buffer.
virtual THostPort GetHostPort() const =0
Returns the remote host:port this node connects to.
virtual void StartConnect()=0
Initiates an async TCP connection if not already connected or connecting.
Abstract write-buffer interface used by the actor transport layer.
Definition node.hpp:22
virtual std::span< char > Acquire(size_t size)=0
Reserves size contiguous bytes in the buffer and returns a span over them.
virtual void Commit(size_t size)=0
Marks size previously acquired bytes as ready to send.
Message envelope containing routing and payload information.
Definition actor.hpp:119
Definition messages_factory.hpp:10
Concrete INode implementation for a single remote actor-system endpoint.
Definition node.hpp:61
THostPort GetHostPort() const override
Returns the remote host:port this node connects to.
Definition node.hpp:131
void StartConnect() override
Initiates an async TCP connection if not already connected or connecting.
Definition node.hpp:117
void Send(TEnvelope &&envelope) override
Serializes envelope and appends it to the outbound buffer.
Definition node.hpp:84
std::span< char > Acquire(size_t size) override
Reserves size contiguous bytes in the buffer and returns a span over them.
Definition node.hpp:103
void Commit(size_t size) override
Marks size previously acquired bytes as ready to send.
Definition node.hpp:112
TNode(TPoller &poller, TMessagesFactory &factory, TResolver &resolver, const std::function< TSocket(const TAddress &)> &socketFactory, const THostPort &hostPort)
Constructs a TNode but does not connect immediately.
Definition node.hpp:75
void Drain() override
Calls StartConnect() then asynchronously writes all buffered bytes to the socket.
Definition node.hpp:124
A class representing an IPv4 or IPv6 address (with port).
Definition address.hpp:38
A host:port pair that resolves to a TAddress.
Definition resolver.hpp:261
TFuture< TAddress > Resolve(TResolver &resolver)
Resolves the host to a TAddress using the given resolver.
Definition resolver.hpp:274
Async DNS resolver over UDP.
Definition resolver.hpp:115
A utility for writing data to a socket-like object.
Definition sockutils.hpp:239
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:262
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185