TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_UDP_SOCKET_HPP
11 : #define BOOST_COROSIO_UDP_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/platform.hpp>
15 : #include <boost/corosio/detail/except.hpp>
16 : #include <boost/corosio/detail/native_handle.hpp>
17 : #include <boost/corosio/detail/op_base.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/capy/io_result.hpp>
20 : #include <boost/corosio/detail/buffer_param.hpp>
21 : #include <boost/corosio/endpoint.hpp>
22 : #include <boost/corosio/message_flags.hpp>
23 : #include <boost/corosio/udp.hpp>
24 : #include <boost/capy/ex/executor_ref.hpp>
25 : #include <boost/capy/ex/execution_context.hpp>
26 : #include <boost/capy/ex/io_env.hpp>
27 : #include <boost/capy/concept/executor.hpp>
28 :
29 : #include <system_error>
30 :
31 : #include <concepts>
32 : #include <coroutine>
33 : #include <cstddef>
34 : #include <stop_token>
35 : #include <type_traits>
36 :
37 : namespace boost::corosio {
38 :
39 : /** An asynchronous UDP socket for coroutine I/O.
40 :
41 : This class provides asynchronous UDP datagram operations that
42 : return awaitable types. Each operation participates in the affine
43 : awaitable protocol, ensuring coroutines resume on the correct
44 : executor.
45 :
46 : Supports two modes of operation:
47 :
48 : **Connectionless mode**: each `send_to` specifies a destination
49 : endpoint, and each `recv_from` captures the source endpoint.
50 : The socket must be opened (and optionally bound) before I/O.
51 :
52 : **Connected mode**: call `connect()` to set a default peer,
53 : then use `send()`/`recv()` without endpoint arguments.
54 : The kernel filters incoming datagrams to those from the
55 : connected peer.
56 :
57 : @par Thread Safety
58 : Distinct objects: Safe.@n
59 : Shared objects: Unsafe. A socket must not have concurrent
60 : operations of the same type (e.g., two simultaneous recv_from).
61 : One send_to and one recv_from may be in flight simultaneously.
62 :
63 : @par Example
64 : @code
65 : // Connectionless mode
66 : io_context ioc;
67 : udp_socket sock( ioc );
68 : sock.open( udp::v4() );
69 : sock.bind( endpoint( ipv4_address::any(), 9000 ) );
70 :
71 : char buf[1024];
72 : endpoint sender;
73 : auto [ec, n] = co_await sock.recv_from(
74 : capy::mutable_buffer( buf, sizeof( buf ) ), sender );
75 : if ( !ec )
76 : co_await sock.send_to(
77 : capy::const_buffer( buf, n ), sender );
78 :
79 : // Connected mode
80 : udp_socket csock( ioc );
81 : auto [cec] = co_await csock.connect(
82 : endpoint( ipv4_address::loopback(), 9000 ) );
83 : if ( !cec )
84 : co_await csock.send(
85 : capy::const_buffer( buf, n ) );
86 : @endcode
87 : */
88 : class BOOST_COROSIO_DECL udp_socket : public io_object
89 : {
90 : public:
91 : /** Define backend hooks for UDP socket operations.
92 :
93 : Platform backends (epoll, kqueue, select) derive from
94 : this to implement datagram I/O and option management.
95 : */
96 : struct implementation : io_object::implementation
97 : {
98 : /** Initiate an asynchronous send_to operation.
99 :
100 : @param h Coroutine handle to resume on completion.
101 : @param ex Executor for dispatching the completion.
102 : @param buf The buffer data to send.
103 : @param dest The destination endpoint.
104 : @param token Stop token for cancellation.
105 : @param ec Output error code.
106 : @param bytes_out Output bytes transferred.
107 :
108 : @return Coroutine handle to resume immediately.
109 : */
110 : virtual std::coroutine_handle<> send_to(
111 : std::coroutine_handle<> h,
112 : capy::executor_ref ex,
113 : buffer_param buf,
114 : endpoint dest,
115 : int flags,
116 : std::stop_token token,
117 : std::error_code* ec,
118 : std::size_t* bytes_out) = 0;
119 :
120 : /** Initiate an asynchronous recv_from operation.
121 :
122 : @param h Coroutine handle to resume on completion.
123 : @param ex Executor for dispatching the completion.
124 : @param buf The buffer to receive into.
125 : @param source Output endpoint for the sender's address.
126 : @param token Stop token for cancellation.
127 : @param ec Output error code.
128 : @param bytes_out Output bytes transferred.
129 :
130 : @return Coroutine handle to resume immediately.
131 : */
132 : virtual std::coroutine_handle<> recv_from(
133 : std::coroutine_handle<> h,
134 : capy::executor_ref ex,
135 : buffer_param buf,
136 : endpoint* source,
137 : int flags,
138 : std::stop_token token,
139 : std::error_code* ec,
140 : std::size_t* bytes_out) = 0;
141 :
142 : /// Return the platform socket descriptor.
143 : virtual native_handle_type native_handle() const noexcept = 0;
144 :
145 : /** Request cancellation of pending asynchronous operations.
146 :
147 : All outstanding operations complete with operation_canceled
148 : error. Check `ec == cond::canceled` for portable comparison.
149 : */
150 : virtual void cancel() noexcept = 0;
151 :
152 : /** Set a socket option.
153 :
154 : @param level The protocol level (e.g. `SOL_SOCKET`).
155 : @param optname The option name.
156 : @param data Pointer to the option value.
157 : @param size Size of the option value in bytes.
158 : @return Error code on failure, empty on success.
159 : */
160 : virtual std::error_code set_option(
161 : int level,
162 : int optname,
163 : void const* data,
164 : std::size_t size) noexcept = 0;
165 :
166 : /** Get a socket option.
167 :
168 : @param level The protocol level (e.g. `SOL_SOCKET`).
169 : @param optname The option name.
170 : @param data Pointer to receive the option value.
171 : @param size On entry, the size of the buffer. On exit,
172 : the size of the option value.
173 : @return Error code on failure, empty on success.
174 : */
175 : virtual std::error_code
176 : get_option(int level, int optname, void* data, std::size_t* size)
177 : const noexcept = 0;
178 :
179 : /// Return the cached local endpoint.
180 : virtual endpoint local_endpoint() const noexcept = 0;
181 :
182 : /// Return the cached remote endpoint (connected mode).
183 : virtual endpoint remote_endpoint() const noexcept = 0;
184 :
185 : /** Initiate an asynchronous connect to set the default peer.
186 :
187 : @param h Coroutine handle to resume on completion.
188 : @param ex Executor for dispatching the completion.
189 : @param ep The remote endpoint to connect to.
190 : @param token Stop token for cancellation.
191 : @param ec Output error code.
192 :
193 : @return Coroutine handle to resume immediately.
194 : */
195 : virtual std::coroutine_handle<> connect(
196 : std::coroutine_handle<> h,
197 : capy::executor_ref ex,
198 : endpoint ep,
199 : std::stop_token token,
200 : std::error_code* ec) = 0;
201 :
202 : /** Initiate an asynchronous connected send operation.
203 :
204 : @param h Coroutine handle to resume on completion.
205 : @param ex Executor for dispatching the completion.
206 : @param buf The buffer data to send.
207 : @param token Stop token for cancellation.
208 : @param ec Output error code.
209 : @param bytes_out Output bytes transferred.
210 :
211 : @return Coroutine handle to resume immediately.
212 : */
213 : virtual std::coroutine_handle<> send(
214 : std::coroutine_handle<> h,
215 : capy::executor_ref ex,
216 : buffer_param buf,
217 : int flags,
218 : std::stop_token token,
219 : std::error_code* ec,
220 : std::size_t* bytes_out) = 0;
221 :
222 : /** Initiate an asynchronous connected recv operation.
223 :
224 : @param h Coroutine handle to resume on completion.
225 : @param ex Executor for dispatching the completion.
226 : @param buf The buffer to receive into.
227 : @param token Stop token for cancellation.
228 : @param ec Output error code.
229 : @param bytes_out Output bytes transferred.
230 :
231 : @return Coroutine handle to resume immediately.
232 : */
233 : virtual std::coroutine_handle<> recv(
234 : std::coroutine_handle<> h,
235 : capy::executor_ref ex,
236 : buffer_param buf,
237 : int flags,
238 : std::stop_token token,
239 : std::error_code* ec,
240 : std::size_t* bytes_out) = 0;
241 : };
242 :
243 : /** Represent the awaitable returned by @ref send_to.
244 :
245 : Captures the destination endpoint and buffer, then dispatches
246 : to the backend implementation on suspension.
247 : */
248 : struct send_to_awaitable
249 : : detail::bytes_op_base<send_to_awaitable>
250 : {
251 : udp_socket& s_;
252 : buffer_param buf_;
253 : endpoint dest_;
254 : int flags_;
255 :
256 HIT 22 : send_to_awaitable(
257 : udp_socket& s, buffer_param buf,
258 : endpoint dest, int flags = 0) noexcept
259 22 : : s_(s), buf_(buf), dest_(dest), flags_(flags) {}
260 :
261 22 : std::coroutine_handle<> dispatch(
262 : std::coroutine_handle<> h, capy::executor_ref ex) const
263 : {
264 44 : return s_.get().send_to(
265 44 : h, ex, buf_, dest_, flags_, token_, &ec_, &bytes_);
266 : }
267 : };
268 :
269 : struct recv_from_awaitable
270 : : detail::bytes_op_base<recv_from_awaitable>
271 : {
272 : udp_socket& s_;
273 : buffer_param buf_;
274 : endpoint& source_;
275 : int flags_;
276 :
277 32 : recv_from_awaitable(
278 : udp_socket& s, buffer_param buf,
279 : endpoint& source, int flags = 0) noexcept
280 32 : : s_(s), buf_(buf), source_(source), flags_(flags) {}
281 :
282 32 : std::coroutine_handle<> dispatch(
283 : std::coroutine_handle<> h, capy::executor_ref ex) const
284 : {
285 64 : return s_.get().recv_from(
286 64 : h, ex, buf_, &source_, flags_, token_, &ec_, &bytes_);
287 : }
288 : };
289 :
290 : struct connect_awaitable
291 : : detail::void_op_base<connect_awaitable>
292 : {
293 : udp_socket& s_;
294 : endpoint endpoint_;
295 :
296 12 : connect_awaitable(udp_socket& s, endpoint ep) noexcept
297 12 : : s_(s), endpoint_(ep) {}
298 :
299 12 : std::coroutine_handle<> dispatch(
300 : std::coroutine_handle<> h, capy::executor_ref ex) const
301 : {
302 12 : return s_.get().connect(h, ex, endpoint_, token_, &ec_);
303 : }
304 : };
305 :
306 : struct send_awaitable
307 : : detail::bytes_op_base<send_awaitable>
308 : {
309 : udp_socket& s_;
310 : buffer_param buf_;
311 : int flags_;
312 :
313 6 : send_awaitable(
314 : udp_socket& s, buffer_param buf,
315 : int flags = 0) noexcept
316 6 : : s_(s), buf_(buf), flags_(flags) {}
317 :
318 6 : std::coroutine_handle<> dispatch(
319 : std::coroutine_handle<> h, capy::executor_ref ex) const
320 : {
321 12 : return s_.get().send(
322 12 : h, ex, buf_, flags_, token_, &ec_, &bytes_);
323 : }
324 : };
325 :
326 : struct recv_awaitable
327 : : detail::bytes_op_base<recv_awaitable>
328 : {
329 : udp_socket& s_;
330 : buffer_param buf_;
331 : int flags_;
332 :
333 4 : recv_awaitable(
334 : udp_socket& s, buffer_param buf,
335 : int flags = 0) noexcept
336 4 : : s_(s), buf_(buf), flags_(flags) {}
337 :
338 4 : std::coroutine_handle<> dispatch(
339 : std::coroutine_handle<> h, capy::executor_ref ex) const
340 : {
341 8 : return s_.get().recv(
342 8 : h, ex, buf_, flags_, token_, &ec_, &bytes_);
343 : }
344 : };
345 :
346 : public:
347 : /** Destructor.
348 :
349 : Closes the socket if open, cancelling any pending operations.
350 : */
351 : ~udp_socket() override;
352 :
353 : /** Construct a socket from an execution context.
354 :
355 : @param ctx The execution context that will own this socket.
356 : */
357 : explicit udp_socket(capy::execution_context& ctx);
358 :
359 : /** Construct a socket from an executor.
360 :
361 : The socket is associated with the executor's context.
362 :
363 : @param ex The executor whose context will own the socket.
364 : */
365 : template<class Ex>
366 : requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
367 : capy::Executor<Ex>
368 : explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
369 : {
370 : }
371 :
372 : /** Move constructor.
373 :
374 : Transfers ownership of the socket resources.
375 :
376 : @param other The socket to move from.
377 : */
378 2 : udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
379 :
380 : /** Move assignment operator.
381 :
382 : Closes any existing socket and transfers ownership.
383 :
384 : @param other The socket to move from.
385 : @return Reference to this socket.
386 : */
387 2 : udp_socket& operator=(udp_socket&& other) noexcept
388 : {
389 2 : if (this != &other)
390 : {
391 2 : close();
392 2 : h_ = std::move(other.h_);
393 : }
394 2 : return *this;
395 : }
396 :
397 : udp_socket(udp_socket const&) = delete;
398 : udp_socket& operator=(udp_socket const&) = delete;
399 :
400 : /** Open the socket.
401 :
402 : Creates a UDP socket and associates it with the platform
403 : reactor.
404 :
405 : @param proto The protocol (IPv4 or IPv6). Defaults to
406 : `udp::v4()`.
407 :
408 : @throws std::system_error on failure.
409 : */
410 : void open(udp proto = udp::v4());
411 :
412 : /** Close the socket.
413 :
414 : Releases socket resources. Any pending operations complete
415 : with `errc::operation_canceled`.
416 : */
417 : void close();
418 :
419 : /** Check if the socket is open.
420 :
421 : @return `true` if the socket is open and ready for operations.
422 : */
423 444 : bool is_open() const noexcept
424 : {
425 : #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
426 : return h_ && get().native_handle() != ~native_handle_type(0);
427 : #else
428 444 : return h_ && get().native_handle() >= 0;
429 : #endif
430 : }
431 :
432 : /** Bind the socket to a local endpoint.
433 :
434 : Associates the socket with a local address and port.
435 : Required before calling `recv_from`.
436 :
437 : @param ep The local endpoint to bind to.
438 :
439 : @return Error code on failure, empty on success.
440 :
441 : @throws std::logic_error if the socket is not open.
442 : */
443 : [[nodiscard]] std::error_code bind(endpoint ep);
444 :
445 : /** Cancel any pending asynchronous operations.
446 :
447 : All outstanding operations complete with
448 : `errc::operation_canceled`. Check `ec == cond::canceled`
449 : for portable comparison.
450 : */
451 : void cancel();
452 :
453 : /** Get the native socket handle.
454 :
455 : @return The native socket handle, or -1 if not open.
456 : */
457 : native_handle_type native_handle() const noexcept;
458 :
459 : /** Set a socket option.
460 :
461 : @param opt The option to set.
462 :
463 : @throws std::logic_error if the socket is not open.
464 : @throws std::system_error on failure.
465 : */
466 : template<class Option>
467 20 : void set_option(Option const& opt)
468 : {
469 20 : if (!is_open())
470 MIS 0 : detail::throw_logic_error("set_option: socket not open");
471 HIT 20 : std::error_code ec = get().set_option(
472 : Option::level(), Option::name(), opt.data(), opt.size());
473 20 : if (ec)
474 MIS 0 : detail::throw_system_error(ec, "udp_socket::set_option");
475 HIT 20 : }
476 :
477 : /** Get a socket option.
478 :
479 : @return The current option value.
480 :
481 : @throws std::logic_error if the socket is not open.
482 : @throws std::system_error on failure.
483 : */
484 : template<class Option>
485 16 : Option get_option() const
486 : {
487 16 : if (!is_open())
488 MIS 0 : detail::throw_logic_error("get_option: socket not open");
489 HIT 16 : Option opt{};
490 16 : std::size_t sz = opt.size();
491 : std::error_code ec =
492 16 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
493 16 : if (ec)
494 MIS 0 : detail::throw_system_error(ec, "udp_socket::get_option");
495 HIT 16 : opt.resize(sz);
496 16 : return opt;
497 : }
498 :
499 : /** Get the local endpoint of the socket.
500 :
501 : @return The local endpoint, or a default endpoint if not bound.
502 : */
503 : endpoint local_endpoint() const noexcept;
504 :
505 : /** Send a datagram to the specified destination.
506 :
507 : @param buf The buffer containing data to send.
508 : @param dest The destination endpoint.
509 : @param flags Message flags (e.g. message_flags::dont_route).
510 :
511 : @return An awaitable that completes with
512 : `io_result<std::size_t>`.
513 :
514 : @throws std::logic_error if the socket is not open.
515 : */
516 : template<capy::ConstBufferSequence Buffers>
517 22 : auto send_to(
518 : Buffers const& buf,
519 : endpoint dest,
520 : corosio::message_flags flags)
521 : {
522 22 : if (!is_open())
523 MIS 0 : detail::throw_logic_error("send_to: socket not open");
524 : return send_to_awaitable(
525 HIT 22 : *this, buf, dest, static_cast<int>(flags));
526 : }
527 :
528 : /// @overload
529 : template<capy::ConstBufferSequence Buffers>
530 22 : auto send_to(Buffers const& buf, endpoint dest)
531 : {
532 22 : return send_to(buf, dest, corosio::message_flags::none);
533 : }
534 :
535 : /** Receive a datagram and capture the sender's endpoint.
536 :
537 : @param buf The buffer to receive data into.
538 : @param source Reference to an endpoint that will be set to
539 : the sender's address on successful completion.
540 : @param flags Message flags (e.g. message_flags::peek).
541 :
542 : @return An awaitable that completes with
543 : `io_result<std::size_t>`.
544 :
545 : @throws std::logic_error if the socket is not open.
546 : */
547 : template<capy::MutableBufferSequence Buffers>
548 32 : auto recv_from(
549 : Buffers const& buf,
550 : endpoint& source,
551 : corosio::message_flags flags)
552 : {
553 32 : if (!is_open())
554 MIS 0 : detail::throw_logic_error("recv_from: socket not open");
555 : return recv_from_awaitable(
556 HIT 32 : *this, buf, source, static_cast<int>(flags));
557 : }
558 :
559 : /// @overload
560 : template<capy::MutableBufferSequence Buffers>
561 32 : auto recv_from(Buffers const& buf, endpoint& source)
562 : {
563 32 : return recv_from(buf, source, corosio::message_flags::none);
564 : }
565 :
566 : /** Initiate an asynchronous connect to set the default peer.
567 :
568 : If the socket is not already open, it is opened automatically
569 : using the address family of @p ep.
570 :
571 : @param ep The remote endpoint to connect to.
572 :
573 : @return An awaitable that completes with `io_result<>`.
574 :
575 : @throws std::system_error if the socket needs to be opened
576 : and the open fails.
577 : */
578 12 : auto connect(endpoint ep)
579 : {
580 12 : if (!is_open())
581 8 : open(ep.is_v6() ? udp::v6() : udp::v4());
582 12 : return connect_awaitable(*this, ep);
583 : }
584 :
585 : /** Send a datagram to the connected peer.
586 :
587 : @param buf The buffer containing data to send.
588 : @param flags Message flags.
589 :
590 : @return An awaitable that completes with
591 : `io_result<std::size_t>`.
592 :
593 : @throws std::logic_error if the socket is not open.
594 : */
595 : template<capy::ConstBufferSequence Buffers>
596 6 : auto send(Buffers const& buf, corosio::message_flags flags)
597 : {
598 6 : if (!is_open())
599 MIS 0 : detail::throw_logic_error("send: socket not open");
600 : return send_awaitable(
601 HIT 6 : *this, buf, static_cast<int>(flags));
602 : }
603 :
604 : /// @overload
605 : template<capy::ConstBufferSequence Buffers>
606 6 : auto send(Buffers const& buf)
607 : {
608 6 : return send(buf, corosio::message_flags::none);
609 : }
610 :
611 : /** Receive a datagram from the connected peer.
612 :
613 : @param buf The buffer to receive data into.
614 : @param flags Message flags (e.g. message_flags::peek).
615 :
616 : @return An awaitable that completes with
617 : `io_result<std::size_t>`.
618 :
619 : @throws std::logic_error if the socket is not open.
620 : */
621 : template<capy::MutableBufferSequence Buffers>
622 4 : auto recv(Buffers const& buf, corosio::message_flags flags)
623 : {
624 4 : if (!is_open())
625 MIS 0 : detail::throw_logic_error("recv: socket not open");
626 : return recv_awaitable(
627 HIT 4 : *this, buf, static_cast<int>(flags));
628 : }
629 :
630 : /// @overload
631 : template<capy::MutableBufferSequence Buffers>
632 4 : auto recv(Buffers const& buf)
633 : {
634 4 : return recv(buf, corosio::message_flags::none);
635 : }
636 :
637 : /** Get the remote endpoint of the socket.
638 :
639 : Returns the address and port of the connected peer.
640 :
641 : @return The remote endpoint, or a default endpoint if
642 : not connected.
643 : */
644 : endpoint remote_endpoint() const noexcept;
645 :
646 : protected:
647 : /// Construct from a pre-built handle (for native_udp_socket).
648 : explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
649 : {
650 : }
651 :
652 : private:
653 : /// Open the socket for the given protocol triple.
654 : void open_for_family(int family, int type, int protocol);
655 :
656 588 : inline implementation& get() const noexcept
657 : {
658 588 : return *static_cast<implementation*>(h_.get());
659 : }
660 : };
661 :
662 : } // namespace boost::corosio
663 :
664 : #endif // BOOST_COROSIO_UDP_SOCKET_HPP
|