LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 69.4 % 160 111 49
Test Date: 2026-04-09 23:21:11 Functions: 67.6 % 136 92 44

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/detail/continuation_op.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : 
      19                 : #include <atomic>
      20                 : #include <coroutine>
      21                 : #include <cstddef>
      22                 : #include <memory>
      23                 : #include <optional>
      24                 : #include <stop_token>
      25                 : #include <system_error>
      26                 : 
      27                 : #include <errno.h>
      28                 : 
      29                 : #include <netinet/in.h>
      30                 : #include <sys/socket.h>
      31                 : #include <sys/uio.h>
      32                 : 
      33                 : namespace boost::corosio::detail {
      34                 : 
      35                 : /** Base operation for reactor-based backends.
      36                 : 
      37                 :     Holds per-operation state that depends on the concrete backend
      38                 :     socket/acceptor types: coroutine handle, executor, output
      39                 :     pointers, file descriptor, stop_callback, and type-specific
      40                 :     impl pointers.
      41                 : 
      42                 :     Fields shared across all backends (errn, bytes_transferred,
      43                 :     cancelled, impl_ptr, perform_io, complete) live in
      44                 :     reactor_op_base so the scheduler and descriptor_state can
      45                 :     access them without template instantiation.
      46                 : 
      47                 :     @tparam Socket The backend socket impl type (forward-declared).
      48                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      49                 : */
      50                 : template<class Socket, class Acceptor>
      51                 : struct reactor_op : reactor_op_base
      52                 : {
      53                 :     /// Stop-token callback that invokes cancel() on the target op.
      54                 :     struct canceller
      55                 :     {
      56                 :         reactor_op* op;
      57 HIT         200 :         void operator()() const noexcept
      58                 :         {
      59             200 :             op->cancel();
      60             200 :         }
      61                 :     };
      62                 : 
      63                 :     /// Caller's coroutine handle to resume on completion.
      64                 :     std::coroutine_handle<> h;
      65                 : 
      66                 :     /// Scheduler-ready continuation for executor dispatch/post (wraps h).
      67                 :     detail::continuation_op cont_op;
      68                 : 
      69                 :     /// Executor for dispatching the completion.
      70                 :     capy::executor_ref ex;
      71                 : 
      72                 :     /// Output pointer for the error code.
      73                 :     std::error_code* ec_out = nullptr;
      74                 : 
      75                 :     /// Output pointer for bytes transferred.
      76                 :     std::size_t* bytes_out = nullptr;
      77                 : 
      78                 :     /// File descriptor this operation targets.
      79                 :     int fd = -1;
      80                 : 
      81                 :     /// Stop-token callback registration.
      82                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      83                 : 
      84                 :     /// Owning socket impl (for stop_token cancellation).
      85                 :     Socket* socket_impl_ = nullptr;
      86                 : 
      87                 :     /// Owning acceptor impl (for stop_token cancellation).
      88                 :     Acceptor* acceptor_impl_ = nullptr;
      89                 : 
      90           77794 :     reactor_op() = default;
      91                 : 
      92                 :     /// Reset operation state for reuse.
      93          423273 :     void reset() noexcept
      94                 :     {
      95          423273 :         fd                = -1;
      96          423273 :         errn              = 0;
      97          423273 :         bytes_transferred = 0;
      98          423273 :         cancelled.store(false, std::memory_order_relaxed);
      99          423273 :         impl_ptr.reset();
     100          423273 :         socket_impl_   = nullptr;
     101          423273 :         acceptor_impl_ = nullptr;
     102          423273 :     }
     103                 : 
     104                 :     /// Return true if this is a read-direction operation.
     105           40590 :     virtual bool is_read_operation() const noexcept
     106                 :     {
     107           40590 :         return false;
     108                 :     }
     109                 : 
     110                 :     /// Cancel this operation via the owning impl.
     111                 :     virtual void cancel() noexcept = 0;
     112                 : 
     113                 :     /// Destroy without invoking.
     114 MIS           0 :     void destroy() override
     115                 :     {
     116               0 :         stop_cb.reset();
     117               0 :         reactor_op_base::destroy();
     118               0 :     }
     119                 : 
     120                 :     /// Arm the stop-token callback for a socket operation.
     121 HIT       90073 :     void start(std::stop_token const& token, Socket* impl)
     122                 :     {
     123           90073 :         cancelled.store(false, std::memory_order_release);
     124           90073 :         stop_cb.reset();
     125           90073 :         socket_impl_   = impl;
     126           90073 :         acceptor_impl_ = nullptr;
     127                 : 
     128           90073 :         if (token.stop_possible())
     129             198 :             stop_cb.emplace(token, canceller{this});
     130           90073 :     }
     131                 : 
     132                 :     /// Arm the stop-token callback for an acceptor operation.
     133            8530 :     void start(std::stop_token const& token, Acceptor* impl)
     134                 :     {
     135            8530 :         cancelled.store(false, std::memory_order_release);
     136            8530 :         stop_cb.reset();
     137            8530 :         socket_impl_   = nullptr;
     138            8530 :         acceptor_impl_ = impl;
     139                 : 
     140            8530 :         if (token.stop_possible())
     141               9 :             stop_cb.emplace(token, canceller{this});
     142            8530 :     }
     143                 : };
     144                 : 
     145                 : /** Shared connect operation.
     146                 : 
     147                 :     Checks SO_ERROR for connect completion status. The operator()()
     148                 :     and cancel() are provided by the concrete backend type.
     149                 : 
     150                 :     @tparam Base The backend's base op type.
     151                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     152                 : */
     153                 : template<class Base, class Endpoint = endpoint>
     154                 : struct reactor_connect_op : Base
     155                 : {
     156                 :     /// Endpoint to connect to.
     157                 :     Endpoint target_endpoint;
     158                 : 
     159                 :     /// Reset operation state for reuse.
     160            8532 :     void reset() noexcept
     161                 :     {
     162            8532 :         Base::reset();
     163            8532 :         target_endpoint = Endpoint{};
     164            8532 :     }
     165                 : 
     166            8517 :     void perform_io() noexcept override
     167                 :     {
     168            8517 :         int err       = 0;
     169            8517 :         socklen_t len = sizeof(err);
     170            8517 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     171 MIS           0 :             err = errno;
     172 HIT        8517 :         this->complete(err, 0);
     173            8517 :     }
     174                 : };
     175                 : 
     176                 : /** Shared scatter-read operation.
     177                 : 
     178                 :     Uses readv() with an EINTR retry loop.
     179                 : 
     180                 :     @tparam Base The backend's base op type.
     181                 : */
     182                 : template<class Base>
     183                 : struct reactor_read_op : Base
     184                 : {
     185                 :     /// Maximum scatter-gather buffer count.
     186                 :     static constexpr std::size_t max_buffers = 16;
     187                 : 
     188                 :     /// Scatter-gather I/O vectors.
     189                 :     iovec iovecs[max_buffers];
     190                 : 
     191                 :     /// Number of active I/O vectors.
     192                 :     int iovec_count = 0;
     193                 : 
     194                 :     /// True for zero-length reads (completed immediately).
     195                 :     bool empty_buffer_read = false;
     196                 : 
     197                 :     /// Return true (this is a read-direction operation).
     198           40620 :     bool is_read_operation() const noexcept override
     199                 :     {
     200           40620 :         return !empty_buffer_read;
     201                 :     }
     202                 : 
     203          203205 :     void reset() noexcept
     204                 :     {
     205          203205 :         Base::reset();
     206          203205 :         iovec_count       = 0;
     207          203205 :         empty_buffer_read = false;
     208          203205 :     }
     209                 : 
     210             325 :     void perform_io() noexcept override
     211                 :     {
     212                 :         ssize_t n;
     213                 :         do
     214                 :         {
     215             325 :             n = ::readv(this->fd, iovecs, iovec_count);
     216                 :         }
     217             325 :         while (n < 0 && errno == EINTR);
     218                 : 
     219             325 :         if (n >= 0)
     220              96 :             this->complete(0, static_cast<std::size_t>(n));
     221                 :         else
     222             229 :             this->complete(errno, 0);
     223             325 :     }
     224                 : };
     225                 : 
     226                 : /** Shared gather-write operation.
     227                 : 
     228                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     229                 :     which returns ssize_t (bytes written or -1 with errno set).
     230                 : 
     231                 :     @tparam Base The backend's base op type.
     232                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     233                 : */
     234                 : template<class Base, class WritePolicy>
     235                 : struct reactor_write_op : Base
     236                 : {
     237                 :     /// The write syscall policy type.
     238                 :     using write_policy = WritePolicy;
     239                 : 
     240                 :     /// Maximum scatter-gather buffer count.
     241                 :     static constexpr std::size_t max_buffers = 16;
     242                 : 
     243                 :     /// Scatter-gather I/O vectors.
     244                 :     iovec iovecs[max_buffers];
     245                 : 
     246                 :     /// Number of active I/O vectors.
     247                 :     int iovec_count = 0;
     248                 : 
     249          202910 :     void reset() noexcept
     250                 :     {
     251          202910 :         Base::reset();
     252          202910 :         iovec_count = 0;
     253          202910 :     }
     254                 : 
     255 MIS           0 :     void perform_io() noexcept override
     256                 :     {
     257               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     258               0 :         if (n >= 0)
     259               0 :             this->complete(0, static_cast<std::size_t>(n));
     260                 :         else
     261               0 :             this->complete(errno, 0);
     262               0 :     }
     263                 : };
     264                 : 
     265                 : /** Shared accept operation.
     266                 : 
     267                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     268                 :     which returns the accepted fd or -1 with errno set.
     269                 : 
     270                 :     @tparam Base The backend's base op type.
     271                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     272                 : */
     273                 : template<class Base, class AcceptPolicy>
     274                 : struct reactor_accept_op : Base
     275                 : {
     276                 :     /// File descriptor of the accepted connection.
     277                 :     int accepted_fd = -1;
     278                 : 
     279                 :     /// Pointer to the peer socket implementation.
     280                 :     io_object::implementation* peer_impl = nullptr;
     281                 : 
     282                 :     /// Output pointer for the accepted implementation.
     283                 :     io_object::implementation** impl_out = nullptr;
     284                 : 
     285                 :     /// Peer address storage filled by accept.
     286                 :     sockaddr_storage peer_storage{};
     287                 : 
     288 HIT        8530 :     void reset() noexcept
     289                 :     {
     290            8530 :         Base::reset();
     291            8530 :         accepted_fd  = -1;
     292            8530 :         peer_impl    = nullptr;
     293            8530 :         impl_out     = nullptr;
     294            8530 :         peer_storage = {};
     295            8530 :     }
     296                 : 
     297            8512 :     void perform_io() noexcept override
     298                 :     {
     299            8512 :         int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
     300            8512 :         if (new_fd >= 0)
     301                 :         {
     302            8512 :             accepted_fd = new_fd;
     303            8512 :             this->complete(0, 0);
     304                 :         }
     305                 :         else
     306                 :         {
     307 MIS           0 :             this->complete(errno, 0);
     308                 :         }
     309 HIT        8512 :     }
     310                 : };
     311                 : 
     312                 : /** Shared connected send operation for datagram sockets.
     313                 : 
     314                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     315                 : 
     316                 :     @tparam Base The backend's base op type.
     317                 : */
     318                 : template<class Base>
     319                 : struct reactor_send_op : Base
     320                 : {
     321                 :     /// Maximum scatter-gather buffer count.
     322                 :     static constexpr std::size_t max_buffers = 16;
     323                 : 
     324                 :     /// Scatter-gather I/O vectors.
     325                 :     iovec iovecs[max_buffers];
     326                 : 
     327                 :     /// Number of active I/O vectors.
     328                 :     int iovec_count = 0;
     329                 : 
     330                 :     /// User-supplied message flags.
     331                 :     int msg_flags = 0;
     332                 : 
     333              14 :     void reset() noexcept
     334                 :     {
     335              14 :         Base::reset();
     336              14 :         iovec_count = 0;
     337              14 :         msg_flags   = 0;
     338              14 :     }
     339                 : 
     340 MIS           0 :     void perform_io() noexcept override
     341                 :     {
     342               0 :         msghdr msg{};
     343               0 :         msg.msg_iov    = iovecs;
     344               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     345                 : 
     346                 : #ifdef MSG_NOSIGNAL
     347               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     348                 : #else
     349                 :         int send_flags = msg_flags;
     350                 : #endif
     351                 : 
     352                 :         ssize_t n;
     353                 :         do
     354                 :         {
     355               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     356                 :         }
     357               0 :         while (n < 0 && errno == EINTR);
     358                 : 
     359               0 :         if (n >= 0)
     360               0 :             this->complete(0, static_cast<std::size_t>(n));
     361                 :         else
     362               0 :             this->complete(errno, 0);
     363               0 :     }
     364                 : };
     365                 : 
     366                 : /** Shared connected recv operation for datagram sockets.
     367                 : 
     368                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     369                 :     Unlike reactor_read_op, does not map n==0 to EOF
     370                 :     (zero-length datagrams are valid).
     371                 : 
     372                 :     @tparam Base The backend's base op type.
     373                 : */
     374                 : template<class Base>
     375                 : struct reactor_recv_op : Base
     376                 : {
     377                 :     /// Maximum scatter-gather buffer count.
     378                 :     static constexpr std::size_t max_buffers = 16;
     379                 : 
     380                 :     /// Scatter-gather I/O vectors.
     381                 :     iovec iovecs[max_buffers];
     382                 : 
     383                 :     /// Number of active I/O vectors.
     384                 :     int iovec_count = 0;
     385                 : 
     386                 :     /// User-supplied message flags.
     387                 :     int msg_flags = 0;
     388                 : 
     389                 :     /// Return true (this is a read-direction operation).
     390 HIT           4 :     bool is_read_operation() const noexcept override
     391                 :     {
     392               4 :         return true;
     393                 :     }
     394                 : 
     395              14 :     void reset() noexcept
     396                 :     {
     397              14 :         Base::reset();
     398              14 :         iovec_count = 0;
     399              14 :         msg_flags   = 0;
     400              14 :     }
     401                 : 
     402 MIS           0 :     void perform_io() noexcept override
     403                 :     {
     404               0 :         msghdr msg{};
     405               0 :         msg.msg_iov    = iovecs;
     406               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     407                 : 
     408                 :         ssize_t n;
     409                 :         do
     410                 :         {
     411               0 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     412                 :         }
     413               0 :         while (n < 0 && errno == EINTR);
     414                 : 
     415               0 :         if (n >= 0)
     416               0 :             this->complete(0, static_cast<std::size_t>(n));
     417                 :         else
     418               0 :             this->complete(errno, 0);
     419               0 :     }
     420                 : };
     421                 : 
     422                 : /** Shared send_to operation for datagram sockets.
     423                 : 
     424                 :     Uses sendmsg() with the destination endpoint in msg_name.
     425                 : 
     426                 :     @tparam Base The backend's base op type.
     427                 : */
     428                 : template<class Base>
     429                 : struct reactor_send_to_op : Base
     430                 : {
     431                 :     /// Maximum scatter-gather buffer count.
     432                 :     static constexpr std::size_t max_buffers = 16;
     433                 : 
     434                 :     /// Scatter-gather I/O vectors.
     435                 :     iovec iovecs[max_buffers];
     436                 : 
     437                 :     /// Number of active I/O vectors.
     438                 :     int iovec_count = 0;
     439                 : 
     440                 :     /// Destination address storage.
     441                 :     sockaddr_storage dest_storage{};
     442                 : 
     443                 :     /// Destination address length.
     444                 :     socklen_t dest_len = 0;
     445                 : 
     446                 :     /// User-supplied message flags.
     447                 :     int msg_flags = 0;
     448                 : 
     449 HIT          28 :     void reset() noexcept
     450                 :     {
     451              28 :         Base::reset();
     452              28 :         iovec_count  = 0;
     453              28 :         dest_storage = {};
     454              28 :         dest_len     = 0;
     455              28 :         msg_flags    = 0;
     456              28 :     }
     457                 : 
     458 MIS           0 :     void perform_io() noexcept override
     459                 :     {
     460               0 :         msghdr msg{};
     461               0 :         msg.msg_name    = &dest_storage;
     462               0 :         msg.msg_namelen = dest_len;
     463               0 :         msg.msg_iov     = iovecs;
     464               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     465                 : 
     466                 : #ifdef MSG_NOSIGNAL
     467               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     468                 : #else
     469                 :         int send_flags = msg_flags;
     470                 : #endif
     471                 : 
     472                 :         ssize_t n;
     473                 :         do
     474                 :         {
     475               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     476                 :         }
     477               0 :         while (n < 0 && errno == EINTR);
     478                 : 
     479               0 :         if (n >= 0)
     480               0 :             this->complete(0, static_cast<std::size_t>(n));
     481                 :         else
     482               0 :             this->complete(errno, 0);
     483               0 :     }
     484                 : };
     485                 : 
     486                 : /** Shared recv_from operation for datagram sockets.
     487                 : 
     488                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     489                 : 
     490                 :     @tparam Base The backend's base op type.
     491                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     492                 : */
     493                 : template<class Base, class Endpoint = endpoint>
     494                 : struct reactor_recv_from_op : Base
     495                 : {
     496                 :     /// Maximum scatter-gather buffer count.
     497                 :     static constexpr std::size_t max_buffers = 16;
     498                 : 
     499                 :     /// Scatter-gather I/O vectors.
     500                 :     iovec iovecs[max_buffers];
     501                 : 
     502                 :     /// Number of active I/O vectors.
     503                 :     int iovec_count = 0;
     504                 : 
     505                 :     /// Source address storage filled by recvmsg.
     506                 :     sockaddr_storage source_storage{};
     507                 : 
     508                 :     /// Actual source address length returned by recvmsg.
     509                 :     socklen_t source_addrlen = 0;
     510                 : 
     511                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     512                 :     Endpoint* source_out = nullptr;
     513                 : 
     514                 :     /// User-supplied message flags.
     515                 :     int msg_flags = 0;
     516                 : 
     517                 :     /// Return true (this is a read-direction operation).
     518               0 :     bool is_read_operation() const noexcept override
     519                 :     {
     520               0 :         return true;
     521                 :     }
     522                 : 
     523 HIT          40 :     void reset() noexcept
     524                 :     {
     525              40 :         Base::reset();
     526              40 :         iovec_count    = 0;
     527              40 :         source_storage = {};
     528              40 :         source_addrlen = 0;
     529              40 :         source_out     = nullptr;
     530              40 :         msg_flags      = 0;
     531              40 :     }
     532                 : 
     533               2 :     void perform_io() noexcept override
     534                 :     {
     535               2 :         msghdr msg{};
     536               2 :         msg.msg_name    = &source_storage;
     537               2 :         msg.msg_namelen = sizeof(source_storage);
     538               2 :         msg.msg_iov     = iovecs;
     539               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     540                 : 
     541                 :         ssize_t n;
     542                 :         do
     543                 :         {
     544               2 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     545                 :         }
     546               2 :         while (n < 0 && errno == EINTR);
     547                 : 
     548               2 :         if (n >= 0)
     549                 :         {
     550               2 :             source_addrlen = msg.msg_namelen;
     551               2 :             this->complete(0, static_cast<std::size_t>(n));
     552                 :         }
     553                 :         else
     554 MIS           0 :             this->complete(errno, 0);
     555 HIT           2 :     }
     556                 : };
     557                 : 
     558                 : } // namespace boost::corosio::detail
     559                 : 
     560                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3