COROIO: coroio/actors/messages.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
messages.hpp
1#pragma once
2
3#include <memory>
4#include <sstream>
5#include <cstring>
6#include <type_traits>
7#include <functional>
8
9#include "actorid.hpp"
10
11namespace NNet {
12namespace NActors {
13
15 TContextDeleter() = default;
16
17 TContextDeleter(void (*func)(void*, void*), void* ctx)
18 : Release(func)
19 , Context(ctx)
20 { }
21
22 void operator()(void* ptr) {
23 Release(Context, ptr);
24 }
25
26 void (*Release)(void*, void*) = nullptr;
27 void* Context = nullptr;
28};
29
30template<typename TLambda>
32 static_assert(
33 std::is_empty_v<TLambda> || sizeof(TLambda) <= sizeof(void*),
34 "Lambda must be stateless or capture only one pointer-sized value");
35
36 // Stateless lambda to context deleter conversion
37 template<typename F>
38 static std::enable_if_t<std::is_empty_v<std::decay_t<F>>, TContextDeleter>
39 Convert(F&& lambda) {
40 auto func_ptr = +[](void* /*unused*/, void* ptr) {
41 std::decay_t<F>{}(ptr);
42 };
43 return TContextDeleter{func_ptr, nullptr};
44 }
45
46 template<typename F>
47 static std::enable_if_t<!std::is_empty_v<std::decay_t<F>>, TContextDeleter>
48 Convert(F&& lambda) {
49 void* captured_ptr = ExtractCapturedPointer(lambda);
50
51 auto func_ptr = +[](void* ctx, void* ptr) {
52 using LambdaType = std::decay_t<F>;
53 alignas(LambdaType) char buffer[sizeof(LambdaType)];
54 *reinterpret_cast<void**>(buffer) = ctx;
55 auto* lambda_ptr = reinterpret_cast<LambdaType*>(buffer);
56 (*lambda_ptr)(ptr);
57 };
58
59 return TContextDeleter{func_ptr, captured_ptr};
60 }
61
62private:
63 template<typename F>
64 static void* ExtractCapturedPointer(const F& lambda) {
65 return *reinterpret_cast<void* const*>(&lambda);
66 }
67};
68
70 TBlobDeleter() = default;
71
72 template<typename TFunc>
73 TBlobDeleter(TFunc&& func)
74 : Release(TLambdaToContextDeleter<TFunc>::Convert(std::forward<TFunc>(func)))
75 { }
76
77 void operator()(void* ptr) {
78 Release(ptr);
79 }
80
81 TContextDeleter Release;
82};
83
99struct TBlob {
100 using TRawPtr = std::unique_ptr<void, TBlobDeleter>;
101 TRawPtr Data = {};
102 uint32_t Size = 0;
103 enum class PointerType {
104 Near,
105 Far
106 } Type;
107};
108
116template<typename T>
117constexpr bool is_pod_v = std::is_trivially_copyable_v<T> && std::is_standard_layout_v<T>;
118
119template<typename T, typename = void>
120struct has_data_members : std::true_type {};
121
122template<typename T>
123struct has_data_members<T, std::enable_if_t<
124 std::is_empty_v<T> &&
125 std::is_trivial_v<T> &&
126 std::is_standard_layout_v<T>
127>> : std::false_type {};
128
129template<typename T>
130constexpr bool has_data_members_v = has_data_members<T>::value;
131
132template<typename T>
133constexpr size_t sizeof_data() {
134 if constexpr (has_data_members_v<T>) {
135 return sizeof(T);
136 } else {
137 return 0;
138 }
139}
140
141template<typename T, typename TAllocator, typename... Args>
142typename std::enable_if_t<is_pod_v<T>, TBlob>
143SerializePodNear(TAllocator& alloc, Args&&... args)
144{
145 constexpr uint32_t size = sizeof_data<T>();
146
147 TBlob::TRawPtr rawPtr = nullptr;
148
149 if constexpr (size > 0) {
150 auto* data = alloc.Acquire(size);
151 new (data) T(std::forward<Args>(args)...);
152
153 rawPtr = TBlob::TRawPtr(data, TBlobDeleter{[&alloc](void* ptr) {
154 alloc.Release(ptr);
155 }});
156 }
157
158 return TBlob{std::move(rawPtr), size, TBlob::PointerType::Near};
159}
160
161template<typename T, typename TAllocator, typename... Args>
162typename std::enable_if_t<!is_pod_v<T>, TBlob>
163SerializeNonPodNear(TAllocator& alloc, Args&&... args)
164{
165 T* obj = new T(std::forward<Args>(args)...);
166
167 auto rawPtr = TBlob::TRawPtr(obj, TBlobDeleter{[](void* ptr) {
168 delete reinterpret_cast<T*>(ptr);
169 }});
170
171 return TBlob{std::move(rawPtr), sizeof(T), TBlob::PointerType::Near};
172}
173
185template<typename T, typename TAllocator, typename... Args>
186TBlob SerializeNear(TAllocator& alloc, Args&&... args)
187{
188 if constexpr (is_pod_v<T>) {
189 return SerializePodNear<T>(alloc, std::forward<Args>(args)...);
190 } else {
191 return SerializeNonPodNear<T>(alloc, std::forward<Args>(args)...);
192 }
193}
194
211template<typename T>
212void SerializeToStream(const T& obj, std::ostringstream& oss)
213{
214 static_assert(sizeof(T) == 0, "Serialization not implemented for this type");
215}
216
227template<typename T>
228TBlob SerializeFar(TBlob blob)
229{
230 if constexpr (is_pod_v<T>) {
231 blob.Type = TBlob::PointerType::Far;
232 return blob; // For POD, far == near, just share the pointer
233 } else {
234 T* obj = reinterpret_cast<T*>(blob.Data.get());
235 std::ostringstream oss;
236 SerializeToStream(*obj, oss);
237 void* data = ::operator new(oss.str().size());
238 std::memcpy(data, oss.str().data(), oss.str().size());
239 auto rawPtr = TBlob::TRawPtr(data, TBlobDeleter{[](void* ptr) {
240 ::operator delete(ptr);
241 }});
242 return TBlob{std::move(rawPtr), static_cast<uint32_t>(oss.str().size()), TBlob::PointerType::Far};
243 }
244}
245
262template<typename T, typename TStream, typename... Args>
263void SerializeFarInplace(TStream& stream, TActorId sender, TActorId recipient, Args&&... args)
264{
265 if constexpr (is_pod_v<T>) {
266 constexpr auto size = sizeof_data<T>() + sizeof(THeader);
267 auto buf = stream.Acquire(size);
268 char* p = static_cast<char*>(buf.data());
269 new (p) THeader {sender, recipient, T::MessageId, sizeof_data<T>()};
270 p += sizeof(THeader);
271 new (p) T(std::forward<Args>(args)...);
272 stream.Commit(size);
273 } else {
274 // TODO: optimize:
275 // 1. estimate size of serialized T
276 // 2. allocate enough space in stream
277 // 3. serialize T to stream
278 // 4. write header with size
279 // 5. commit the whole buffer
280 std::ostringstream oss;
281 SerializeToStream(T(std::forward<Args>(args)...), oss);
282 auto size = oss.str().size() + sizeof(THeader);
283 auto buf = stream.Acquire(size);
284 char* p = static_cast<char*>(buf.data());
285 new (p) THeader {sender, recipient, T::MessageId, static_cast<uint32_t>(oss.str().size())};
286 p += sizeof(THeader);
287 std::memcpy(p, oss.str().data(), oss.str().size());
288 stream.Commit(size);
289 }
290}
291
302template<typename T>
303auto DeserializeNear(const TBlob& blob) -> std::conditional_t<sizeof_data<T>() == 0, T, T&> {
304 if constexpr (sizeof_data<T>() == 0) {
305 return T{};
306 } else {
307 return *reinterpret_cast<T*>(blob.Data.get());
308 }
309}
310
325template<typename T>
326void DeserializeFromStream(T& obj, std::istringstream& iss) {
327 static_assert(sizeof(T) == 0, "Deserialization not implemented for this type");
328}
329
340template<typename T>
341auto DeserializeFar(const TBlob& blob) -> std::conditional_t<is_pod_v<T> && (sizeof_data<T>()>0), T&, T> {
342 if constexpr (is_pod_v<T>) {
343 return DeserializeNear<T>(blob);
344 } else {
345 std::istringstream iss(std::string(reinterpret_cast<const char*>(blob.Data.get()), blob.Size));
346 T obj;
347 DeserializeFromStream(obj, iss);
348 return obj;
349 }
350}
351
352} // namespace NActors
353} // namespace NNet
Definition messages.hpp:69
Opaque message payload with Near/Far duality.
Definition messages.hpp:99
uint32_t Size
Payload size in bytes (0 for empty/sentinel blobs)
Definition messages.hpp:102
TRawPtr Data
Owned payload (Near: object ptr; Far: byte buffer)
Definition messages.hpp:101
PointerType
Definition messages.hpp:103
@ Near
Live object pointer — valid only within the same process.
@ Far
Serialized byte buffer — safe to copy across the network.
Definition messages.hpp:14
Definition messages.hpp:31
Definition messages.hpp:120