1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_UDP_SOCKET_HPP
10  
#ifndef BOOST_COROSIO_UDP_SOCKET_HPP
11  
#define BOOST_COROSIO_UDP_SOCKET_HPP
11  
#define BOOST_COROSIO_UDP_SOCKET_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/detail/platform.hpp>
14  
#include <boost/corosio/detail/platform.hpp>
15  
#include <boost/corosio/detail/except.hpp>
15  
#include <boost/corosio/detail/except.hpp>
16  
#include <boost/corosio/detail/native_handle.hpp>
16  
#include <boost/corosio/detail/native_handle.hpp>
 
17 +
#include <boost/corosio/detail/op_base.hpp>
17  
#include <boost/corosio/io/io_object.hpp>
18  
#include <boost/corosio/io/io_object.hpp>
18  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/corosio/detail/buffer_param.hpp>
20  
#include <boost/corosio/detail/buffer_param.hpp>
20  
#include <boost/corosio/endpoint.hpp>
21  
#include <boost/corosio/endpoint.hpp>
 
22 +
#include <boost/corosio/message_flags.hpp>
21  
#include <boost/corosio/udp.hpp>
23  
#include <boost/corosio/udp.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
24  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <boost/capy/ex/execution_context.hpp>
25  
#include <boost/capy/ex/execution_context.hpp>
24  
#include <boost/capy/ex/io_env.hpp>
26  
#include <boost/capy/ex/io_env.hpp>
25  
#include <boost/capy/concept/executor.hpp>
27  
#include <boost/capy/concept/executor.hpp>
26  

28  

27  
#include <system_error>
29  
#include <system_error>
28  

30  

29  
#include <concepts>
31  
#include <concepts>
30  
#include <coroutine>
32  
#include <coroutine>
31  
#include <cstddef>
33  
#include <cstddef>
32  
#include <stop_token>
34  
#include <stop_token>
33  
#include <type_traits>
35  
#include <type_traits>
34  

36  

