10#include <sys/socket.h>
12#include <netinet/in.h>
41class TSocketBase<void> {
98template<
typename TSockOps>
139 struct TAwaitableRead:
public TAwaitable<TAwaitableRead> {
141 this->ret = TSockOps::read(this->fd, this->b, this->s);
144 void await_suspend(std::coroutine_handle<> h) {
145 this->poller->AddRead(this->fd, h);
148 return TAwaitableRead{Poller_,Fd_,buf,size};
161 struct TAwaitableRead:
public TAwaitable<TAwaitableRead> {
163 return (this->ready =
false);
167 this->ret = TSockOps::read(this->fd, this->b, this->s);
170 void await_suspend(std::coroutine_handle<> h) {
171 this->poller->AddRead(this->fd, h);
174 return TAwaitableRead{Poller_,Fd_,buf,size};
187 struct TAwaitableWrite:
public TAwaitable<TAwaitableWrite> {
189 this->ret = TSockOps::write(this->fd, this->b, this->s);
192 void await_suspend(std::coroutine_handle<> h) {
193 this->poller->AddWrite(this->fd, h);
196 return TAwaitableWrite{Poller_,Fd_,
const_cast<void*
>(buf),size};
209 struct TAwaitableWrite:
public TAwaitable<TAwaitableWrite> {
211 return (this->ready =
false);
215 this->ret = TSockOps::write(this->fd, this->b, this->s);
218 void await_suspend(std::coroutine_handle<> h) {
219 this->poller->AddWrite(this->fd, h);
222 return TAwaitableWrite{Poller_,Fd_,
const_cast<void*
>(buf),size};
233 struct TAwaitableClose:
public TAwaitable<TAwaitableClose> {
238 void await_suspend(std::coroutine_handle<> h) {
239 this->poller->AddRemoteHup(this->fd, h);
242 return TAwaitableClose{Poller_,Fd_};
253 TSockOps::close(Fd_);
254 Poller_->RemoveEvent(Fd_);
264 return (ready = (ret >= 0));
277 if (ret < 0 && WSAGetLastError() != WSAEWOULDBLOCK ) {
278 throw std::system_error(WSAGetLastError(), std::generic_category());
281 if (ret < 0 && !(errno==EINTR||errno==EAGAIN||errno==EINPROGRESS)) {
282 throw std::system_error(errno, std::generic_category());
289 void* b =
nullptr;
size_t s = 0;
297 static auto read(
int fd,
void* buf,
size_t count) {
298 return ::read(fd, buf, count);
301 static auto write(
int fd,
const void* buf,
size_t count) {
302 return ::write(fd, buf, count);
305 static auto close(
int fd) {
337 static auto read(
int fd,
void* buf,
size_t count) {
338 return ::recv(fd,
static_cast<char*
>(buf), count, 0);
341 static auto write(
int fd,
const void* buf,
size_t count) {
342 return ::send(fd,
static_cast<const char*
>(buf), count, 0);
345 static auto close(
int fd) {
387 TSocket(TSocket&& other);
388 TSocket& operator=(TSocket&& other);
403 if (RemoteAddr_.has_value()) {
404 throw std::runtime_error(
"Already connected");
409 int ret = connect(fd, addr.first, addr.second);
411 if (ret < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
412 throw std::system_error(WSAGetLastError(), std::generic_category(),
"connect");
415 if (ret < 0 && !(errno == EINTR||errno==EAGAIN||errno==EINPROGRESS)) {
416 throw std::system_error(errno, std::generic_category(),
"connect");
422 void await_suspend(std::coroutine_handle<> h) {
423 poller->AddWrite(fd, h);
424 if (deadline != TTime::max()) {
425 timerId = poller->AddTimer(deadline, h);
429 void await_resume() {
430 if (deadline != TTime::max() && poller->RemoveTimer(timerId, deadline)) {
431 throw std::system_error(std::make_error_code(std::errc::timed_out));
437 std::pair<const sockaddr*, int> addr;
439 unsigned timerId = 0;
441 return TAwaitable{Poller_, Fd_, RemoteAddr_->RawAddr(), deadline};
455 bool await_ready()
const {
return false; }
456 void await_suspend(std::coroutine_handle<> h) {
457 poller->AddRead(fd, h);
459 TSocket await_resume() {
460 char clientaddr[
sizeof(sockaddr_in6)];
461 socklen_t len =
sizeof(sockaddr_in6);
463 int clientfd = accept(fd,
reinterpret_cast<sockaddr*
>(&clientaddr[0]), &len);
465 throw std::system_error(errno, std::generic_category(),
"accept");
468 return TSocket{
TAddress{
reinterpret_cast<sockaddr*
>(&clientaddr[0]), len}, clientfd, *poller};
498 std::optional<TAddress> LocalAddr_;
499 std::optional<TAddress> RemoteAddr_;
533 : TSocket(poller, domain, type)
536 Poller_->Register(Fd_);
546 : TSocket(addr, fd, poller)
549 Poller_->Register(Fd_);
556 Poller_->Register(Fd_);
571 bool await_ready()
const {
return false; }
572 void await_suspend(std::coroutine_handle<> h) {
573 poller->Accept(fd,
reinterpret_cast<sockaddr*
>(&addr[0]), &len, h);
577 int clientfd = poller->Result();
579 throw std::system_error(-clientfd, std::generic_category(),
"accept");
588 char addr[2*(
sizeof(sockaddr_in6)+16)] = {0};
589 socklen_t len =
sizeof(addr);
608 if (RemoteAddr_.has_value()) {
609 throw std::runtime_error(
"Already connected");
613 bool await_ready()
const {
return false; }
615 void await_suspend(std::coroutine_handle<> h) {
616 poller->Connect(fd, addr.first, addr.second, h);
617 if (deadline != TTime::max()) {
618 timerId = poller->AddTimer(deadline, h);
622 void await_resume() {
623 if (deadline != TTime::max() && poller->RemoveTimer(timerId, deadline)) {
625 throw std::system_error(std::make_error_code(std::errc::timed_out));
627 int ret = poller->Result();
629 throw std::system_error(-ret, std::generic_category(),
"connect");
635 std::pair<const sockaddr*, int> addr;
637 unsigned timerId = 0;
654 bool await_ready()
const {
return false; }
655 void await_suspend(std::coroutine_handle<> h) {
656 poller->Recv(fd, buf, size, h);
659 ssize_t await_resume() {
660 int ret = poller->Result();
662 throw std::system_error(-ret, std::generic_category());
689 bool await_ready()
const {
return false; }
690 void await_suspend(std::coroutine_handle<> h) {
691 poller->Send(fd, buf, size, h);
694 ssize_t await_resume() {
695 int ret = poller->Result();
697 throw std::system_error(-ret, std::generic_category());
771 bool await_ready()
const {
return false; }
772 void await_suspend(std::coroutine_handle<> h) {
773 poller->Read(fd, buf, size, h);
776 ssize_t await_resume() {
777 int ret = poller->Result();
779 throw std::system_error(-ret, std::generic_category());
791 return TAwaitable{Poller_, Fd_, buf, size};
806 bool await_ready()
const {
return false; }
807 void await_suspend(std::coroutine_handle<> h) {
808 poller->Write(fd, buf, size, h);
811 ssize_t await_resume() {
812 int ret = poller->Result();
814 throw std::system_error(-ret, std::generic_category());
826 return TAwaitable{Poller_, Fd_, buf, size};
A class representing an IPv4 or IPv6 address (with port).
Definition address.hpp:30
Asynchronous file handle that owns its file descriptor.
Definition socket.hpp:317
TFileHandle(int fd, TPollerBase &poller)
Constructs a TFileHandle from an existing file descriptor.
Definition socket.hpp:325
Definition socket.hpp:295
Base class for pollers managing asynchronous I/O events and timers.
Definition poller.hpp:44
auto WriteSomeYield(const void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:830
auto ReadSomeYield(void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:835
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data from the provided buffer to the file.
Definition socket.hpp:804
TPollerDrivenFileHandle(int fd, T &poller)
Constructs a TPollerDrivenFileHandle from an existing file descriptor.
Definition socket.hpp:754
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the file into the provided buffer.
Definition socket.hpp:769
Socket type driven by the poller's implementation.
Definition socket.hpp:519
auto ReadSomeYield(void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:718
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket.
Definition socket.hpp:652
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data to the socket.
Definition socket.hpp:687
auto Accept()
Asynchronously accepts an incoming connection.
Definition socket.hpp:569
TPollerDrivenSocket(T &poller, int domain, int type=SOCK_STREAM)
Constructs a TPollerDrivenSocket from a poller, domain, and socket type.
Definition socket.hpp:532
auto Connect(const TAddress &addr, TTime deadline=TTime::max())
Asynchronously connects to the specified address with an optional deadline.
Definition socket.hpp:607
auto WriteSomeYield(const void *buf, size_t size)
The WriteSomeYield and ReadSomeYield variants behave similarly to WriteSome/ReadSome.
Definition socket.hpp:713
TPollerDrivenSocket(const TAddress &addr, int fd, T &poller)
Constructs a TPollerDrivenSocket from an existing file descriptor.
Definition socket.hpp:545
Definition socket.hpp:335
TPollerBase * Poller()
Returns the poller associated with this socket.
Definition socket.hpp:47
int Setup(int s)
Performs additional setup on the socket descriptor.
int Create(int domain, int type)
Creates a new socket descriptor.
Template base class implementing asynchronous socket I/O operations.
Definition socket.hpp:99
TSocketBase(TPollerBase &poller, int domain, int type)
Constructs a TSocketBase with a new socket descriptor.
Definition socket.hpp:108
auto Monitor()
Monitors the socket for remote hang-up (closure).
Definition socket.hpp:232
auto WriteSomeYield(const void *buf, size_t size)
Forces a write operation on the next event loop iteration.
Definition socket.hpp:208
auto WriteSome(const void *buf, size_t size)
Asynchronously writes data from the provided buffer to the socket.
Definition socket.hpp:186
auto ReadSome(void *buf, size_t size)
Asynchronously reads data from the socket into the provided buffer.
Definition socket.hpp:138
void Close()
Closes the socket.
Definition socket.hpp:250
TSocketBase(int fd, TPollerBase &poller)
Constructs a TSocketBase from an existing socket descriptor.
Definition socket.hpp:116
auto ReadSomeYield(void *buf, size_t size)
Forces a read operation on the next event loop iteration.
Definition socket.hpp:160
High-level asynchronous socket for network communication.
Definition socket.hpp:364
TSocket(TPollerBase &poller, int domain, int type=SOCK_STREAM)
Constructs a TSocket using the given poller, address family, and socket type.
const std::optional< TAddress > & RemoteAddr() const
Returns the remote address of the connected peer.
auto Connect(const TAddress &addr, TTime deadline=TTime::max())
Asynchronously connects to the specified address.
Definition socket.hpp:402
int Fd() const
Returns the underlying socket descriptor.
TSocket(const TAddress &addr, int fd, TPollerBase &poller)
Constructs a TSocket for an already-connected socket.
void Bind(const TAddress &addr)
Binds the socket to the specified local address.
auto Accept()
Asynchronously accepts an incoming connection.
Definition socket.hpp:453
const std::optional< TAddress > & LocalAddr() const
Returns the local address to which the socket is bound.
void Listen(int backlog=128)
Puts the socket in a listening state with an optional backlog (default is 128).
Definition socket.hpp:261