TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/tcp_acceptor.hpp>
14 : #include <boost/corosio/detail/intrusive.hpp>
15 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
16 : #include <boost/corosio/native/detail/make_err.hpp>
17 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
18 :
19 : #include <memory>
20 : #include <mutex>
21 : #include <utility>
22 :
23 : #include <errno.h>
24 : #include <netinet/in.h>
25 : #include <sys/socket.h>
26 : #include <unistd.h>
27 :
28 : namespace boost::corosio::detail {
29 :
30 : /** CRTP base for reactor-backed acceptor implementations.
31 :
32 : Provides shared data members, trivial virtual overrides, and
33 : non-virtual helper methods for cancellation and close. Concrete
34 : backends inherit and add `cancel()`, `close_socket()`, and
35 : `accept()` overrides that delegate to the `do_*` helpers.
36 :
37 : @tparam Derived The concrete acceptor type (CRTP).
38 : @tparam Service The backend's acceptor service type.
39 : @tparam Op The backend's base op type.
40 : @tparam AcceptOp The backend's accept op type.
41 : @tparam DescState The backend's descriptor_state type.
42 : @tparam ImplBase The public vtable base
43 : (tcp_acceptor::implementation or
44 : local_stream_acceptor::implementation).
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class Service,
50 : class Op,
51 : class AcceptOp,
52 : class DescState,
53 : class ImplBase = tcp_acceptor::implementation,
54 : class Endpoint = endpoint>
55 : class reactor_acceptor
56 : : public ImplBase
57 : , public std::enable_shared_from_this<Derived>
58 : , public intrusive_list<Derived>::node
59 : {
60 : friend Derived;
61 :
62 HIT 163 : explicit reactor_acceptor(Service& svc) noexcept : svc_(svc) {}
63 :
64 : protected:
65 : Service& svc_;
66 : int fd_ = -1;
67 : Endpoint local_endpoint_;
68 :
69 : public:
70 : /// Pending accept operation slot.
71 : AcceptOp acc_;
72 :
73 : /// Per-descriptor state for persistent reactor registration.
74 : DescState desc_state_;
75 :
76 163 : ~reactor_acceptor() override = default;
77 :
78 : /// Return the underlying file descriptor.
79 : int native_handle() const noexcept
80 : {
81 : return fd_;
82 : }
83 :
84 : /// Return the cached local endpoint.
85 8650 : Endpoint local_endpoint() const noexcept override
86 : {
87 8650 : return local_endpoint_;
88 : }
89 :
90 : /// Return true if the acceptor has an open file descriptor.
91 9600 : bool is_open() const noexcept override
92 : {
93 9600 : return fd_ >= 0;
94 : }
95 :
96 : /// Set a socket option.
97 143 : std::error_code set_option(
98 : int level,
99 : int optname,
100 : void const* data,
101 : std::size_t size) noexcept override
102 : {
103 143 : if (::setsockopt(
104 143 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
105 MIS 0 : return make_err(errno);
106 HIT 143 : return {};
107 : }
108 :
109 : /// Get a socket option.
110 : std::error_code
111 2 : get_option(int level, int optname, void* data, std::size_t* size)
112 : const noexcept override
113 : {
114 2 : socklen_t len = static_cast<socklen_t>(*size);
115 2 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
116 MIS 0 : return make_err(errno);
117 HIT 2 : *size = static_cast<std::size_t>(len);
118 2 : return {};
119 : }
120 :
121 : /// Cache the local endpoint.
122 148 : void set_local_endpoint(Endpoint ep) noexcept
123 : {
124 148 : local_endpoint_ = std::move(ep);
125 148 : }
126 :
127 : /// Assign the fd and initialize descriptor state (acceptor: read_op only, no registration).
128 158 : void init_acceptor_fd(int fd) noexcept
129 : {
130 158 : fd_ = fd;
131 158 : desc_state_.fd = fd;
132 : {
133 158 : std::lock_guard lock(desc_state_.mutex);
134 158 : desc_state_.read_op = nullptr;
135 158 : }
136 158 : }
137 :
138 : /// Return a reference to the owning service.
139 8518 : Service& service() noexcept
140 : {
141 8518 : return svc_;
142 : }
143 :
144 : // --- Virtual method overrides ---
145 :
146 4 : void cancel() noexcept override { do_cancel(); }
147 :
148 642 : void close_socket() noexcept { do_close_socket(); }
149 :
150 : // --- End virtual overrides ---
151 :
152 : /** Cancel a single pending operation.
153 :
154 : Claims the operation from the read_op descriptor slot
155 : under the mutex and posts it to the scheduler as cancelled.
156 :
157 : @param op The operation to cancel.
158 : */
159 : void cancel_single_op(Op& op) noexcept;
160 :
161 : /** Cancel the pending accept operation. */
162 : void do_cancel() noexcept;
163 :
164 : /** Close the acceptor and cancel pending operations.
165 :
166 : Invoked by the derived class's close_socket(). The
167 : derived class may add backend-specific cleanup after
168 : calling this method.
169 : */
170 : void do_close_socket() noexcept;
171 :
172 : /** Release the acceptor without closing the fd. */
173 : native_handle_type do_release_socket() noexcept;
174 :
175 : /** Bind the acceptor socket to an endpoint.
176 :
177 : Caches the resolved local endpoint (including ephemeral
178 : port) after a successful bind.
179 :
180 : @param ep The endpoint to bind to.
181 : @return The error code from bind(), or success.
182 : */
183 : std::error_code do_bind(Endpoint const& ep);
184 :
185 : /** Start listening on the acceptor socket.
186 :
187 : Registers the file descriptor with the reactor after
188 : a successful listen() call.
189 :
190 : @param backlog The listen backlog.
191 : @return The error code from listen(), or success.
192 : */
193 : std::error_code do_listen(int backlog);
194 : };
195 :
196 : template<
197 : class Derived,
198 : class Service,
199 : class Op,
200 : class AcceptOp,
201 : class DescState,
202 : class ImplBase,
203 : class Endpoint>
204 : void
205 10 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
206 : cancel_single_op(Op& op) noexcept
207 : {
208 10 : auto self = this->weak_from_this().lock();
209 10 : if (!self)
210 MIS 0 : return;
211 :
212 HIT 10 : op.request_cancel();
213 :
214 10 : reactor_op_base* claimed = nullptr;
215 : {
216 10 : std::lock_guard lock(desc_state_.mutex);
217 10 : if (desc_state_.read_op == &op)
218 8 : claimed = std::exchange(desc_state_.read_op, nullptr);
219 10 : }
220 10 : if (claimed)
221 : {
222 8 : op.impl_ptr = self;
223 8 : svc_.post(&op);
224 8 : svc_.work_finished();
225 : }
226 10 : }
227 :
228 : template<
229 : class Derived,
230 : class Service,
231 : class Op,
232 : class AcceptOp,
233 : class DescState,
234 : class ImplBase,
235 : class Endpoint>
236 : void
237 4 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
238 : do_cancel() noexcept
239 : {
240 4 : cancel_single_op(acc_);
241 4 : }
242 :
243 : template<
244 : class Derived,
245 : class Service,
246 : class Op,
247 : class AcceptOp,
248 : class DescState,
249 : class ImplBase,
250 : class Endpoint>
251 : void
252 642 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
253 : do_close_socket() noexcept
254 : {
255 642 : auto self = this->weak_from_this().lock();
256 642 : if (self)
257 : {
258 642 : acc_.request_cancel();
259 :
260 642 : reactor_op_base* claimed = nullptr;
261 : {
262 642 : std::lock_guard lock(desc_state_.mutex);
263 642 : claimed = std::exchange(desc_state_.read_op, nullptr);
264 642 : desc_state_.read_ready = false;
265 642 : desc_state_.write_ready = false;
266 :
267 642 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
268 MIS 0 : desc_state_.impl_ref_ = self;
269 HIT 642 : }
270 :
271 642 : if (claimed)
272 : {
273 4 : acc_.impl_ptr = self;
274 4 : svc_.post(&acc_);
275 4 : svc_.work_finished();
276 : }
277 : }
278 :
279 642 : if (fd_ >= 0)
280 : {
281 158 : if (desc_state_.registered_events != 0)
282 140 : svc_.scheduler().deregister_descriptor(fd_);
283 158 : ::close(fd_);
284 158 : fd_ = -1;
285 : }
286 :
287 642 : desc_state_.fd = -1;
288 642 : desc_state_.registered_events = 0;
289 :
290 642 : local_endpoint_ = Endpoint{};
291 642 : }
292 :
293 : template<
294 : class Derived,
295 : class Service,
296 : class Op,
297 : class AcceptOp,
298 : class DescState,
299 : class ImplBase,
300 : class Endpoint>
301 : native_handle_type
302 MIS 0 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
303 : do_release_socket() noexcept
304 : {
305 0 : auto self = this->weak_from_this().lock();
306 0 : if (self)
307 : {
308 0 : acc_.request_cancel();
309 :
310 0 : reactor_op_base* claimed = nullptr;
311 : {
312 0 : std::lock_guard lock(desc_state_.mutex);
313 0 : claimed = std::exchange(desc_state_.read_op, nullptr);
314 0 : desc_state_.read_ready = false;
315 0 : desc_state_.write_ready = false;
316 :
317 0 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
318 0 : desc_state_.impl_ref_ = self;
319 0 : }
320 :
321 0 : if (claimed)
322 : {
323 0 : acc_.impl_ptr = self;
324 0 : svc_.post(&acc_);
325 0 : svc_.work_finished();
326 : }
327 : }
328 :
329 0 : native_handle_type released = fd_;
330 :
331 0 : if (fd_ >= 0)
332 : {
333 0 : if (desc_state_.registered_events != 0)
334 0 : svc_.scheduler().deregister_descriptor(fd_);
335 0 : fd_ = -1;
336 : }
337 :
338 0 : desc_state_.fd = -1;
339 0 : desc_state_.registered_events = 0;
340 :
341 0 : local_endpoint_ = Endpoint{};
342 :
343 0 : return released;
344 0 : }
345 :
346 : template<
347 : class Derived,
348 : class Service,
349 : class Op,
350 : class AcceptOp,
351 : class DescState,
352 : class ImplBase,
353 : class Endpoint>
354 : std::error_code
355 HIT 156 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
356 : do_bind(Endpoint const& ep)
357 : {
358 156 : sockaddr_storage storage{};
359 156 : socklen_t addrlen = to_sockaddr(ep, storage);
360 156 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
361 8 : return make_err(errno);
362 :
363 : // Cache local endpoint (resolves ephemeral port / path)
364 148 : sockaddr_storage local{};
365 148 : socklen_t local_len = sizeof(local);
366 148 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local), &local_len) ==
367 : 0)
368 148 : set_local_endpoint(from_sockaddr_as(local, local_len, Endpoint{}));
369 :
370 148 : return {};
371 : }
372 :
373 : template<
374 : class Derived,
375 : class Service,
376 : class Op,
377 : class AcceptOp,
378 : class DescState,
379 : class ImplBase,
380 : class Endpoint>
381 : std::error_code
382 140 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
383 : do_listen(int backlog)
384 : {
385 140 : if (::listen(fd_, backlog) < 0)
386 MIS 0 : return make_err(errno);
387 :
388 HIT 140 : svc_.scheduler().register_descriptor(fd_, &desc_state_);
389 140 : return {};
390 : }
391 :
392 : } // namespace boost::corosio::detail
393 :
394 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
|