COROIO: coroio/sockutils.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
sockutils.hpp
1#pragma once
2
3#include <assert.h>
4#include <span>
5#include <algorithm>
6#include "corochain.hpp"
7
8#ifdef min
9#undef min
10#endif
11
12#ifdef max
13#undef max
14#endif
15
16namespace NNet {
17
18struct TLine {
19 std::string_view Part1;
20 std::string_view Part2;
21
22 size_t Size() const {
23 return Part1.size() + Part2.size();
24 }
25
26 operator bool() const {
27 return !Part1.empty();
28 }
29};
30
63template<typename TSocket>
70 : Socket(socket)
71 { }
89 TFuture<void> Read(void* data, size_t size) {
90 char* p = static_cast<char*>(data);
91
92 if (!Buffer.empty()) {
93 size_t toCopy = std::min(size, Buffer.size());
94 std::memcpy(p, Buffer.data(), toCopy);
95 p += toCopy;
96 size -= toCopy;
97 Buffer.erase(0, toCopy);
98 }
99
100 while (size != 0) {
101 auto readSize = co_await Socket.ReadSome(p, size);
102 if (readSize == 0) {
103 throw std::runtime_error("Connection closed");
104 }
105 if (readSize < 0) {
106 continue; // retry
107 }
108 p += readSize;
109 size -= readSize;
110 }
111 co_return;
112 }
130 TFuture<std::string> ReadUntil(const std::string& delimiter)
131 {
132 std::string result;
133 char tempBuffer[1024];
134
135 while (true) {
136 auto pos = std::search(Buffer.begin(), Buffer.end(), delimiter.begin(), delimiter.end());
137 if (pos != Buffer.end()) {
138 size_t delimiterOffset = std::distance(Buffer.begin(), pos);
139 result.append(Buffer.substr(0, delimiterOffset + delimiter.size()));
140 Buffer.erase(0, delimiterOffset + delimiter.size());
141 co_return result;
142 }
143
144 result.append(Buffer);
145 Buffer.clear();
146
147 auto readSize = co_await Socket.ReadSome(tempBuffer, sizeof(tempBuffer));
148 if (readSize == 0) {
149 throw std::runtime_error("Connection closed");
150 }
151 if (readSize < 0) {
152 continue; // retry
153 }
154
155 Buffer.append(tempBuffer, readSize);
156 }
157
158 co_return result;
159 }
160
161private:
162 TSocket& Socket;
163 std::string Buffer;
164};
165
198template<typename TSocket>
205 : Socket(socket)
206 { }
222 TFuture<void> Write(const void* data, size_t size) {
223 const char* p = static_cast<const char*>(data);
224 while (size != 0) {
225 auto readSize = co_await Socket.WriteSome(p, size);
226 if (readSize == 0) {
227 throw std::runtime_error("Connection closed");
228 }
229 if (readSize < 0) {
230 continue; // retry
231 }
232 p += readSize;
233 size -= readSize;
234 }
235 co_return;
236 }
249 TFuture<void> Write(const TLine& line) {
250 co_await Write(line.Part1.data(), line.Part1.size());
251 co_await Write(line.Part2.data(), line.Part2.size());
252 co_return;
253 }
254
255private:
256 TSocket& Socket;
257};
258
296template<typename T, typename TSocket>
303 : Socket(socket)
304 { }
321 T res;
322 size_t size = sizeof(T);
323 char* p = reinterpret_cast<char*>(&res);
324 while (size != 0) {
325 auto readSize = co_await Socket.ReadSome(p, size);
326 if (readSize == 0) {
327 throw std::runtime_error("Connection closed");
328 }
329 if (readSize < 0) {
330 continue; // retry
331 }
332 p += readSize;
333 size -= readSize;
334 }
335 co_return res;
336 }
337
338private:
339 TSocket& Socket;
340};
341
387public:
392 TLineSplitter(int maxLen);
408 TLine Pop();
418 void Push(const char* buf, size_t size);
419
420private:
421 size_t WPos;
422 size_t RPos;
423 size_t Size;
424 size_t Cap;
425 std::string Data;
426 std::string_view View;
427};
428
479public:
484 TZeroCopyLineSplitter(int maxLen);
501 TLine Pop();
518 std::span<char> Acquire(size_t size);
527 void Commit(size_t size);
539 void Push(const char* p, size_t len);
540
541private:
542 size_t WPos;
543 size_t RPos;
544 size_t Size;
545 size_t Cap;
546 std::string Data;
547 std::string_view View;
548};
585template<typename TSocket>
596 TLineReader(TSocket& socket, int maxLineSize = 4096)
597 : Socket(socket)
598 , Splitter(maxLineSize)
599 , ChunkSize(maxLineSize / 2)
600 { }
613 auto line = Splitter.Pop();
614 while (!line) {
615 auto buf = Splitter.Acquire(ChunkSize);
616 auto size = co_await Socket.ReadSome(buf.data(), buf.size());
617 if (size < 0) {
618 continue;
619 }
620 if (size == 0) {
621 break;
622 }
623 Splitter.Commit(size);
624 line = Splitter.Pop();
625 }
626 co_return line;
627 }
628
629private:
630 TSocket& Socket;
631 TZeroCopyLineSplitter Splitter;
632 int ChunkSize;
633};
634
635} // namespace NNet {
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data from the provided buffer to the socket.
Definition socket.hpp:186
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket into the provided buffer.
Definition socket.hpp:138
High-level asynchronous socket for network communication.
Definition socket.hpp:364
Implementation of a promise/future system for coroutines.
A utility for reading data from a socket-like object, either a fixed number of bytes or until a speci...
Definition sockutils.hpp:64
TFuture< void > Read(void *data, size_t size)
Reads exactly size bytes and stores them into data.
Definition sockutils.hpp:89
TFuture< std::string > ReadUntil(const std::string &delimiter)
Reads data until the given delimiter is encountered.
Definition sockutils.hpp:130
TByteReader(TSocket &socket)
Constructs a reader for the given socket.
Definition sockutils.hpp:69
A utility for writing data to a socket-like object.
Definition sockutils.hpp:199
TFuture< void > Write(const TLine &line)
Writes a TLine object by sequentially writing its parts.
Definition sockutils.hpp:249
TByteWriter(TSocket &socket)
Constructs a writer for the given socket.
Definition sockutils.hpp:204
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:222
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182
Reads a complete line from a socket using a zero-copy line splitter.
Definition sockutils.hpp:586
TLineReader(TSocket &socket, int maxLineSize=4096)
Constructs a line reader with the given socket and maximum line size.
Definition sockutils.hpp:596
TFuture< TLine > Read()
Reads and returns the next complete line from the socket.
Definition sockutils.hpp:612
Splits incoming data into lines using a circular buffer of fixed capacity.
Definition sockutils.hpp:386
void Push(const char *buf, size_t size)
Appends new data to the circular buffer.
Definition sockutils.cpp:37
TLine Pop()
Retrieves and removes the next complete line from the buffer.
Definition sockutils.cpp:16
Definition sockutils.hpp:18
A utility for reading a fixed-size structure of type T from a socket-like object.
Definition sockutils.hpp:297
TStructReader(TSocket &socket)
Constructs a reader with a reference to the given socket.
Definition sockutils.hpp:302
TFuture< T > Read()
Reads a single instance of type T from the socket.
Definition sockutils.hpp:320
Splits incoming data into lines using a fixed-size circular buffer, enabling zero-copy writes via Acq...
Definition sockutils.hpp:478
TLine Pop()
Extracts and removes the next complete line from the buffer, if available.
Definition sockutils.cpp:58
void Push(const char *p, size_t len)
(Optional) Copies data from an external buffer into the circular buffer.
Definition sockutils.cpp:97
void Commit(size_t size)
Finalizes the amount of data written into the span returned by Acquire().
Definition sockutils.cpp:92
std::span< char > Acquire(size_t size)
Reserves space in the circular buffer for writing data directly (e.g., from a socket read) without ex...
Definition sockutils.cpp:79