LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 68.3 % 161 110 51
Test Date: 2026-04-10 22:36:12 Functions: 64.7 % 136 88 48

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/detail/continuation_op.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : 
      19                 : #include <atomic>
      20                 : #include <coroutine>
      21                 : #include <cstddef>
      22                 : #include <memory>
      23                 : #include <optional>
      24                 : #include <stop_token>
      25                 : #include <system_error>
      26                 : 
      27                 : #include <errno.h>
      28                 : 
      29                 : #include <netinet/in.h>
      30                 : #include <sys/socket.h>
      31                 : #include <sys/uio.h>
      32                 : 
      33                 : namespace boost::corosio::detail {
      34                 : 
      35                 : /** Base operation for reactor-based backends.
      36                 : 
      37                 :     Holds per-operation state that depends on the concrete backend
      38                 :     socket/acceptor types: coroutine handle, executor, output
      39                 :     pointers, file descriptor, stop_callback, and type-specific
      40                 :     impl pointers.
      41                 : 
      42                 :     Fields shared across all backends (errn, bytes_transferred,
      43                 :     cancelled, impl_ptr, perform_io, complete) live in
      44                 :     reactor_op_base so the scheduler and descriptor_state can
      45                 :     access them without template instantiation.
      46                 : 
      47                 :     @tparam Socket The backend socket impl type (forward-declared).
      48                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      49                 : */
      50                 : template<class Socket, class Acceptor>
      51                 : struct reactor_op : reactor_op_base
      52                 : {
      53                 :     /// Stop-token callback that invokes cancel() on the target op.
      54                 :     struct canceller
      55                 :     {
      56                 :         reactor_op* op;
      57 HIT         199 :         void operator()() const noexcept
      58                 :         {
      59             199 :             op->cancel();
      60             199 :         }
      61                 :     };
      62                 : 
      63                 :     /// Caller's coroutine handle to resume on completion.
      64                 :     std::coroutine_handle<> h;
      65                 : 
      66                 :     /// Scheduler-ready continuation for executor dispatch/post (wraps h).
      67                 :     detail::continuation_op cont_op;
      68                 : 
      69                 :     /// Executor for dispatching the completion.
      70                 :     capy::executor_ref ex;
      71                 : 
      72                 :     /// Output pointer for the error code.
      73                 :     std::error_code* ec_out = nullptr;
      74                 : 
      75                 :     /// Output pointer for bytes transferred.
      76                 :     std::size_t* bytes_out = nullptr;
      77                 : 
      78                 :     /// File descriptor this operation targets.
      79                 :     int fd = -1;
      80                 : 
      81                 :     /// Stop-token callback registration.
      82                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      83                 : 
      84                 :     /// Owning socket impl (for stop_token cancellation).
      85                 :     Socket* socket_impl_ = nullptr;
      86                 : 
      87                 :     /// Owning acceptor impl (for stop_token cancellation).
      88                 :     Acceptor* acceptor_impl_ = nullptr;
      89                 : 
      90           73019 :     reactor_op() = default;
      91                 : 
      92                 :     /// Reset operation state for reuse.
      93          410378 :     void reset() noexcept
      94                 :     {
      95          410378 :         fd                = -1;
      96          410378 :         errn              = 0;
      97          410378 :         bytes_transferred = 0;
      98          410378 :         cancelled.store(false, std::memory_order_relaxed);
      99          410378 :         impl_ptr.reset();
     100          410378 :         socket_impl_   = nullptr;
     101          410378 :         acceptor_impl_ = nullptr;
     102          410378 :     }
     103                 : 
     104                 :     /// Return true if this is a read-direction operation.
     105           39410 :     virtual bool is_read_operation() const noexcept
     106                 :     {
     107           39410 :         return false;
     108                 :     }
     109                 : 
     110                 :     /// Cancel this operation via the owning impl.
     111                 :     virtual void cancel() noexcept = 0;
     112                 : 
     113                 :     /// Destroy without invoking.
     114 MIS           0 :     void destroy() override
     115                 :     {
     116               0 :         stop_cb.reset();
     117               0 :         reactor_op_base::destroy();
     118               0 :     }
     119                 : 
     120                 :     /// Arm the stop-token callback for a socket operation.
     121 HIT       87182 :     void start(std::stop_token const& token, Socket* impl)
     122                 :     {
     123           87182 :         cancelled.store(false, std::memory_order_release);
     124           87182 :         stop_cb.reset();
     125           87182 :         socket_impl_   = impl;
     126           87182 :         acceptor_impl_ = nullptr;
     127                 : 
     128           87182 :         if (token.stop_possible())
     129             197 :             stop_cb.emplace(token, canceller{this});
     130           87182 :     }
     131                 : 
     132                 :     /// Arm the stop-token callback for an acceptor operation.
     133            7995 :     void start(std::stop_token const& token, Acceptor* impl)
     134                 :     {
     135            7995 :         cancelled.store(false, std::memory_order_release);
     136            7995 :         stop_cb.reset();
     137            7995 :         socket_impl_   = nullptr;
     138            7995 :         acceptor_impl_ = impl;
     139                 : 
     140            7995 :         if (token.stop_possible())
     141               9 :             stop_cb.emplace(token, canceller{this});
     142            7995 :     }
     143                 : };
     144                 : 
     145                 : /** Shared connect operation.
     146                 : 
     147                 :     Checks SO_ERROR for connect completion status. The operator()()
     148                 :     and cancel() are provided by the concrete backend type.
     149                 : 
     150                 :     @tparam Base The backend's base op type.
     151                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     152                 : */
     153                 : template<class Base, class Endpoint = endpoint>
     154                 : struct reactor_connect_op : Base
     155                 : {
     156                 :     /// Endpoint to connect to.
     157                 :     Endpoint target_endpoint;
     158                 : 
     159                 :     /// Reset operation state for reuse.
     160            8001 :     void reset() noexcept
     161                 :     {
     162            8001 :         Base::reset();
     163            8001 :         target_endpoint = Endpoint{};
     164            8001 :     }
     165                 : 
     166            7982 :     void perform_io() noexcept override
     167                 :     {
     168            7982 :         int err       = 0;
     169            7982 :         socklen_t len = sizeof(err);
     170            7982 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     171 MIS           0 :             err = errno;
     172 HIT        7982 :         this->complete(err, 0);
     173            7982 :     }
     174                 : };
     175                 : 
     176                 : /** Shared scatter-read operation.
     177                 : 
     178                 :     Uses readv() with an EINTR retry loop.
     179                 : 
     180                 :     @tparam Base The backend's base op type.
     181                 : */
     182                 : template<class Base>
     183                 : struct reactor_read_op : Base
     184                 : {
     185                 :     /// Maximum scatter-gather buffer count.
     186                 :     static constexpr std::size_t max_buffers = 16;
     187                 : 
     188                 :     /// Scatter-gather I/O vectors.
     189                 :     iovec iovecs[max_buffers];
     190                 : 
     191                 :     /// Number of active I/O vectors.
     192                 :     int iovec_count = 0;
     193                 : 
     194                 :     /// True for zero-length reads (completed immediately).
     195                 :     bool empty_buffer_read = false;
     196                 : 
     197                 :     /// Return true (this is a read-direction operation).
     198           39438 :     bool is_read_operation() const noexcept override
     199                 :     {
     200           39438 :         return !empty_buffer_read;
     201                 :     }
     202                 : 
     203          197286 :     void reset() noexcept
     204                 :     {
     205          197286 :         Base::reset();
     206          197286 :         iovec_count       = 0;
     207          197286 :         empty_buffer_read = false;
     208          197286 :     }
     209                 : 
     210             325 :     void perform_io() noexcept override
     211                 :     {
     212                 :         ssize_t n;
     213                 :         do
     214                 :         {
     215             325 :             n = ::readv(this->fd, iovecs, iovec_count);
     216                 :         }
     217             325 :         while (n < 0 && errno == EINTR);
     218                 : 
     219             325 :         if (n >= 0)
     220              97 :             this->complete(0, static_cast<std::size_t>(n));
     221                 :         else
     222             228 :             this->complete(errno, 0);
     223             325 :     }
     224                 : };
     225                 : 
     226                 : /** Shared gather-write operation.
     227                 : 
     228                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     229                 :     which returns ssize_t (bytes written or -1 with errno set).
     230                 : 
     231                 :     @tparam Base The backend's base op type.
     232                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     233                 : */
     234                 : template<class Base, class WritePolicy>
     235                 : struct reactor_write_op : Base
     236                 : {
     237                 :     /// The write syscall policy type.
     238                 :     using write_policy = WritePolicy;
     239                 : 
     240                 :     /// Maximum scatter-gather buffer count.
     241                 :     static constexpr std::size_t max_buffers = 16;
     242                 : 
     243                 :     /// Scatter-gather I/O vectors.
     244                 :     iovec iovecs[max_buffers];
     245                 : 
     246                 :     /// Number of active I/O vectors.
     247                 :     int iovec_count = 0;
     248                 : 
     249          196992 :     void reset() noexcept
     250                 :     {
     251          196992 :         Base::reset();
     252          196992 :         iovec_count = 0;
     253          196992 :     }
     254                 : 
     255 MIS           0 :     void perform_io() noexcept override
     256                 :     {
     257               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     258               0 :         if (n >= 0)
     259               0 :             this->complete(0, static_cast<std::size_t>(n));
     260                 :         else
     261               0 :             this->complete(errno, 0);
     262               0 :     }
     263                 : };
     264                 : 
     265                 : /** Shared accept operation.
     266                 : 
     267                 :     Delegates the actual syscall to
     268                 :     AcceptPolicy::do_accept(fd, peer_storage, peer_addrlen),
     269                 :     which returns the accepted fd or -1 with errno set and writes
     270                 :     the real peer address length into peer_addrlen.
     271                 : 
     272                 :     @tparam Base The backend's base op type.
     273                 :     @tparam AcceptPolicy Provides
     274                 :         `static int do_accept(int, sockaddr_storage&, socklen_t&)`.
     275                 : */
     276                 : template<class Base, class AcceptPolicy>
     277                 : struct reactor_accept_op : Base
     278                 : {
     279                 :     /// File descriptor of the accepted connection.
     280                 :     int accepted_fd = -1;
     281                 : 
     282                 :     /// Pointer to the peer socket implementation.
     283                 :     io_object::implementation* peer_impl = nullptr;
     284                 : 
     285                 :     /// Output pointer for the accepted implementation.
     286                 :     io_object::implementation** impl_out = nullptr;
     287                 : 
     288                 :     /// Peer address storage filled by accept.
     289                 :     sockaddr_storage peer_storage{};
     290                 : 
     291                 :     /// Actual peer address length returned by accept.
     292                 :     socklen_t peer_addrlen = 0;
     293                 : 
     294 HIT        7995 :     void reset() noexcept
     295                 :     {
     296            7995 :         Base::reset();
     297            7995 :         accepted_fd  = -1;
     298            7995 :         peer_impl    = nullptr;
     299            7995 :         impl_out     = nullptr;
     300            7995 :         peer_storage = {};
     301            7995 :         peer_addrlen = 0;
     302            7995 :     }
     303                 : 
     304            7977 :     void perform_io() noexcept override
     305                 :     {
     306                 :         int new_fd =
     307            7977 :             AcceptPolicy::do_accept(this->fd, peer_storage, peer_addrlen);
     308            7977 :         if (new_fd >= 0)
     309                 :         {
     310            7977 :             accepted_fd = new_fd;
     311            7977 :             this->complete(0, 0);
     312                 :         }
     313                 :         else
     314                 :         {
     315 MIS           0 :             this->complete(errno, 0);
     316                 :         }
     317 HIT        7977 :     }
     318                 : };
     319                 : 
     320                 : /** Shared connected send operation for datagram sockets.
     321                 : 
     322                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     323                 : 
     324                 :     @tparam Base The backend's base op type.
     325                 : */
     326                 : template<class Base>
     327                 : struct reactor_send_op : Base
     328                 : {
     329                 :     /// Maximum scatter-gather buffer count.
     330                 :     static constexpr std::size_t max_buffers = 16;
     331                 : 
     332                 :     /// Scatter-gather I/O vectors.
     333                 :     iovec iovecs[max_buffers];
     334                 : 
     335                 :     /// Number of active I/O vectors.
     336                 :     int iovec_count = 0;
     337                 : 
     338                 :     /// User-supplied message flags.
     339                 :     int msg_flags = 0;
     340                 : 
     341              18 :     void reset() noexcept
     342                 :     {
     343              18 :         Base::reset();
     344              18 :         iovec_count = 0;
     345              18 :         msg_flags   = 0;
     346              18 :     }
     347                 : 
     348 MIS           0 :     void perform_io() noexcept override
     349                 :     {
     350               0 :         msghdr msg{};
     351               0 :         msg.msg_iov    = iovecs;
     352               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     353                 : 
     354                 : #ifdef MSG_NOSIGNAL
     355               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     356                 : #else
     357                 :         int send_flags = msg_flags;
     358                 : #endif
     359                 : 
     360                 :         ssize_t n;
     361                 :         do
     362                 :         {
     363               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     364                 :         }
     365               0 :         while (n < 0 && errno == EINTR);
     366                 : 
     367               0 :         if (n >= 0)
     368               0 :             this->complete(0, static_cast<std::size_t>(n));
     369                 :         else
     370               0 :             this->complete(errno, 0);
     371               0 :     }
     372                 : };
     373                 : 
     374                 : /** Shared connected recv operation for datagram sockets.
     375                 : 
     376                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     377                 :     Unlike reactor_read_op, does not map n==0 to EOF
     378                 :     (zero-length datagrams are valid).
     379                 : 
     380                 :     @tparam Base The backend's base op type.
     381                 : */
     382                 : template<class Base>
     383                 : struct reactor_recv_op : Base
     384                 : {
     385                 :     /// Maximum scatter-gather buffer count.
     386                 :     static constexpr std::size_t max_buffers = 16;
     387                 : 
     388                 :     /// Scatter-gather I/O vectors.
     389                 :     iovec iovecs[max_buffers];
     390                 : 
     391                 :     /// Number of active I/O vectors.
     392                 :     int iovec_count = 0;
     393                 : 
     394                 :     /// User-supplied message flags.
     395                 :     int msg_flags = 0;
     396                 : 
     397                 :     /// Return true (this is a read-direction operation).
     398               0 :     bool is_read_operation() const noexcept override
     399                 :     {
     400               0 :         return true;
     401                 :     }
     402                 : 
     403 HIT          18 :     void reset() noexcept
     404                 :     {
     405              18 :         Base::reset();
     406              18 :         iovec_count = 0;
     407              18 :         msg_flags   = 0;
     408              18 :     }
     409                 : 
     410 MIS           0 :     void perform_io() noexcept override
     411                 :     {
     412               0 :         msghdr msg{};
     413               0 :         msg.msg_iov    = iovecs;
     414               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     415                 : 
     416                 :         ssize_t n;
     417                 :         do
     418                 :         {
     419               0 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     420                 :         }
     421               0 :         while (n < 0 && errno == EINTR);
     422                 : 
     423               0 :         if (n >= 0)
     424               0 :             this->complete(0, static_cast<std::size_t>(n));
     425                 :         else
     426               0 :             this->complete(errno, 0);
     427               0 :     }
     428                 : };
     429                 : 
     430                 : /** Shared send_to operation for datagram sockets.
     431                 : 
     432                 :     Uses sendmsg() with the destination endpoint in msg_name.
     433                 : 
     434                 :     @tparam Base The backend's base op type.
     435                 : */
     436                 : template<class Base>
     437                 : struct reactor_send_to_op : Base
     438                 : {
     439                 :     /// Maximum scatter-gather buffer count.
     440                 :     static constexpr std::size_t max_buffers = 16;
     441                 : 
     442                 :     /// Scatter-gather I/O vectors.
     443                 :     iovec iovecs[max_buffers];
     444                 : 
     445                 :     /// Number of active I/O vectors.
     446                 :     int iovec_count = 0;
     447                 : 
     448                 :     /// Destination address storage.
     449                 :     sockaddr_storage dest_storage{};
     450                 : 
     451                 :     /// Destination address length.
     452                 :     socklen_t dest_len = 0;
     453                 : 
     454                 :     /// User-supplied message flags.
     455                 :     int msg_flags = 0;
     456                 : 
     457 HIT          28 :     void reset() noexcept
     458                 :     {
     459              28 :         Base::reset();
     460              28 :         iovec_count  = 0;
     461              28 :         dest_storage = {};
     462              28 :         dest_len     = 0;
     463              28 :         msg_flags    = 0;
     464              28 :     }
     465                 : 
     466 MIS           0 :     void perform_io() noexcept override
     467                 :     {
     468               0 :         msghdr msg{};
     469               0 :         msg.msg_name    = &dest_storage;
     470               0 :         msg.msg_namelen = dest_len;
     471               0 :         msg.msg_iov     = iovecs;
     472               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     473                 : 
     474                 : #ifdef MSG_NOSIGNAL
     475               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     476                 : #else
     477                 :         int send_flags = msg_flags;
     478                 : #endif
     479                 : 
     480                 :         ssize_t n;
     481                 :         do
     482                 :         {
     483               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     484                 :         }
     485               0 :         while (n < 0 && errno == EINTR);
     486                 : 
     487               0 :         if (n >= 0)
     488               0 :             this->complete(0, static_cast<std::size_t>(n));
     489                 :         else
     490               0 :             this->complete(errno, 0);
     491               0 :     }
     492                 : };
     493                 : 
     494                 : /** Shared recv_from operation for datagram sockets.
     495                 : 
     496                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     497                 : 
     498                 :     @tparam Base The backend's base op type.
     499                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     500                 : */
     501                 : template<class Base, class Endpoint = endpoint>
     502                 : struct reactor_recv_from_op : Base
     503                 : {
     504                 :     /// Maximum scatter-gather buffer count.
     505                 :     static constexpr std::size_t max_buffers = 16;
     506                 : 
     507                 :     /// Scatter-gather I/O vectors.
     508                 :     iovec iovecs[max_buffers];
     509                 : 
     510                 :     /// Number of active I/O vectors.
     511                 :     int iovec_count = 0;
     512                 : 
     513                 :     /// Source address storage filled by recvmsg.
     514                 :     sockaddr_storage source_storage{};
     515                 : 
     516                 :     /// Actual source address length returned by recvmsg.
     517                 :     socklen_t source_addrlen = 0;
     518                 : 
     519                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     520                 :     Endpoint* source_out = nullptr;
     521                 : 
     522                 :     /// User-supplied message flags.
     523                 :     int msg_flags = 0;
     524                 : 
     525                 :     /// Return true (this is a read-direction operation).
     526               0 :     bool is_read_operation() const noexcept override
     527                 :     {
     528               0 :         return true;
     529                 :     }
     530                 : 
     531 HIT          40 :     void reset() noexcept
     532                 :     {
     533              40 :         Base::reset();
     534              40 :         iovec_count    = 0;
     535              40 :         source_storage = {};
     536              40 :         source_addrlen = 0;
     537              40 :         source_out     = nullptr;
     538              40 :         msg_flags      = 0;
     539              40 :     }
     540                 : 
     541               2 :     void perform_io() noexcept override
     542                 :     {
     543               2 :         msghdr msg{};
     544               2 :         msg.msg_name    = &source_storage;
     545               2 :         msg.msg_namelen = sizeof(source_storage);
     546               2 :         msg.msg_iov     = iovecs;
     547               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     548                 : 
     549                 :         ssize_t n;
     550                 :         do
     551                 :         {
     552               2 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     553                 :         }
     554               2 :         while (n < 0 && errno == EINTR);
     555                 : 
     556               2 :         if (n >= 0)
     557                 :         {
     558               2 :             source_addrlen = msg.msg_namelen;
     559               2 :             this->complete(0, static_cast<std::size_t>(n));
     560                 :         }
     561                 :         else
     562 MIS           0 :             this->complete(errno, 0);
     563 HIT           2 :     }
     564                 : };
     565                 : 
     566                 : } // namespace boost::corosio::detail
     567                 : 
     568                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3