COROIO: coroio/actors/envelope_reader.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
envelope_reader.hpp
1#pragma once
2
3#include "actor.hpp"
4#include "queue.hpp"
5#include "intrusive_list.hpp"
6
7#include <deque>
8#include <span>
9#include <list>
10
11namespace NNet {
12namespace NActors {
13
15public:
16 TEnvelopeReader(size_t unused1 = 0, size_t unused2 = 0) { }
17 void Push(const char* buf, size_t size);
18
19 std::optional<TEnvelope> Pop();
20 bool NeedMoreData() const {
21 return !Messages.empty();
22 }
23 size_t Size() const {
24 return Buffer.size();
25 }
26
27private:
28 void Process();
29
30 bool HasHeader = false;
31 THeader Header;
32 std::queue<TEnvelope> Messages;
33 std::deque<char> Buffer;
34};
35
37public:
38 // capacity must be a power of two
39 TZeroCopyEnvelopeReader(size_t capacity = 1024, size_t unused = 0);
40 std::span<char> Acquire(size_t size);
41 void Commit(size_t size);
42 std::optional<TEnvelope> Pop();
43 size_t Size() const;
44 bool NeedMoreData() const {
45 return !HasHeader || Size() < Header.Size;
46 }
47
48 // for testing purposes
49 void Push(const char* p, size_t len);
50
51private:
52 void EnsureCapacity(size_t size);
53 template<size_t size>
54 void CopyOut(char* buf);
55 void CopyOut(char* buf, size_t size);
56 void TryReadHeader();
57
58 std::vector<char> Data;
59 size_t Head = 0;
60 size_t Tail = 0;
61 size_t LastIndex = 0;
62
63 bool HasHeader = false;
64 THeader Header;
65};
66
68public:
69 TZeroCopyEnvelopeReaderV2(size_t chunkSize = 1024 * 1024, size_t lowWatermark = 64 * 1024);
70
71 std::span<char> Acquire(size_t size);
72 void Commit(size_t size);
73 std::optional<TEnvelope> Pop();
74 size_t Size() const {
75 return CurrentSize;
76 }
77 bool NeedMoreData() const {
78 return !HasHeader || Size() < Header.Size;
79 }
80
81 // for testing purposes
82 void Push(const char* p, size_t len);
83
84 int UsedChunksCount() const {
85 return UsedChunks.Size();
86 }
87
88private:
89 struct TChunk;
90
91 void Rotate();
92 template<size_t size>
93 bool CopyOut(char* buf);
94 void CopyOut(char* buf, size_t size);
95 TBlob ExtractBlob(TChunk& chunk, size_t size);
96
97 struct TChunk : TIntrusiveListNode<TChunk> {
98 TChunk(size_t size, TZeroCopyEnvelopeReaderV2* parent);
99
100 std::span<char> TryAcquire(size_t size, size_t lowWatermark);
101 std::span<char> Acquire(size_t size);
102 void Commit(size_t size);
103 template<size_t size>
104 bool CopyOut(char* buf);
105 bool CopyOut(char* buf, size_t size);
106 size_t Size() const;
107 void Clear();
108
109 std::vector<char> Data;
110 size_t Head = 0;
111 size_t Tail = 0;
112 int UseCount = 0;
113
114 TZeroCopyEnvelopeReaderV2* Parent = nullptr;
115 };
116
117 const size_t ChunkSize;
118 const size_t LowWatermark;
119 size_t CurrentSize = 0;
120 THeader Header;
121 bool HasHeader = false;
122 std::unique_ptr<TChunk> CurrentChunk;
123 TIntrusiveList<TChunk> SealedChunks;
124 std::vector<std::unique_ptr<TChunk>> FreeChunks;
125 TIntrusiveList<TChunk> UsedChunks;
126};
127
128} // namespace NActors
129} // namespace NNet
Actor system implementation with message passing and behavior support.
Definition envelope_reader.hpp:14
Definition intrusive_list.hpp:16
Definition envelope_reader.hpp:67
Definition envelope_reader.hpp:36
Memory-efficient unbounded queue implementation for actor message passing.
Definition messages.hpp:84
Header for messages sent between actors. Used in remote communication and serialization.
Definition actorid.hpp:88
Definition intrusive_list.hpp:9