COROIO: coroio/pipe/pipe.hpp Source File
COROIO
 
Loading...
Searching...
No Matches
pipe.hpp
1#pragma once
2
4
5namespace NNet {
6
7#ifndef _WIN32
8
30class TPipe {
31public:
41 template<typename TPoller>
42 TPipe(TPoller& poller, const std::string& exe, const std::vector<std::string>& args, bool stderrToStdout = false)
43 : PipeLow(exe, args, stderrToStdout)
44 {
45 PipeLow.Fork();
46
47 ReadHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.ReadFd, poller);
48 WriteHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.WriteFd, poller);
49 if (!stderrToStdout) {
50 ErrHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.ErrFd, poller);
51 }
52 }
53
55 int Pid() const { return PipeLow.ChildPid; }
56
58 void CloseRead() {
59 ReadHandle.reset();
60 }
61
63 void CloseWrite() {
64 WriteHandle.reset();
65 }
66
68 void CloseErr() {
69 ErrHandle.reset();
70 }
71
80 int Wait();
81
88 TFuture<ssize_t> ReadSome(void* buffer, size_t size);
89
96 TFuture<ssize_t> ReadSomeErr(void* buffer, size_t size);
97
104 TFuture<ssize_t> WriteSome(const void* buffer, size_t size);
105
106private:
107 struct TPipeLow {
108 TPipeLow(const std::string& exe, const std::vector<std::string>& args, bool mergeErr);
109 ~TPipeLow();
110
111 void Fork();
112
113 std::string Exe;
114 std::vector<std::string> Args;
115
116 // Descriptors are owned by TPipeFileHandle, do not close them!
117 int ReadFd = -1;
118 int WriteFd = -1;
119 int ErrFd = -1;
120 int ChildPid = -1;
121 bool StderrToStdout = false;
122 };
123
124 struct TTypelessFileHandle {
125 virtual ~TTypelessFileHandle() = default;
126 virtual TFuture<ssize_t> ReadSome(void* buffer, size_t size) = 0;
127 virtual TFuture<ssize_t> WriteSome(const void* buffer, size_t size) = 0;
128 };
129
130 template<typename TPoller>
131 struct TPipeFileHandle : public TTypelessFileHandle {
132 TPipeFileHandle(int fd, TPoller& poller)
133 : Handle(fd, poller)
134 { }
135
136 TFuture<ssize_t> ReadSome(void* buffer, size_t size) override {
137 co_return co_await Handle.ReadSome(buffer, size);
138 }
139 TFuture<ssize_t> WriteSome(const void* buffer, size_t size) override {
140 co_return co_await Handle.WriteSome(buffer, size);
141 }
142
143 typename TPoller::TFileHandle Handle;
144 };
145
146 TPipeLow PipeLow;
147 std::unique_ptr<TTypelessFileHandle> ReadHandle;
148 std::unique_ptr<TTypelessFileHandle> WriteHandle;
149 std::unique_ptr<TTypelessFileHandle> ErrHandle;
150};
151
152#endif // _WIN32
153
154} // namespace NNet {
Spawns a child process and exposes its stdin/stdout/stderr as async handles.
Definition pipe.hpp:30
int Wait()
Waits for the child process to exit and returns its exit status.
Definition pipe.cpp:187
void CloseRead()
Closes the read end of the child's stdout pipe.
Definition pipe.hpp:58
TPipe(TPoller &poller, const std::string &exe, const std::vector< std::string > &args, bool stderrToStdout=false)
Spawns exe with args and wires up async I/O handles.
Definition pipe.hpp:42
void CloseWrite()
Closes the write end of the child's stdin pipe, sending EOF to the child.
Definition pipe.hpp:63
TFuture< ssize_t > ReadSome(void *buffer, size_t size)
Reads up to size bytes from the child's stdout.
Definition pipe.cpp:171
TFuture< ssize_t > WriteSome(const void *buffer, size_t size)
Writes up to size bytes to the child's stdin.
Definition pipe.cpp:183
TFuture< ssize_t > ReadSomeErr(void *buffer, size_t size)
Reads up to size bytes from the child's stderr.
Definition pipe.cpp:175
void CloseErr()
Closes the read end of the child's stderr pipe.
Definition pipe.hpp:68
int Pid() const
Returns the child's process ID.
Definition pipe.hpp:55
Implementation of a promise/future system for coroutines.
Owned coroutine handle that carries a result of type T.
Definition corochain.hpp:185