LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op_complete.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.3 % 103 93 10
Test Date: 2026-04-09 23:21:11 Functions: 92.9 % 28 26 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      14                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      15                 : #include <boost/corosio/native/detail/make_err.hpp>
      16                 : #include <boost/corosio/io/io_object.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : #include <mutex>
      20                 : #include <utility>
      21                 : 
      22                 : #include <netinet/in.h>
      23                 : #include <sys/socket.h>
      24                 : #include <unistd.h>
      25                 : 
      26                 : namespace boost::corosio::detail {
      27                 : 
      28                 : /** Complete a base read/write operation.
      29                 : 
      30                 :     Translates the recorded errno and cancellation state into
      31                 :     an error_code, stores the byte count, then resumes the
      32                 :     caller via symmetric transfer.
      33                 : 
      34                 :     @tparam Op The concrete operation type.
      35                 :     @param op The operation to complete.
      36                 : */
      37                 : template<typename Op>
      38                 : void
      39 HIT       81523 : complete_io_op(Op& op)
      40                 : {
      41           81523 :     op.stop_cb.reset();
      42           81523 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      43                 : 
      44           81523 :     if (op.cancelled.load(std::memory_order_acquire))
      45             309 :         *op.ec_out = capy::error::canceled;
      46           81214 :     else if (op.errn != 0)
      47 MIS           0 :         *op.ec_out = make_err(op.errn);
      48 HIT       81214 :     else if (op.is_read_operation() && op.bytes_transferred == 0)
      49 MIS           0 :         *op.ec_out = capy::error::eof;
      50                 :     else
      51 HIT       81214 :         *op.ec_out = {};
      52                 : 
      53           81523 :     *op.bytes_out = op.bytes_transferred;
      54                 : 
      55           81523 :     op.cont_op.cont.h = op.h;
      56           81523 :     capy::executor_ref saved_ex(op.ex);
      57           81523 :     auto prevent = std::move(op.impl_ptr);
      58           81523 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
      59           81523 : }
      60                 : 
      61                 : /** Complete a connect operation with endpoint caching.
      62                 : 
      63                 :     On success, queries the local endpoint via getsockname and
      64                 :     caches both endpoints in the socket impl. Then resumes the
      65                 :     caller via symmetric transfer.
      66                 : 
      67                 :     @tparam Op The concrete connect operation type.
      68                 :     @param op The operation to complete.
      69                 : */
      70                 : template<typename Op>
      71                 : void
      72            8532 : complete_connect_op(Op& op)
      73                 : {
      74            8532 :     op.stop_cb.reset();
      75            8532 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      76                 : 
      77            8532 :     bool success =
      78            8532 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
      79                 : 
      80            8532 :     if (success && op.socket_impl_)
      81                 :     {
      82                 :         using ep_type = decltype(op.target_endpoint);
      83            8528 :         ep_type local_ep;
      84            8528 :         sockaddr_storage local_storage{};
      85            8528 :         socklen_t local_len = sizeof(local_storage);
      86            8528 :         if (::getsockname(
      87                 :                 op.fd, reinterpret_cast<sockaddr*>(&local_storage),
      88            8528 :                 &local_len) == 0)
      89            8524 :             local_ep =
      90            8528 :                 from_sockaddr_as(local_storage, local_len, ep_type{});
      91            8528 :         op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
      92                 :     }
      93                 : 
      94            8532 :     if (op.cancelled.load(std::memory_order_acquire))
      95 MIS           0 :         *op.ec_out = capy::error::canceled;
      96 HIT        8532 :     else if (op.errn != 0)
      97               4 :         *op.ec_out = make_err(op.errn);
      98                 :     else
      99            8528 :         *op.ec_out = {};
     100                 : 
     101            8532 :     op.cont_op.cont.h = op.h;
     102            8532 :     capy::executor_ref saved_ex(op.ex);
     103            8532 :     auto prevent = std::move(op.impl_ptr);
     104            8532 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     105            8532 : }
     106                 : 
     107                 : /** Construct and register a peer socket from an accepted fd.
     108                 : 
     109                 :     Creates a new socket impl via the acceptor's associated
     110                 :     socket service, registers it with the scheduler, and caches
     111                 :     the local and remote endpoints.
     112                 : 
     113                 :     @tparam SocketImpl The concrete socket implementation type.
     114                 :     @tparam AcceptorImpl The concrete acceptor implementation type.
     115                 :     @param acceptor_impl The acceptor that accepted the connection.
     116                 :     @param accepted_fd The accepted file descriptor (set to -1 on success).
     117                 :     @param peer_storage The peer address from accept().
     118                 :     @param impl_out Output pointer for the new socket impl.
     119                 :     @param ec_out Output pointer for any error.
     120                 :     @return True on success, false on failure.
     121                 : */
     122                 : template<typename SocketImpl, typename AcceptorImpl>
     123                 : bool
     124            8518 : setup_accepted_socket(
     125                 :     AcceptorImpl* acceptor_impl,
     126                 :     int& accepted_fd,
     127                 :     sockaddr_storage const& peer_storage,
     128                 :     io_object::implementation** impl_out,
     129                 :     std::error_code* ec_out)
     130                 : {
     131            8518 :     auto* socket_svc = acceptor_impl->service().stream_service();
     132            8518 :     if (!socket_svc)
     133                 :     {
     134 MIS           0 :         *ec_out = make_err(ENOENT);
     135               0 :         return false;
     136                 :     }
     137                 : 
     138 HIT        8518 :     auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
     139            8518 :     impl.set_socket(accepted_fd);
     140                 : 
     141            8518 :     impl.desc_state_.fd = accepted_fd;
     142                 :     {
     143            8518 :         std::lock_guard lock(impl.desc_state_.mutex);
     144            8518 :         impl.desc_state_.read_op    = nullptr;
     145            8518 :         impl.desc_state_.write_op   = nullptr;
     146            8518 :         impl.desc_state_.connect_op = nullptr;
     147            8518 :     }
     148            8518 :     socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
     149                 : 
     150                 :     using ep_type = decltype(acceptor_impl->local_endpoint());
     151            8518 :     impl.set_endpoints(
     152                 :         acceptor_impl->local_endpoint(),
     153            8518 :         from_sockaddr_as(
     154                 :             peer_storage,
     155                 :             static_cast<socklen_t>(sizeof(peer_storage)),
     156                 :             ep_type{}));
     157                 : 
     158            8518 :     if (impl_out)
     159            8518 :         *impl_out = &impl;
     160            8518 :     accepted_fd = -1;
     161            8518 :     return true;
     162                 : }
     163                 : 
     164                 : /** Complete an accept operation.
     165                 : 
     166                 :     Sets up the peer socket on success, or closes the accepted
     167                 :     fd on failure. Then resumes the caller via symmetric transfer.
     168                 : 
     169                 :     @tparam SocketImpl The concrete socket implementation type.
     170                 :     @tparam Op The concrete accept operation type.
     171                 :     @param op The operation to complete.
     172                 : */
     173                 : template<typename SocketImpl, typename Op>
     174                 : void
     175            8530 : complete_accept_op(Op& op)
     176                 : {
     177            8530 :     op.stop_cb.reset();
     178            8530 :     op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     179                 : 
     180            8530 :     bool success =
     181            8530 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     182                 : 
     183            8530 :     if (op.cancelled.load(std::memory_order_acquire))
     184              12 :         *op.ec_out = capy::error::canceled;
     185            8518 :     else if (op.errn != 0)
     186 MIS           0 :         *op.ec_out = make_err(op.errn);
     187                 :     else
     188 HIT        8518 :         *op.ec_out = {};
     189                 : 
     190            8530 :     if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
     191                 :     {
     192            8518 :         if (!setup_accepted_socket<SocketImpl>(
     193            8518 :                 op.acceptor_impl_, op.accepted_fd, op.peer_storage, op.impl_out,
     194                 :                 op.ec_out))
     195 MIS           0 :             success = false;
     196                 :     }
     197                 : 
     198 HIT        8530 :     if (!success || !op.acceptor_impl_)
     199                 :     {
     200              12 :         if (op.accepted_fd >= 0)
     201                 :         {
     202 MIS           0 :             ::close(op.accepted_fd);
     203               0 :             op.accepted_fd = -1;
     204                 :         }
     205 HIT          12 :         if (op.impl_out)
     206              12 :             *op.impl_out = nullptr;
     207                 :     }
     208                 : 
     209            8530 :     op.cont_op.cont.h = op.h;
     210            8530 :     capy::executor_ref saved_ex(op.ex);
     211            8530 :     auto prevent = std::move(op.impl_ptr);
     212            8530 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     213            8530 : }
     214                 : 
     215                 : /** Complete a datagram operation (send_to or recv_from).
     216                 : 
     217                 :     For recv_from operations, writes the source endpoint from the
     218                 :     recorded sockaddr_storage into the caller's endpoint pointer.
     219                 :     Then resumes the caller via symmetric transfer.
     220                 : 
     221                 :     @tparam Op The concrete datagram operation type.
     222                 :     @param op The operation to complete.
     223                 :     @param source_out Optional pointer to store source endpoint
     224                 :         (non-null for recv_from, null for send_to).
     225                 : */
     226                 : template<typename Op, typename Endpoint>
     227                 : void
     228              18 : complete_datagram_op(Op& op, Endpoint* source_out)
     229                 : {
     230              18 :     op.stop_cb.reset();
     231              18 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     232                 : 
     233              18 :     if (op.cancelled.load(std::memory_order_acquire))
     234               6 :         *op.ec_out = capy::error::canceled;
     235              12 :     else if (op.errn != 0)
     236 MIS           0 :         *op.ec_out = make_err(op.errn);
     237                 :     else
     238 HIT          12 :         *op.ec_out = {};
     239                 : 
     240              18 :     *op.bytes_out = op.bytes_transferred;
     241                 : 
     242              28 :     if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
     243              10 :         op.errn == 0)
     244              20 :         *source_out = from_sockaddr_as(
     245              10 :             op.source_storage,
     246                 :             op.source_addrlen,
     247                 :             Endpoint{});
     248                 : 
     249              18 :     op.cont_op.cont.h = op.h;
     250              18 :     capy::executor_ref saved_ex(op.ex);
     251              18 :     auto prevent = std::move(op.impl_ptr);
     252              18 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     253              18 : }
     254                 : 
     255                 : } // namespace boost::corosio::detail
     256                 : 
     257                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
        

Generated by: LCOV version 2.3