31 using TSocket =
typename TPoller::TSocket;
36 , SocketFactory(socketFactory)
40 void Send(
TEnvelope&& envelope)
override {
41 auto blob = std::move(envelope.Blob);
43 blob = Factory.SerializeFar(envelope.MessageId, std::move(blob));
46 .Sender = envelope.Sender,
47 .Recipient = envelope.Recipient,
48 .MessageId = envelope.MessageId,
51 OutputBuffer.insert(OutputBuffer.end(), (
char*)&data, (
char*)&data +
sizeof(data));
53 OutputBuffer.insert(OutputBuffer.end(), (
char*)blob.Data.get(), (
char*)blob.Data.get() + blob.Size);
57 void StartConnect()
override {
64 void Drain()
override {
66 if (Connected && (!Drainer.raw() || Drainer.done())) {
78 while (!OutputBuffer.empty()) {
79 SendBuffer.swap(OutputBuffer);
84 }
catch (
const std::exception& ex) {
85 std::cerr <<
"Error during draining: " << ex.what() <<
"\n";
92 if (Connector.raw() && !Connector.done()) {
96 Connector = DoConnect();
101 std::cout <<
"Connecting to " << HostPort.ToString() <<
"\n";
105 TAddress addr =
co_await HostPort.Resolve(Resolver);
106 Socket = SocketFactory(addr);
107 co_await Socket.Connect(addr );
114 .Recipient = recipient,
120 std::cout <<
"Connected to " << HostPort.ToString() <<
"\n";
121 }
catch (
const std::exception& ex) {
122 std::cerr <<
"Error connecting to " << HostPort.ToString() <<
": " << ex.what() <<
"\n";
126 co_await Poller.Sleep(std::chrono::milliseconds(1000));
132 bool Connected =
false;
140 std::function<TSocket(
TAddress&)> SocketFactory;
142 std::vector<char> OutputBuffer;
143 std::vector<char> SendBuffer;
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:222