COROIO: coroio/actors/messages.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
messages.hpp
1#pragma once
2
3#include <memory>
4#include <sstream>
5#include <cstring>
6#include <type_traits>
7#include <functional>
8
9#include "actorid.hpp"
10
11namespace NNet {
12namespace NActors {
13
15 TContextDeleter() = default;
16
17 TContextDeleter(void (*func)(void*, void*), void* ctx)
18 : Release(func)
19 , Context(ctx)
20 { }
21
22 void operator()(void* ptr) {
23 Release(Context, ptr);
24 }
25
26 void (*Release)(void*, void*) = nullptr;
27 void* Context = nullptr;
28};
29
30template<typename TLambda>
32 static_assert(
33 std::is_empty_v<TLambda> || sizeof(TLambda) <= sizeof(void*),
34 "Lambda must be stateless or capture only one pointer-sized value");
35
36 // Stateless lambda to context deleter conversion
37 template<typename F>
38 static std::enable_if_t<std::is_empty_v<std::decay_t<F>>, TContextDeleter>
39 Convert(F&& lambda) {
40 auto func_ptr = +[](void* /*unused*/, void* ptr) {
41 std::decay_t<F>{}(ptr);
42 };
43 return TContextDeleter{func_ptr, nullptr};
44 }
45
46 template<typename F>
47 static std::enable_if_t<!std::is_empty_v<std::decay_t<F>>, TContextDeleter>
48 Convert(F&& lambda) {
49 void* captured_ptr = ExtractCapturedPointer(lambda);
50
51 auto func_ptr = +[](void* ctx, void* ptr) {
52 using LambdaType = std::decay_t<F>;
53 alignas(LambdaType) char buffer[sizeof(LambdaType)];
54 *reinterpret_cast<void**>(buffer) = ctx;
55 auto* lambda_ptr = reinterpret_cast<LambdaType*>(buffer);
56 (*lambda_ptr)(ptr);
57 };
58
59 return TContextDeleter{func_ptr, captured_ptr};
60 }
61
62private:
63 template<typename F>
64 static void* ExtractCapturedPointer(const F& lambda) {
65 return *reinterpret_cast<void* const*>(&lambda);
66 }
67};
68
70 TBlobDeleter() = default;
71
72 template<typename TFunc>
73 TBlobDeleter(TFunc&& func)
74 : Release(TLambdaToContextDeleter<TFunc>::Convert(std::forward<TFunc>(func)))
75 { }
76
77 void operator()(void* ptr) {
78 Release(ptr);
79 }
80
81 TContextDeleter Release;
82};
83
84struct TBlob {
85 using TRawPtr = std::unique_ptr<void, TBlobDeleter>;
86 //using TRawPtr = std::shared_ptr<void>;
87 TRawPtr Data = {};
88 uint32_t Size = 0;
89 enum class PointerType {
90 Near, // Pointer to the object (for actor communication)
91 Far // Serialized representation (for network transmission)
92 } Type;
93};
94
95template<typename T>
96constexpr bool is_pod_v = std::is_trivially_copyable_v<T> && std::is_standard_layout_v<T>;
97
98template<typename T, typename = void>
99struct has_data_members : std::true_type {};
100
101template<typename T>
102struct has_data_members<T, std::enable_if_t<
103 std::is_empty_v<T> &&
104 std::is_trivial_v<T> &&
105 std::is_standard_layout_v<T>
106>> : std::false_type {};
107
108template<typename T>
109constexpr bool has_data_members_v = has_data_members<T>::value;
110
111template<typename T>
112constexpr size_t sizeof_data() {
113 if constexpr (has_data_members_v<T>) {
114 return sizeof(T);
115 } else {
116 return 0;
117 }
118}
119
120template<typename T, typename TAllocator, typename... Args>
121typename std::enable_if_t<is_pod_v<T>, TBlob>
122SerializePodNear(TAllocator& alloc, Args&&... args)
123{
124 constexpr uint32_t size = sizeof_data<T>();
125
126 TBlob::TRawPtr rawPtr = nullptr;
127
128 if constexpr (size > 0) {
129 auto* data = alloc.Acquire(size);
130 new (data) T(std::forward<Args>(args)...);
131
132 rawPtr = TBlob::TRawPtr(data, TBlobDeleter{[&alloc](void* ptr) {
133 alloc.Release(ptr);
134 }});
135 }
136
137 return TBlob{std::move(rawPtr), size, TBlob::PointerType::Near};
138}
139
140template<typename T, typename TAllocator, typename... Args>
141typename std::enable_if_t<!is_pod_v<T>, TBlob>
142SerializeNonPodNear(TAllocator& alloc, Args&&... args)
143{
144 T* obj = new T(std::forward<Args>(args)...);
145
146 auto rawPtr = TBlob::TRawPtr(obj, TBlobDeleter{[](void* ptr) {
147 delete reinterpret_cast<T*>(ptr);
148 }});
149
150 return TBlob{std::move(rawPtr), sizeof(T), TBlob::PointerType::Near};
151}
152
153template<typename T, typename TAllocator, typename... Args>
154TBlob SerializeNear(TAllocator& alloc, Args&&... args)
155{
156 if constexpr (is_pod_v<T>) {
157 return SerializePodNear<T>(alloc, std::forward<Args>(args)...);
158 } else {
159 return SerializeNonPodNear<T>(alloc, std::forward<Args>(args)...);
160 }
161}
162
163template<typename T>
164void SerializeToStream(const T& obj, std::ostringstream& oss)
165{
166 static_assert(sizeof(T) == 0, "Serialization not implemented for this type");
167}
168
169template<typename T>
170TBlob SerializeFar(TBlob blob)
171{
172 if constexpr (is_pod_v<T>) {
173 blob.Type = TBlob::PointerType::Far;
174 return blob; // For POD, far == near, just share the pointer
175 } else {
176 T* obj = reinterpret_cast<T*>(blob.Data.get());
177 std::ostringstream oss;
178 SerializeToStream(*obj, oss);
179 void* data = ::operator new(oss.str().size());
180 std::memcpy(data, oss.str().data(), oss.str().size());
181 auto rawPtr = TBlob::TRawPtr(data, TBlobDeleter{[](void* ptr) {
182 ::operator delete(ptr);
183 }});
184 return TBlob{std::move(rawPtr), static_cast<uint32_t>(oss.str().size()), TBlob::PointerType::Far};
185 }
186}
187
188template<typename T, typename TStream, typename... Args>
189void SerializeFarInplace(TStream& stream, TActorId sender, TActorId recipient, Args&&... args)
190{
191 if constexpr (is_pod_v<T>) {
192 constexpr auto size = sizeof_data<T>() + sizeof(THeader);
193 auto buf = stream.Acquire(size);
194 char* p = static_cast<char*>(buf.data());
195 new (p) THeader {sender, recipient, T::MessageId, sizeof_data<T>()};
196 p += sizeof(THeader);
197 new (p) T(std::forward<Args>(args)...);
198 stream.Commit(size);
199 } else {
200 // TODO: optimize:
201 // 1. estimate size of serialized T
202 // 2. allocate enough space in stream
203 // 3. serialize T to stream
204 // 4. write header with size
205 // 5. commit the whole buffer
206 std::ostringstream oss;
207 SerializeToStream(T(std::forward<Args>(args)...), oss);
208 auto size = oss.str().size() + sizeof(THeader);
209 auto buf = stream.Acquire(size);
210 char* p = static_cast<char*>(buf.data());
211 new (p) THeader {sender, recipient, T::MessageId, static_cast<uint32_t>(oss.str().size())};
212 p += sizeof(THeader);
213 std::memcpy(p, oss.str().data(), oss.str().size());
214 stream.Commit(size);
215 }
216}
217
218template<typename T>
219auto DeserializeNear(const TBlob& blob) -> std::conditional_t<sizeof_data<T>() == 0, T, T&> {
220 if constexpr (sizeof_data<T>() == 0) {
221 return T{};
222 } else {
223 return *reinterpret_cast<T*>(blob.Data.get());
224 }
225}
226
227template<typename T>
228void DeserializeFromStream(T& obj, std::istringstream& iss) {
229 static_assert(sizeof(T) == 0, "Deserialization not implemented for this type");
230}
231
232template<typename T>
233auto DeserializeFar(const TBlob& blob) -> std::conditional_t<is_pod_v<T> && (sizeof_data<T>()>0), T&, T> {
234 if constexpr (is_pod_v<T>) {
235 return DeserializeNear<T>(blob);
236 } else {
237 std::istringstream iss(std::string(reinterpret_cast<const char*>(blob.Data.get()), blob.Size));
238 T obj;
239 DeserializeFromStream(obj, iss);
240 return obj;
241 }
242}
243
244} // namespace NActors
245} // namespace NNet
Definition messages.hpp:69
Definition messages.hpp:84
Definition messages.hpp:14
Definition messages.hpp:31
Definition messages.hpp:99