include/boost/corosio/native/detail/reactor/reactor_backend.hpp

50.7% Lines (34/67) -% List of functions (0/1)
reactor_backend.hpp
f(x) Functions (1)
Function Calls Lines Blocks
<unknown function 36> :36
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BACKEND_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BACKEND_HPP
12
13 /* Parameterized reactor backend.
14
15 Assembles all socket, service, acceptor, and op types for a given
16 backend Traits type. Includes the accept() implementation (which
17 needs all types to be complete) and the reactor_types<Traits>
18 bundle used by backend.hpp.
19 */
20
21 #include <boost/corosio/native/detail/reactor/reactor_service_finals.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
23 #include <boost/corosio/native/detail/endpoint_convert.hpp>
24 #include <boost/corosio/detail/dispatch_coro.hpp>
25
26 #include <mutex>
27
28 namespace boost::corosio::detail {
29
30 // ============================================================
31 // Acceptor accept() implementation
32 // ============================================================
33
34 template<class Traits, class AccImplBase, class Endpoint>
35 std::coroutine_handle<>
36 5053x reactor_acceptor_final<Traits, AccImplBase, Endpoint>::accept(
37 std::coroutine_handle<> h,
38 capy::executor_ref ex,
39 std::stop_token token,
40 std::error_code* ec,
41 io_object::implementation** impl_out)
42 {
43 using socket_final = stream_socket_t<Traits, Endpoint>;
44
45 5053x auto& op = this->acc_;
46 5053x op.reset();
47 5053x op.h = h;
48 5053x op.ex = ex;
49 5053x op.ec_out = ec;
50 5053x op.impl_out = impl_out;
51 5053x op.fd = this->fd_;
52 5053x op.start(token, this);
53
54 5053x sockaddr_storage peer_storage{};
55 5053x socklen_t peer_addrlen = 0;
56
57 5053x int accepted = Traits::accept_policy::do_accept(
58 this->fd_, peer_storage, peer_addrlen);
59
60 5053x if (accepted >= 0)
61 {
62 {
63 6x std::lock_guard lock(this->desc_state_.mutex);
64 6x this->desc_state_.read_ready = false;
65 6x }
66
67 6x if (this->svc_.scheduler().try_consume_inline_budget())
68 {
69 auto* socket_svc = this->svc_.stream_service();
70 if (socket_svc)
71 {
72 auto& impl =
73 static_cast<socket_final&>(*socket_svc->construct());
74 impl.set_socket(accepted);
75
76 impl.desc_state_.fd = accepted;
77 {
78 std::lock_guard lock(impl.desc_state_.mutex);
79 impl.desc_state_.read_op = nullptr;
80 impl.desc_state_.write_op = nullptr;
81 impl.desc_state_.connect_op = nullptr;
82 }
83 socket_svc->scheduler().register_descriptor(
84 accepted, &impl.desc_state_);
85
86 impl.set_endpoints(
87 this->local_endpoint_,
88 from_sockaddr_as(
89 peer_storage, peer_addrlen, Endpoint{}));
90
91 *ec = {};
92 if (impl_out)
93 *impl_out = &impl;
94 }
95 else
96 {
97 ::close(accepted);
98 *ec = make_err(ENOENT);
99 if (impl_out)
100 *impl_out = nullptr;
101 }
102 op.cont_op.cont.h = h;
103 return dispatch_coro(ex, op.cont_op.cont);
104 }
105
106 6x op.accepted_fd = accepted;
107 6x op.peer_storage = peer_storage;
108 6x op.peer_addrlen = peer_addrlen;
109 6x op.complete(0, 0);
110 6x op.impl_ptr = this->shared_from_this();
111 6x this->svc_.post(&op);
112 6x return std::noop_coroutine();
113 }
114
115 5047x if (errno == EAGAIN || errno == EWOULDBLOCK)
116 {
117 5047x op.impl_ptr = this->shared_from_this();
118 5047x this->svc_.work_started();
119
120 5047x std::lock_guard lock(this->desc_state_.mutex);
121 5047x bool io_done = false;
122 5047x if (this->desc_state_.read_ready)
123 {
124 this->desc_state_.read_ready = false;
125 op.perform_io();
126 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
127 if (!io_done)
128 op.errn = 0;
129 }
130
131 5047x if (io_done || op.cancelled.load(std::memory_order_acquire))
132 {
133 this->svc_.post(&op);
134 this->svc_.work_finished();
135 }
136 else
137 {
138 5047x this->desc_state_.read_op = &op;
139 }
140 5047x return std::noop_coroutine();
141 5047x }
142
143 op.complete(errno, 0);
144 op.impl_ptr = this->shared_from_this();
145 this->svc_.post(&op);
146 return std::noop_coroutine();
147 }
148
149 // ============================================================
150 // Type bundle for backend.hpp
151 // ============================================================
152
153 template<class Traits>
154 struct reactor_types
155 {
156 using tcp_socket_type = stream_socket_t<Traits, endpoint>;
157 using tcp_service_type = reactor_tcp_service_final<
158 Traits, tcp_socket_type>;
159
160 using udp_socket_type = dgram_socket_t<Traits, endpoint>;
161 using udp_service_type = reactor_udp_service_final<
162 Traits, udp_socket_type>;
163
164 using tcp_acceptor_type = stream_acceptor_t<Traits, endpoint>;
165 using tcp_acceptor_service_type = reactor_acceptor_service_final<
166 Traits, tcp_acceptor_service, tcp_acceptor_type,
167 tcp_service_type, endpoint>;
168
169 using local_stream_socket_type = stream_socket_t<Traits, local_endpoint>;
170 using local_stream_service_type = reactor_local_stream_service_final<
171 Traits, local_stream_socket_type>;
172
173 using local_datagram_socket_type = dgram_socket_t<Traits, local_endpoint>;
174 using local_datagram_service_type = reactor_local_dgram_service_final<
175 Traits, local_datagram_socket_type>;
176
177 using local_stream_acceptor_type = stream_acceptor_t<Traits, local_endpoint>;
178 using local_stream_acceptor_service_type = reactor_acceptor_service_final<
179 Traits, local_stream_acceptor_service, local_stream_acceptor_type,
180 local_stream_service_type, local_endpoint>;
181 };
182
183 } // namespace boost::corosio::detail
184
185 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BACKEND_HPP
186