35  
namespace boost::corosio {
37  
namespace boost::corosio {
36  

38  

37  
/** An asynchronous UDP socket for coroutine I/O.
39  
/** An asynchronous UDP socket for coroutine I/O.
38  

40  

39  
    This class provides asynchronous UDP datagram operations that
41  
    This class provides asynchronous UDP datagram operations that
40  
    return awaitable types. Each operation participates in the affine
42  
    return awaitable types. Each operation participates in the affine
41  
    awaitable protocol, ensuring coroutines resume on the correct
43  
    awaitable protocol, ensuring coroutines resume on the correct
42  
    executor.
44  
    executor.
43  

45  

44  
    Supports two modes of operation:
46  
    Supports two modes of operation:
45  

47  

46  
    **Connectionless mode**: each `send_to` specifies a destination
48  
    **Connectionless mode**: each `send_to` specifies a destination
47  
    endpoint, and each `recv_from` captures the source endpoint.
49  
    endpoint, and each `recv_from` captures the source endpoint.
48  
    The socket must be opened (and optionally bound) before I/O.
50  
    The socket must be opened (and optionally bound) before I/O.
49  

51  

50  
    **Connected mode**: call `connect()` to set a default peer,
52  
    **Connected mode**: call `connect()` to set a default peer,
51  
    then use `send()`/`recv()` without endpoint arguments.
53  
    then use `send()`/`recv()` without endpoint arguments.
52  
    The kernel filters incoming datagrams to those from the
54  
    The kernel filters incoming datagrams to those from the
53  
    connected peer.
55  
    connected peer.
54  

56  

55  
    @par Thread Safety
57  
    @par Thread Safety
56  
    Distinct objects: Safe.@n
58  
    Distinct objects: Safe.@n
57  
    Shared objects: Unsafe. A socket must not have concurrent
59  
    Shared objects: Unsafe. A socket must not have concurrent
58  
    operations of the same type (e.g., two simultaneous recv_from).
60  
    operations of the same type (e.g., two simultaneous recv_from).
59  
    One send_to and one recv_from may be in flight simultaneously.
61  
    One send_to and one recv_from may be in flight simultaneously.
60  

62  

61  
    @par Example
63  
    @par Example
62  
    @code
64  
    @code
63  
    // Connectionless mode
65  
    // Connectionless mode
64  
    io_context ioc;
66  
    io_context ioc;
65  
    udp_socket sock( ioc );
67  
    udp_socket sock( ioc );
66  
    sock.open( udp::v4() );
68  
    sock.open( udp::v4() );
67  
    sock.bind( endpoint( ipv4_address::any(), 9000 ) );
69  
    sock.bind( endpoint( ipv4_address::any(), 9000 ) );
68  

70  

69  
    char buf[1024];
71  
    char buf[1024];
70  
    endpoint sender;
72  
    endpoint sender;
71  
    auto [ec, n] = co_await sock.recv_from(
73  
    auto [ec, n] = co_await sock.recv_from(
72  
        capy::mutable_buffer( buf, sizeof( buf ) ), sender );
74  
        capy::mutable_buffer( buf, sizeof( buf ) ), sender );
73  
    if ( !ec )
75  
    if ( !ec )
74  
        co_await sock.send_to(
76  
        co_await sock.send_to(
75  
            capy::const_buffer( buf, n ), sender );
77  
            capy::const_buffer( buf, n ), sender );
76  

78  

77  
    // Connected mode
79  
    // Connected mode
78  
    udp_socket csock( ioc );
80  
    udp_socket csock( ioc );
79  
    auto [cec] = co_await csock.connect(
81  
    auto [cec] = co_await csock.connect(
80  
        endpoint( ipv4_address::loopback(), 9000 ) );
82  
        endpoint( ipv4_address::loopback(), 9000 ) );
81  
    if ( !cec )
83  
    if ( !cec )
82  
        co_await csock.send(
84  
        co_await csock.send(
83  
            capy::const_buffer( buf, n ) );
85  
            capy::const_buffer( buf, n ) );
84  
    @endcode
86  
    @endcode
85  
*/
87  
*/
86  
class BOOST_COROSIO_DECL udp_socket : public io_object
88  
class BOOST_COROSIO_DECL udp_socket : public io_object
87  
{
89  
{
88  
public:
90  
public:
89  
    /** Define backend hooks for UDP socket operations.
91  
    /** Define backend hooks for UDP socket operations.
90  

92  

91  
        Platform backends (epoll, kqueue, select) derive from
93  
        Platform backends (epoll, kqueue, select) derive from
92  
        this to implement datagram I/O and option management.
94  
        this to implement datagram I/O and option management.
93  
    */
95  
    */
94  
    struct implementation : io_object::implementation
96  
    struct implementation : io_object::implementation
95  
    {
97  
    {
96  
        /** Initiate an asynchronous send_to operation.
98  
        /** Initiate an asynchronous send_to operation.
97  

99  

98  
            @param h Coroutine handle to resume on completion.
100  
            @param h Coroutine handle to resume on completion.
99  
            @param ex Executor for dispatching the completion.
101  
            @param ex Executor for dispatching the completion.
100  
            @param buf The buffer data to send.
102  
            @param buf The buffer data to send.
101  
            @param dest The destination endpoint.
103  
            @param dest The destination endpoint.
102  
            @param token Stop token for cancellation.
104  
            @param token Stop token for cancellation.
103  
            @param ec Output error code.
105  
            @param ec Output error code.
104  
            @param bytes_out Output bytes transferred.
106  
            @param bytes_out Output bytes transferred.
105  

107  

106  
            @return Coroutine handle to resume immediately.
108  
            @return Coroutine handle to resume immediately.
107  
        */
109  
        */
108  
        virtual std::coroutine_handle<> send_to(
110  
        virtual std::coroutine_handle<> send_to(
109  
            std::coroutine_handle<> h,
111  
            std::coroutine_handle<> h,
110  
            capy::executor_ref ex,
112  
            capy::executor_ref ex,
111  
            buffer_param buf,
113  
            buffer_param buf,
112  
            endpoint dest,
114  
            endpoint dest,
 
115 +
            int flags,
113  
            std::stop_token token,
116  
            std::stop_token token,
114  
            std::error_code* ec,
117  
            std::error_code* ec,
115  
            std::size_t* bytes_out) = 0;
118  
            std::size_t* bytes_out) = 0;
116  

119  

117  
        /** Initiate an asynchronous recv_from operation.
120  
        /** Initiate an asynchronous recv_from operation.
118  

121  

119  
            @param h Coroutine handle to resume on completion.
122  
            @param h Coroutine handle to resume on completion.
120  
            @param ex Executor for dispatching the completion.
123  
            @param ex Executor for dispatching the completion.
121  
            @param buf The buffer to receive into.
124  
            @param buf The buffer to receive into.
122  
            @param source Output endpoint for the sender's address.
125  
            @param source Output endpoint for the sender's address.
123  
            @param token Stop token for cancellation.
126  
            @param token Stop token for cancellation.
124  
            @param ec Output error code.
127  
            @param ec Output error code.
125  
            @param bytes_out Output bytes transferred.
128  
            @param bytes_out Output bytes transferred.
126  

129  

127  
            @return Coroutine handle to resume immediately.
130  
            @return Coroutine handle to resume immediately.
128  
        */
131  
        */
129  
        virtual std::coroutine_handle<> recv_from(
132  
        virtual std::coroutine_handle<> recv_from(
130  
            std::coroutine_handle<> h,
133  
            std::coroutine_handle<> h,
131  
            capy::executor_ref ex,
134  
            capy::executor_ref ex,
132  
            buffer_param buf,
135  
            buffer_param buf,
133  
            endpoint* source,
136  
            endpoint* source,
 
137 +
            int flags,
134  
            std::stop_token token,
138  
            std::stop_token token,
135  
            std::error_code* ec,
139  
            std::error_code* ec,
136  
            std::size_t* bytes_out) = 0;
140  
            std::size_t* bytes_out) = 0;
137  

141  

138  
        /// Return the platform socket descriptor.
142  
        /// Return the platform socket descriptor.
139  
        virtual native_handle_type native_handle() const noexcept = 0;
143  
        virtual native_handle_type native_handle() const noexcept = 0;
140  

144  

141  
        /** Request cancellation of pending asynchronous operations.
145  
        /** Request cancellation of pending asynchronous operations.
142  

146  

143  
            All outstanding operations complete with operation_canceled
147  
            All outstanding operations complete with operation_canceled
144  
            error. Check `ec == cond::canceled` for portable comparison.
148  
            error. Check `ec == cond::canceled` for portable comparison.
145  
        */
149  
        */
146  
        virtual void cancel() noexcept = 0;
150  
        virtual void cancel() noexcept = 0;
147  

151  

148  
        /** Set a socket option.
152  
        /** Set a socket option.
149  

153  

150  
            @param level The protocol level (e.g. `SOL_SOCKET`).
154  
            @param level The protocol level (e.g. `SOL_SOCKET`).
151  
            @param optname The option name.
155  
            @param optname The option name.
152  
            @param data Pointer to the option value.
156  
            @param data Pointer to the option value.
153  
            @param size Size of the option value in bytes.
157  
            @param size Size of the option value in bytes.
154  
            @return Error code on failure, empty on success.
158  
            @return Error code on failure, empty on success.
155  
        */
159  
        */
156  
        virtual std::error_code set_option(
160  
        virtual std::error_code set_option(
157  
            int level,
161  
            int level,
158  
            int optname,
162  
            int optname,
159  
            void const* data,
163  
            void const* data,
160  
            std::size_t size) noexcept = 0;
164  
            std::size_t size) noexcept = 0;
161  

165  

162  
        /** Get a socket option.
166  
        /** Get a socket option.
163  

167  

164  
            @param level The protocol level (e.g. `SOL_SOCKET`).
168  
            @param level The protocol level (e.g. `SOL_SOCKET`).
165  
            @param optname The option name.
169  
            @param optname The option name.
166  
            @param data Pointer to receive the option value.
170  
            @param data Pointer to receive the option value.
167  
            @param size On entry, the size of the buffer. On exit,
171  
            @param size On entry, the size of the buffer. On exit,
168  
                the size of the option value.
172  
                the size of the option value.
169  
            @return Error code on failure, empty on success.
173  
            @return Error code on failure, empty on success.
170  
        */
174  
        */
171  
        virtual std::error_code
175  
        virtual std::error_code
172  
        get_option(int level, int optname, void* data, std::size_t* size)
176  
        get_option(int level, int optname, void* data, std::size_t* size)
173  
            const noexcept = 0;
177  
            const noexcept = 0;
174  

178  

175  
        /// Return the cached local endpoint.
179  
        /// Return the cached local endpoint.
176  
        virtual endpoint local_endpoint() const noexcept = 0;
180  
        virtual endpoint local_endpoint() const noexcept = 0;
177  

181  

178  
        /// Return the cached remote endpoint (connected mode).
182  
        /// Return the cached remote endpoint (connected mode).
179  
        virtual endpoint remote_endpoint() const noexcept = 0;
183  
        virtual endpoint remote_endpoint() const noexcept = 0;
180  

184  

181  
        /** Initiate an asynchronous connect to set the default peer.
185  
        /** Initiate an asynchronous connect to set the default peer.
182  

186  

183  
            @param h Coroutine handle to resume on completion.
187  
            @param h Coroutine handle to resume on completion.
184  
            @param ex Executor for dispatching the completion.
188  
            @param ex Executor for dispatching the completion.
185  
            @param ep The remote endpoint to connect to.
189  
            @param ep The remote endpoint to connect to.
186  
            @param token Stop token for cancellation.
190  
            @param token Stop token for cancellation.
187  
            @param ec Output error code.
191  
            @param ec Output error code.
188  

192  

189  
            @return Coroutine handle to resume immediately.
193  
            @return Coroutine handle to resume immediately.
190  
        */
194  
        */
191  
        virtual std::coroutine_handle<> connect(
195  
        virtual std::coroutine_handle<> connect(
192  
            std::coroutine_handle<> h,
196  
            std::coroutine_handle<> h,
193  
            capy::executor_ref ex,
197  
            capy::executor_ref ex,
194  
            endpoint ep,
198  
            endpoint ep,
195  
            std::stop_token token,
199  
            std::stop_token token,
196  
            std::error_code* ec) = 0;
200  
            std::error_code* ec) = 0;
197  

201  

198  
        /** Initiate an asynchronous connected send operation.
202  
        /** Initiate an asynchronous connected send operation.
199  

203  

200  
            @param h Coroutine handle to resume on completion.
204  
            @param h Coroutine handle to resume on completion.
201  
            @param ex Executor for dispatching the completion.
205  
            @param ex Executor for dispatching the completion.
202  
            @param buf The buffer data to send.
206  
            @param buf The buffer data to send.
203  
            @param token Stop token for cancellation.
207  
            @param token Stop token for cancellation.
204  
            @param ec Output error code.
208  
            @param ec Output error code.
205  
            @param bytes_out Output bytes transferred.
209  
            @param bytes_out Output bytes transferred.
206  

210  

207  
            @return Coroutine handle to resume immediately.
211  
            @return Coroutine handle to resume immediately.
208  
        */
212  
        */
209  
        virtual std::coroutine_handle<> send(
213  
        virtual std::coroutine_handle<> send(
210  
            std::coroutine_handle<> h,
214  
            std::coroutine_handle<> h,
211  
            capy::executor_ref ex,
215  
            capy::executor_ref ex,
212  
            buffer_param buf,
216  
            buffer_param buf,
 
217 +
            int flags,
213  
            std::stop_token token,
218  
            std::stop_token token,
214  
            std::error_code* ec,
219  
            std::error_code* ec,
215  
            std::size_t* bytes_out) = 0;
220  
            std::size_t* bytes_out) = 0;
216  

221  

217  
        /** Initiate an asynchronous connected recv operation.
222  
        /** Initiate an asynchronous connected recv operation.
218  

223  

219  
            @param h Coroutine handle to resume on completion.
224  
            @param h Coroutine handle to resume on completion.
220  
            @param ex Executor for dispatching the completion.
225  
            @param ex Executor for dispatching the completion.
221  
            @param buf The buffer to receive into.
226  
            @param buf The buffer to receive into.
222  
            @param token Stop token for cancellation.
227  
            @param token Stop token for cancellation.
223  
            @param ec Output error code.
228  
            @param ec Output error code.
224  
            @param bytes_out Output bytes transferred.
229  
            @param bytes_out Output bytes transferred.
225  

230  

226  
            @return Coroutine handle to resume immediately.
231  
            @return Coroutine handle to resume immediately.
227  
        */
232  
        */
228  
        virtual std::coroutine_handle<> recv(
233  
        virtual std::coroutine_handle<> recv(
229  
            std::coroutine_handle<> h,
234  
            std::coroutine_handle<> h,
230  
            capy::executor_ref ex,
235  
            capy::executor_ref ex,
231  
            buffer_param buf,
236  
            buffer_param buf,
 
237 +
            int flags,
232  
            std::stop_token token,
238  
            std::stop_token token,
233  
            std::error_code* ec,
239  
            std::error_code* ec,
234  
            std::size_t* bytes_out) = 0;
240  
            std::size_t* bytes_out) = 0;
235  
    };
241  
    };
236  

242  

237  
    /** Represent the awaitable returned by @ref send_to.
243  
    /** Represent the awaitable returned by @ref send_to.
238  

244  

239  
        Captures the destination endpoint and buffer, then dispatches
245  
        Captures the destination endpoint and buffer, then dispatches
240  
        to the backend implementation on suspension.
246  
        to the backend implementation on suspension.
241  
    */
247  
    */
242  
    struct send_to_awaitable
248  
    struct send_to_awaitable
 
249 +
        : detail::bytes_op_base<send_to_awaitable>
243  
    {
250  
    {
244  
        udp_socket& s_;
251  
        udp_socket& s_;
245  
        buffer_param buf_;
252  
        buffer_param buf_;
246  
        endpoint dest_;
253  
        endpoint dest_;
247 -
        std::stop_token token_;
254 +
        int flags_;
248 -
        mutable std::error_code ec_;
 
249 -
        mutable std::size_t bytes_ = 0;
 
250  

255  

251  
        send_to_awaitable(
256  
        send_to_awaitable(
252 -
            udp_socket& s, buffer_param buf, endpoint dest) noexcept
257 +
            udp_socket& s, buffer_param buf,
253 -
            : s_(s)
258 +
            endpoint dest, int flags = 0) noexcept
254 -
            , buf_(buf)
259 +
            : s_(s), buf_(buf), dest_(dest), flags_(flags) {}
255 -
            , dest_(dest)
 
256 -
        {
 
257 -
        }
 
258 -

 
259 -
        bool await_ready() const noexcept
 
260 -
        {
 
261 -
            return token_.stop_requested();
 
262 -
        }
 
263 -

 
264 -
        capy::io_result<std::size_t> await_resume() const noexcept
 
265 -
        {
 
266 -
            if (token_.stop_requested())
 
267 -
                return {make_error_code(std::errc::operation_canceled), 0};
 
268 -
            return {ec_, bytes_};
 
269 -
        }
 
270  

260  

271 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
261 +
        std::coroutine_handle<> dispatch(
272 -
            -> std::coroutine_handle<>
262 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
273 -
            token_ = env->stop_token;
 
274  
        {
263  
        {
275  
            return s_.get().send_to(
264  
            return s_.get().send_to(
276 -
                h, env->executor, buf_, dest_, token_, &ec_, &bytes_);
265 +
                h, ex, buf_, dest_, flags_, token_, &ec_, &bytes_);
277  
        }
266  
        }
278  
    };
267  
    };
279 -
    /** Represent the awaitable returned by @ref recv_from.
 
280 -

 
281 -
        Captures the receive buffer and source endpoint reference,
 
282 -
        then dispatches to the backend implementation on suspension.
 
283 -
    */
 
284  

268  

285  
    struct recv_from_awaitable
269  
    struct recv_from_awaitable
 
270 +
        : detail::bytes_op_base<recv_from_awaitable>
286  
    {
271  
    {
287  
        udp_socket& s_;
272  
        udp_socket& s_;
288  
        buffer_param buf_;
273  
        buffer_param buf_;
289  
        endpoint& source_;
274  
        endpoint& source_;
290 -
        std::stop_token token_;
275 +
        int flags_;
291 -
        mutable std::error_code ec_;
 
292 -
        mutable std::size_t bytes_ = 0;
 
293  

276  

294  
        recv_from_awaitable(
277  
        recv_from_awaitable(
295 -
            udp_socket& s, buffer_param buf, endpoint& source) noexcept
278 +
            udp_socket& s, buffer_param buf,
296 -
            : s_(s)
279 +
            endpoint& source, int flags = 0) noexcept
297 -
            , buf_(buf)
280 +
            : s_(s), buf_(buf), source_(source), flags_(flags) {}
298 -
            , source_(source)
 
299 -
        {
 
300 -
        }
 
301 -

 
302 -
        bool await_ready() const noexcept
 
303 -
        {
 
304 -
            return token_.stop_requested();
 
305 -
        }
 
306 -

 
307 -
        capy::io_result<std::size_t> await_resume() const noexcept
 
308 -
        {
 
309 -
            if (token_.stop_requested())
 
310 -
                return {make_error_code(std::errc::operation_canceled), 0};
 
311 -
            return {ec_, bytes_};
 
312 -
        }
 
313  

281  

314 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
282 +
        std::coroutine_handle<> dispatch(
315 -
            -> std::coroutine_handle<>
283 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
316 -
            token_ = env->stop_token;
 
317  
        {
284  
        {
318  
            return s_.get().recv_from(
285  
            return s_.get().recv_from(
319 -
                h, env->executor, buf_, &source_, token_, &ec_, &bytes_);
286 +
                h, ex, buf_, &source_, flags_, token_, &ec_, &bytes_);
320  
        }
287  
        }
321  
    };
288  
    };
322 -
    /** Represent the awaitable returned by @ref connect.
 
323 -

 
324 -
        Captures the target endpoint, then dispatches to the backend
 
325 -
        implementation on suspension.
 
326 -
    */
 
327  

289  

328  
    struct connect_awaitable
290  
    struct connect_awaitable
 
291 +
        : detail::void_op_base<connect_awaitable>
329  
    {
292  
    {
330  
        udp_socket& s_;
293  
        udp_socket& s_;
331 -
        std::stop_token token_;
 
332 -
        mutable std::error_code ec_;
 
333  
        endpoint endpoint_;
294  
        endpoint endpoint_;
334  

295  

335  
        connect_awaitable(udp_socket& s, endpoint ep) noexcept
296  
        connect_awaitable(udp_socket& s, endpoint ep) noexcept
336 -
            : s_(s)
297 +
            : s_(s), endpoint_(ep) {}
337 -
            , endpoint_(ep)
 
338 -
        {
 
339 -
        }
 
340 -

 
341 -
        bool await_ready() const noexcept
 
342 -
        {
 
343 -
            return token_.stop_requested();
 
344 -
        }
 
345 -

 
346 -
        capy::io_result<> await_resume() const noexcept
 
347 -
        {
 
348 -
            if (token_.stop_requested())
 
349 -
                return {make_error_code(std::errc::operation_canceled)};
 
350 -
            return {ec_};
 
351 -
        }
 
352  

298  

353 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
299 +
        std::coroutine_handle<> dispatch(
354 -
            -> std::coroutine_handle<>
300 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
355  
        {
301  
        {
356 -
            token_ = env->stop_token;
302 +
            return s_.get().connect(h, ex, endpoint_, token_, &ec_);
357 -
            return s_.get().connect(h, env->executor, endpoint_, token_, &ec_);
 
358  
        }
303  
        }
359  
    };
304  
    };
360 -
    /** Represent the awaitable returned by @ref send.
 
361 -

 
362 -
        Captures the buffer, then dispatches to the backend
 
363 -
        implementation on suspension. No endpoint argument
 
364 -
        (uses the connected peer).
 
365 -
    */
 
366  

305  

367  
    struct send_awaitable
306  
    struct send_awaitable
 
307 +
        : detail::bytes_op_base<send_awaitable>
368  
    {
308  
    {
369  
        udp_socket& s_;
309  
        udp_socket& s_;
370  
        buffer_param buf_;
310  
        buffer_param buf_;
371 -
        std::stop_token token_;
311 +
        int flags_;
372 -
        mutable std::error_code ec_;
 
373 -
        mutable std::size_t bytes_ = 0;
 
374 -

 
375 -
        send_awaitable(udp_socket& s, buffer_param buf) noexcept
 
376 -
            : s_(s)
 
377 -
            , buf_(buf)
 
378 -
        {
 
379 -
        }
 
380 -

 
381 -
        bool await_ready() const noexcept
 
382 -
        {
 
383 -
            return token_.stop_requested();
 
384 -
        }
 
385  

312  

386 -
        capy::io_result<std::size_t> await_resume() const noexcept
313 +
        send_awaitable(
387 -
        {
314 +
            udp_socket& s, buffer_param buf,
388 -
            if (token_.stop_requested())
315 +
            int flags = 0) noexcept
389 -
                return {make_error_code(std::errc::operation_canceled), 0};
316 +
            : s_(s), buf_(buf), flags_(flags) {}
390 -
            return {ec_, bytes_};
 
391 -
        }
 
392  

317  

393 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
318 +
        std::coroutine_handle<> dispatch(
394 -
            -> std::coroutine_handle<>
319 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
395  
        {
320  
        {
396 -
            token_ = env->stop_token;
321 +
            return s_.get().send(
397 -
            return s_.get().send(h, env->executor, buf_, token_, &ec_, &bytes_);
322 +
                h, ex, buf_, flags_, token_, &ec_, &bytes_);
398  
        }
323  
        }
399  
    };
324  
    };
400 -
    /** Represent the awaitable returned by @ref recv.
 
401 -

 
402 -
        Captures the receive buffer, then dispatches to the backend
 
403 -
        implementation on suspension. No source endpoint (connected
 
404 -
        mode filters at the kernel level).
 
405 -
    */
 
406  

325  

407  
    struct recv_awaitable
326  
    struct recv_awaitable
 
327 +
        : detail::bytes_op_base<recv_awaitable>
408  
    {
328  
    {
409  
        udp_socket& s_;
329  
        udp_socket& s_;
410  
        buffer_param buf_;
330  
        buffer_param buf_;
411 -
        std::stop_token token_;
331 +
        int flags_;
412 -
        mutable std::error_code ec_;
 
413 -
        mutable std::size_t bytes_ = 0;
 
414 -

 
415 -
        recv_awaitable(udp_socket& s, buffer_param buf) noexcept
 
416 -
            : s_(s)
 
417 -
            , buf_(buf)
 
418 -
        {
 
419 -
        }
 
420 -

 
421 -
        bool await_ready() const noexcept
 
422 -
        {
 
423 -
            return token_.stop_requested();
 
424 -
        }
 
425  

332  

426 -
        capy::io_result<std::size_t> await_resume() const noexcept
333 +
        recv_awaitable(
427 -
        {
334 +
            udp_socket& s, buffer_param buf,
428 -
            if (token_.stop_requested())
335 +
            int flags = 0) noexcept
429 -
                return {make_error_code(std::errc::operation_canceled), 0};
336 +
            : s_(s), buf_(buf), flags_(flags) {}
430 -
            return {ec_, bytes_};
 
431 -
        }
 
432  

337  

433 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
338 +
        std::coroutine_handle<> dispatch(
434 -
            -> std::coroutine_handle<>
339 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
435  
        {
340  
        {
436 -
            token_ = env->stop_token;
341 +
            return s_.get().recv(
437 -
            return s_.get().recv(h, env->executor, buf_, token_, &ec_, &bytes_);
342 +
                h, ex, buf_, flags_, token_, &ec_, &bytes_);
438  
        }
343  
        }
439  
    };
344  
    };
440  

345  

441  
public:
346  
public:
442  
    /** Destructor.
347  
    /** Destructor.
443  

348  

444  
        Closes the socket if open, cancelling any pending operations.
349  
        Closes the socket if open, cancelling any pending operations.
445  
    */
350  
    */
446  
    ~udp_socket() override;
351  
    ~udp_socket() override;
447  

352  

448  
    /** Construct a socket from an execution context.
353  
    /** Construct a socket from an execution context.
449  

354  

450  
        @param ctx The execution context that will own this socket.
355  
        @param ctx The execution context that will own this socket.
451  
    */
356  
    */
452  
    explicit udp_socket(capy::execution_context& ctx);
357  
    explicit udp_socket(capy::execution_context& ctx);
453  

358  

454  
    /** Construct a socket from an executor.
359  
    /** Construct a socket from an executor.
455  

360  

456  
        The socket is associated with the executor's context.
361  
        The socket is associated with the executor's context.
457  

362  

458  
        @param ex The executor whose context will own the socket.
363  
        @param ex The executor whose context will own the socket.
459  
    */
364  
    */
460  
    template<class Ex>
365  
    template<class Ex>
461  
        requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
366  
        requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
462  
        capy::Executor<Ex>
367  
        capy::Executor<Ex>
463  
    explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
368  
    explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
464  
    {
369  
    {
465  
    }
370  
    }
466  

371  

467  
    /** Move constructor.
372  
    /** Move constructor.
468  

373  

469  
        Transfers ownership of the socket resources.
374  
        Transfers ownership of the socket resources.
470  

375  

471  
        @param other The socket to move from.
376  
        @param other The socket to move from.
472  
    */
377  
    */
473  
    udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
378  
    udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
474  

379  

475  
    /** Move assignment operator.
380  
    /** Move assignment operator.
476  

381  

477  
        Closes any existing socket and transfers ownership.
382  
        Closes any existing socket and transfers ownership.
478  

383  

479  
        @param other The socket to move from.
384  
        @param other The socket to move from.
480  
        @return Reference to this socket.
385  
        @return Reference to this socket.
481  
    */
386  
    */
482  
    udp_socket& operator=(udp_socket&& other) noexcept
387  
    udp_socket& operator=(udp_socket&& other) noexcept
483  
    {
388  
    {
484  
        if (this != &other)
389  
        if (this != &other)
485  
        {
390  
        {
486  
            close();
391  
            close();
487  
            h_ = std::move(other.h_);
392  
            h_ = std::move(other.h_);
488  
        }
393  
        }
489  
        return *this;
394  
        return *this;
490  
    }
395  
    }
491  

396  

492  
    udp_socket(udp_socket const&)            = delete;
397  
    udp_socket(udp_socket const&)            = delete;
493  
    udp_socket& operator=(udp_socket const&) = delete;
398  
    udp_socket& operator=(udp_socket const&) = delete;
494  

399  

495  
    /** Open the socket.
400  
    /** Open the socket.
496  

401  

497  
        Creates a UDP socket and associates it with the platform
402  
        Creates a UDP socket and associates it with the platform
498  
        reactor.
403  
        reactor.
499  

404  

500  
        @param proto The protocol (IPv4 or IPv6). Defaults to
405  
        @param proto The protocol (IPv4 or IPv6). Defaults to
501  
            `udp::v4()`.
406  
            `udp::v4()`.
502  

407  

503  
        @throws std::system_error on failure.
408  
        @throws std::system_error on failure.
504  
    */
409  
    */
505  
    void open(udp proto = udp::v4());
410  
    void open(udp proto = udp::v4());
506  

411  

507  
    /** Close the socket.
412  
    /** Close the socket.
508  

413  

509  
        Releases socket resources. Any pending operations complete
414  
        Releases socket resources. Any pending operations complete
510  
        with `errc::operation_canceled`.
415  
        with `errc::operation_canceled`.
511  
    */
416  
    */
512  
    void close();
417  
    void close();
513  

418  

514  
    /** Check if the socket is open.
419  
    /** Check if the socket is open.
515  

420  

516  
        @return `true` if the socket is open and ready for operations.
421  
        @return `true` if the socket is open and ready for operations.
517  
    */
422  
    */
518  
    bool is_open() const noexcept
423  
    bool is_open() const noexcept
519  
    {
424  
    {
520  
#if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
425  
#if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
521  
        return h_ && get().native_handle() != ~native_handle_type(0);
426  
        return h_ && get().native_handle() != ~native_handle_type(0);
522  
#else
427  
#else
523  
        return h_ && get().native_handle() >= 0;
428  
        return h_ && get().native_handle() >= 0;
524  
#endif
429  
#endif
525  
    }
430  
    }
526  

431  

527  
    /** Bind the socket to a local endpoint.
432  
    /** Bind the socket to a local endpoint.
528  

433  

529  
        Associates the socket with a local address and port.
434  
        Associates the socket with a local address and port.
530  
        Required before calling `recv_from`.
435  
        Required before calling `recv_from`.
531  

436  

532  
        @param ep The local endpoint to bind to.
437  
        @param ep The local endpoint to bind to.
533  

438  

534  
        @return Error code on failure, empty on success.
439  
        @return Error code on failure, empty on success.
535  

440  

536  
        @throws std::logic_error if the socket is not open.
441  
        @throws std::logic_error if the socket is not open.
537  
    */
442  
    */
538  
    [[nodiscard]] std::error_code bind(endpoint ep);
443  
    [[nodiscard]] std::error_code bind(endpoint ep);
539  

444  

540  
    /** Cancel any pending asynchronous operations.
445  
    /** Cancel any pending asynchronous operations.
541  

446  

542  
        All outstanding operations complete with
447  
        All outstanding operations complete with
543  
        `errc::operation_canceled`. Check `ec == cond::canceled`
448  
        `errc::operation_canceled`. Check `ec == cond::canceled`
544  
        for portable comparison.
449  
        for portable comparison.
545  
    */
450  
    */
546  
    void cancel();
451  
    void cancel();
547  

452  

548  
    /** Get the native socket handle.
453  
    /** Get the native socket handle.
549  

454  

550  
        @return The native socket handle, or -1 if not open.
455  
        @return The native socket handle, or -1 if not open.
551  
    */
456  
    */
552  
    native_handle_type native_handle() const noexcept;
457  
    native_handle_type native_handle() const noexcept;
553  

458  

554  
    /** Set a socket option.
459  
    /** Set a socket option.
555  

460  

556  
        @param opt The option to set.
461  
        @param opt The option to set.
557  

462  

558  
        @throws std::logic_error if the socket is not open.
463  
        @throws std::logic_error if the socket is not open.
559  
        @throws std::system_error on failure.
464  
        @throws std::system_error on failure.
560  
    */
465  
    */
561  
    template<class Option>
466  
    template<class Option>
562  
    void set_option(Option const& opt)
467  
    void set_option(Option const& opt)
563  
    {
468  
    {
564  
        if (!is_open())
469  
        if (!is_open())
565  
            detail::throw_logic_error("set_option: socket not open");
470  
            detail::throw_logic_error("set_option: socket not open");
566  
        std::error_code ec = get().set_option(
471  
        std::error_code ec = get().set_option(
567  
            Option::level(), Option::name(), opt.data(), opt.size());
472  
            Option::level(), Option::name(), opt.data(), opt.size());
568  
        if (ec)
473  
        if (ec)
569  
            detail::throw_system_error(ec, "udp_socket::set_option");
474  
            detail::throw_system_error(ec, "udp_socket::set_option");
570  
    }
475  
    }
571  

476  

572  
    /** Get a socket option.
477  
    /** Get a socket option.
573  

478  

574  
        @return The current option value.
479  
        @return The current option value.
575  

480  

576  
        @throws std::logic_error if the socket is not open.
481  
        @throws std::logic_error if the socket is not open.
577  
        @throws std::system_error on failure.
482  
        @throws std::system_error on failure.
578  
    */
483  
    */
579  
    template<class Option>
484  
    template<class Option>
580  
    Option get_option() const
485  
    Option get_option() const
581  
    {
486  
    {
582  
        if (!is_open())
487  
        if (!is_open())
583  
            detail::throw_logic_error("get_option: socket not open");
488  
            detail::throw_logic_error("get_option: socket not open");
584  
        Option opt{};
489  
        Option opt{};
585  
        std::size_t sz = opt.size();
490  
        std::size_t sz = opt.size();
586  
        std::error_code ec =
491  
        std::error_code ec =
587  
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
492  
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
588  
        if (ec)
493  
        if (ec)
589  
            detail::throw_system_error(ec, "udp_socket::get_option");
494  
            detail::throw_system_error(ec, "udp_socket::get_option");
590  
        opt.resize(sz);
495  
        opt.resize(sz);
591  
        return opt;
496  
        return opt;
592  
    }
497  
    }
593  

498  

594  
    /** Get the local endpoint of the socket.
499  
    /** Get the local endpoint of the socket.
595  

500  

596  
        @return The local endpoint, or a default endpoint if not bound.
501  
        @return The local endpoint, or a default endpoint if not bound.
597  
    */
502  
    */
598  
    endpoint local_endpoint() const noexcept;
503  
    endpoint local_endpoint() const noexcept;
599  

504  

600  
    /** Send a datagram to the specified destination.
505  
    /** Send a datagram to the specified destination.
601  

506  

602  
        @param buf The buffer containing data to send.
507  
        @param buf The buffer containing data to send.
603  
        @param dest The destination endpoint.
508  
        @param dest The destination endpoint.
 
509 +
        @param flags Message flags (e.g. message_flags::dont_route).
604  

510  

605  
        @return An awaitable that completes with
511  
        @return An awaitable that completes with
606  
            `io_result<std::size_t>`.
512  
            `io_result<std::size_t>`.
607  

513  

608  
        @throws std::logic_error if the socket is not open.
514  
        @throws std::logic_error if the socket is not open.
609  
    */
515  
    */
610  
    template<capy::ConstBufferSequence Buffers>
516  
    template<capy::ConstBufferSequence Buffers>
611 -
    auto send_to(Buffers const& buf, endpoint dest)
517 +
    auto send_to(
 
518 +
        Buffers const& buf,
 
519 +
        endpoint dest,
 
520 +
        corosio::message_flags flags)
612  
    {
521  
    {
613  
        if (!is_open())
522  
        if (!is_open())
614  
            detail::throw_logic_error("send_to: socket not open");
523  
            detail::throw_logic_error("send_to: socket not open");
615 -
        return send_to_awaitable(*this, buf, dest);
524 +
        return send_to_awaitable(
 
525 +
            *this, buf, dest, static_cast<int>(flags));
 
526 +
    }
 
527 +

 
528 +
    /// @overload
 
529 +
    template<capy::ConstBufferSequence Buffers>
 
530 +
    auto send_to(Buffers const& buf, endpoint dest)
 
531 +
    {
 
532 +
        return send_to(buf, dest, corosio::message_flags::none);
616  
    }
533  
    }
617  

534  

618  
    /** Receive a datagram and capture the sender's endpoint.
535  
    /** Receive a datagram and capture the sender's endpoint.
619  

536  

620  
        @param buf The buffer to receive data into.
537  
        @param buf The buffer to receive data into.
621  
        @param source Reference to an endpoint that will be set to
538  
        @param source Reference to an endpoint that will be set to
622  
            the sender's address on successful completion.
539  
            the sender's address on successful completion.
 
540 +
        @param flags Message flags (e.g. message_flags::peek).
623  

541  

624  
        @return An awaitable that completes with
542  
        @return An awaitable that completes with
625  
            `io_result<std::size_t>`.
543  
            `io_result<std::size_t>`.
626  

544  

627  
        @throws std::logic_error if the socket is not open.
545  
        @throws std::logic_error if the socket is not open.
628  
    */
546  
    */
629  
    template<capy::MutableBufferSequence Buffers>
547  
    template<capy::MutableBufferSequence Buffers>
630 -
    auto recv_from(Buffers const& buf, endpoint& source)
548 +
    auto recv_from(
 
549 +
        Buffers const& buf,
 
550 +
        endpoint& source,
 
551 +
        corosio::message_flags flags)
631  
    {
552  
    {
632  
        if (!is_open())
553  
        if (!is_open())
633  
            detail::throw_logic_error("recv_from: socket not open");
554  
            detail::throw_logic_error("recv_from: socket not open");
634 -
        return recv_from_awaitable(*this, buf, source);
555 +
        return recv_from_awaitable(
 
556 +
            *this, buf, source, static_cast<int>(flags));
 
557 +
    }
 
558 +

 
559 +
    /// @overload
 
560 +
    template<capy::MutableBufferSequence Buffers>
 
561 +
    auto recv_from(Buffers const& buf, endpoint& source)
 
562 +
    {
 
563 +
        return recv_from(buf, source, corosio::message_flags::none);
635  
    }
564  
    }
636  

565  

637  
    /** Initiate an asynchronous connect to set the default peer.
566  
    /** Initiate an asynchronous connect to set the default peer.
638  

567  

639  
        If the socket is not already open, it is opened automatically
568  
        If the socket is not already open, it is opened automatically
640  
        using the address family of @p ep.
569  
        using the address family of @p ep.
641  

570  

642  
        @param ep The remote endpoint to connect to.
571  
        @param ep The remote endpoint to connect to.
643  

572  

644  
        @return An awaitable that completes with `io_result<>`.
573  
        @return An awaitable that completes with `io_result<>`.
645  

574  

646  
        @throws std::system_error if the socket needs to be opened
575  
        @throws std::system_error if the socket needs to be opened
647  
            and the open fails.
576  
            and the open fails.
648  
    */
577  
    */
649  
    auto connect(endpoint ep)
578  
    auto connect(endpoint ep)
650  
    {
579  
    {
651  
        if (!is_open())
580  
        if (!is_open())
652  
            open(ep.is_v6() ? udp::v6() : udp::v4());
581  
            open(ep.is_v6() ? udp::v6() : udp::v4());
653  
        return connect_awaitable(*this, ep);
582  
        return connect_awaitable(*this, ep);
654  
    }
583  
    }
655  

584  

656  
    /** Send a datagram to the connected peer.
585  
    /** Send a datagram to the connected peer.
657  

586  

658  
        @param buf The buffer containing data to send.
587  
        @param buf The buffer containing data to send.
 
588 +
        @param flags Message flags.
659  

589  

660  
        @return An awaitable that completes with
590  
        @return An awaitable that completes with
661  
            `io_result<std::size_t>`.
591  
            `io_result<std::size_t>`.
662  

592  

663  
        @throws std::logic_error if the socket is not open.
593  
        @throws std::logic_error if the socket is not open.
664  
    */
594  
    */
665  
    template<capy::ConstBufferSequence Buffers>
595  
    template<capy::ConstBufferSequence Buffers>
666 -
    auto send(Buffers const& buf)
596 +
    auto send(Buffers const& buf, corosio::message_flags flags)
667  
    {
597  
    {
668  
        if (!is_open())
598  
        if (!is_open())
669  
            detail::throw_logic_error("send: socket not open");
599  
            detail::throw_logic_error("send: socket not open");
670 -
        return send_awaitable(*this, buf);
600 +
        return send_awaitable(
 
601 +
            *this, buf, static_cast<int>(flags));
 
602 +
    }
 
603 +

 
604 +
    /// @overload
 
605 +
    template<capy::ConstBufferSequence Buffers>
 
606 +
    auto send(Buffers const& buf)
 
607 +
    {
 
608 +
        return send(buf, corosio::message_flags::none);
671  
    }
609  
    }
672  

610  

673  
    /** Receive a datagram from the connected peer.
611  
    /** Receive a datagram from the connected peer.
674  

612  

675  
        @param buf The buffer to receive data into.
613  
        @param buf The buffer to receive data into.
 
614 +
        @param flags Message flags (e.g. message_flags::peek).
676  

615  

677  
        @return An awaitable that completes with
616  
        @return An awaitable that completes with
678  
            `io_result<std::size_t>`.
617  
            `io_result<std::size_t>`.
679  

618  

680  
        @throws std::logic_error if the socket is not open.
619  
        @throws std::logic_error if the socket is not open.
681  
    */
620  
    */
682  
    template<capy::MutableBufferSequence Buffers>
621  
    template<capy::MutableBufferSequence Buffers>
683 -
    auto recv(Buffers const& buf)
622 +
    auto recv(Buffers const& buf, corosio::message_flags flags)
684  
    {
623  
    {
685  
        if (!is_open())
624  
        if (!is_open())
686  
            detail::throw_logic_error("recv: socket not open");
625  
            detail::throw_logic_error("recv: socket not open");
687 -
        return recv_awaitable(*this, buf);
626 +
        return recv_awaitable(
 
627 +
            *this, buf, static_cast<int>(flags));
 
628 +
    }
 
629 +

 
630 +
    /// @overload
 
631 +
    template<capy::MutableBufferSequence Buffers>
 
632 +
    auto recv(Buffers const& buf)
 
633 +
    {
 
634 +
        return recv(buf, corosio::message_flags::none);
688  
    }
635  
    }
689  

636  

690  
    /** Get the remote endpoint of the socket.
637  
    /** Get the remote endpoint of the socket.
691  

638  

692  
        Returns the address and port of the connected peer.
639  
        Returns the address and port of the connected peer.
693  

640  

694  
        @return The remote endpoint, or a default endpoint if
641  
        @return The remote endpoint, or a default endpoint if
695  
            not connected.
642  
            not connected.
696  
    */
643  
    */
697  
    endpoint remote_endpoint() const noexcept;
644  
    endpoint remote_endpoint() const noexcept;
698  

645  

699  
protected:
646  
protected:
700  
    /// Construct from a pre-built handle (for native_udp_socket).
647  
    /// Construct from a pre-built handle (for native_udp_socket).
701  
    explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
648  
    explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
702  
    {
649  
    {
703  
    }
650  
    }
704  

651  

705  
private:
652  
private:
706  
    /// Open the socket for the given protocol triple.
653  
    /// Open the socket for the given protocol triple.
707  
    void open_for_family(int family, int type, int protocol);
654  
    void open_for_family(int family, int type, int protocol);
708  

655  

709  
    inline implementation& get() const noexcept
656  
    inline implementation& get() const noexcept
710  
    {
657  
    {
711  
        return *static_cast<implementation*>(h_.get());
658  
        return *static_cast<implementation*>(h_.get());
712  
    }
659  
    }
713  
};
660  
};
714  

661  

715  
} // namespace boost::corosio
662  
} // namespace boost::corosio
716  

663  

717  
#endif // BOOST_COROSIO_UDP_SOCKET_HPP
664  
#endif // BOOST_COROSIO_UDP_SOCKET_HPP