LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op_complete.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.6 % 117 106 11
Test Date: 2026-04-10 22:36:12 Functions: 93.8 % 32 30 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      14                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      15                 : #include <boost/corosio/native/detail/make_err.hpp>
      16                 : #include <boost/corosio/io/io_object.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : #include <mutex>
      20                 : #include <utility>
      21                 : 
      22                 : #include <netinet/in.h>
      23                 : #include <sys/socket.h>
      24                 : #include <unistd.h>
      25                 : 
      26                 : namespace boost::corosio::detail {
      27                 : 
      28                 : /** Complete a base read/write operation.
      29                 : 
      30                 :     Translates the recorded errno and cancellation state into
      31                 :     an error_code, stores the byte count, then resumes the
      32                 :     caller via symmetric transfer.
      33                 : 
      34                 :     @tparam Op The concrete operation type.
      35                 :     @param op The operation to complete.
      36                 : */
      37                 : template<typename Op>
      38                 : void
      39 HIT       79153 : complete_io_op(Op& op)
      40                 : {
      41           79153 :     op.stop_cb.reset();
      42           79153 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      43                 : 
      44           79153 :     if (op.cancelled.load(std::memory_order_acquire))
      45             305 :         *op.ec_out = capy::error::canceled;
      46           78848 :     else if (op.errn != 0)
      47 MIS           0 :         *op.ec_out = make_err(op.errn);
      48 HIT       78848 :     else if (op.is_read_operation() && op.bytes_transferred == 0)
      49 MIS           0 :         *op.ec_out = capy::error::eof;
      50                 :     else
      51 HIT       78848 :         *op.ec_out = {};
      52                 : 
      53           79153 :     *op.bytes_out = op.bytes_transferred;
      54                 : 
      55           79153 :     op.cont_op.cont.h = op.h;
      56           79153 :     capy::executor_ref saved_ex(op.ex);
      57           79153 :     auto prevent = std::move(op.impl_ptr);
      58           79153 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
      59           79153 : }
      60                 : 
      61                 : /** Complete a connect operation with endpoint caching.
      62                 : 
      63                 :     On success, queries the local endpoint via getsockname and
      64                 :     caches both endpoints in the socket impl. Then resumes the
      65                 :     caller via symmetric transfer.
      66                 : 
      67                 :     @tparam Op The concrete connect operation type.
      68                 :     @param op The operation to complete.
      69                 : */
      70                 : template<typename Op>
      71                 : void
      72            8001 : complete_connect_op(Op& op)
      73                 : {
      74            8001 :     op.stop_cb.reset();
      75            8001 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      76                 : 
      77            8001 :     bool success =
      78            8001 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
      79                 : 
      80            8001 :     if (success && op.socket_impl_)
      81                 :     {
      82                 :         using ep_type = decltype(op.target_endpoint);
      83            7997 :         ep_type local_ep;
      84            7997 :         sockaddr_storage local_storage{};
      85            7997 :         socklen_t local_len = sizeof(local_storage);
      86            7997 :         if (::getsockname(
      87                 :                 op.fd, reinterpret_cast<sockaddr*>(&local_storage),
      88            7997 :                 &local_len) == 0)
      89            7993 :             local_ep =
      90            7997 :                 from_sockaddr_as(local_storage, local_len, ep_type{});
      91            7997 :         op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
      92                 :     }
      93                 : 
      94            8001 :     if (op.cancelled.load(std::memory_order_acquire))
      95 MIS           0 :         *op.ec_out = capy::error::canceled;
      96 HIT        8001 :     else if (op.errn != 0)
      97               4 :         *op.ec_out = make_err(op.errn);
      98                 :     else
      99            7997 :         *op.ec_out = {};
     100                 : 
     101            8001 :     op.cont_op.cont.h = op.h;
     102            8001 :     capy::executor_ref saved_ex(op.ex);
     103            8001 :     auto prevent = std::move(op.impl_ptr);
     104            8001 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     105            8001 : }
     106                 : 
     107                 : /** Construct and register a peer socket from an accepted fd.
     108                 : 
     109                 :     Creates a new socket impl via the acceptor's associated
     110                 :     socket service, registers it with the scheduler, and caches
     111                 :     the local and remote endpoints.
     112                 : 
     113                 :     @tparam SocketImpl The concrete socket implementation type.
     114                 :     @tparam AcceptorImpl The concrete acceptor implementation type.
     115                 :     @param acceptor_impl The acceptor that accepted the connection.
     116                 :     @param accepted_fd The accepted file descriptor (set to -1 on success).
     117                 :     @param peer_storage The peer address from accept().
     118                 :     @param peer_addrlen The actual peer address length from accept().
     119                 :     @param impl_out Output pointer for the new socket impl.
     120                 :     @param ec_out Output pointer for any error.
     121                 :     @return True on success, false on failure.
     122                 : */
     123                 : template<typename SocketImpl, typename AcceptorImpl>
     124                 : bool
     125            7983 : setup_accepted_socket(
     126                 :     AcceptorImpl* acceptor_impl,
     127                 :     int& accepted_fd,
     128                 :     sockaddr_storage const& peer_storage,
     129                 :     socklen_t peer_addrlen,
     130                 :     io_object::implementation** impl_out,
     131                 :     std::error_code* ec_out)
     132                 : {
     133            7983 :     auto* socket_svc = acceptor_impl->service().stream_service();
     134            7983 :     if (!socket_svc)
     135                 :     {
     136 MIS           0 :         *ec_out = make_err(ENOENT);
     137               0 :         return false;
     138                 :     }
     139                 : 
     140 HIT        7983 :     auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
     141            7983 :     impl.set_socket(accepted_fd);
     142                 : 
     143            7983 :     impl.desc_state_.fd = accepted_fd;
     144                 :     {
     145            7983 :         std::lock_guard lock(impl.desc_state_.mutex);
     146            7983 :         impl.desc_state_.read_op    = nullptr;
     147            7983 :         impl.desc_state_.write_op   = nullptr;
     148            7983 :         impl.desc_state_.connect_op = nullptr;
     149            7983 :     }
     150            7983 :     socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
     151                 : 
     152                 :     using ep_type = decltype(acceptor_impl->local_endpoint());
     153            7983 :     impl.set_endpoints(
     154                 :         acceptor_impl->local_endpoint(),
     155            7983 :         from_sockaddr_as(peer_storage, peer_addrlen, ep_type{}));
     156                 : 
     157            7983 :     if (impl_out)
     158            7983 :         *impl_out = &impl;
     159            7983 :     accepted_fd = -1;
     160            7983 :     return true;
     161                 : }
     162                 : 
     163                 : /** Complete an accept operation.
     164                 : 
     165                 :     Sets up the peer socket on success, or closes the accepted
     166                 :     fd on failure. Then resumes the caller via symmetric transfer.
     167                 : 
     168                 :     @tparam SocketImpl The concrete socket implementation type.
     169                 :     @tparam Op The concrete accept operation type.
     170                 :     @param op The operation to complete.
     171                 : */
     172                 : template<typename SocketImpl, typename Op>
     173                 : void
     174            7995 : complete_accept_op(Op& op)
     175                 : {
     176            7995 :     op.stop_cb.reset();
     177            7995 :     op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     178                 : 
     179            7995 :     bool success =
     180            7995 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     181                 : 
     182            7995 :     if (op.cancelled.load(std::memory_order_acquire))
     183              12 :         *op.ec_out = capy::error::canceled;
     184            7983 :     else if (op.errn != 0)
     185 MIS           0 :         *op.ec_out = make_err(op.errn);
     186                 :     else
     187 HIT        7983 :         *op.ec_out = {};
     188                 : 
     189            7995 :     if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
     190                 :     {
     191            7983 :         if (!setup_accepted_socket<SocketImpl>(
     192            7983 :                 op.acceptor_impl_, op.accepted_fd, op.peer_storage,
     193                 :                 op.peer_addrlen, op.impl_out, op.ec_out))
     194 MIS           0 :             success = false;
     195                 :     }
     196                 : 
     197 HIT        7995 :     if (!success || !op.acceptor_impl_)
     198                 :     {
     199              12 :         if (op.accepted_fd >= 0)
     200                 :         {
     201 MIS           0 :             ::close(op.accepted_fd);
     202               0 :             op.accepted_fd = -1;
     203                 :         }
     204 HIT          12 :         if (op.impl_out)
     205              12 :             *op.impl_out = nullptr;
     206                 :     }
     207                 : 
     208            7995 :     op.cont_op.cont.h = op.h;
     209            7995 :     capy::executor_ref saved_ex(op.ex);
     210            7995 :     auto prevent = std::move(op.impl_ptr);
     211            7995 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     212            7995 : }
     213                 : 
     214                 : /** Complete a connected datagram operation (send or recv).
     215                 : 
     216                 :     No source endpoint to capture. Critically, does NOT map
     217                 :     bytes_transferred == 0 to EOF — zero-length datagrams are valid
     218                 :     events on connected datagram sockets.
     219                 : 
     220                 :     @tparam Op The concrete datagram operation type.
     221                 :     @param op The operation to complete.
     222                 : */
     223                 : template<typename Op>
     224                 : void
     225              10 : complete_datagram_op(Op& op)
     226                 : {
     227              10 :     op.stop_cb.reset();
     228              10 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     229                 : 
     230              10 :     if (op.cancelled.load(std::memory_order_acquire))
     231               2 :         *op.ec_out = capy::error::canceled;
     232               8 :     else if (op.errn != 0)
     233 MIS           0 :         *op.ec_out = make_err(op.errn);
     234                 :     else
     235 HIT           8 :         *op.ec_out = {};
     236                 : 
     237              10 :     *op.bytes_out = op.bytes_transferred;
     238                 : 
     239              10 :     op.cont_op.cont.h = op.h;
     240              10 :     capy::executor_ref saved_ex(op.ex);
     241              10 :     auto prevent = std::move(op.impl_ptr);
     242              10 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     243              10 : }
     244                 : 
     245                 : /** Complete a datagram operation with source endpoint capture.
     246                 : 
     247                 :     For recv_from operations, writes the source endpoint from the
     248                 :     recorded sockaddr_storage into the caller's endpoint pointer.
     249                 :     Then resumes the caller via symmetric transfer.
     250                 : 
     251                 :     @tparam Op The concrete datagram operation type.
     252                 :     @param op The operation to complete.
     253                 :     @param source_out Optional pointer to store source endpoint
     254                 :         (non-null for recv_from, null for send_to).
     255                 : */
     256                 : template<typename Op, typename Endpoint>
     257                 : void
     258              18 : complete_datagram_op(Op& op, Endpoint* source_out)
     259                 : {
     260              18 :     op.stop_cb.reset();
     261              18 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     262                 : 
     263              18 :     if (op.cancelled.load(std::memory_order_acquire))
     264               6 :         *op.ec_out = capy::error::canceled;
     265              12 :     else if (op.errn != 0)
     266 MIS           0 :         *op.ec_out = make_err(op.errn);
     267                 :     else
     268 HIT          12 :         *op.ec_out = {};
     269                 : 
     270              18 :     *op.bytes_out = op.bytes_transferred;
     271                 : 
     272              28 :     if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
     273              10 :         op.errn == 0)
     274              20 :         *source_out = from_sockaddr_as(
     275              10 :             op.source_storage,
     276                 :             op.source_addrlen,
     277                 :             Endpoint{});
     278                 : 
     279              18 :     op.cont_op.cont.h = op.h;
     280              18 :     capy::executor_ref saved_ex(op.ex);
     281              18 :     auto prevent = std::move(op.impl_ptr);
     282              18 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     283              18 : }
     284                 : 
     285                 : } // namespace boost::corosio::detail
     286                 : 
     287                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
        

Generated by: LCOV version 2.3