include/boost/corosio/local_stream_socket.hpp

87.5% Lines (14/16) 85.7% List of functions (6/7)
local_stream_socket.hpp
f(x) Functions (7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/platform.hpp>
15 #include <boost/corosio/detail/except.hpp>
16 #include <boost/corosio/detail/native_handle.hpp>
17 #include <boost/corosio/detail/op_base.hpp>
18 #include <boost/corosio/io/io_stream.hpp>
19 #include <boost/capy/io_result.hpp>
20 #include <boost/corosio/detail/buffer_param.hpp>
21 #include <boost/corosio/local_endpoint.hpp>
22 #include <boost/corosio/local_stream.hpp>
23 #include <boost/corosio/shutdown_type.hpp>
24 #include <boost/capy/ex/executor_ref.hpp>
25 #include <boost/capy/ex/execution_context.hpp>
26 #include <boost/capy/ex/io_env.hpp>
27 #include <boost/capy/concept/executor.hpp>
28
29 #include <system_error>
30
31 #include <concepts>
32 #include <coroutine>
33 #include <cstddef>
34 #include <stop_token>
35 #include <type_traits>
36
37 namespace boost::corosio {
38
39 /* An asynchronous Unix stream socket for coroutine I/O.
40
41 This class provides asynchronous Unix domain stream socket
42 operations that return awaitable types. Each operation
43 participates in the affine awaitable protocol, ensuring
44 coroutines resume on the correct executor.
45
46 The socket must be opened before performing I/O operations.
47 Operations support cancellation through std::stop_token via
48 the affine protocol, or explicitly through cancel().
49
50 Thread Safety:
51 Distinct objects: Safe.
52 Shared objects: Unsafe. A socket must not have concurrent
53 operations of the same type. One read and one write may
54 be in flight simultaneously.
55
56 Satisfies capy::Stream.
57 */
58 class BOOST_COROSIO_DECL local_stream_socket : public io_stream
59 {
60 public:
61 using shutdown_type = corosio::shutdown_type;
62 using enum corosio::shutdown_type;
63
64 /** Define backend hooks for local stream socket operations.
65
66 Platform backends (epoll, kqueue, select) derive from this
67 to implement socket I/O, connection, and option management.
68 */
69 struct implementation : io_stream::implementation
70 {
71 virtual std::coroutine_handle<> connect(
72 std::coroutine_handle<> h,
73 capy::executor_ref ex,
74 corosio::local_endpoint ep,
75 std::stop_token token,
76 std::error_code* ec) = 0;
77
78 virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
79
80 virtual native_handle_type native_handle() const noexcept = 0;
81
82 virtual native_handle_type release_socket() noexcept = 0;
83
84 virtual void cancel() noexcept = 0;
85
86 virtual std::error_code set_option(
87 int level,
88 int optname,
89 void const* data,
90 std::size_t size) noexcept = 0;
91
92 virtual std::error_code
93 get_option(int level, int optname, void* data, std::size_t* size)
94 const noexcept = 0;
95
96 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
97
98 virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
99 };
100
101 /// Represent the awaitable returned by connect.
102 struct connect_awaitable
103 : detail::void_op_base<connect_awaitable>
104 {
105 local_stream_socket& s_;
106 corosio::local_endpoint endpoint_;
107
108 4x connect_awaitable(
109 local_stream_socket& s, corosio::local_endpoint ep) noexcept
110 4x : s_(s), endpoint_(ep) {}
111
112 4x std::coroutine_handle<> dispatch(
113 std::coroutine_handle<> h, capy::executor_ref ex) const
114 {
115 4x return s_.get().connect(h, ex, endpoint_, token_, &ec_);
116 }
117 };
118
119 public:
120 ~local_stream_socket() override;
121
122 explicit local_stream_socket(capy::execution_context& ctx);
123
124 template<class Ex>
125 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
126 capy::Executor<Ex>
127 explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
128 {
129 }
130
131 22x local_stream_socket(local_stream_socket&& other) noexcept
132 22x : io_object(std::move(other))
133 {
134 22x }
135
136 local_stream_socket& operator=(local_stream_socket&& other) noexcept
137 {
138 if (this != &other)
139 {
140 close();
141 h_ = std::move(other.h_);
142 }
143 return *this;
144 }
145
146 local_stream_socket(local_stream_socket const&) = delete;
147 local_stream_socket& operator=(local_stream_socket const&) = delete;
148
149 /** Open the socket.
150
151 Creates a Unix stream socket and associates it with
152 the platform reactor.
153
154 @param proto The protocol. Defaults to local_stream{}.
155
156 @throws std::system_error on failure.
157 */
158 void open(local_stream proto = {});
159
160 /// Close the socket.
161 void close();
162
163 /// Check if the socket is open.
164 118x bool is_open() const noexcept
165 {
166 118x return h_ && get().native_handle() >= 0;
167 }
168
169 /** Initiate an asynchronous connect operation.
170
171 If the socket is not already open, it is opened automatically.
172
173 @param ep The local endpoint (path) to connect to.
174
175 @return An awaitable that completes with io_result<>.
176
177 @throws std::system_error if the socket needs to be opened
178 and the open fails.
179 */
180 4x auto connect(corosio::local_endpoint ep)
181 {
182 4x if (!is_open())
183 open();
184 4x return connect_awaitable(*this, ep);
185 }
186
187 void cancel();
188
189 native_handle_type native_handle() const noexcept;
190
191 /** Query the number of bytes available for reading.
192
193 @return The number of bytes that can be read without blocking.
194
195 @throws std::logic_error if the socket is not open.
196 @throws std::system_error on ioctl failure.
197 */
198 std::size_t available() const;
199
200 /** Release ownership of the native socket handle.
201
202 Deregisters the socket from the reactor and cancels pending
203 operations without closing the fd. The caller takes ownership
204 of the returned descriptor.
205
206 @return The native handle, or -1 if not open.
207
208 @throws std::logic_error if the socket is not open.
209 */
210 native_handle_type release();
211
212 void shutdown(shutdown_type what);
213
214 /** Shut down part or all of the socket (non-throwing).
215
216 @param what Which direction to shut down.
217 @param ec Set to the error code on failure.
218 */
219 void shutdown(shutdown_type what, std::error_code& ec) noexcept;
220
221 template<class Option>
222 void set_option(Option const& opt)
223 {
224 if (!is_open())
225 detail::throw_logic_error("set_option: socket not open");
226 std::error_code ec = get().set_option(
227 Option::level(), Option::name(), opt.data(), opt.size());
228 if (ec)
229 detail::throw_system_error(ec, "local_stream_socket::set_option");
230 }
231
232 template<class Option>
233 Option get_option() const
234 {
235 if (!is_open())
236 detail::throw_logic_error("get_option: socket not open");
237 Option opt{};
238 std::size_t sz = opt.size();
239 std::error_code ec =
240 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
241 if (ec)
242 detail::throw_system_error(ec, "local_stream_socket::get_option");
243 opt.resize(sz);
244 return opt;
245 }
246
247 /** Assign an existing file descriptor to this socket.
248
249 The socket must not already be open. The fd is adopted
250 and registered with the platform reactor. Used by
251 make_local_stream_pair() to wrap socketpair() fds.
252
253 @param fd The file descriptor to adopt. Must be a valid,
254 open, non-blocking Unix stream socket.
255
256 @throws std::system_error on failure.
257 */
258 void assign(int fd);
259
260 corosio::local_endpoint local_endpoint() const noexcept;
261
262 corosio::local_endpoint remote_endpoint() const noexcept;
263
264 protected:
265 local_stream_socket() noexcept = default;
266
267 explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
268
269 private:
270 friend class local_stream_acceptor;
271
272 void open_for_family(int family, int type, int protocol);
273
274 104x inline implementation& get() const noexcept
275 {
276 104x return *static_cast<implementation*>(h_.get());
277 }
278 };
279
280 } // namespace boost::corosio
281
282 #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
283