COROIO: coroio/pipe/pipe.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
pipe.hpp
1#pragma once
2
4
5namespace NNet {
6
7#ifndef _WIN32
8
9class TPipe {
10public:
11 template<typename TPoller>
12 TPipe(TPoller& poller, const std::string& exe, const std::vector<std::string>& args, bool stderrToStdout = false)
13 : PipeLow(exe, args, stderrToStdout)
14 {
15 PipeLow.Fork();
16
17 ReadHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.ReadFd, poller);
18 WriteHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.WriteFd, poller);
19 if (!stderrToStdout) {
20 ErrHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.ErrFd, poller);
21 }
22 }
23
24 int Pid() const { return PipeLow.ChildPid; }
25
26 void CloseRead() {
27 ReadHandle.reset();
28 }
29
30 void CloseWrite() {
31 WriteHandle.reset();
32 }
33
34 void CloseErr() {
35 ErrHandle.reset();
36 }
37
38 int Wait();
39
40 TFuture<ssize_t> ReadSome(void* buffer, size_t size);
41 TFuture<ssize_t> ReadSomeErr(void* buffer, size_t size);
42 TFuture<ssize_t> WriteSome(const void* buffer, size_t size);
43
44private:
45 struct TPipeLow {
46 TPipeLow(const std::string& exe, const std::vector<std::string>& args, bool mergeErr);
47 ~TPipeLow();
48
49 void Fork();
50
51 std::string Exe;
52 std::vector<std::string> Args;
53
54 // Descriptors are owned by TPipeFileHandle, do not close them!
55 int ReadFd = -1;
56 int WriteFd = -1;
57 int ErrFd = -1;
58 int ChildPid = -1;
59 bool StderrToStdout = false;
60 };
61
62 struct TTypelessFileHandle {
63 virtual ~TTypelessFileHandle() = default;
64 virtual TFuture<ssize_t> ReadSome(void* buffer, size_t size) = 0;
65 virtual TFuture<ssize_t> WriteSome(const void* buffer, size_t size) = 0;
66 };
67
68 template<typename TPoller>
69 struct TPipeFileHandle : public TTypelessFileHandle {
70 TPipeFileHandle(int fd, TPoller& poller)
71 : Handle(fd, poller)
72 { }
73
74 TFuture<ssize_t> ReadSome(void* buffer, size_t size) override {
75 co_return co_await Handle.ReadSome(buffer, size);
76 }
77 TFuture<ssize_t> WriteSome(const void* buffer, size_t size) override {
78 co_return co_await Handle.WriteSome(buffer, size);
79 }
80
81 typename TPoller::TFileHandle Handle;
82 };
83
84 TPipeLow PipeLow;
85 std::unique_ptr<TTypelessFileHandle> ReadHandle;
86 std::unique_ptr<TTypelessFileHandle> WriteHandle;
87 std::unique_ptr<TTypelessFileHandle> ErrHandle;
88};
89
90#endif // _WIN32
91
92} // namespace NNet {
Definition pipe.hpp:9
Implementation of a promise/future system for coroutines.
Future type for coroutines returning a value of type T.
Definition corochain.hpp:182