LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_stream_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 85.4 % 212 181 31
Test Date: 2026-04-10 22:36:12 Functions: 75.6 % 90 68 22

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_socket.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
      15                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      16                 : #include <boost/capy/buffers.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : 
      20                 : #include <errno.h>
      21                 : #include <sys/socket.h>
      22                 : #include <sys/uio.h>
      23                 : 
      24                 : namespace boost::corosio::detail {
      25                 : 
      26                 : /** CRTP base for reactor-backed stream socket implementations.
      27                 : 
      28                 :     Inherits shared data members and cancel/close/register logic
      29                 :     from reactor_basic_socket. Adds the stream-specific remote
      30                 :     endpoint, shutdown, and I/O dispatch (connect, read, write).
      31                 : 
      32                 :     @tparam Derived   The concrete socket type (CRTP).
      33                 :     @tparam Service   The backend's socket service type.
      34                 :     @tparam ConnOp    The backend's connect op type.
      35                 :     @tparam ReadOp    The backend's read op type.
      36                 :     @tparam WriteOp   The backend's write op type.
      37                 :     @tparam DescState The backend's descriptor_state type.
      38                 :     @tparam ImplBase  The public vtable base
      39                 :                       (tcp_socket::implementation or
      40                 :                        local_stream_socket::implementation).
      41                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      42                 : */
      43                 : template<
      44                 :     class Derived,
      45                 :     class Service,
      46                 :     class ConnOp,
      47                 :     class ReadOp,
      48                 :     class WriteOp,
      49                 :     class DescState,
      50                 :     class ImplBase = tcp_socket::implementation,
      51                 :     class Endpoint = endpoint>
      52                 : class reactor_stream_socket
      53                 :     : public reactor_basic_socket<
      54                 :           Derived,
      55                 :           ImplBase,
      56                 :           Service,
      57                 :           DescState,
      58                 :           Endpoint>
      59                 : {
      60                 :     using base_type = reactor_basic_socket<
      61                 :         Derived,
      62                 :         ImplBase,
      63                 :         Service,
      64                 :         DescState,
      65                 :         Endpoint>;
      66                 :     friend base_type;
      67                 :     friend Derived;
      68                 : 
      69 HIT       24072 :     explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
      70                 : 
      71                 : protected:
      72                 :     Endpoint remote_endpoint_;
      73                 : 
      74                 : public:
      75                 :     /// Pending connect operation slot.
      76                 :     ConnOp conn_;
      77                 : 
      78                 :     /// Pending read operation slot.
      79                 :     ReadOp rd_;
      80                 : 
      81                 :     /// Pending write operation slot.
      82                 :     WriteOp wr_;
      83                 : 
      84           24072 :     ~reactor_stream_socket() override = default;
      85                 : 
      86                 :     /// Return the cached remote endpoint.
      87              44 :     Endpoint remote_endpoint() const noexcept override
      88                 :     {
      89              44 :         return remote_endpoint_;
      90                 :     }
      91                 : 
      92                 :     // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
      93                 : 
      94            7987 :     std::coroutine_handle<> connect(
      95                 :         std::coroutine_handle<> h,
      96                 :         capy::executor_ref ex,
      97                 :         Endpoint ep,
      98                 :         std::stop_token token,
      99                 :         std::error_code* ec) override
     100                 :     {
     101            7987 :         return do_connect(h, ex, ep, token, ec);
     102                 :     }
     103                 : 
     104          197286 :     std::coroutine_handle<> read_some(
     105                 :         std::coroutine_handle<> h,
     106                 :         capy::executor_ref ex,
     107                 :         buffer_param param,
     108                 :         std::stop_token token,
     109                 :         std::error_code* ec,
     110                 :         std::size_t* bytes_out) override
     111                 :     {
     112          197286 :         return do_read_some(h, ex, param, token, ec, bytes_out);
     113                 :     }
     114                 : 
     115          196992 :     std::coroutine_handle<> write_some(
     116                 :         std::coroutine_handle<> h,
     117                 :         capy::executor_ref ex,
     118                 :         buffer_param param,
     119                 :         std::stop_token token,
     120                 :         std::error_code* ec,
     121                 :         std::size_t* bytes_out) override
     122                 :     {
     123          196992 :         return do_write_some(h, ex, param, token, ec, bytes_out);
     124                 :     }
     125                 : 
     126                 :     std::error_code
     127               6 :     shutdown(corosio::shutdown_type what) noexcept override
     128                 :     {
     129               6 :         return do_shutdown(static_cast<int>(what));
     130                 :     }
     131                 : 
     132             184 :     void cancel() noexcept override
     133                 :     {
     134             184 :         this->do_cancel();
     135             184 :     }
     136                 : 
     137                 :     // --- End virtual overrides ---
     138                 : 
     139                 :     /// Close the socket (non-virtual, called by the service).
     140                 :     void close_socket() noexcept
     141                 :     {
     142                 :         this->do_close_socket();
     143                 :     }
     144                 : 
     145                 :     /** Shut down part or all of the full-duplex connection.
     146                 : 
     147                 :         @param what 0 = receive, 1 = send, 2 = both.
     148                 :     */
     149               6 :     std::error_code do_shutdown(int what) noexcept
     150                 :     {
     151                 :         int how;
     152               6 :         switch (what)
     153                 :         {
     154               2 :         case 0: // shutdown_receive
     155               2 :             how = SHUT_RD;
     156               2 :             break;
     157               2 :         case 1: // shutdown_send
     158               2 :             how = SHUT_WR;
     159               2 :             break;
     160               2 :         case 2: // shutdown_both
     161               2 :             how = SHUT_RDWR;
     162               2 :             break;
     163 MIS           0 :         default:
     164               0 :             return make_err(EINVAL);
     165                 :         }
     166 HIT           6 :         if (::shutdown(this->fd_, how) != 0)
     167 MIS           0 :             return make_err(errno);
     168 HIT           6 :         return {};
     169                 :     }
     170                 : 
     171                 :     /// Cache local and remote endpoints.
     172           15982 :     void set_endpoints(Endpoint local, Endpoint remote) noexcept
     173                 :     {
     174           15982 :         this->local_endpoint_ = std::move(local);
     175           15982 :         remote_endpoint_      = std::move(remote);
     176           15982 :     }
     177                 : 
     178                 :     /** Shared connect dispatch.
     179                 : 
     180                 :         Tries the connect syscall speculatively. On synchronous
     181                 :         completion, returns via inline budget or posts through queue.
     182                 :         On EINPROGRESS, registers with the reactor.
     183                 :     */
     184                 :     std::coroutine_handle<> do_connect(
     185                 :         std::coroutine_handle<>,
     186                 :         capy::executor_ref,
     187                 :         Endpoint const&,
     188                 :         std::stop_token const&,
     189                 :         std::error_code*);
     190                 : 
     191                 :     /** Shared scatter-read dispatch.
     192                 : 
     193                 :         Tries readv() speculatively. On success or hard error,
     194                 :         returns via inline budget or posts through queue.
     195                 :         On EAGAIN, registers with the reactor.
     196                 :     */
     197                 :     std::coroutine_handle<> do_read_some(
     198                 :         std::coroutine_handle<>,
     199                 :         capy::executor_ref,
     200                 :         buffer_param,
     201                 :         std::stop_token const&,
     202                 :         std::error_code*,
     203                 :         std::size_t*);
     204                 : 
     205                 :     /** Shared gather-write dispatch.
     206                 : 
     207                 :         Tries the write via WriteOp::write_policy speculatively.
     208                 :         On success or hard error, returns via inline budget or
     209                 :         posts through queue. On EAGAIN, registers with the reactor.
     210                 :     */
     211                 :     std::coroutine_handle<> do_write_some(
     212                 :         std::coroutine_handle<>,
     213                 :         capy::executor_ref,
     214                 :         buffer_param,
     215                 :         std::stop_token const&,
     216                 :         std::error_code*,
     217                 :         std::size_t*);
     218                 : 
     219                 :     /** Close the socket and cancel pending operations.
     220                 : 
     221                 :         Extends the base do_close_socket() to also reset
     222                 :         the remote endpoint.
     223                 :     */
     224           72219 :     void do_close_socket() noexcept
     225                 :     {
     226           72219 :         base_type::do_close_socket();
     227           72219 :         remote_endpoint_ = Endpoint{};
     228           72219 :     }
     229                 : 
     230                 :     /** Release the socket without closing the fd.
     231                 : 
     232                 :         Extends the base do_release_socket() to also reset
     233                 :         the remote endpoint.
     234                 :     */
     235               2 :     native_handle_type do_release_socket() noexcept
     236                 :     {
     237               2 :         auto fd = base_type::do_release_socket();
     238               2 :         remote_endpoint_ = Endpoint{};
     239               2 :         return fd;
     240                 :     }
     241                 : 
     242                 : private:
     243                 :     // CRTP callbacks for reactor_basic_socket cancel/close
     244                 : 
     245                 :     template<class Op>
     246             191 :     reactor_op_base** op_to_desc_slot(Op& op) noexcept
     247                 :     {
     248             191 :         if (&op == static_cast<void*>(&conn_))
     249 MIS           0 :             return &this->desc_state_.connect_op;
     250 HIT         191 :         if (&op == static_cast<void*>(&rd_))
     251             191 :             return &this->desc_state_.read_op;
     252 MIS           0 :         if (&op == static_cast<void*>(&wr_))
     253               0 :             return &this->desc_state_.write_op;
     254               0 :         return nullptr;
     255                 :     }
     256                 : 
     257                 :     template<class Op>
     258               0 :     bool* op_to_cancel_flag(Op& op) noexcept
     259                 :     {
     260               0 :         if (&op == static_cast<void*>(&conn_))
     261               0 :             return &this->desc_state_.connect_cancel_pending;
     262               0 :         if (&op == static_cast<void*>(&rd_))
     263               0 :             return &this->desc_state_.read_cancel_pending;
     264               0 :         if (&op == static_cast<void*>(&wr_))
     265               0 :             return &this->desc_state_.write_cancel_pending;
     266               0 :         return nullptr;
     267                 :     }
     268                 : 
     269                 :     template<class Fn>
     270 HIT       72405 :     void for_each_op(Fn fn) noexcept
     271                 :     {
     272           72405 :         fn(conn_);
     273           72405 :         fn(rd_);
     274           72405 :         fn(wr_);
     275           72405 :     }
     276                 : 
     277                 :     template<class Fn>
     278           72405 :     void for_each_desc_entry(Fn fn) noexcept
     279                 :     {
     280           72405 :         fn(conn_, this->desc_state_.connect_op);
     281           72405 :         fn(rd_, this->desc_state_.read_op);
     282           72405 :         fn(wr_, this->desc_state_.write_op);
     283           72405 :     }
     284                 : };
     285                 : 
     286                 : template<
     287                 :     class Derived,
     288                 :     class Service,
     289                 :     class ConnOp,
     290                 :     class ReadOp,
     291                 :     class WriteOp,
     292                 :     class DescState,
     293                 :     class ImplBase,
     294                 :     class Endpoint>
     295                 : std::coroutine_handle<>
     296            7987 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     297                 :     do_connect(
     298                 :         std::coroutine_handle<> h,
     299                 :         capy::executor_ref ex,
     300                 :         Endpoint const& ep,
     301                 :         std::stop_token const& token,
     302                 :         std::error_code* ec)
     303                 : {
     304            7987 :     auto& op = conn_;
     305                 : 
     306            7987 :     sockaddr_storage storage{};
     307            7987 :     socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
     308                 :     int result =
     309            7987 :         ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     310                 : 
     311            7987 :     if (result == 0)
     312                 :     {
     313               4 :         sockaddr_storage local_storage{};
     314               4 :         socklen_t local_len = sizeof(local_storage);
     315               4 :         if (::getsockname(
     316                 :                 this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
     317               4 :                 &local_len) == 0)
     318 MIS           0 :             this->local_endpoint_ =
     319 HIT           4 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     320               4 :         remote_endpoint_ = ep;
     321                 :     }
     322                 : 
     323            7987 :     if (result == 0 || errno != EINPROGRESS)
     324                 :     {
     325               4 :         int err = (result < 0) ? errno : 0;
     326               4 :         if (this->svc_.scheduler().try_consume_inline_budget())
     327                 :         {
     328 MIS           0 :             *ec = err ? make_err(err) : std::error_code{};
     329               0 :             op.cont_op.cont.h = h;
     330               0 :             return dispatch_coro(ex, op.cont_op.cont);
     331                 :         }
     332 HIT           4 :         op.reset();
     333               4 :         op.h               = h;
     334               4 :         op.ex              = ex;
     335               4 :         op.ec_out          = ec;
     336               4 :         op.fd              = this->fd_;
     337               4 :         op.target_endpoint = ep;
     338               4 :         op.start(token, static_cast<Derived*>(this));
     339               4 :         op.impl_ptr = this->shared_from_this();
     340               4 :         op.complete(err, 0);
     341               4 :         this->svc_.post(&op);
     342               4 :         return std::noop_coroutine();
     343                 :     }
     344                 : 
     345                 :     // EINPROGRESS — register with reactor
     346            7983 :     op.reset();
     347            7983 :     op.h               = h;
     348            7983 :     op.ex              = ex;
     349            7983 :     op.ec_out          = ec;
     350            7983 :     op.fd              = this->fd_;
     351            7983 :     op.target_endpoint = ep;
     352            7983 :     op.start(token, static_cast<Derived*>(this));
     353            7983 :     op.impl_ptr = this->shared_from_this();
     354                 : 
     355            7983 :     this->register_op(
     356            7983 :         op, this->desc_state_.connect_op, this->desc_state_.write_ready,
     357            7983 :         this->desc_state_.connect_cancel_pending, true);
     358            7983 :     return std::noop_coroutine();
     359                 : }
     360                 : 
     361                 : template<
     362                 :     class Derived,
     363                 :     class Service,
     364                 :     class ConnOp,
     365                 :     class ReadOp,
     366                 :     class WriteOp,
     367                 :     class DescState,
     368                 :     class ImplBase,
     369                 :     class Endpoint>
     370                 : std::coroutine_handle<>
     371          197286 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     372                 :     do_read_some(
     373                 :         std::coroutine_handle<> h,
     374                 :         capy::executor_ref ex,
     375                 :         buffer_param param,
     376                 :         std::stop_token const& token,
     377                 :         std::error_code* ec,
     378                 :         std::size_t* bytes_out)
     379                 : {
     380          197286 :     auto& op = rd_;
     381          197286 :     op.reset();
     382                 : 
     383          197286 :     capy::mutable_buffer bufs[ReadOp::max_buffers];
     384          197286 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
     385                 : 
     386          197286 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     387                 :     {
     388               2 :         op.empty_buffer_read = true;
     389               2 :         op.h                 = h;
     390               2 :         op.ex                = ex;
     391               2 :         op.ec_out            = ec;
     392               2 :         op.bytes_out         = bytes_out;
     393               2 :         op.start(token, static_cast<Derived*>(this));
     394               2 :         op.impl_ptr = this->shared_from_this();
     395               2 :         op.complete(0, 0);
     396               2 :         this->svc_.post(&op);
     397               2 :         return std::noop_coroutine();
     398                 :     }
     399                 : 
     400          394568 :     for (int i = 0; i < op.iovec_count; ++i)
     401                 :     {
     402          197284 :         op.iovecs[i].iov_base = bufs[i].data();
     403          197284 :         op.iovecs[i].iov_len  = bufs[i].size();
     404                 :     }
     405                 : 
     406                 :     // Speculative read
     407                 :     ssize_t n;
     408                 :     do
     409                 :     {
     410          197284 :         n = ::readv(this->fd_, op.iovecs, op.iovec_count);
     411                 :     }
     412          197284 :     while (n < 0 && errno == EINTR);
     413                 : 
     414          197284 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     415                 :     {
     416          196898 :         int err    = (n < 0) ? errno : 0;
     417          196898 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     418                 : 
     419          196898 :         if (this->svc_.scheduler().try_consume_inline_budget())
     420                 :         {
     421          157551 :             if (err)
     422 MIS           0 :                 *ec = make_err(err);
     423 HIT      157551 :             else if (n == 0)
     424              10 :                 *ec = capy::error::eof;
     425                 :             else
     426          157541 :                 *ec = {};
     427          157551 :             *bytes_out = bytes;
     428          157551 :             op.cont_op.cont.h = h;
     429          157551 :             return dispatch_coro(ex, op.cont_op.cont);
     430                 :         }
     431           39347 :         op.h         = h;
     432           39347 :         op.ex        = ex;
     433           39347 :         op.ec_out    = ec;
     434           39347 :         op.bytes_out = bytes_out;
     435           39347 :         op.start(token, static_cast<Derived*>(this));
     436           39347 :         op.impl_ptr = this->shared_from_this();
     437           39347 :         op.complete(err, bytes);
     438           39347 :         this->svc_.post(&op);
     439           39347 :         return std::noop_coroutine();
     440                 :     }
     441                 : 
     442                 :     // EAGAIN — register with reactor
     443             386 :     op.h         = h;
     444             386 :     op.ex        = ex;
     445             386 :     op.ec_out    = ec;
     446             386 :     op.bytes_out = bytes_out;
     447             386 :     op.fd        = this->fd_;
     448             386 :     op.start(token, static_cast<Derived*>(this));
     449             386 :     op.impl_ptr = this->shared_from_this();
     450                 : 
     451             386 :     this->register_op(
     452             386 :         op, this->desc_state_.read_op, this->desc_state_.read_ready,
     453             386 :         this->desc_state_.read_cancel_pending);
     454             386 :     return std::noop_coroutine();
     455                 : }
     456                 : 
     457                 : template<
     458                 :     class Derived,
     459                 :     class Service,
     460                 :     class ConnOp,
     461                 :     class ReadOp,
     462                 :     class WriteOp,
     463                 :     class DescState,
     464                 :     class ImplBase,
     465                 :     class Endpoint>
     466                 : std::coroutine_handle<>
     467          196992 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     468                 :     do_write_some(
     469                 :         std::coroutine_handle<> h,
     470                 :         capy::executor_ref ex,
     471                 :         buffer_param param,
     472                 :         std::stop_token const& token,
     473                 :         std::error_code* ec,
     474                 :         std::size_t* bytes_out)
     475                 : {
     476          196992 :     auto& op = wr_;
     477          196992 :     op.reset();
     478                 : 
     479          196992 :     capy::mutable_buffer bufs[WriteOp::max_buffers];
     480          196992 :     op.iovec_count =
     481          196992 :         static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
     482                 : 
     483          196992 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     484                 :     {
     485               2 :         op.h         = h;
     486               2 :         op.ex        = ex;
     487               2 :         op.ec_out    = ec;
     488               2 :         op.bytes_out = bytes_out;
     489               2 :         op.start(token, static_cast<Derived*>(this));
     490               2 :         op.impl_ptr = this->shared_from_this();
     491               2 :         op.complete(0, 0);
     492               2 :         this->svc_.post(&op);
     493               2 :         return std::noop_coroutine();
     494                 :     }
     495                 : 
     496          393980 :     for (int i = 0; i < op.iovec_count; ++i)
     497                 :     {
     498          196990 :         op.iovecs[i].iov_base = bufs[i].data();
     499          196990 :         op.iovecs[i].iov_len  = bufs[i].size();
     500                 :     }
     501                 : 
     502                 :     // Speculative write via backend-specific write policy
     503                 :     ssize_t n =
     504          196990 :         WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
     505                 : 
     506          196990 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     507                 :     {
     508          196990 :         int err    = (n < 0) ? errno : 0;
     509          196990 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     510                 : 
     511          196990 :         if (this->svc_.scheduler().try_consume_inline_budget())
     512                 :         {
     513          157606 :             *ec        = err ? make_err(err) : std::error_code{};
     514          157606 :             *bytes_out = bytes;
     515          157606 :             op.cont_op.cont.h = h;
     516          157606 :             return dispatch_coro(ex, op.cont_op.cont);
     517                 :         }
     518           39384 :         op.h         = h;
     519           39384 :         op.ex        = ex;
     520           39384 :         op.ec_out    = ec;
     521           39384 :         op.bytes_out = bytes_out;
     522           39384 :         op.start(token, static_cast<Derived*>(this));
     523           39384 :         op.impl_ptr = this->shared_from_this();
     524           39384 :         op.complete(err, bytes);
     525           39384 :         this->svc_.post(&op);
     526           39384 :         return std::noop_coroutine();
     527                 :     }
     528                 : 
     529                 :     // EAGAIN — register with reactor
     530 MIS           0 :     op.h         = h;
     531               0 :     op.ex        = ex;
     532               0 :     op.ec_out    = ec;
     533               0 :     op.bytes_out = bytes_out;
     534               0 :     op.fd        = this->fd_;
     535               0 :     op.start(token, static_cast<Derived*>(this));
     536               0 :     op.impl_ptr = this->shared_from_this();
     537                 : 
     538               0 :     this->register_op(
     539               0 :         op, this->desc_state_.write_op, this->desc_state_.write_ready,
     540               0 :         this->desc_state_.write_cancel_pending, true);
     541               0 :     return std::noop_coroutine();
     542                 : }
     543                 : 
     544                 : } // namespace boost::corosio::detail
     545                 : 
     546                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
        

Generated by: LCOV version 2.3