COROIO: coroio/sockutils.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
sockutils.hpp
1#pragma once
2
3#include <assert.h>
4#include <span>
5#include "corochain.hpp"
6
7namespace NNet {
8
9struct TLine {
10 std::string_view Part1;
11 std::string_view Part2;
12
13 size_t Size() const {
14 return Part1.size() + Part2.size();
15 }
16
17 operator bool() const {
18 return !Part1.empty();
19 }
20};
21
54template<typename TSocket>
61 : Socket(socket)
62 { }
63
80 TFuture<void> Read(void* data, size_t size) {
81 char* p = static_cast<char*>(data);
82
83 if (!Buffer.empty()) {
84 size_t toCopy = std::min(size, Buffer.size());
85 std::memcpy(p, Buffer.data(), toCopy);
86 p += toCopy;
87 size -= toCopy;
88 Buffer.erase(0, toCopy);
89 }
90
91 while (size != 0) {
92 auto readSize = co_await Socket.ReadSome(p, size);
93 if (readSize == 0) {
94 throw std::runtime_error("Connection closed");
95 }
96 if (readSize < 0) {
97 continue; // retry
98 }
99 p += readSize;
100 size -= readSize;
101 }
102 co_return;
103 }
104
121 TFuture<std::string> ReadUntil(const std::string& delimiter)
122 {
123 std::string result;
124 char tempBuffer[1024];
125
126 while (true) {
127 auto pos = std::search(Buffer.begin(), Buffer.end(), delimiter.begin(), delimiter.end());
128 if (pos != Buffer.end()) {
129 size_t delimiterOffset = std::distance(Buffer.begin(), pos);
130 result.append(Buffer.substr(0, delimiterOffset + delimiter.size()));
131 Buffer.erase(0, delimiterOffset + delimiter.size());
132 co_return result;
133 }
134
135 result.append(Buffer);
136 Buffer.clear();
137
138 auto readSize = co_await Socket.ReadSome(tempBuffer, sizeof(tempBuffer));
139 if (readSize == 0) {
140 throw std::runtime_error("Connection closed");
141 }
142 if (readSize < 0) {
143 continue; // retry
144 }
145
146 Buffer.append(tempBuffer, readSize);
147 }
148
149 co_return result;
150 }
151
152private:
153 TSocket& Socket;
154 std::string Buffer;
155};
156
189template<typename TSocket>
196 : Socket(socket)
197 { }
198
213 TFuture<void> Write(const void* data, size_t size) {
214 const char* p = static_cast<const char*>(data);
215 while (size != 0) {
216 auto readSize = co_await Socket.WriteSome(p, size);
217 if (readSize == 0) {
218 throw std::runtime_error("Connection closed");
219 }
220 if (readSize < 0) {
221 continue; // retry
222 }
223 p += readSize;
224 size -= readSize;
225 }
226 co_return;
227 }
228
240 TFuture<void> Write(const TLine& line) {
241 co_await Write(line.Part1.data(), line.Part1.size());
242 co_await Write(line.Part2.data(), line.Part2.size());
243 co_return;
244 }
245
246private:
247 TSocket& Socket;
248};
249
287template<typename T, typename TSocket>
294 : Socket(socket)
295 { }
296
312 T res;
313 size_t size = sizeof(T);
314 char* p = reinterpret_cast<char*>(&res);
315 while (size != 0) {
316 auto readSize = co_await Socket.ReadSome(p, size);
317 if (readSize == 0) {
318 throw std::runtime_error("Connection closed");
319 }
320 if (readSize < 0) {
321 continue; // retry
322 }
323 p += readSize;
324 size -= readSize;
325 }
326 co_return res;
327 }
328
329private:
330 TSocket& Socket;
331};
332
378public:
383 TLineSplitter(int maxLen);
409 void Push(const char* buf, size_t size);
410
411private:
412 size_t WPos;
413 size_t RPos;
414 size_t Size;
415 size_t Cap;
416 std::string Data;
417 std::string_view View;
418};
419
470public:
509 std::span<char> Acquire(size_t size);
518 void Commit(size_t size);
530 void Push(const char* p, size_t len);
531
532private:
533 size_t WPos;
534 size_t RPos;
535 size_t Size;
536 size_t Cap;
537 std::string Data;
538 std::string_view View;
539};
540
576template<typename TSocket>
587 TLineReader(TSocket& socket, int maxLineSize = 4096)
588 : Socket(socket)
589 , Splitter(maxLineSize)
590 , ChunkSize(maxLineSize / 2)
591 { }
592
604 auto line = Splitter.Pop();
605 while (!line) {
606 auto buf = Splitter.Acquire(ChunkSize);
607 auto size = co_await Socket.ReadSome(buf.data(), buf.size());
608 if (size < 0) {
609 continue;
610 }
611 if (size == 0) {
612 break;
613 }
614 Splitter.Commit(size);
615 line = Splitter.Pop();
616 }
617 co_return line;
618 }
619
620private:
621 TSocket& Socket;
622 TZeroCopyLineSplitter Splitter;
623 int ChunkSize;
624};
625
626} // namespace NNet {
High-level asynchronous socket for network communication.
Definition socket.hpp:364
Implementation of a promise/future system for coroutines.
TFuture< void > Read(void *data, size_t size)
Reads exactly size bytes and stores them into data.
Definition sockutils.hpp:80
TFuture< std::string > ReadUntil(const std::string &delimiter)
Reads data until the given delimiter is encountered.
Definition sockutils.hpp:121
TByteReader(TSocket &socket)
Constructs a reader for the given socket.
Definition sockutils.hpp:60
TFuture< void > Write(const TLine &line)
Writes a TLine object by sequentially writing its parts.
Definition sockutils.hpp:240
TByteWriter(TSocket &socket)
Constructs a writer for the given socket.
Definition sockutils.hpp:195
TFuture< void > Write(const void *data, size_t size)
Writes exactly size bytes from data to the socket.
Definition sockutils.hpp:213
Future type for coroutines returning a value of type T.
Definition corochain.hpp:177
Definition sockutils.hpp:9
TLineReader(TSocket &socket, int maxLineSize=4096)
Constructs a line reader with the given socket and maximum line size.
Definition sockutils.hpp:587
TFuture< TLine > Read()
Reads and returns the next complete line from the socket.
Definition sockutils.hpp:603
void Push(const char *buf, size_t size)
Appends new data to the circular buffer.
TLineSplitter(int maxLen)
Constructs a line splitter with a fixed ring buffer capacity.
TLine Pop()
Retrieves and removes the next complete line from the buffer.
TStructReader(TSocket &socket)
Constructs a reader with a reference to the given socket.
Definition sockutils.hpp:293
TFuture< T > Read()
Reads a single instance of type T from the socket.
Definition sockutils.hpp:311
Splits incoming data into lines using a fixed-size circular buffer, enabling zero-copy writes via Acq...
Definition sockutils.hpp:469
TLine Pop()
Extracts and removes the next complete line from the buffer, if available.
void Push(const char *p, size_t len)
(Optional) Copies data from an external buffer into the circular buffer.
void Commit(size_t size)
Finalizes the amount of data written into the span returned by Acquire().
std::span< char > Acquire(size_t size)
Reserves space in the circular buffer for writing data directly (e.g., from a socket read) without ex...
TZeroCopyLineSplitter(int maxLen)
Constructs a zero-copy line splitter with a fixed ring buffer capacity.