LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 87.9 % 165 145 20
Test Date: 2026-04-09 23:21:11 Functions: 100.0 % 11 11

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : 
      20                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      21                 : 
      22                 : #include <boost/corosio/native/detail/select/select_op.hpp>
      23                 : #include <boost/corosio/detail/timer_service.hpp>
      24                 : #include <boost/corosio/native/detail/make_err.hpp>
      25                 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
      26                 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
      27                 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
      28                 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
      29                 : 
      30                 : #include <boost/corosio/detail/except.hpp>
      31                 : 
      32                 : #include <sys/select.h>
      33                 : #include <unistd.h>
      34                 : #include <errno.h>
      35                 : #include <fcntl.h>
      36                 : 
      37                 : #include <atomic>
      38                 : #include <chrono>
      39                 : #include <cstdint>
      40                 : #include <limits>
      41                 : #include <mutex>
      42                 : #include <unordered_map>
      43                 : 
      44                 : namespace boost::corosio::detail {
      45                 : 
      46                 : struct select_op;
      47                 : struct select_descriptor_state;
      48                 : 
      49                 : /** POSIX scheduler using select() for I/O multiplexing.
      50                 : 
      51                 :     This scheduler implements the scheduler interface using the POSIX select()
      52                 :     call for I/O event notification. It inherits the shared reactor threading
      53                 :     model from reactor_scheduler: signal state machine, inline completion
      54                 :     budget, work counting, and the do_one event loop.
      55                 : 
      56                 :     The design mirrors epoll_scheduler for behavioral consistency:
      57                 :     - Same single-reactor thread coordination model
      58                 :     - Same deferred I/O pattern (reactor marks ready; workers do I/O)
      59                 :     - Same timer integration pattern
      60                 : 
      61                 :     Known Limitations:
      62                 :     - FD_SETSIZE (~1024) limits maximum concurrent connections
      63                 :     - O(n) scanning: rebuilds fd_sets each iteration
      64                 :     - Level-triggered only (no edge-triggered mode)
      65                 : 
      66                 :     @par Thread Safety
      67                 :     All public member functions are thread-safe.
      68                 : */
      69                 : class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
      70                 : {
      71                 : public:
      72                 :     /// Select needs write-direction notification to rebuild fd_sets.
      73                 :     static constexpr bool needs_write_notification = true;
      74                 : 
      75                 :     /** Construct the scheduler.
      76                 : 
      77                 :         Creates a self-pipe for reactor interruption.
      78                 : 
      79                 :         @param ctx Reference to the owning execution_context.
      80                 :         @param concurrency_hint Hint for expected thread count (unused).
      81                 :     */
      82                 :     select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
      83                 : 
      84                 :     /// Destroy the scheduler.
      85                 :     ~select_scheduler() override;
      86                 : 
      87                 :     select_scheduler(select_scheduler const&)            = delete;
      88                 :     select_scheduler& operator=(select_scheduler const&) = delete;
      89                 : 
      90                 :     /// Shut down the scheduler, draining pending operations.
      91                 :     void shutdown() override;
      92                 : 
      93                 :     /** Return the maximum file descriptor value supported.
      94                 : 
      95                 :         Returns FD_SETSIZE - 1, the maximum fd value that can be
      96                 :         monitored by select(). Operations with fd >= FD_SETSIZE
      97                 :         will fail with EINVAL.
      98                 : 
      99                 :         @return The maximum supported file descriptor value.
     100                 :     */
     101                 :     static constexpr int max_fd() noexcept
     102                 :     {
     103                 :         return FD_SETSIZE - 1;
     104                 :     }
     105                 : 
     106                 :     /** Register a descriptor for persistent monitoring.
     107                 : 
     108                 :         The fd is added to the registered_descs_ map and will be
     109                 :         included in subsequent select() calls. The reactor is
     110                 :         interrupted so a blocked select() rebuilds its fd_sets.
     111                 : 
     112                 :         @param fd The file descriptor to register.
     113                 :         @param desc Pointer to descriptor state for this fd.
     114                 :     */
     115                 :     void register_descriptor(int fd, select_descriptor_state* desc) const;
     116                 : 
     117                 :     /** Deregister a persistently registered descriptor.
     118                 : 
     119                 :         @param fd The file descriptor to deregister.
     120                 :     */
     121                 :     void deregister_descriptor(int fd) const;
     122                 : 
     123                 :     /** Interrupt the reactor so it rebuilds its fd_sets.
     124                 : 
     125                 :         Called when a write or connect op is registered after
     126                 :         the reactor's snapshot was taken. Without this, select()
     127                 :         may block not watching for writability on the fd.
     128                 :     */
     129                 :     void notify_reactor() const;
     130                 : 
     131                 : private:
     132                 :     void
     133                 :     run_task(lock_type& lock, context_type* ctx,
     134                 :         long timeout_us) override;
     135                 :     void interrupt_reactor() const override;
     136                 :     long calculate_timeout(long requested_timeout_us) const;
     137                 : 
     138                 :     // Self-pipe for interrupting select()
     139                 :     int pipe_fds_[2]; // [0]=read, [1]=write
     140                 : 
     141                 :     // Per-fd tracking for fd_set building
     142                 :     mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
     143                 :     mutable int max_fd_ = -1;
     144                 : };
     145                 : 
     146 HIT         230 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
     147             230 :     : pipe_fds_{-1, -1}
     148             230 :     , max_fd_(-1)
     149                 : {
     150             230 :     if (::pipe(pipe_fds_) < 0)
     151 MIS           0 :         detail::throw_system_error(make_err(errno), "pipe");
     152                 : 
     153 HIT         690 :     for (int i = 0; i < 2; ++i)
     154                 :     {
     155             460 :         int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
     156             460 :         if (flags == -1)
     157                 :         {
     158 MIS           0 :             int errn = errno;
     159               0 :             ::close(pipe_fds_[0]);
     160               0 :             ::close(pipe_fds_[1]);
     161               0 :             detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
     162                 :         }
     163 HIT         460 :         if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
     164                 :         {
     165 MIS           0 :             int errn = errno;
     166               0 :             ::close(pipe_fds_[0]);
     167               0 :             ::close(pipe_fds_[1]);
     168               0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
     169                 :         }
     170 HIT         460 :         if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
     171                 :         {
     172 MIS           0 :             int errn = errno;
     173               0 :             ::close(pipe_fds_[0]);
     174               0 :             ::close(pipe_fds_[1]);
     175               0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
     176                 :         }
     177                 :     }
     178                 : 
     179 HIT         230 :     timer_svc_ = &get_timer_service(ctx, *this);
     180             230 :     timer_svc_->set_on_earliest_changed(
     181            4071 :         timer_service::callback(this, [](void* p) {
     182            3841 :             static_cast<select_scheduler*>(p)->interrupt_reactor();
     183            3841 :         }));
     184                 : 
     185             230 :     get_resolver_service(ctx, *this);
     186             230 :     get_signal_service(ctx, *this);
     187             230 :     get_stream_file_service(ctx, *this);
     188             230 :     get_random_access_file_service(ctx, *this);
     189                 : 
     190             230 :     completed_ops_.push(&task_op_);
     191             230 : }
     192                 : 
     193             460 : inline select_scheduler::~select_scheduler()
     194                 : {
     195             230 :     if (pipe_fds_[0] >= 0)
     196             230 :         ::close(pipe_fds_[0]);
     197             230 :     if (pipe_fds_[1] >= 0)
     198             230 :         ::close(pipe_fds_[1]);
     199             460 : }
     200                 : 
     201                 : inline void
     202             230 : select_scheduler::shutdown()
     203                 : {
     204             230 :     shutdown_drain();
     205                 : 
     206             230 :     if (pipe_fds_[1] >= 0)
     207             230 :         interrupt_reactor();
     208             230 : }
     209                 : 
     210                 : inline void
     211            7383 : select_scheduler::register_descriptor(
     212                 :     int fd, select_descriptor_state* desc) const
     213                 : {
     214            7383 :     if (fd < 0 || fd >= FD_SETSIZE)
     215 MIS           0 :         detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
     216                 : 
     217 HIT        7383 :     desc->registered_events = reactor_event_read | reactor_event_write;
     218            7383 :     desc->fd                = fd;
     219            7383 :     desc->scheduler_        = this;
     220            7383 :     desc->mutex.set_enabled(!single_threaded_);
     221            7383 :     desc->ready_events_.store(0, std::memory_order_relaxed);
     222                 : 
     223                 :     {
     224            7383 :         conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
     225            7383 :         desc->impl_ref_.reset();
     226            7383 :         desc->read_ready  = false;
     227            7383 :         desc->write_ready = false;
     228            7383 :     }
     229                 : 
     230                 :     {
     231            7383 :         mutex_type::scoped_lock lock(mutex_);
     232            7383 :         registered_descs_[fd] = desc;
     233            7383 :         if (fd > max_fd_)
     234            7379 :             max_fd_ = fd;
     235            7383 :     }
     236                 : 
     237            7383 :     interrupt_reactor();
     238            7383 : }
     239                 : 
     240                 : inline void
     241            7383 : select_scheduler::deregister_descriptor(int fd) const
     242                 : {
     243            7383 :     mutex_type::scoped_lock lock(mutex_);
     244                 : 
     245            7383 :     auto it = registered_descs_.find(fd);
     246            7383 :     if (it == registered_descs_.end())
     247 MIS           0 :         return;
     248                 : 
     249 HIT        7383 :     registered_descs_.erase(it);
     250                 : 
     251            7383 :     if (fd == max_fd_)
     252                 :     {
     253            7321 :         max_fd_ = pipe_fds_[0];
     254           14532 :         for (auto& [registered_fd, state] : registered_descs_)
     255                 :         {
     256            7211 :             if (registered_fd > max_fd_)
     257            7201 :                 max_fd_ = registered_fd;
     258                 :         }
     259                 :     }
     260            7383 : }
     261                 : 
     262                 : inline void
     263            3617 : select_scheduler::notify_reactor() const
     264                 : {
     265            3617 :     interrupt_reactor();
     266            3617 : }
     267                 : 
     268                 : inline void
     269           15226 : select_scheduler::interrupt_reactor() const
     270                 : {
     271           15226 :     char byte               = 1;
     272           15226 :     [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
     273           15226 : }
     274                 : 
     275                 : inline long
     276          121870 : select_scheduler::calculate_timeout(long requested_timeout_us) const
     277                 : {
     278          121870 :     if (requested_timeout_us == 0)
     279 MIS           0 :         return 0;
     280                 : 
     281 HIT      121870 :     auto nearest = timer_svc_->nearest_expiry();
     282          121870 :     if (nearest == timer_service::time_point::max())
     283              46 :         return requested_timeout_us;
     284                 : 
     285          121824 :     auto now = std::chrono::steady_clock::now();
     286          121824 :     if (nearest <= now)
     287             374 :         return 0;
     288                 : 
     289                 :     auto timer_timeout_us =
     290          121450 :         std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
     291          121450 :             .count();
     292                 : 
     293          121450 :     constexpr auto long_max =
     294                 :         static_cast<long long>((std::numeric_limits<long>::max)());
     295                 :     auto capped_timer_us =
     296          121450 :         (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
     297          121450 :                               static_cast<long long>(0)),
     298          121450 :                    long_max);
     299                 : 
     300          121450 :     if (requested_timeout_us < 0)
     301          121444 :         return static_cast<long>(capped_timer_us);
     302                 : 
     303                 :     return static_cast<long>(
     304               6 :         (std::min)(static_cast<long long>(requested_timeout_us),
     305               6 :                    capped_timer_us));
     306                 : }
     307                 : 
     308                 : inline void
     309          146301 : select_scheduler::run_task(
     310                 :     lock_type& lock, context_type* ctx, long timeout_us)
     311                 : {
     312                 :     long effective_timeout_us =
     313          146301 :         task_interrupted_ ? 0 : calculate_timeout(timeout_us);
     314                 : 
     315                 :     // Snapshot registered descriptors while holding lock.
     316                 :     // Record which fds need write monitoring to avoid a hot loop:
     317                 :     // select is level-triggered so writable sockets (nearly always
     318                 :     // writable) would cause select() to return immediately every
     319                 :     // iteration if unconditionally added to write_fds.
     320                 :     struct fd_entry
     321                 :     {
     322                 :         int fd;
     323                 :         select_descriptor_state* desc;
     324                 :         bool needs_write;
     325                 :     };
     326                 :     fd_entry snapshot[FD_SETSIZE];
     327          146301 :     int snapshot_count = 0;
     328                 : 
     329          434243 :     for (auto& [fd, desc] : registered_descs_)
     330                 :     {
     331          287942 :         if (snapshot_count < FD_SETSIZE)
     332                 :         {
     333          287942 :             conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
     334          287942 :             snapshot[snapshot_count].fd   = fd;
     335          287942 :             snapshot[snapshot_count].desc = desc;
     336          287942 :             snapshot[snapshot_count].needs_write =
     337          287942 :                 (desc->write_op || desc->connect_op);
     338          287942 :             ++snapshot_count;
     339          287942 :         }
     340                 :     }
     341                 : 
     342          146301 :     if (lock.owns_lock())
     343          121870 :         lock.unlock();
     344                 : 
     345          146301 :     task_cleanup on_exit{this, &lock, ctx};
     346                 : 
     347                 :     fd_set read_fds, write_fds, except_fds;
     348         2487117 :     FD_ZERO(&read_fds);
     349         2487117 :     FD_ZERO(&write_fds);
     350         2487117 :     FD_ZERO(&except_fds);
     351                 : 
     352          146301 :     FD_SET(pipe_fds_[0], &read_fds);
     353          146301 :     int nfds = pipe_fds_[0];
     354                 : 
     355          434243 :     for (int i = 0; i < snapshot_count; ++i)
     356                 :     {
     357          287942 :         int fd = snapshot[i].fd;
     358          287942 :         FD_SET(fd, &read_fds);
     359          287942 :         if (snapshot[i].needs_write)
     360            3617 :             FD_SET(fd, &write_fds);
     361          287942 :         FD_SET(fd, &except_fds);
     362          287942 :         if (fd > nfds)
     363          146035 :             nfds = fd;
     364                 :     }
     365                 : 
     366                 :     struct timeval tv;
     367          146301 :     struct timeval* tv_ptr = nullptr;
     368          146301 :     if (effective_timeout_us >= 0)
     369                 :     {
     370          146255 :         tv.tv_sec  = effective_timeout_us / 1000000;
     371          146255 :         tv.tv_usec = effective_timeout_us % 1000000;
     372          146255 :         tv_ptr     = &tv;
     373                 :     }
     374                 : 
     375          146301 :     int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
     376                 : 
     377                 :     // EINTR: signal interrupted select(), just retry.
     378                 :     // EBADF: an fd was closed between snapshot and select(); retry
     379                 :     // with a fresh snapshot from registered_descs_.
     380          146301 :     if (ready < 0)
     381                 :     {
     382 MIS           0 :         if (errno == EINTR || errno == EBADF)
     383               0 :             return;
     384               0 :         detail::throw_system_error(make_err(errno), "select");
     385                 :     }
     386                 : 
     387                 :     // Process timers outside the lock
     388 HIT      146301 :     timer_svc_->process_expired();
     389                 : 
     390          146301 :     op_queue local_ops;
     391                 : 
     392          146301 :     if (ready > 0)
     393                 :     {
     394          127360 :         if (FD_ISSET(pipe_fds_[0], &read_fds))
     395                 :         {
     396                 :             char buf[256];
     397           15094 :             while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
     398                 :             {
     399                 :             }
     400                 :         }
     401                 : 
     402          366470 :         for (int i = 0; i < snapshot_count; ++i)
     403                 :         {
     404          239110 :             int fd                        = snapshot[i].fd;
     405          239110 :             select_descriptor_state* desc = snapshot[i].desc;
     406                 : 
     407          239110 :             std::uint32_t flags = 0;
     408          239110 :             if (FD_ISSET(fd, &read_fds))
     409          123609 :                 flags |= reactor_event_read;
     410          239110 :             if (FD_ISSET(fd, &write_fds))
     411            3617 :                 flags |= reactor_event_write;
     412          239110 :             if (FD_ISSET(fd, &except_fds))
     413 MIS           0 :                 flags |= reactor_event_error;
     414                 : 
     415 HIT      239110 :             if (flags == 0)
     416          111886 :                 continue;
     417                 : 
     418          127224 :             desc->add_ready_events(flags);
     419                 : 
     420          127224 :             bool expected = false;
     421          127224 :             if (desc->is_enqueued_.compare_exchange_strong(
     422                 :                     expected, true, std::memory_order_release,
     423                 :                     std::memory_order_relaxed))
     424                 :             {
     425          127224 :                 local_ops.push(desc);
     426                 :             }
     427                 :         }
     428                 :     }
     429                 : 
     430          146301 :     lock.lock();
     431                 : 
     432          146301 :     if (!local_ops.empty())
     433          123610 :         completed_ops_.splice(local_ops);
     434          146301 : }
     435                 : 
     436                 : } // namespace boost::corosio::detail
     437                 : 
     438                 : #endif // BOOST_COROSIO_HAS_SELECT
     439                 : 
     440                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
        

Generated by: LCOV version 2.3