include/boost/corosio/local_stream_socket.hpp

87.5% Lines (14/16) 85.7% List of functions (6/7)
local_stream_socket.hpp
f(x) Functions (7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/platform.hpp>
15 #include <boost/corosio/detail/except.hpp>
16 #include <boost/corosio/detail/native_handle.hpp>
17 #include <boost/corosio/detail/op_base.hpp>
18 #include <boost/corosio/io/io_stream.hpp>
19 #include <boost/capy/io_result.hpp>
20 #include <boost/corosio/detail/buffer_param.hpp>
21 #include <boost/corosio/local_endpoint.hpp>
22 #include <boost/corosio/local_stream.hpp>
23 #include <boost/corosio/shutdown_type.hpp>
24 #include <boost/capy/ex/executor_ref.hpp>
25 #include <boost/capy/ex/execution_context.hpp>
26 #include <boost/capy/ex/io_env.hpp>
27 #include <boost/capy/concept/executor.hpp>
28
29 #include <system_error>
30
31 #include <concepts>
32 #include <coroutine>
33 #include <cstddef>
34 #include <stop_token>
35 #include <type_traits>
36
37 namespace boost::corosio {
38
39 /** An asynchronous Unix stream socket for coroutine I/O.
40
41 This class provides asynchronous Unix domain stream socket
42 operations that return awaitable types. Each operation
43 participates in the affine awaitable protocol, ensuring
44 coroutines resume on the correct executor.
45
46 The socket must be opened before performing I/O operations.
47 Operations support cancellation through std::stop_token via
48 the affine protocol, or explicitly through cancel().
49
50 Satisfies capy::Stream.
51
52 @par Thread Safety
53 Distinct objects: Safe.@n
54 Shared objects: Unsafe. A socket must not have concurrent
55 operations of the same type. One read and one write may be
56 in flight simultaneously.
57 */
58 class BOOST_COROSIO_DECL local_stream_socket : public io_stream
59 {
60 public:
61 using shutdown_type = corosio::shutdown_type;
62 using enum corosio::shutdown_type;
63
64 /** Define backend hooks for local stream socket operations.
65
66 Platform backends (epoll, kqueue, select) derive from this
67 to implement socket I/O, connection, and option management.
68 */
69 struct implementation : io_stream::implementation
70 {
71 virtual std::coroutine_handle<> connect(
72 std::coroutine_handle<> h,
73 capy::executor_ref ex,
74 corosio::local_endpoint ep,
75 std::stop_token token,
76 std::error_code* ec) = 0;
77
78 virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
79
80 virtual native_handle_type native_handle() const noexcept = 0;
81
82 virtual native_handle_type release_socket() noexcept = 0;
83
84 virtual void cancel() noexcept = 0;
85
86 virtual std::error_code set_option(
87 int level,
88 int optname,
89 void const* data,
90 std::size_t size) noexcept = 0;
91
92 virtual std::error_code
93 get_option(int level, int optname, void* data, std::size_t* size)
94 const noexcept = 0;
95
96 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
97
98 virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
99 };
100
101 /// Represent the awaitable returned by connect.
102 struct connect_awaitable
103 : detail::void_op_base<connect_awaitable>
104 {
105 local_stream_socket& s_;
106 corosio::local_endpoint endpoint_;
107
108 4x connect_awaitable(
109 local_stream_socket& s, corosio::local_endpoint ep) noexcept
110 4x : s_(s), endpoint_(ep) {}
111
112 4x std::coroutine_handle<> dispatch(
113 std::coroutine_handle<> h, capy::executor_ref ex) const
114 {
115 4x return s_.get().connect(h, ex, endpoint_, token_, &ec_);
116 }
117 };
118
119 public:
120 ~local_stream_socket() override;
121
122 explicit local_stream_socket(capy::execution_context& ctx);
123
124 template<class Ex>
125 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
126 capy::Executor<Ex>
127 explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
128 {
129 }
130
131 22x local_stream_socket(local_stream_socket&& other) noexcept
132 22x : io_object(std::move(other))
133 {
134 22x }
135
136 local_stream_socket& operator=(local_stream_socket&& other) noexcept
137 {
138 if (this != &other)
139 {
140 close();
141 io_object::operator=(std::move(other));
142 }
143 return *this;
144 }
145
146 local_stream_socket(local_stream_socket const&) = delete;
147 local_stream_socket& operator=(local_stream_socket const&) = delete;
148
149 /** Open the socket.
150
151 Creates a Unix stream socket and associates it with
152 the platform reactor.
153
154 @param proto The protocol. Defaults to local_stream{}.
155
156 @throws std::system_error on failure.
157 */
158 void open(local_stream proto = {});
159
160 /// Close the socket.
161 void close();
162
163 /// Check if the socket is open.
164 118x bool is_open() const noexcept
165 {
166 118x return h_ && get().native_handle() >= 0;
167 }
168
169 /** Initiate an asynchronous connect operation.
170
171 If the socket is not already open, it is opened automatically.
172
173 @param ep The peer endpoint (path) to connect to.
174
175 @par Cancellation
176 Supports cancellation via the awaitable's stop_token or by
177 calling @ref cancel. On cancellation, yields
178 `errc::operation_canceled`.
179
180 @return An awaitable that completes with io_result<>.
181
182 @throws std::system_error if the socket needs to be opened
183 and the open fails.
184 */
185 4x auto connect(corosio::local_endpoint ep)
186 {
187 4x if (!is_open())
188 open();
189 4x return connect_awaitable(*this, ep);
190 }
191
192 void cancel();
193
194 native_handle_type native_handle() const noexcept;
195
196 /** Query the number of bytes available for reading.
197
198 @return The number of bytes that can be read without blocking.
199
200 @throws std::logic_error if the socket is not open.
201 @throws std::system_error on ioctl failure.
202 */
203 std::size_t available() const;
204
205 /** Release ownership of the native socket handle.
206
207 Deregisters the socket from the reactor and cancels pending
208 operations without closing the fd. The caller takes ownership
209 of the returned descriptor.
210
211 @return The native handle.
212
213 @throws std::logic_error if the socket is not open.
214 */
215 native_handle_type release();
216
217 /** Shut down part or all of the socket (best-effort).
218
219 Unix stream sockets are full-duplex: each direction (send and
220 receive) operates independently. This function allows you to
221 close one or both directions without destroying the socket.
222
223 @li @ref shutdown_send signals end-of-stream to the peer: their
224 subsequent reads will complete with `capy::cond::eof` after
225 they drain any data already in flight. You can still
226 receive data from the peer until they also close their
227 send direction. This is the cleanest way to end a session
228 — preferable to @ref close() because it gives the peer an
229 explicit EOF rather than tearing the socket down abruptly.
230
231 @li @ref shutdown_receive disables reading on the socket. The
232 peer is not informed and may continue sending; data
233 already buffered or arriving later is discarded.
234
235 @li @ref shutdown_both combines both effects.
236
237 When the peer shuts down their send direction, subsequent read
238 operations on this socket complete with `capy::cond::eof`. Use
239 the portable condition test rather than comparing error codes
240 directly:
241
242 @code
243 auto [ec, n] = co_await sock.read_some(buffer);
244 if (ec == capy::cond::eof)
245 {
246 // Peer closed their send direction
247 }
248 @endcode
249
250 Calls `::shutdown` on the underlying descriptor when open.
251 Errors from the syscall (such as `ENOTCONN` on a peer that
252 already closed) are swallowed because they are typically
253 unhelpful at this layer; if the socket is not open, the call
254 is a no-op. To observe errors, use the
255 @ref shutdown(shutdown_type,std::error_code&) overload.
256
257 @param what Which direction to shut down.
258 */
259 void shutdown(shutdown_type what);
260
261 /** Shut down part or all of the socket (non-throwing).
262
263 Same semantics as @ref shutdown(shutdown_type) but reports
264 syscall errors via @p ec instead of swallowing them.
265
266 @param what Which direction to shut down.
267 @param ec Set to the error code on failure.
268 */
269 void shutdown(shutdown_type what, std::error_code& ec) noexcept;
270
271 template<class Option>
272 void set_option(Option const& opt)
273 {
274 if (!is_open())
275 detail::throw_logic_error("set_option: socket not open");
276 std::error_code ec = get().set_option(
277 Option::level(), Option::name(), opt.data(), opt.size());
278 if (ec)
279 detail::throw_system_error(ec, "local_stream_socket::set_option");
280 }
281
282 template<class Option>
283 Option get_option() const
284 {
285 if (!is_open())
286 detail::throw_logic_error("get_option: socket not open");
287 Option opt{};
288 std::size_t sz = opt.size();
289 std::error_code ec =
290 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
291 if (ec)
292 detail::throw_system_error(ec, "local_stream_socket::get_option");
293 opt.resize(sz);
294 return opt;
295 }
296
297 /** Assign an existing file descriptor to this socket.
298
299 The fd is adopted and registered with the platform reactor.
300 Used by @ref make_local_stream_pair to wrap `socketpair`
301 fds.
302
303 @param fd The file descriptor to adopt. Must be a valid,
304 open Unix domain stream socket.
305
306 @par Preconditions
307 The socket must not already be open.
308
309 @throws std::logic_error if the precondition is violated
310 (the socket is already open).
311 @throws std::system_error on any other failure (e.g. the
312 fd is not a Unix domain stream socket, or backend
313 configuration fails).
314 */
315 void assign(int fd);
316
317 corosio::local_endpoint local_endpoint() const noexcept;
318
319 corosio::local_endpoint remote_endpoint() const noexcept;
320
321 protected:
322 local_stream_socket() noexcept = default;
323
324 explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
325
326 private:
327 friend class local_stream_acceptor;
328
329 void open_for_family(int family, int type, int protocol);
330
331 104x inline implementation& get() const noexcept
332 {
333 104x return *static_cast<implementation*>(h_.get());
334 }
335 };
336
337 } // namespace boost::corosio
338
339 #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
340