TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/make_err.hpp>
18 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
19 :
20 : #include <memory>
21 : #include <mutex>
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : /** CRTP base for reactor-backed socket implementations.
32 :
33 : Extracts the shared data members, virtual overrides, and
34 : cancel/close/register logic that is identical across TCP
35 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
36 :
37 : Derived classes provide CRTP callbacks that enumerate their
38 : specific op slots so cancel/close can iterate them generically.
39 :
40 : @tparam Derived The concrete socket type (CRTP).
41 : @tparam ImplBase The public vtable base (tcp_socket::implementation
42 : or udp_socket::implementation).
43 : @tparam Service The backend's service type.
44 : @tparam DescState The backend's descriptor_state type.
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class ImplBase,
50 : class Service,
51 : class DescState,
52 : class Endpoint = endpoint>
53 : class reactor_basic_socket
54 : : public ImplBase
55 : , public std::enable_shared_from_this<Derived>
56 : , public intrusive_list<Derived>::node
57 : {
58 : friend Derived;
59 :
60 : template<class, class, class, class, class, class, class, class>
61 : friend class reactor_stream_socket;
62 :
63 : template<class, class, class, class, class, class, class, class, class, class>
64 : friend class reactor_datagram_socket;
65 :
66 HIT 25797 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : Service& svc_;
70 : int fd_ = -1;
71 : Endpoint local_endpoint_;
72 :
73 : public:
74 : /// Per-descriptor state for persistent reactor registration.
75 : DescState desc_state_;
76 :
77 25797 : ~reactor_basic_socket() override = default;
78 :
79 : /// Return the underlying file descriptor.
80 78062 : native_handle_type native_handle() const noexcept override
81 : {
82 78062 : return fd_;
83 : }
84 :
85 : /// Return the cached local endpoint.
86 80 : Endpoint local_endpoint() const noexcept override
87 : {
88 80 : return local_endpoint_;
89 : }
90 :
91 : /// Return true if the socket has an open file descriptor.
92 : bool is_open() const noexcept
93 : {
94 : return fd_ >= 0;
95 : }
96 :
97 : /// Set a socket option.
98 20 : std::error_code set_option(
99 : int level,
100 : int optname,
101 : void const* data,
102 : std::size_t size) noexcept override
103 : {
104 20 : if (::setsockopt(
105 20 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
106 MIS 0 : return make_err(errno);
107 HIT 20 : return {};
108 : }
109 :
110 : /// Get a socket option.
111 : std::error_code
112 78 : get_option(int level, int optname, void* data, std::size_t* size)
113 : const noexcept override
114 : {
115 78 : socklen_t len = static_cast<socklen_t>(*size);
116 78 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
117 MIS 0 : return make_err(errno);
118 HIT 78 : *size = static_cast<std::size_t>(len);
119 78 : return {};
120 : }
121 :
122 : /// Assign the file descriptor.
123 8518 : void set_socket(int fd) noexcept
124 : {
125 8518 : fd_ = fd;
126 8518 : }
127 :
128 : /// Assign the fd, initialize descriptor state, and register with the reactor.
129 8694 : void init_and_register(int fd) noexcept
130 : {
131 8694 : fd_ = fd;
132 8694 : desc_state_.fd = fd;
133 : {
134 8694 : std::lock_guard lock(desc_state_.mutex);
135 8694 : desc_state_.read_op = nullptr;
136 8694 : desc_state_.write_op = nullptr;
137 8694 : desc_state_.connect_op = nullptr;
138 8694 : }
139 8694 : svc_.scheduler().register_descriptor(fd, &desc_state_);
140 8694 : }
141 :
142 : /// Cache the local endpoint.
143 : void set_local_endpoint(Endpoint ep) noexcept
144 : {
145 : local_endpoint_ = ep;
146 : }
147 :
148 : /** Bind the socket to a local endpoint.
149 :
150 : Calls ::bind() and caches the resulting local endpoint
151 : via getsockname().
152 :
153 : @param ep The endpoint to bind to.
154 : @return Error code on failure, empty on success.
155 : */
156 76 : std::error_code do_bind(Endpoint const& ep) noexcept
157 : {
158 76 : sockaddr_storage storage{};
159 76 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
160 76 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
161 10 : return make_err(errno);
162 :
163 66 : sockaddr_storage local_storage{};
164 66 : socklen_t local_len = sizeof(local_storage);
165 66 : if (::getsockname(
166 66 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
167 : 0)
168 52 : local_endpoint_ =
169 66 : from_sockaddr_as(local_storage, local_len, Endpoint{});
170 :
171 66 : return {};
172 : }
173 :
174 : /** Register an op with the reactor.
175 :
176 : Handles cached edge events and deferred cancellation.
177 : Called on the EAGAIN/EINPROGRESS path when speculative
178 : I/O failed.
179 : */
180 : template<class Op>
181 : void register_op(
182 : Op& op,
183 : reactor_op_base*& desc_slot,
184 : bool& ready_flag,
185 : bool& cancel_flag,
186 : bool is_write_direction = false) noexcept;
187 :
188 : /** Cancel a single pending operation.
189 :
190 : Claims the operation from its descriptor_state slot under
191 : the mutex and posts it to the scheduler as cancelled.
192 : Derived must implement:
193 : op_to_desc_slot(Op&) -> reactor_op_base**
194 : op_to_cancel_flag(Op&) -> bool*
195 : */
196 : template<class Op>
197 : void cancel_single_op(Op& op) noexcept;
198 :
199 : /** Cancel all pending operations.
200 :
201 : Invoked by the derived class's cancel() override.
202 : Derived must implement:
203 : for_each_op(auto fn)
204 : for_each_desc_entry(auto fn)
205 : */
206 : void do_cancel() noexcept;
207 :
208 : /** Close the socket and cancel pending operations.
209 :
210 : Invoked by the derived class's close_socket(). The
211 : derived class may add backend-specific cleanup after
212 : calling this method.
213 : Derived must implement:
214 : for_each_op(auto fn)
215 : for_each_desc_entry(auto fn)
216 : */
217 : void do_close_socket() noexcept;
218 :
219 : /** Release the socket without closing the fd.
220 :
221 : Like do_close_socket() but does not call ::close().
222 : Returns the fd so the caller can take ownership.
223 : */
224 : native_handle_type do_release_socket() noexcept;
225 : };
226 :
227 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
228 : template<class Op>
229 : void
230 8915 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
231 : Op& op,
232 : reactor_op_base*& desc_slot,
233 : bool& ready_flag,
234 : bool& cancel_flag,
235 : bool is_write_direction) noexcept
236 : {
237 8915 : svc_.work_started();
238 :
239 8915 : std::lock_guard lock(desc_state_.mutex);
240 8915 : bool io_done = false;
241 8915 : if (ready_flag)
242 : {
243 185 : ready_flag = false;
244 185 : op.perform_io();
245 185 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
246 185 : if (!io_done)
247 185 : op.errn = 0;
248 : }
249 :
250 8915 : if (cancel_flag)
251 : {
252 MIS 0 : cancel_flag = false;
253 0 : op.cancelled.store(true, std::memory_order_relaxed);
254 : }
255 :
256 HIT 8915 : if (io_done || op.cancelled.load(std::memory_order_acquire))
257 : {
258 MIS 0 : svc_.post(&op);
259 0 : svc_.work_finished();
260 : }
261 : else
262 : {
263 HIT 8915 : desc_slot = &op;
264 :
265 : // Select must rebuild its fd_sets when a write-direction op
266 : // is parked, so select() watches for writability. Compiled
267 : // away to nothing for epoll and kqueue.
268 : if constexpr (Service::needs_write_notification)
269 : {
270 3809 : if (is_write_direction)
271 3617 : svc_.scheduler().notify_reactor();
272 : }
273 : }
274 8915 : }
275 :
276 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
277 : template<class Op>
278 : void
279 194 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
280 : Op& op) noexcept
281 : {
282 194 : auto self = this->weak_from_this().lock();
283 194 : if (!self)
284 MIS 0 : return;
285 :
286 HIT 194 : op.request_cancel();
287 :
288 194 : auto* d = static_cast<Derived*>(this);
289 194 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
290 :
291 194 : if (desc_op_ptr)
292 : {
293 194 : reactor_op_base* claimed = nullptr;
294 : {
295 194 : std::lock_guard lock(desc_state_.mutex);
296 194 : if (*desc_op_ptr == &op)
297 194 : claimed = std::exchange(*desc_op_ptr, nullptr);
298 : else
299 : {
300 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
301 0 : if (cflag)
302 0 : *cflag = true;
303 : }
304 HIT 194 : }
305 194 : if (claimed)
306 : {
307 194 : op.impl_ptr = self;
308 194 : svc_.post(&op);
309 194 : svc_.work_finished();
310 : }
311 : }
312 194 : }
313 :
314 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
315 : void
316 188 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
317 : do_cancel() noexcept
318 : {
319 188 : auto self = this->weak_from_this().lock();
320 188 : if (!self)
321 MIS 0 : return;
322 :
323 HIT 188 : auto* d = static_cast<Derived*>(this);
324 :
325 760 : d->for_each_op([](auto& op) { op.request_cancel(); });
326 :
327 : // Claim ops under a single lock acquisition
328 : struct claimed_entry
329 : {
330 : reactor_op_base* op = nullptr;
331 : reactor_op_base* base = nullptr;
332 : };
333 : // Max 3 ops (conn, rd, wr)
334 188 : claimed_entry claimed[3];
335 188 : int count = 0;
336 :
337 : {
338 188 : std::lock_guard lock(desc_state_.mutex);
339 1332 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
340 572 : if (desc_slot == &op)
341 : {
342 101 : claimed[count].op = std::exchange(desc_slot, nullptr);
343 101 : claimed[count].base = &op;
344 101 : ++count;
345 : }
346 : });
347 188 : }
348 :
349 289 : for (int i = 0; i < count; ++i)
350 : {
351 101 : claimed[i].base->impl_ptr = self;
352 101 : svc_.post(claimed[i].base);
353 101 : svc_.work_finished();
354 : }
355 188 : }
356 :
357 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
358 : void
359 77498 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
360 : do_close_socket() noexcept
361 : {
362 77498 : auto self = this->weak_from_this().lock();
363 77498 : if (self)
364 : {
365 77498 : auto* d = static_cast<Derived*>(this);
366 :
367 310920 : d->for_each_op([](auto& op) { op.request_cancel(); });
368 :
369 : struct claimed_entry
370 : {
371 : reactor_op_base* base = nullptr;
372 : };
373 77498 : claimed_entry claimed[3];
374 77498 : int count = 0;
375 :
376 : {
377 77498 : std::lock_guard lock(desc_state_.mutex);
378 77498 : d->for_each_desc_entry(
379 466844 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
380 233422 : auto* c = std::exchange(desc_slot, nullptr);
381 233422 : if (c)
382 : {
383 4 : claimed[count].base = c;
384 4 : ++count;
385 : }
386 : });
387 77498 : desc_state_.read_ready = false;
388 77498 : desc_state_.write_ready = false;
389 77498 : desc_state_.read_cancel_pending = false;
390 77498 : desc_state_.write_cancel_pending = false;
391 77498 : desc_state_.connect_cancel_pending = false;
392 :
393 77498 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
394 271 : desc_state_.impl_ref_ = self;
395 77498 : }
396 :
397 77502 : for (int i = 0; i < count; ++i)
398 : {
399 4 : claimed[i].base->impl_ptr = self;
400 4 : svc_.post(claimed[i].base);
401 4 : svc_.work_finished();
402 : }
403 : }
404 :
405 77498 : if (fd_ >= 0)
406 : {
407 17210 : if (desc_state_.registered_events != 0)
408 17210 : svc_.scheduler().deregister_descriptor(fd_);
409 17210 : ::close(fd_);
410 17210 : fd_ = -1;
411 : }
412 :
413 77498 : desc_state_.fd = -1;
414 77498 : desc_state_.registered_events = 0;
415 :
416 77498 : local_endpoint_ = Endpoint{};
417 77498 : }
418 :
419 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
420 : native_handle_type
421 2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
422 : do_release_socket() noexcept
423 : {
424 : // Cancel pending ops (same as do_close_socket)
425 2 : auto self = this->weak_from_this().lock();
426 2 : if (self)
427 : {
428 2 : auto* d = static_cast<Derived*>(this);
429 :
430 8 : d->for_each_op([](auto& op) { op.request_cancel(); });
431 :
432 : struct claimed_entry
433 : {
434 : reactor_op_base* base = nullptr;
435 : };
436 2 : claimed_entry claimed[3];
437 2 : int count = 0;
438 :
439 : {
440 2 : std::lock_guard lock(desc_state_.mutex);
441 2 : d->for_each_desc_entry(
442 12 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
443 6 : auto* c = std::exchange(desc_slot, nullptr);
444 6 : if (c)
445 : {
446 MIS 0 : claimed[count].base = c;
447 0 : ++count;
448 : }
449 : });
450 HIT 2 : desc_state_.read_ready = false;
451 2 : desc_state_.write_ready = false;
452 2 : desc_state_.read_cancel_pending = false;
453 2 : desc_state_.write_cancel_pending = false;
454 2 : desc_state_.connect_cancel_pending = false;
455 :
456 2 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
457 MIS 0 : desc_state_.impl_ref_ = self;
458 HIT 2 : }
459 :
460 2 : for (int i = 0; i < count; ++i)
461 : {
462 MIS 0 : claimed[i].base->impl_ptr = self;
463 0 : svc_.post(claimed[i].base);
464 0 : svc_.work_finished();
465 : }
466 : }
467 :
468 HIT 2 : native_handle_type released = fd_;
469 :
470 2 : if (fd_ >= 0)
471 : {
472 2 : if (desc_state_.registered_events != 0)
473 2 : svc_.scheduler().deregister_descriptor(fd_);
474 : // Do NOT close -- caller takes ownership
475 2 : fd_ = -1;
476 : }
477 :
478 2 : desc_state_.fd = -1;
479 2 : desc_state_.registered_events = 0;
480 :
481 2 : local_endpoint_ = Endpoint{};
482 :
483 4 : return released;
484 2 : }
485 :
486 : } // namespace boost::corosio::detail
487 :
488 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|