10#include <sys/socket.h>
12#include <netinet/in.h>
25template<
typename T>
class TSocketBase;
61 int Create(
int domain,
int type);
98template<
typename TSockOps>
140 struct TAwaitableRead:
public TAwaitable<TAwaitableRead> {
142 this->ret = TSockOps::read(this->fd, this->b, this->s);
145 void await_suspend(std::coroutine_handle<> h) {
146 this->poller->AddRead(this->fd, h);
149 return TAwaitableRead{Poller_,Fd_,buf,size};
163 struct TAwaitableRead:
public TAwaitable<TAwaitableRead> {
165 return (this->ready =
false);
169 this->ret = TSockOps::read(this->fd, this->b, this->s);
172 void await_suspend(std::coroutine_handle<> h) {
173 this->poller->AddRead(this->fd, h);
176 return TAwaitableRead{Poller_,Fd_,buf,size};
190 struct TAwaitableWrite:
public TAwaitable<TAwaitableWrite> {
192 this->ret = TSockOps::write(this->fd, this->b, this->s);
195 void await_suspend(std::coroutine_handle<> h) {
196 this->poller->AddWrite(this->fd, h);
199 return TAwaitableWrite{Poller_,Fd_,
const_cast<void*
>(buf),size};
212 struct TAwaitableWrite:
public TAwaitable<TAwaitableWrite> {
214 return (this->ready =
false);
218 this->ret = TSockOps::write(this->fd, this->b, this->s);
221 void await_suspend(std::coroutine_handle<> h) {
222 this->poller->AddWrite(this->fd, h);
225 return TAwaitableWrite{Poller_,Fd_,
const_cast<void*
>(buf),size};
236 struct TAwaitableClose:
public TAwaitable<TAwaitableClose> {
241 void await_suspend(std::coroutine_handle<> h) {
242 this->poller->AddRemoteHup(this->fd, h);
245 return TAwaitableClose{Poller_,Fd_};
256 TSockOps::close(Fd_);
257 Poller_->RemoveEvent(Fd_);
267 return (ready = (ret >= 0));
280 if (ret < 0 && WSAGetLastError() != WSAEWOULDBLOCK ) {
281 throw std::system_error(WSAGetLastError(), std::generic_category());
284 if (ret < 0 && !(errno==EINTR||errno==EAGAIN||errno==EINPROGRESS)) {
285 throw std::system_error(errno, std::generic_category());
292 void* b =
nullptr;
size_t s = 0;
300 static auto read(
int fd,
void* buf,
size_t count) {
301 return ::read(fd, buf, count);
304 static auto write(
int fd,
const void* buf,
size_t count) {
305 return ::write(fd, buf, count);
308 static auto close(
int fd) {
340 static auto read(
int fd,
void* buf,
size_t count) {
341 return ::recv(fd,
static_cast<char*
>(buf), count, 0);
344 static auto write(
int fd,
const void* buf,
size_t count) {
345 return ::send(fd,
static_cast<const char*
>(buf), count, 0);
348 static auto close(
int fd) {
406 if (RemoteAddr_.has_value()) {
407 throw std::runtime_error(
"Already connected");
412 int ret = connect(fd, addr.first, addr.second);
414 if (ret < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
415 throw std::system_error(WSAGetLastError(), std::generic_category(),
"connect");
418 if (ret < 0 && !(errno == EINTR||errno==EAGAIN||errno==EINPROGRESS)) {
419 throw std::system_error(errno, std::generic_category(),
"connect");
425 void await_suspend(std::coroutine_handle<> h) {
427 if (deadline != TTime::max()) {
428 timerId = poller->
AddTimer(deadline, h);
432 void await_resume() {
433 if (deadline != TTime::max() && poller->
RemoveTimer(timerId, deadline)) {
434 throw std::system_error(std::make_error_code(std::errc::timed_out));
440 std::pair<const sockaddr*, int> addr;
442 unsigned timerId = 0;
444 return TAwaitable{Poller_, Fd_, RemoteAddr_->RawAddr(), deadline};
458 bool await_ready()
const {
return false; }
459 void await_suspend(std::coroutine_handle<> h) {
463 char clientaddr[
sizeof(sockaddr_in6)];
464 socklen_t len =
static_cast<socklen_t
>(
sizeof(sockaddr_in6));
466 int clientfd = accept(fd,
reinterpret_cast<sockaddr*
>(&clientaddr[0]), &len);
468 throw std::system_error(errno, std::generic_category(),
"accept");
471 return TSocket{
TAddress{
reinterpret_cast<sockaddr*
>(&clientaddr[0]), len}, clientfd, *poller};
484 void Listen(
int backlog = 128);
490 const std::optional<TAddress>&
RemoteAddr()
const;
496 const std::optional<TAddress>&
LocalAddr()
const;
501 std::optional<TAddress> LocalAddr_;
502 std::optional<TAddress> RemoteAddr_;
536 :
TSocket(poller, domain, type)
539 Poller_->Register(Fd_);
552 Poller_->Register(Fd_);
559 Poller_->Register(Fd_);
574 bool await_ready()
const {
return false; }
575 void await_suspend(std::coroutine_handle<> h) {
576 poller->Accept(fd,
reinterpret_cast<sockaddr*
>(&addr[0]), &len, h);
580 int clientfd = poller->Result();
582 throw std::system_error(-clientfd, std::generic_category(),
"accept");
591 char addr[2*(
sizeof(sockaddr_in6)+16)] = {0};
592 socklen_t len =
static_cast<socklen_t
>(
sizeof(addr));
611 if (RemoteAddr_.has_value()) {
612 throw std::runtime_error(
"Already connected");
616 bool await_ready()
const {
return false; }
618 void await_suspend(std::coroutine_handle<> h) {
619 poller->Connect(fd, addr.first, addr.second, h);
620 if (deadline != TTime::max()) {
621 timerId = poller->
AddTimer(deadline, h);
625 void await_resume() {
626 if (deadline != TTime::max() && poller->
RemoveTimer(timerId, deadline)) {
628 throw std::system_error(std::make_error_code(std::errc::timed_out));
630 int ret = poller->Result();
632 throw std::system_error(-ret, std::generic_category(),
"connect");
638 std::pair<const sockaddr*, int> addr;
640 unsigned timerId = 0;
657 bool await_ready()
const {
return false; }
658 void await_suspend(std::coroutine_handle<> h) {
659 poller->Recv(fd, buf, size, h);
662 auto await_resume() {
663 auto ret = poller->Result();
667 if (err == WSAEWOULDBLOCK || err == WSAEINTR || err == WSAEINPROGRESS) {
672 if (err == EINTR || err == EAGAIN || err == EINPROGRESS) {
676 throw std::system_error(-ret, std::generic_category());
703 bool await_ready()
const {
return false; }
704 void await_suspend(std::coroutine_handle<> h) {
705 poller->Send(fd, buf, size, h);
708 auto await_resume() {
709 auto ret = poller->Result();
713 if (err == WSAEWOULDBLOCK || err == WSAEINTR || err == WSAEINPROGRESS) {
718 if (err == EINTR || err == EAGAIN || err == EINPROGRESS) {
722 throw std::system_error(-ret, std::generic_category());
796 bool await_ready()
const {
return false; }
797 void await_suspend(std::coroutine_handle<> h) {
798 poller->Read(fd, buf, size, h);
801 auto await_resume() {
802 auto ret = poller->Result();
806 if (err == WSAEWOULDBLOCK || err == WSAEINTR || err == WSAEINPROGRESS) {
811 if (err == EINTR || err == EAGAIN || err == EINPROGRESS) {
815 throw std::system_error(-ret, std::generic_category());
827 return TAwaitable{Poller_, Fd_, buf, size};
842 bool await_ready()
const {
return false; }
843 void await_suspend(std::coroutine_handle<> h) {
844 poller->Write(fd, buf, size, h);
847 auto await_resume() {
848 auto ret = poller->Result();
852 if (err == WSAEWOULDBLOCK || err == WSAEINTR || err == WSAEINPROGRESS) {
857 if (err == EINTR || err == EAGAIN || err == EINPROGRESS) {
861 throw std::system_error(-ret, std::generic_category());
873 return TAwaitable{Poller_, Fd_, buf, size};
A class representing an IPv4 or IPv6 address (with port).
Definition address.hpp:38
Asynchronous file handle that owns its file descriptor.
Definition socket.hpp:320
TFileHandle(int fd, TPollerBase &poller)
Constructs a TFileHandle from an existing file descriptor.
Definition socket.hpp:328
Definition socket.hpp:298
Base class for pollers managing asynchronous I/O events and timers.
Definition poller.hpp:52
unsigned AddTimer(TTime deadline, THandle h)
Schedules a timer.
Definition poller.hpp:70
void AddWrite(int fd, THandle h)
Registers a write event on a file descriptor.
Definition poller.hpp:109
bool RemoveTimer(unsigned timerId, TTime deadline)
Removes or cancels a timer.
Definition poller.hpp:85
void AddRead(int fd, THandle h)
Registers a read event on a file descriptor.
Definition poller.hpp:99
Asynchronous file handle driven by the poller's implementation.
Definition socket.hpp:767
auto WriteSomeYield(const void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:877
auto ReadSomeYield(void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:882
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data from the provided buffer to the file.
Definition socket.hpp:840
TPollerDrivenFileHandle(int fd, T &poller)
Constructs a TPollerDrivenFileHandle from an existing file descriptor.
Definition socket.hpp:779
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the file into the provided buffer.
Definition socket.hpp:794
Socket type driven by the poller's implementation.
Definition socket.hpp:522
auto ReadSomeYield(void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:743
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket.
Definition socket.hpp:655
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data to the socket.
Definition socket.hpp:701
auto Accept()
Asynchronously accepts an incoming connection.
Definition socket.hpp:572
TPollerDrivenSocket(T &poller, int domain, int type=SOCK_STREAM)
Constructs a TPollerDrivenSocket from a poller, domain, and socket type.
Definition socket.hpp:535
auto Connect(const TAddress &addr, TTime deadline=TTime::max())
Asynchronously connects to the specified address with an optional deadline.
Definition socket.hpp:610
auto WriteSomeYield(const void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:738
TPollerDrivenSocket(const TAddress &addr, int fd, T &poller)
Constructs a TPollerDrivenSocket from an existing file descriptor.
Definition socket.hpp:548
Definition socket.hpp:338
TPollerBase * Poller()
Returns the poller associated with this socket.
Definition socket.hpp:47
Template base class implementing asynchronous socket I/O operations.
Definition socket.hpp:99
TSocketBase(TPollerBase &poller, int domain, int type)
Constructs a TSocketBase with a new socket descriptor.
Definition socket.hpp:108
auto Monitor()
Monitors the socket for remote hang-up (closure).
Definition socket.hpp:235
auto WriteSomeYield(const void *buf, size_t size)
Forces a write operation on the next event loop iteration.
Definition socket.hpp:211
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data from the provided buffer to the socket.
Definition socket.hpp:189
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket into the provided buffer.
Definition socket.hpp:139
void Close()
Closes the socket.
Definition socket.hpp:253
TSocketBase(int fd, TPollerBase &poller)
Constructs a TSocketBase from an existing socket descriptor.
Definition socket.hpp:116
auto ReadSomeYield(void *buf, size_t size)
Forces a read operation on the next event loop iteration.
Definition socket.hpp:162
High-level asynchronous socket for network communication.
Definition socket.hpp:367
const std::optional< TAddress > & RemoteAddr() const
Returns the remote address of the connected peer.
Definition socket.cpp:109
auto Connect(const TAddress &addr, TTime deadline=TTime::max())
Asynchronously connects to the specified address.
Definition socket.hpp:405
int Fd() const
Returns the underlying socket descriptor.
Definition socket.cpp:113
void Bind(const TAddress &addr)
Binds the socket to the specified local address.
Definition socket.cpp:83
auto Accept()
Asynchronously accepts an incoming connection.
Definition socket.hpp:456
const std::optional< TAddress > & LocalAddr() const
Returns the local address to which the socket is bound.
Definition socket.cpp:105
void Listen(int backlog=128)
Puts the socket in a listening state with an optional backlog (default is 128).
Definition socket.cpp:99
Definition socket.hpp:264