LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_basic_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.2 % 174 157 17
Test Date: 2026-04-09 23:21:11 Functions: 63.3 % 294 186 108

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/intrusive.hpp>
      14                 : #include <boost/corosio/detail/native_handle.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      17                 : #include <boost/corosio/native/detail/make_err.hpp>
      18                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      19                 : 
      20                 : #include <memory>
      21                 : #include <mutex>
      22                 : #include <utility>
      23                 : 
      24                 : #include <errno.h>
      25                 : #include <netinet/in.h>
      26                 : #include <sys/socket.h>
      27                 : #include <unistd.h>
      28                 : 
      29                 : namespace boost::corosio::detail {
      30                 : 
      31                 : /** CRTP base for reactor-backed socket implementations.
      32                 : 
      33                 :     Extracts the shared data members, virtual overrides, and
      34                 :     cancel/close/register logic that is identical across TCP
      35                 :     (reactor_stream_socket) and UDP (reactor_datagram_socket).
      36                 : 
      37                 :     Derived classes provide CRTP callbacks that enumerate their
      38                 :     specific op slots so cancel/close can iterate them generically.
      39                 : 
      40                 :     @tparam Derived   The concrete socket type (CRTP).
      41                 :     @tparam ImplBase  The public vtable base (tcp_socket::implementation
      42                 :                       or udp_socket::implementation).
      43                 :     @tparam Service   The backend's service type.
      44                 :     @tparam DescState The backend's descriptor_state type.
      45                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      46                 : */
      47                 : template<
      48                 :     class Derived,
      49                 :     class ImplBase,
      50                 :     class Service,
      51                 :     class DescState,
      52                 :     class Endpoint = endpoint>
      53                 : class reactor_basic_socket
      54                 :     : public ImplBase
      55                 :     , public std::enable_shared_from_this<Derived>
      56                 :     , public intrusive_list<Derived>::node
      57                 : {
      58                 :     friend Derived;
      59                 : 
      60                 :     template<class, class, class, class, class, class, class, class>
      61                 :     friend class reactor_stream_socket;
      62                 : 
      63                 :     template<class, class, class, class, class, class, class, class, class, class>
      64                 :     friend class reactor_datagram_socket;
      65                 : 
      66 HIT       25797 :     explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
      67                 : 
      68                 : protected:
      69                 :     Service& svc_;
      70                 :     int fd_ = -1;
      71                 :     Endpoint local_endpoint_;
      72                 : 
      73                 : public:
      74                 :     /// Per-descriptor state for persistent reactor registration.
      75                 :     DescState desc_state_;
      76                 : 
      77           25797 :     ~reactor_basic_socket() override = default;
      78                 : 
      79                 :     /// Return the underlying file descriptor.
      80           78062 :     native_handle_type native_handle() const noexcept override
      81                 :     {
      82           78062 :         return fd_;
      83                 :     }
      84                 : 
      85                 :     /// Return the cached local endpoint.
      86              80 :     Endpoint local_endpoint() const noexcept override
      87                 :     {
      88              80 :         return local_endpoint_;
      89                 :     }
      90                 : 
      91                 :     /// Return true if the socket has an open file descriptor.
      92                 :     bool is_open() const noexcept
      93                 :     {
      94                 :         return fd_ >= 0;
      95                 :     }
      96                 : 
      97                 :     /// Set a socket option.
      98              20 :     std::error_code set_option(
      99                 :         int level,
     100                 :         int optname,
     101                 :         void const* data,
     102                 :         std::size_t size) noexcept override
     103                 :     {
     104              20 :         if (::setsockopt(
     105              20 :                 fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
     106 MIS           0 :             return make_err(errno);
     107 HIT          20 :         return {};
     108                 :     }
     109                 : 
     110                 :     /// Get a socket option.
     111                 :     std::error_code
     112              78 :     get_option(int level, int optname, void* data, std::size_t* size)
     113                 :         const noexcept override
     114                 :     {
     115              78 :         socklen_t len = static_cast<socklen_t>(*size);
     116              78 :         if (::getsockopt(fd_, level, optname, data, &len) != 0)
     117 MIS           0 :             return make_err(errno);
     118 HIT          78 :         *size = static_cast<std::size_t>(len);
     119              78 :         return {};
     120                 :     }
     121                 : 
     122                 :     /// Assign the file descriptor.
     123            8518 :     void set_socket(int fd) noexcept
     124                 :     {
     125            8518 :         fd_ = fd;
     126            8518 :     }
     127                 : 
     128                 :     /// Assign the fd, initialize descriptor state, and register with the reactor.
     129            8694 :     void init_and_register(int fd) noexcept
     130                 :     {
     131            8694 :         fd_ = fd;
     132            8694 :         desc_state_.fd = fd;
     133                 :         {
     134            8694 :             std::lock_guard lock(desc_state_.mutex);
     135            8694 :             desc_state_.read_op    = nullptr;
     136            8694 :             desc_state_.write_op   = nullptr;
     137            8694 :             desc_state_.connect_op = nullptr;
     138            8694 :         }
     139            8694 :         svc_.scheduler().register_descriptor(fd, &desc_state_);
     140            8694 :     }
     141                 : 
     142                 :     /// Cache the local endpoint.
     143                 :     void set_local_endpoint(Endpoint ep) noexcept
     144                 :     {
     145                 :         local_endpoint_ = ep;
     146                 :     }
     147                 : 
     148                 :     /** Bind the socket to a local endpoint.
     149                 : 
     150                 :         Calls ::bind() and caches the resulting local endpoint
     151                 :         via getsockname().
     152                 : 
     153                 :         @param ep The endpoint to bind to.
     154                 :         @return Error code on failure, empty on success.
     155                 :     */
     156              76 :     std::error_code do_bind(Endpoint const& ep) noexcept
     157                 :     {
     158              76 :         sockaddr_storage storage{};
     159              76 :         socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
     160              76 :         if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
     161              10 :             return make_err(errno);
     162                 : 
     163              66 :         sockaddr_storage local_storage{};
     164              66 :         socklen_t local_len = sizeof(local_storage);
     165              66 :         if (::getsockname(
     166              66 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     167                 :             0)
     168              52 :             local_endpoint_ =
     169              66 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     170                 : 
     171              66 :         return {};
     172                 :     }
     173                 : 
     174                 :     /** Register an op with the reactor.
     175                 : 
     176                 :         Handles cached edge events and deferred cancellation.
     177                 :         Called on the EAGAIN/EINPROGRESS path when speculative
     178                 :         I/O failed.
     179                 :     */
     180                 :     template<class Op>
     181                 :     void register_op(
     182                 :         Op& op,
     183                 :         reactor_op_base*& desc_slot,
     184                 :         bool& ready_flag,
     185                 :         bool& cancel_flag,
     186                 :         bool is_write_direction = false) noexcept;
     187                 : 
     188                 :     /** Cancel a single pending operation.
     189                 : 
     190                 :         Claims the operation from its descriptor_state slot under
     191                 :         the mutex and posts it to the scheduler as cancelled.
     192                 :         Derived must implement:
     193                 :           op_to_desc_slot(Op&) -> reactor_op_base**
     194                 :           op_to_cancel_flag(Op&) -> bool*
     195                 :     */
     196                 :     template<class Op>
     197                 :     void cancel_single_op(Op& op) noexcept;
     198                 : 
     199                 :     /** Cancel all pending operations.
     200                 : 
     201                 :         Invoked by the derived class's cancel() override.
     202                 :         Derived must implement:
     203                 :           for_each_op(auto fn)
     204                 :           for_each_desc_entry(auto fn)
     205                 :     */
     206                 :     void do_cancel() noexcept;
     207                 : 
     208                 :     /** Close the socket and cancel pending operations.
     209                 : 
     210                 :         Invoked by the derived class's close_socket(). The
     211                 :         derived class may add backend-specific cleanup after
     212                 :         calling this method.
     213                 :         Derived must implement:
     214                 :           for_each_op(auto fn)
     215                 :           for_each_desc_entry(auto fn)
     216                 :     */
     217                 :     void do_close_socket() noexcept;
     218                 : 
     219                 :     /** Release the socket without closing the fd.
     220                 : 
     221                 :         Like do_close_socket() but does not call ::close().
     222                 :         Returns the fd so the caller can take ownership.
     223                 :     */
     224                 :     native_handle_type do_release_socket() noexcept;
     225                 : };
     226                 : 
     227                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     228                 : template<class Op>
     229                 : void
     230            8915 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
     231                 :     Op& op,
     232                 :     reactor_op_base*& desc_slot,
     233                 :     bool& ready_flag,
     234                 :     bool& cancel_flag,
     235                 :     bool is_write_direction) noexcept
     236                 : {
     237            8915 :     svc_.work_started();
     238                 : 
     239            8915 :     std::lock_guard lock(desc_state_.mutex);
     240            8915 :     bool io_done = false;
     241            8915 :     if (ready_flag)
     242                 :     {
     243             185 :         ready_flag = false;
     244             185 :         op.perform_io();
     245             185 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     246             185 :         if (!io_done)
     247             185 :             op.errn = 0;
     248                 :     }
     249                 : 
     250            8915 :     if (cancel_flag)
     251                 :     {
     252 MIS           0 :         cancel_flag = false;
     253               0 :         op.cancelled.store(true, std::memory_order_relaxed);
     254                 :     }
     255                 : 
     256 HIT        8915 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     257                 :     {
     258 MIS           0 :         svc_.post(&op);
     259               0 :         svc_.work_finished();
     260                 :     }
     261                 :     else
     262                 :     {
     263 HIT        8915 :         desc_slot = &op;
     264                 : 
     265                 :         // Select must rebuild its fd_sets when a write-direction op
     266                 :         // is parked, so select() watches for writability. Compiled
     267                 :         // away to nothing for epoll and kqueue.
     268                 :         if constexpr (Service::needs_write_notification)
     269                 :         {
     270            3809 :             if (is_write_direction)
     271            3617 :                 svc_.scheduler().notify_reactor();
     272                 :         }
     273                 :     }
     274            8915 : }
     275                 : 
     276                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     277                 : template<class Op>
     278                 : void
     279             194 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
     280                 :     Op& op) noexcept
     281                 : {
     282             194 :     auto self = this->weak_from_this().lock();
     283             194 :     if (!self)
     284 MIS           0 :         return;
     285                 : 
     286 HIT         194 :     op.request_cancel();
     287                 : 
     288             194 :     auto* d                       = static_cast<Derived*>(this);
     289             194 :     reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
     290                 : 
     291             194 :     if (desc_op_ptr)
     292                 :     {
     293             194 :         reactor_op_base* claimed = nullptr;
     294                 :         {
     295             194 :             std::lock_guard lock(desc_state_.mutex);
     296             194 :             if (*desc_op_ptr == &op)
     297             194 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     298                 :             else
     299                 :             {
     300 MIS           0 :                 bool* cflag = d->op_to_cancel_flag(op);
     301               0 :                 if (cflag)
     302               0 :                     *cflag = true;
     303                 :             }
     304 HIT         194 :         }
     305             194 :         if (claimed)
     306                 :         {
     307             194 :             op.impl_ptr = self;
     308             194 :             svc_.post(&op);
     309             194 :             svc_.work_finished();
     310                 :         }
     311                 :     }
     312             194 : }
     313                 : 
     314                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     315                 : void
     316             188 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     317                 :     do_cancel() noexcept
     318                 : {
     319             188 :     auto self = this->weak_from_this().lock();
     320             188 :     if (!self)
     321 MIS           0 :         return;
     322                 : 
     323 HIT         188 :     auto* d = static_cast<Derived*>(this);
     324                 : 
     325             760 :     d->for_each_op([](auto& op) { op.request_cancel(); });
     326                 : 
     327                 :     // Claim ops under a single lock acquisition
     328                 :     struct claimed_entry
     329                 :     {
     330                 :         reactor_op_base* op   = nullptr;
     331                 :         reactor_op_base* base = nullptr;
     332                 :     };
     333                 :     // Max 3 ops (conn, rd, wr)
     334             188 :     claimed_entry claimed[3];
     335             188 :     int count = 0;
     336                 : 
     337                 :     {
     338             188 :         std::lock_guard lock(desc_state_.mutex);
     339            1332 :         d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
     340             572 :             if (desc_slot == &op)
     341                 :             {
     342             101 :                 claimed[count].op   = std::exchange(desc_slot, nullptr);
     343             101 :                 claimed[count].base = &op;
     344             101 :                 ++count;
     345                 :             }
     346                 :         });
     347             188 :     }
     348                 : 
     349             289 :     for (int i = 0; i < count; ++i)
     350                 :     {
     351             101 :         claimed[i].base->impl_ptr = self;
     352             101 :         svc_.post(claimed[i].base);
     353             101 :         svc_.work_finished();
     354                 :     }
     355             188 : }
     356                 : 
     357                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     358                 : void
     359           77498 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     360                 :     do_close_socket() noexcept
     361                 : {
     362           77498 :     auto self = this->weak_from_this().lock();
     363           77498 :     if (self)
     364                 :     {
     365           77498 :         auto* d = static_cast<Derived*>(this);
     366                 : 
     367          310920 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     368                 : 
     369                 :         struct claimed_entry
     370                 :         {
     371                 :             reactor_op_base* base = nullptr;
     372                 :         };
     373           77498 :         claimed_entry claimed[3];
     374           77498 :         int count = 0;
     375                 : 
     376                 :         {
     377           77498 :             std::lock_guard lock(desc_state_.mutex);
     378           77498 :             d->for_each_desc_entry(
     379          466844 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     380          233422 :                     auto* c = std::exchange(desc_slot, nullptr);
     381          233422 :                     if (c)
     382                 :                     {
     383               4 :                         claimed[count].base = c;
     384               4 :                         ++count;
     385                 :                     }
     386                 :                 });
     387           77498 :             desc_state_.read_ready             = false;
     388           77498 :             desc_state_.write_ready            = false;
     389           77498 :             desc_state_.read_cancel_pending    = false;
     390           77498 :             desc_state_.write_cancel_pending   = false;
     391           77498 :             desc_state_.connect_cancel_pending = false;
     392                 : 
     393           77498 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     394             271 :                 desc_state_.impl_ref_ = self;
     395           77498 :         }
     396                 : 
     397           77502 :         for (int i = 0; i < count; ++i)
     398                 :         {
     399               4 :             claimed[i].base->impl_ptr = self;
     400               4 :             svc_.post(claimed[i].base);
     401               4 :             svc_.work_finished();
     402                 :         }
     403                 :     }
     404                 : 
     405           77498 :     if (fd_ >= 0)
     406                 :     {
     407           17210 :         if (desc_state_.registered_events != 0)
     408           17210 :             svc_.scheduler().deregister_descriptor(fd_);
     409           17210 :         ::close(fd_);
     410           17210 :         fd_ = -1;
     411                 :     }
     412                 : 
     413           77498 :     desc_state_.fd                = -1;
     414           77498 :     desc_state_.registered_events = 0;
     415                 : 
     416           77498 :     local_endpoint_ = Endpoint{};
     417           77498 : }
     418                 : 
     419                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     420                 : native_handle_type
     421               2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     422                 :     do_release_socket() noexcept
     423                 : {
     424                 :     // Cancel pending ops (same as do_close_socket)
     425               2 :     auto self = this->weak_from_this().lock();
     426               2 :     if (self)
     427                 :     {
     428               2 :         auto* d = static_cast<Derived*>(this);
     429                 : 
     430               8 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     431                 : 
     432                 :         struct claimed_entry
     433                 :         {
     434                 :             reactor_op_base* base = nullptr;
     435                 :         };
     436               2 :         claimed_entry claimed[3];
     437               2 :         int count = 0;
     438                 : 
     439                 :         {
     440               2 :             std::lock_guard lock(desc_state_.mutex);
     441               2 :             d->for_each_desc_entry(
     442              12 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     443               6 :                     auto* c = std::exchange(desc_slot, nullptr);
     444               6 :                     if (c)
     445                 :                     {
     446 MIS           0 :                         claimed[count].base = c;
     447               0 :                         ++count;
     448                 :                     }
     449                 :                 });
     450 HIT           2 :             desc_state_.read_ready             = false;
     451               2 :             desc_state_.write_ready            = false;
     452               2 :             desc_state_.read_cancel_pending    = false;
     453               2 :             desc_state_.write_cancel_pending   = false;
     454               2 :             desc_state_.connect_cancel_pending = false;
     455                 : 
     456               2 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     457 MIS           0 :                 desc_state_.impl_ref_ = self;
     458 HIT           2 :         }
     459                 : 
     460               2 :         for (int i = 0; i < count; ++i)
     461                 :         {
     462 MIS           0 :             claimed[i].base->impl_ptr = self;
     463               0 :             svc_.post(claimed[i].base);
     464               0 :             svc_.work_finished();
     465                 :         }
     466                 :     }
     467                 : 
     468 HIT           2 :     native_handle_type released = fd_;
     469                 : 
     470               2 :     if (fd_ >= 0)
     471                 :     {
     472               2 :         if (desc_state_.registered_events != 0)
     473               2 :             svc_.scheduler().deregister_descriptor(fd_);
     474                 :         // Do NOT close -- caller takes ownership
     475               2 :         fd_ = -1;
     476                 :     }
     477                 : 
     478               2 :     desc_state_.fd                = -1;
     479               2 :     desc_state_.registered_events = 0;
     480                 : 
     481               2 :     local_endpoint_ = Endpoint{};
     482                 : 
     483               4 :     return released;
     484               2 : }
     485                 : 
     486                 : } // namespace boost::corosio::detail
     487                 : 
     488                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
        

Generated by: LCOV version 2.3