COROIO: coroio/sockutils.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
sockutils.hpp
1#pragma once
2
3#include <assert.h>
4#include <span>
5#include <algorithm>
6#include "corochain.hpp"
7
8#ifdef min
9#undef min
10#endif
11
12#ifdef max
13#undef max
14#endif
15
16namespace NNet {
17
30struct TLine {
31 std::string_view Part1;
32 std::string_view Part2;
33
34 size_t Size() const {
35 return Part1.size() + Part2.size();
36 }
37
38 operator bool() const {
39 return !Part1.empty();
40 }
41};
42
75template<typename TSocket>
82 : Socket(socket)
83 { }
101 TFuture<void> Read(void* data, size_t size) {
102 char* p = static_cast<char*>(data);
103
104 if (!Buffer.empty()) {
105 size_t toCopy = std::min(size, Buffer.size());
106 std::copy(Buffer.begin(), Buffer.begin() + toCopy, p);
107 p += toCopy;
108 size -= toCopy;
109 Buffer.erase(Buffer.begin(), Buffer.begin() + toCopy);
110 }
111
112 while (size != 0) {
113 auto readSize = co_await Socket.ReadSome(p, size);
114 if (readSize == 0) {
115 throw std::runtime_error("Connection closed");
116 }
117 if (readSize < 0) {
118 continue; // retry
119 }
120 p += readSize;
121 size -= readSize;
122 }
123 co_return;
124 }
125
126 TFuture<ssize_t> ReadSome(void* data, size_t size) {
127 char tempBuffer[1024];
128 // read by chunks, copy to user up-to size bytes
129
130 if (!Buffer.empty()) {
131 size_t toCopy = std::min(size, Buffer.size());
132 std::copy(Buffer.begin(), Buffer.begin() + toCopy, static_cast<char*>(data));
133 Buffer.erase(Buffer.begin(), Buffer.begin() + toCopy);
134 co_return toCopy;
135 }
136
137 auto readSize = co_await Socket.ReadSome(tempBuffer, sizeof(tempBuffer));
138 if (readSize == 0) {
139 co_return 0;
140 }
141 if (readSize < 0) {
142 co_return -1; // retry
143 }
144
145 size_t toCopy = std::min(static_cast<size_t>(readSize), size);
146 std::copy(tempBuffer, tempBuffer + toCopy, static_cast<char*>(data));
147 if (static_cast<size_t>(readSize) > toCopy) {
148 Buffer.insert(Buffer.end(), tempBuffer + toCopy, tempBuffer + readSize);
149 }
150 co_return toCopy;
151 }
152
170 TFuture<std::string> ReadUntil(const std::string& delimiter)
171 {
172 std::string result;
173 char tempBuffer[1024];
174
175 while (true) {
176 auto pos = std::search(Buffer.begin(), Buffer.end(), delimiter.begin(), delimiter.end());
177 if (pos != Buffer.end()) {
178 size_t delimiterOffset = std::distance(Buffer.begin(), pos);
179 result.insert(result.end(), Buffer.begin(), Buffer.begin() + delimiterOffset + delimiter.size());
180 Buffer.erase(Buffer.begin(), Buffer.begin() + delimiterOffset + delimiter.size());
181 co_return result;
182 }
183
184 result.insert(result.end(), Buffer.begin(), Buffer.end());
185 Buffer.clear();
186
187 auto readSize = co_await Socket.ReadSome(tempBuffer, sizeof(tempBuffer));
188 if (readSize == 0) {
189 throw std::runtime_error("Connection closed");
190 }
191 if (readSize < 0) {
192 continue; // retry
193 }
194
195 Buffer.insert(Buffer.end(), tempBuffer, tempBuffer + readSize);
196 }
197
198 co_return result;
199 }
200
201private:
202 TSocket& Socket;
203 std::deque<char> Buffer;
204};
205
238template<typename TSocket>
245 : Socket(socket)
246 { }
262 TFuture<void> Write(const void* data, size_t size) {
263 const char* p = static_cast<const char*>(data);
264 while (size != 0) {
265 auto readSize = co_await Socket.WriteSome(p, size);
266 if (readSize == 0) {
267 throw std::runtime_error("Connection closed");
268 }
269 if (readSize < 0) {
270 continue; // retry
271 }
272 p += readSize;
273 size -= readSize;
274 }
275 co_return;
276 }
289 TFuture<void> Write(const TLine& line) {
290 co_await Write(line.Part1.data(), line.Part1.size());
291 co_await Write(line.Part2.data(), line.Part2.size());
292 co_return;
293 }
294
295private:
296 TSocket& Socket;
297};
298
336template<typename T, typename TSocket>
343 : Socket(socket)
344 { }
361 T res;
362 size_t size = sizeof(T);
363 char* p = reinterpret_cast<char*>(&res);
364 while (size != 0) {
365 auto readSize = co_await Socket.ReadSome(p, size);
366 if (readSize == 0) {
367 throw std::runtime_error("Connection closed");
368 }
369 if (readSize < 0) {
370 continue; // retry
371 }
372 p += readSize;
373 size -= readSize;
374 }
375 co_return res;
376 }
377
378private:
379 TSocket& Socket;
380};
381
427public:
432 TLineSplitter(int maxLen);
448 TLine Pop();
458 void Push(const char* buf, size_t size);
459
460private:
461 size_t WPos;
462 size_t RPos;
463 size_t Size;
464 size_t Cap;
465 std::string Data;
466 std::string_view View;
467};
468
519public:
524 TZeroCopyLineSplitter(int maxLen);
541 TLine Pop();
558 std::span<char> Acquire(size_t size);
567 void Commit(size_t size);
579 void Push(const char* p, size_t len);
580
581private:
582 size_t WPos;
583 size_t RPos;
584 size_t Size;
585 size_t Cap;
586 std::string Data;
587 std::string_view View;
588};
625template<typename TSocket>
636 TLineReader(TSocket& socket, int maxLineSize = 4096)
637 : Socket(socket)
638 , Splitter(maxLineSize)
639 , ChunkSize(maxLineSize / 2)
640 { }
653 auto line = Splitter.Pop();
654 while (!line) {
655 auto buf = Splitter.Acquire(ChunkSize);
656 auto size = co_await Socket.ReadSome(buf.data(), buf.size());
657 if (size < 0) {
658 continue;
659 }
660 if (size == 0) {
661 break;
662 }
663 Splitter.Commit(size);
664 line = Splitter.Pop();
665 }
666 co_return line;
667 }
668
669private:
670 TSocket& Socket;
671 TZeroCopyLineSplitter Splitter;
672 int ChunkSize;
673};
674
675} // namespace NNet {
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data from the provided buffer to the socket.
Definition socket.hpp:189
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket into the provided buffer.
Definition socket.hpp:139
High-level asynchronous socket for network communication.
Definition socket.hpp:367
Implementation of a promise/future system for coroutines.
A utility for reading data from a socket-like object, either a fixed number of bytes or until a speci...
Definition sockutils.hpp:76
TFuture< void > Read(void *data, size_t size)
Reads exactly size bytes and stores them into data.
Definition sockutils.hpp:101
TFuture< std::string > ReadUntil(const std::string &delimiter)
Reads data until the given delimiter is encountered.
Definition sockutils.hpp:170
TByteReader(TSocket &socket)
Constructs a reader for the given socket.
Definition sockutils.hpp:81
A utility for writing data to a socket-like object.
Definition sockutils.hpp:239
TFuture< void > Write(const TLine &line)
Writes a TLine object by sequentially writing its parts.
Definition sockutils.hpp:289
TByteWriter(TSocket &socket)
Constructs a writer for the given socket.
Definition sockutils.hpp:244
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:262
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185
Reads a complete line from a socket using a zero-copy line splitter.
Definition sockutils.hpp:626
TLineReader(TSocket &socket, int maxLineSize=4096)
Constructs a line reader with the given socket and maximum line size.
Definition sockutils.hpp:636
TFuture< TLine > Read()
Reads and returns the next complete line from the socket.
Definition sockutils.hpp:652
Splits incoming data into lines using a circular buffer of fixed capacity.
Definition sockutils.hpp:426
void Push(const char *buf, size_t size)
Appends new data to the circular buffer.
Definition sockutils.cpp:37
TLine Pop()
Retrieves and removes the next complete line from the buffer.
Definition sockutils.cpp:16
A line extracted from a circular read buffer, represented in up to two parts.
Definition sockutils.hpp:30
std::string_view Part2
Continuation segment if the line wraps; empty otherwise.
Definition sockutils.hpp:32
std::string_view Part1
First (or only) segment of the line.
Definition sockutils.hpp:31
A utility for reading a fixed-size structure of type T from a socket-like object.
Definition sockutils.hpp:337
TStructReader(TSocket &socket)
Constructs a reader with a reference to the given socket.
Definition sockutils.hpp:342
TFuture< T > Read()
Reads a single instance of type T from the socket.
Definition sockutils.hpp:360
Splits incoming data into lines using a fixed-size circular buffer, enabling zero-copy writes via Acq...
Definition sockutils.hpp:518
TLine Pop()
Extracts and removes the next complete line from the buffer, if available.
Definition sockutils.cpp:58
void Push(const char *p, size_t len)
(Optional) Copies data from an external buffer into the circular buffer.
Definition sockutils.cpp:97
void Commit(size_t size)
Finalizes the amount of data written into the span returned by Acquire().
Definition sockutils.cpp:92
std::span< char > Acquire(size_t size)
Reserves space in the circular buffer for writing data directly (e.g., from a socket read) without ex...
Definition sockutils.cpp:79