1 +
//
 
2 +
// Copyright (c) 2026 Michael Vandeberg
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
 
11 +
#define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/config.hpp>
 
14 +
#include <boost/corosio/detail/except.hpp>
 
15 +
#include <boost/corosio/io/io_object.hpp>
 
16 +
#include <boost/capy/io_result.hpp>
 
17 +
#include <boost/corosio/local_endpoint.hpp>
 
18 +
#include <boost/corosio/local_stream.hpp>
 
19 +
#include <boost/corosio/local_stream_socket.hpp>
 
20 +
#include <boost/capy/ex/executor_ref.hpp>
 
21 +
#include <boost/capy/ex/execution_context.hpp>
 
22 +
#include <boost/capy/ex/io_env.hpp>
 
23 +
#include <boost/capy/concept/executor.hpp>
 
24 +

 
25 +
#include <system_error>
 
26 +

 
27 +
#include <cassert>
 
28 +
#include <concepts>
 
29 +
#include <coroutine>
 
30 +
#include <cstddef>
 
31 +
#include <stop_token>
 
32 +
#include <type_traits>
 
33 +

 
34 +
namespace boost::corosio {
 
35 +

 
36 +
/* An asynchronous Unix stream acceptor for coroutine I/O.
 
37 +

 
38 +
   This class provides asynchronous Unix domain stream accept
 
39 +
   operations that return awaitable types. The acceptor binds
 
40 +
   to a local endpoint (filesystem path) and listens for
 
41 +
   incoming connections.
 
42 +

 
43 +
   The library does NOT automatically unlink the socket path
 
44 +
   on close. Callers are responsible for removing the socket
 
45 +
   file before bind or after close.
 
46 +

 
47 +
   Thread Safety:
 
48 +
     Distinct objects: Safe.
 
49 +
     Shared objects: Unsafe. An acceptor must not have
 
50 +
     concurrent accept operations.
 
51 +
*/
 
52 +
/** Options for local_stream_acceptor::bind(). */
 
53 +
enum class bind_option
 
54 +
{
 
55 +
    none,
 
56 +
    /// Unlink the socket path before binding (ignored for abstract paths).
 
57 +
    unlink_existing
 
58 +
};
 
59 +

 
60 +
class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
 
61 +
{
 
62 +
    struct move_accept_awaitable
 
63 +
    {
 
64 +
        local_stream_acceptor& acc_;
 
65 +
        std::stop_token token_;
 
66 +
        mutable std::error_code ec_;
 
67 +
        mutable io_object::implementation* peer_impl_ = nullptr;
 
68 +

 
69 +
        explicit move_accept_awaitable(
 
70 +
            local_stream_acceptor& acc) noexcept
 
71 +
            : acc_(acc)
 
72 +
        {
 
73 +
        }
 
74 +

 
75 +
        bool await_ready() const noexcept
 
76 +
        {
 
77 +
            return token_.stop_requested();
 
78 +
        }
 
79 +

 
80 +
        capy::io_result<local_stream_socket> await_resume() const noexcept
 
81 +
        {
 
82 +
            if (token_.stop_requested())
 
83 +
                return {make_error_code(std::errc::operation_canceled),
 
84 +
                        local_stream_socket()};
 
85 +

 
86 +
            if (ec_ || !peer_impl_)
 
87 +
                return {ec_, local_stream_socket()};
 
88 +

 
89 +
            local_stream_socket peer(acc_.ctx_);
 
90 +
            reset_peer_impl(peer, peer_impl_);
 
91 +
            return {ec_, std::move(peer)};
 
92 +
        }
 
93 +

 
94 +
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
95 +
            -> std::coroutine_handle<>
 
96 +
        {
 
97 +
            token_ = env->stop_token;
 
98 +
            return acc_.get().accept(
 
99 +
                h, env->executor, token_, &ec_, &peer_impl_);
 
100 +
        }
 
101 +
    };
 
102 +

 
103 +
    struct accept_awaitable
 
104 +
    {
 
105 +
        local_stream_acceptor& acc_;
 
106 +
        local_stream_socket& peer_;
 
107 +
        std::stop_token token_;
 
108 +
        mutable std::error_code ec_;
 
109 +
        mutable io_object::implementation* peer_impl_ = nullptr;
 
110 +

 
111 +
        accept_awaitable(
 
112 +
            local_stream_acceptor& acc, local_stream_socket& peer) noexcept
 
113 +
            : acc_(acc)
 
114 +
            , peer_(peer)
 
115 +
        {
 
116 +
        }
 
117 +

 
118 +
        bool await_ready() const noexcept
 
119 +
        {
 
120 +
            return token_.stop_requested();
 
121 +
        }
 
122 +

 
123 +
        capy::io_result<> await_resume() const noexcept
 
124 +
        {
 
125 +
            if (token_.stop_requested())
 
126 +
                return {make_error_code(std::errc::operation_canceled)};
 
127 +

 
128 +
            if (!ec_ && peer_impl_)
 
129 +
                peer_.h_.reset(peer_impl_);
 
130 +
            return {ec_};
 
131 +
        }
 
132 +

 
133 +
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
134 +
            -> std::coroutine_handle<>
 
135 +
        {
 
136 +
            token_ = env->stop_token;
 
137 +
            return acc_.get().accept(
 
138 +
                h, env->executor, token_, &ec_, &peer_impl_);
 
139 +
        }
 
140 +
    };
 
141 +

 
142 +
public:
 
143 +
    ~local_stream_acceptor() override;
 
144 +

 
145 +
    explicit local_stream_acceptor(capy::execution_context& ctx);
 
146 +

 
147 +
    template<class Ex>
 
148 +
        requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
 
149 +
        capy::Executor<Ex>
 
150 +
    explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
 
151 +
    {
 
152 +
    }
 
153 +

 
154 +
    local_stream_acceptor(local_stream_acceptor&& other) noexcept
 
155 +
        : local_stream_acceptor(other.ctx_, std::move(other))
 
156 +
    {
 
157 +
    }
 
158 +

 
159 +
    local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
 
160 +
    {
 
161 +
        assert(&ctx_ == &other.ctx_ &&
 
162 +
            "move-assign requires the same execution_context");
 
163 +
        if (this != &other)
 
164 +
        {
 
165 +
            close();
 
166 +
            io_object::operator=(std::move(other));
 
167 +
        }
 
168 +
        return *this;
 
169 +
    }
 
170 +

 
171 +
    local_stream_acceptor(local_stream_acceptor const&)            = delete;
 
172 +
    local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
 
173 +

 
174 +
    /** Create the acceptor socket.
 
175 +

 
176 +
        @param proto The protocol. Defaults to local_stream{}.
 
177 +

 
178 +
        @throws std::system_error on failure.
 
179 +
    */
 
180 +
    void open(local_stream proto = {});
 
181 +

 
182 +
    /** Bind to a local endpoint.
 
183 +

 
184 +
        @param ep The local endpoint (path) to bind to.
 
185 +
        @param opt Bind options. Pass bind_option::unlink_existing
 
186 +
            to unlink the socket path before binding (ignored for
 
187 +
            abstract sockets and empty endpoints).
 
188 +

 
189 +
        @return An error code on failure, empty on success.
 
190 +

 
191 +
        @throws std::logic_error if the acceptor is not open.
 
192 +
    */
 
193 +
    [[nodiscard]] std::error_code
 
194 +
    bind(corosio::local_endpoint ep,
 
195 +
         bind_option opt = bind_option::none);
 
196 +

 
197 +
    /** Start listening for incoming connections.
 
198 +

 
199 +
        @param backlog The maximum pending connection queue length.
 
200 +

 
201 +
        @return An error code on failure, empty on success.
 
202 +

 
203 +
        @throws std::logic_error if the acceptor is not open.
 
204 +
    */
 
205 +
    [[nodiscard]] std::error_code listen(int backlog = 128);
 
206 +

 
207 +
    /// Close the acceptor.
 
208 +
    void close();
 
209 +

 
210 +
    /// Check if the acceptor is listening.
 
211 +
    bool is_open() const noexcept
 
212 +
    {
 
213 +
        return h_ && get().is_open();
 
214 +
    }
 
215 +

 
216 +
    /** Initiate an asynchronous accept operation.
 
217 +

 
218 +
        @param peer The socket to receive the accepted connection.
 
219 +

 
220 +
        @return An awaitable that completes with io_result<>.
 
221 +

 
222 +
        @throws std::logic_error if the acceptor is not listening.
 
223 +
    */
 
224 +
    auto accept(local_stream_socket& peer)
 
225 +
    {
 
226 +
        if (!is_open())
 
227 +
            detail::throw_logic_error("accept: acceptor not listening");
 
228 +
        return accept_awaitable(*this, peer);
 
229 +
    }
 
230 +

 
231 +
    /** Initiate an asynchronous accept, returning the socket.
 
232 +

 
233 +
        @return An awaitable that completes with
 
234 +
            io_result<local_stream_socket>.
 
235 +

 
236 +
        @throws std::logic_error if the acceptor is not listening.
 
237 +
    */
 
238 +
    auto accept()
 
239 +
    {
 
240 +
        if (!is_open())
 
241 +
            detail::throw_logic_error("accept: acceptor not listening");
 
242 +
        return move_accept_awaitable(*this);
 
243 +
    }
 
244 +

 
245 +
    void cancel();
 
246 +

 
247 +
    /** Release ownership of the native socket handle.
 
248 +

 
249 +
        Deregisters the acceptor from the reactor and cancels
 
250 +
        pending operations without closing the fd.
 
251 +

 
252 +
        @return The native handle, or -1 if not open.
 
253 +

 
254 +
        @throws std::logic_error if the acceptor is not open.
 
255 +
    */
 
256 +
    native_handle_type release();
 
257 +

 
258 +
    corosio::local_endpoint local_endpoint() const noexcept;
 
259 +

 
260 +
    template<class Option>
 
261 +
    void set_option(Option const& opt)
 
262 +
    {
 
263 +
        if (!is_open())
 
264 +
            detail::throw_logic_error("set_option: acceptor not open");
 
265 +
        std::error_code ec = get().set_option(
 
266 +
            Option::level(), Option::name(), opt.data(), opt.size());
 
267 +
        if (ec)
 
268 +
            detail::throw_system_error(ec, "local_stream_acceptor::set_option");
 
269 +
    }
 
270 +

 
271 +
    template<class Option>
 
272 +
    Option get_option() const
 
273 +
    {
 
274 +
        if (!is_open())
 
275 +
            detail::throw_logic_error("get_option: acceptor not open");
 
276 +
        Option opt{};
 
277 +
        std::size_t sz = opt.size();
 
278 +
        std::error_code ec =
 
279 +
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
 
280 +
        if (ec)
 
281 +
            detail::throw_system_error(ec, "local_stream_acceptor::get_option");
 
282 +
        opt.resize(sz);
 
283 +
        return opt;
 
284 +
    }
 
285 +

 
286 +
    /** Define backend hooks for local stream acceptor operations. */
 
287 +
    struct implementation : io_object::implementation
 
288 +
    {
 
289 +
        virtual std::coroutine_handle<> accept(
 
290 +
            std::coroutine_handle<>,
 
291 +
            capy::executor_ref,
 
292 +
            std::stop_token,
 
293 +
            std::error_code*,
 
294 +
            io_object::implementation**) = 0;
 
295 +

 
296 +
        virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
 
297 +

 
298 +
        virtual bool is_open() const noexcept = 0;
 
299 +

 
300 +
        virtual native_handle_type release_socket() noexcept = 0;
 
301 +

 
302 +
        virtual void cancel() noexcept = 0;
 
303 +

 
304 +
        virtual std::error_code set_option(
 
305 +
            int level,
 
306 +
            int optname,
 
307 +
            void const* data,
 
308 +
            std::size_t size) noexcept = 0;
 
309 +

 
310 +
        virtual std::error_code
 
311 +
        get_option(int level, int optname, void* data, std::size_t* size)
 
312 +
            const noexcept = 0;
 
313 +
    };
 
314 +

 
315 +
protected:
 
316 +
    local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
 
317 +
        : io_object(std::move(h))
 
318 +
        , ctx_(ctx)
 
319 +
    {
 
320 +
    }
 
321 +

 
322 +
    local_stream_acceptor(
 
323 +
        capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
 
324 +
        : io_object(std::move(other))
 
325 +
        , ctx_(ctx)
 
326 +
    {
 
327 +
    }
 
328 +

 
329 +
    static void reset_peer_impl(
 
330 +
        local_stream_socket& peer, io_object::implementation* impl) noexcept
 
331 +
    {
 
332 +
        if (impl)
 
333 +
            peer.h_.reset(impl);
 
334 +
    }
 
335 +

 
336 +
private:
 
337 +
    capy::execution_context& ctx_;
 
338 +

 
339 +
    inline implementation& get() const noexcept
 
340 +
    {
 
341 +
        return *static_cast<implementation*>(h_.get());
 
342 +
    }
 
343 +
};
 
344 +

 
345 +
} // namespace boost::corosio
 
346 +

 
347 +
#endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP