34 using TSocket =
typename TPoller::TSocket;
39 , SocketFactory(socketFactory)
44 void Send(
TEnvelope&& envelope)
override {
45 auto blob = std::move(envelope.Blob);
47 blob = Factory.SerializeFar(envelope.MessageId, std::move(blob));
50 .Sender = envelope.Sender,
51 .Recipient = envelope.Recipient,
52 .MessageId = envelope.MessageId,
55 auto buf = Acquire(
sizeof(data) + blob.Size);
56 std::memcpy(buf.data(), &data,
sizeof(data));
58 std::memcpy(buf.data() +
sizeof(data), blob.Data.get(), blob.Size);
60 Commit(
sizeof(data) + blob.Size);
63 std::span<char> Acquire(
size_t size)
override {
64 if (UncommittedBytes + size > OutputBuffer.size()) {
65 OutputBuffer.resize(UncommittedBytes + size);
67 auto buf = std::span<char>(OutputBuffer.data() + UncommittedBytes, size);
68 UncommittedBytes += size;
72 void Commit(
size_t size)
override {
73 CommittedBytes += size;
74 UncommittedBytes = CommittedBytes;
77 void StartConnect()
override {
84 void Drain()
override {
86 if (Connected && (!Drainer.raw() || Drainer.done())) {
98 while (CommittedBytes > 0) {
99 SendBuffer.swap(OutputBuffer);
100 auto readSize = CommittedBytes;
101 UncommittedBytes = CommittedBytes = 0;
105 }
catch (
const std::exception& ex) {
106 std::cerr <<
"Error during draining: " << ex.what() <<
"\n";
113 if (Connector.raw() && !Connector.done()) {
117 Connector = DoConnect();
122 std::cout <<
"Connecting to " << HostPort.ToString() <<
"\n";
126 TAddress addr =
co_await HostPort.Resolve(Resolver);
127 Socket = SocketFactory(addr);
128 co_await Socket.Connect(addr );
135 .Recipient = recipient,
141 std::cout <<
"Connected to " << HostPort.ToString() <<
"\n";
142 }
catch (
const std::exception& ex) {
143 std::cerr <<
"Error connecting to " << HostPort.ToString() <<
": " << ex.what() <<
"\n";
147 co_await Poller.Sleep(std::chrono::milliseconds(1000));
153 bool Connected =
false;
161 std::function<TSocket(
TAddress&)> SocketFactory;
163 std::vector<char> OutputBuffer;
164 size_t UncommittedBytes = 0;
165 size_t CommittedBytes = 0;
166 std::vector<char> SendBuffer;
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:222