COROIO: coroio/actors/node.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
node.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "envelope_reader.hpp"
5#include "messages_factory.hpp"
6
7#include <coroio/address.hpp>
8#include <coroio/dns/resolver.hpp>
9
10#include <span>
11
12namespace NNet {
13namespace NActors {
14
16public:
17 virtual ~IOutputStream() = default;
18 virtual std::span<char> Acquire(size_t size) = 0;
19 virtual void Commit(size_t size) = 0;
20};
21
22class INode : public IOutputStream {
23public:
24 virtual ~INode() = default;
25 virtual void Send(TEnvelope&& envelope) = 0;
26 virtual void StartConnect() = 0;
27 virtual void Drain() = 0;
28 virtual THostPort GetHostPort() const = 0;
29};
30
31template<typename TPoller>
32class TNode : public INode {
33public:
34 using TSocket = typename TPoller::TSocket;
35 TNode(TPoller& poller, TMessagesFactory& factory, TResolver& resolver, const std::function<TSocket(const TAddress&)>& socketFactory, const THostPort& hostPort)
36 : Poller(poller)
37 , Factory(factory)
38 , Resolver(resolver)
39 , SocketFactory(socketFactory)
40 , HostPort(hostPort)
41 { }
42
43 // TODO: remove me, envelope needed only for local actors
44 void Send(TEnvelope&& envelope) override {
45 auto blob = std::move(envelope.Blob);
46 if (blob.Size > 0) {
47 blob = Factory.SerializeFar(envelope.MessageId, std::move(blob));
48 }
49 THeader data{
50 .Sender = envelope.Sender,
51 .Recipient = envelope.Recipient,
52 .MessageId = envelope.MessageId,
53 .Size = blob.Size
54 };
55 auto buf = Acquire(sizeof(data) + blob.Size);
56 std::memcpy(buf.data(), &data, sizeof(data));
57 if (blob.Size > 0) {
58 std::memcpy(buf.data() + sizeof(data), blob.Data.get(), blob.Size);
59 }
60 Commit(sizeof(data) + blob.Size);
61 }
62
63 std::span<char> Acquire(size_t size) override {
64 if (UncommittedBytes + size > OutputBuffer.size()) {
65 OutputBuffer.resize(UncommittedBytes + size);
66 }
67 auto buf = std::span<char>(OutputBuffer.data() + UncommittedBytes, size);
68 UncommittedBytes += size;
69 return buf;
70 }
71
72 void Commit(size_t size) override {
73 CommittedBytes += size;
74 UncommittedBytes = CommittedBytes;
75 }
76
77 void StartConnect() override {
78 if (!Connected) {
79 Connect();
80 return;
81 }
82 }
83
84 void Drain() override {
85 StartConnect();
86 if (Connected && (!Drainer.raw() || Drainer.done())) {
87 Drainer = DoDrain();
88 }
89 }
90
91 THostPort GetHostPort() const override {
92 return HostPort;
93 }
94
95private:
96 TFuture<void> DoDrain() {
97 try {
98 while (CommittedBytes > 0) {
99 SendBuffer.swap(OutputBuffer);
100 auto readSize = CommittedBytes;
101 UncommittedBytes = CommittedBytes = 0;
102 co_await TByteWriter(Socket).Write(SendBuffer.data(), readSize);
103 }
104 co_return;
105 } catch (const std::exception& ex) {
106 std::cerr << "Error during draining: " << ex.what() << "\n";
107 Connect();
108 co_return;
109 }
110 }
111
112 void Connect() {
113 if (Connector.raw() && !Connector.done()) {
114 return;
115 }
116
117 Connector = DoConnect();
118 }
119
120 TFuture<void> DoConnect() {
121 Connected = false;
122 std::cout << "Connecting to " << HostPort.ToString() << "\n";
123 while (!Connected) {
124 try {
125 //auto deadline = NNet::TClock::now() + std::chrono::milliseconds(5000);
126 TAddress addr = co_await HostPort.Resolve(Resolver);
127 Socket = SocketFactory(addr);
128 co_await Socket.Connect(addr /*, deadline*/);
129 // Connection bug workaround:
130 {
131 auto sender = TActorId(0, 0, 0);
132 auto recipient = TActorId(0, 0, 0);
133 THeader data{
134 .Sender = sender,
135 .Recipient = recipient,
136 .MessageId = 0
137 };
138 co_await TByteWriter(Socket).Write(&data, sizeof(data));
139 }
140 Connected = true;
141 std::cout << "Connected to " << HostPort.ToString() << "\n";
142 } catch (const std::exception& ex) {
143 std::cerr << "Error connecting to " << HostPort.ToString() << ": " << ex.what() << "\n";
144 Connected = false;
145 }
146 if (!Connected) {
147 co_await Poller.Sleep(std::chrono::milliseconds(1000));
148 }
149 }
150 co_return;
151 }
152
153 bool Connected = false;
154 TFuture<void> Drainer;
155 TFuture<void> Connector;
156 TSocket Socket;
157
158 TPoller& Poller;
159 TMessagesFactory& Factory;
160 TResolver& Resolver;
161 std::function<TSocket(TAddress&)> SocketFactory;
162 THostPort HostPort;
163 std::vector<char> OutputBuffer;
164 size_t UncommittedBytes = 0;
165 size_t CommittedBytes = 0;
166 std::vector<char> SendBuffer;
167};
168
169} // namespace NActors
170} // namespace NNet
Actor system implementation with message passing and behavior support.
Definition node.hpp:22
Definition node.hpp:15
Unique identifier for actors in the system.
Definition actorid.hpp:26
Message envelope containing routing and payload information.
Definition actor.hpp:117
Definition messages_factory.hpp:10
Definition node.hpp:32
A class representing an IPv4 or IPv6 address (with port).
Definition address.hpp:38
Definition resolver.hpp:253
Resolves hostnames into IP addresses using a custom poller.
Definition resolver.hpp:118
Header for messages sent between actors. Used in remote communication and serialization.
Definition actorid.hpp:88
A utility for writing data to a socket-like object.
Definition sockutils.hpp:199
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:222
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182