TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/detail/buffer_array.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/read_source.hpp>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/io_task.hpp>
23 :
24 : #include <concepts>
25 : #include <coroutine>
26 : #include <cstddef>
27 : #include <exception>
28 : #include <new>
29 : #include <span>
30 : #include <stop_token>
31 : #include <system_error>
32 : #include <utility>
33 :
34 : namespace boost {
35 : namespace capy {
36 :
37 : /** Type-erased wrapper for any ReadSource.
38 :
39 : This class provides type erasure for any type satisfying the
40 : @ref ReadSource concept, enabling runtime polymorphism for
41 : source read operations. It uses cached awaitable storage to achieve
42 : zero steady-state allocation after construction.
43 :
44 : The wrapper supports two construction modes:
45 : - **Owning**: Pass by value to transfer ownership. The wrapper
46 : allocates storage and owns the source.
47 : - **Reference**: Pass a pointer to wrap without ownership. The
48 : pointed-to source must outlive this wrapper.
49 :
50 : @par Awaitable Preallocation
51 : The constructor preallocates storage for the type-erased awaitable.
52 : This reserves all virtual address space at server startup
53 : so memory usage can be measured up front, rather than
54 : allocating piecemeal as traffic arrives.
55 :
56 : @par Immediate Completion
57 : Operations complete immediately without suspending when the
58 : buffer sequence is empty, or when the underlying source's
59 : awaitable reports readiness via `await_ready`.
60 :
61 : @par Thread Safety
62 : Not thread-safe. Concurrent operations on the same wrapper
63 : are undefined behavior.
64 :
65 : @par Example
66 : @code
67 : // Owning - takes ownership of the source
68 : any_read_source rs(some_source{args...});
69 :
70 : // Reference - wraps without ownership
71 : some_source source;
72 : any_read_source rs(&source);
73 :
74 : mutable_buffer buf(data, size);
75 : auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76 : @endcode
77 :
78 : @see any_read_stream, ReadSource
79 : */
80 : class any_read_source
81 : {
82 : struct vtable;
83 : struct awaitable_ops;
84 :
85 : template<ReadSource S>
86 : struct vtable_for_impl;
87 :
88 : void* source_ = nullptr;
89 : vtable const* vt_ = nullptr;
90 : void* cached_awaitable_ = nullptr;
91 : void* storage_ = nullptr;
92 : awaitable_ops const* active_ops_ = nullptr;
93 :
94 : public:
95 : /** Destructor.
96 :
97 : Destroys the owned source (if any) and releases the cached
98 : awaitable storage.
99 : */
100 : ~any_read_source();
101 :
102 : /** Construct a default instance.
103 :
104 : Constructs an empty wrapper. Operations on a default-constructed
105 : wrapper result in undefined behavior.
106 : */
107 : any_read_source() = default;
108 :
109 : /** Non-copyable.
110 :
111 : The awaitable cache is per-instance and cannot be shared.
112 : */
113 : any_read_source(any_read_source const&) = delete;
114 : any_read_source& operator=(any_read_source const&) = delete;
115 :
116 : /** Construct by moving.
117 :
118 : Transfers ownership of the wrapped source (if owned) and
119 : cached awaitable storage from `other`. After the move, `other` is
120 : in a default-constructed state.
121 :
122 : @param other The wrapper to move from.
123 : */
124 HIT 1 : any_read_source(any_read_source&& other) noexcept
125 1 : : source_(std::exchange(other.source_, nullptr))
126 1 : , vt_(std::exchange(other.vt_, nullptr))
127 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128 1 : , storage_(std::exchange(other.storage_, nullptr))
129 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
130 : {
131 1 : }
132 :
133 : /** Assign by moving.
134 :
135 : Destroys any owned source and releases existing resources,
136 : then transfers ownership from `other`.
137 :
138 : @param other The wrapper to move from.
139 : @return Reference to this wrapper.
140 : */
141 : any_read_source&
142 : operator=(any_read_source&& other) noexcept;
143 :
144 : /** Construct by taking ownership of a ReadSource.
145 :
146 : Allocates storage and moves the source into this wrapper.
147 : The wrapper owns the source and will destroy it.
148 :
149 : @param s The source to take ownership of.
150 : */
151 : template<ReadSource S>
152 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
153 : any_read_source(S s);
154 :
155 : /** Construct by wrapping a ReadSource without ownership.
156 :
157 : Wraps the given source by pointer. The source must remain
158 : valid for the lifetime of this wrapper.
159 :
160 : @param s Pointer to the source to wrap.
161 : */
162 : template<ReadSource S>
163 : any_read_source(S* s);
164 :
165 : /** Check if the wrapper contains a valid source.
166 :
167 : @return `true` if wrapping a source, `false` if default-constructed
168 : or moved-from.
169 : */
170 : bool
171 30 : has_value() const noexcept
172 : {
173 30 : return source_ != nullptr;
174 : }
175 :
176 : /** Check if the wrapper contains a valid source.
177 :
178 : @return `true` if wrapping a source, `false` if default-constructed
179 : or moved-from.
180 : */
181 : explicit
182 8 : operator bool() const noexcept
183 : {
184 8 : return has_value();
185 : }
186 :
187 : /** Initiate a partial read operation.
188 :
189 : Attempt to read up to `buffer_size( buffers )` bytes into
190 : the provided buffer sequence. May fill less than the
191 : full sequence.
192 :
193 : @param buffers The buffer sequence to read into.
194 :
195 : @return An awaitable that await-returns `(error_code,std::size_t)`.
196 :
197 : @par Immediate Completion
198 : The operation completes immediately without suspending
199 : the calling coroutine when:
200 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201 : @li The underlying source's awaitable reports immediate
202 : readiness via `await_ready`.
203 :
204 : @note This is a partial operation and may not process the
205 : entire buffer sequence. Use @ref read for guaranteed
206 : complete transfer.
207 :
208 : @par Preconditions
209 : The wrapper must contain a valid source (`has_value() == true`).
210 : The caller must not call this function again after a prior
211 : call returned an error (including EOF).
212 : */
213 : template<MutableBufferSequence MB>
214 : auto
215 : read_some(MB buffers);
216 :
217 : /** Initiate a complete read operation.
218 :
219 : Reads data into the provided buffer sequence by forwarding
220 : to the underlying source's `read` operation. Large buffer
221 : sequences are processed in windows, with each window
222 : forwarded as a separate `read` call to the underlying source.
223 : The operation completes when the entire buffer sequence is
224 : filled, end-of-file is reached, or an error occurs.
225 :
226 : @param buffers The buffer sequence to read into.
227 :
228 : @return An awaitable that await-returns `(error_code,std::size_t)`.
229 :
230 : @par Immediate Completion
231 : The operation completes immediately without suspending
232 : the calling coroutine when:
233 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234 : @li The underlying source's `read` awaitable reports
235 : immediate readiness via `await_ready`.
236 :
237 : @par Postconditions
238 : Exactly one of the following is true on return:
239 : @li **Success**: `!ec` and `n == buffer_size(buffers)`.
240 : The entire buffer was filled.
241 : @li **End-of-stream or Error**: `ec` and `n` indicates
242 : the number of bytes transferred before the failure.
243 :
244 : @par Preconditions
245 : The wrapper must contain a valid source (`has_value() == true`).
246 : The caller must not call this function again after a prior
247 : call returned an error (including EOF).
248 : */
249 : template<MutableBufferSequence MB>
250 : io_task<std::size_t>
251 : read(MB buffers);
252 :
253 : protected:
254 : /** Rebind to a new source after move.
255 :
256 : Updates the internal pointer to reference a new source object.
257 : Used by owning wrappers after move assignment when the owned
258 : object has moved to a new location.
259 :
260 : @param new_source The new source to bind to. Must be the same
261 : type as the original source.
262 :
263 : @note Terminates if called with a source of different type
264 : than the original.
265 : */
266 : template<ReadSource S>
267 : void
268 : rebind(S& new_source) noexcept
269 : {
270 : if(vt_ != &vtable_for_impl<S>::value)
271 : std::terminate();
272 : source_ = &new_source;
273 : }
274 :
275 : private:
276 : auto
277 : read_(std::span<mutable_buffer const> buffers);
278 : };
279 :
280 : // ordered by call sequence for cache line coherence
281 : struct any_read_source::awaitable_ops
282 : {
283 : bool (*await_ready)(void*);
284 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285 : io_result<std::size_t> (*await_resume)(void*);
286 : void (*destroy)(void*) noexcept;
287 : };
288 :
289 : // ordered by call frequency for cache line coherence
290 : struct any_read_source::vtable
291 : {
292 : awaitable_ops const* (*construct_read_some_awaitable)(
293 : void* source,
294 : void* storage,
295 : std::span<mutable_buffer const> buffers);
296 : awaitable_ops const* (*construct_read_awaitable)(
297 : void* source,
298 : void* storage,
299 : std::span<mutable_buffer const> buffers);
300 : std::size_t awaitable_size;
301 : std::size_t awaitable_align;
302 : void (*destroy)(void*) noexcept;
303 : };
304 :
305 : template<ReadSource S>
306 : struct any_read_source::vtable_for_impl
307 : {
308 : using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309 : std::span<mutable_buffer const>{}));
310 : using ReadAwaitable = decltype(std::declval<S&>().read(
311 : std::span<mutable_buffer const>{}));
312 :
313 : static void
314 8 : do_destroy_impl(void* source) noexcept
315 : {
316 8 : static_cast<S*>(source)->~S();
317 8 : }
318 :
319 : static awaitable_ops const*
320 52 : construct_read_some_awaitable_impl(
321 : void* source,
322 : void* storage,
323 : std::span<mutable_buffer const> buffers)
324 : {
325 52 : auto& s = *static_cast<S*>(source);
326 52 : ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327 :
328 : static constexpr awaitable_ops ops = {
329 52 : +[](void* p) {
330 52 : return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331 : },
332 2 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
333 2 : return detail::call_await_suspend(
334 2 : static_cast<ReadSomeAwaitable*>(p), h, env);
335 : },
336 50 : +[](void* p) {
337 50 : return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338 : },
339 54 : +[](void* p) noexcept {
340 2 : static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341 : }
342 : };
343 52 : return &ops;
344 : }
345 :
346 : static awaitable_ops const*
347 117 : construct_read_awaitable_impl(
348 : void* source,
349 : void* storage,
350 : std::span<mutable_buffer const> buffers)
351 : {
352 117 : auto& s = *static_cast<S*>(source);
353 117 : ::new(storage) ReadAwaitable(s.read(buffers));
354 :
355 : static constexpr awaitable_ops ops = {
356 117 : +[](void* p) {
357 117 : return static_cast<ReadAwaitable*>(p)->await_ready();
358 : },
359 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
360 1 : return detail::call_await_suspend(
361 1 : static_cast<ReadAwaitable*>(p), h, env);
362 : },
363 117 : +[](void* p) {
364 117 : return static_cast<ReadAwaitable*>(p)->await_resume();
365 : },
366 117 : +[](void* p) noexcept {
367 MIS 0 : static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368 : }
369 : };
370 HIT 117 : return &ops;
371 : }
372 :
373 : static constexpr std::size_t max_awaitable_size =
374 : sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375 : ? sizeof(ReadSomeAwaitable)
376 : : sizeof(ReadAwaitable);
377 : static constexpr std::size_t max_awaitable_align =
378 : alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379 : ? alignof(ReadSomeAwaitable)
380 : : alignof(ReadAwaitable);
381 :
382 : static constexpr vtable value = {
383 : &construct_read_some_awaitable_impl,
384 : &construct_read_awaitable_impl,
385 : max_awaitable_size,
386 : max_awaitable_align,
387 : &do_destroy_impl
388 : };
389 : };
390 :
391 : inline
392 148 : any_read_source::~any_read_source()
393 : {
394 148 : if(storage_)
395 : {
396 7 : vt_->destroy(source_);
397 7 : ::operator delete(storage_);
398 : }
399 148 : if(cached_awaitable_)
400 : {
401 141 : if(active_ops_)
402 1 : active_ops_->destroy(cached_awaitable_);
403 141 : ::operator delete(cached_awaitable_);
404 : }
405 148 : }
406 :
407 : inline any_read_source&
408 5 : any_read_source::operator=(any_read_source&& other) noexcept
409 : {
410 5 : if(this != &other)
411 : {
412 4 : if(storage_)
413 : {
414 1 : vt_->destroy(source_);
415 1 : ::operator delete(storage_);
416 : }
417 4 : if(cached_awaitable_)
418 : {
419 3 : if(active_ops_)
420 1 : active_ops_->destroy(cached_awaitable_);
421 3 : ::operator delete(cached_awaitable_);
422 : }
423 4 : source_ = std::exchange(other.source_, nullptr);
424 4 : vt_ = std::exchange(other.vt_, nullptr);
425 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
426 4 : storage_ = std::exchange(other.storage_, nullptr);
427 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
428 : }
429 5 : return *this;
430 : }
431 :
432 : template<ReadSource S>
433 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
434 9 : any_read_source::any_read_source(S s)
435 9 : : vt_(&vtable_for_impl<S>::value)
436 : {
437 : struct guard {
438 : any_read_source* self;
439 : bool committed = false;
440 9 : ~guard() {
441 9 : if(!committed && self->storage_) {
442 : // source_ is null if the source move-ctor threw before
443 : // the placement-new assigned it.
444 1 : if(self->source_)
445 MIS 0 : self->vt_->destroy(self->source_);
446 HIT 1 : ::operator delete(self->storage_);
447 1 : self->storage_ = nullptr;
448 1 : self->source_ = nullptr;
449 : }
450 9 : }
451 9 : } g{this};
452 :
453 9 : storage_ = ::operator new(sizeof(S));
454 9 : source_ = ::new(storage_) S(std::move(s));
455 :
456 : // Preallocate the awaitable storage
457 8 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
458 :
459 8 : g.committed = true;
460 9 : }
461 :
462 : template<ReadSource S>
463 136 : any_read_source::any_read_source(S* s)
464 136 : : source_(s)
465 136 : , vt_(&vtable_for_impl<S>::value)
466 : {
467 : // Preallocate the awaitable storage
468 136 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
469 136 : }
470 :
471 : template<MutableBufferSequence MB>
472 : auto
473 54 : any_read_source::read_some(MB buffers)
474 : {
475 : struct awaitable
476 : {
477 : any_read_source* self_;
478 : detail::mutable_buffer_array<detail::max_iovec_> ba_;
479 :
480 54 : awaitable(any_read_source* self, MB const& buffers)
481 54 : : self_(self)
482 54 : , ba_(buffers)
483 : {
484 54 : }
485 :
486 : bool
487 54 : await_ready() const noexcept
488 : {
489 54 : return ba_.to_span().empty();
490 : }
491 :
492 : std::coroutine_handle<>
493 52 : await_suspend(std::coroutine_handle<> h, io_env const* env)
494 : {
495 52 : self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
496 52 : self_->source_,
497 52 : self_->cached_awaitable_,
498 52 : ba_.to_span());
499 :
500 52 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
501 50 : return h;
502 :
503 2 : return self_->active_ops_->await_suspend(
504 2 : self_->cached_awaitable_, h, env);
505 : }
506 :
507 : io_result<std::size_t>
508 52 : await_resume()
509 : {
510 52 : if(ba_.to_span().empty())
511 2 : return {{}, 0};
512 :
513 : struct guard {
514 : any_read_source* self;
515 50 : ~guard() {
516 50 : self->active_ops_->destroy(self->cached_awaitable_);
517 50 : self->active_ops_ = nullptr;
518 50 : }
519 50 : } g{self_};
520 50 : return self_->active_ops_->await_resume(
521 50 : self_->cached_awaitable_);
522 50 : }
523 : };
524 54 : return awaitable(this, buffers);
525 : }
526 :
527 : inline auto
528 117 : any_read_source::read_(std::span<mutable_buffer const> buffers)
529 : {
530 : struct awaitable
531 : {
532 : any_read_source* self_;
533 : std::span<mutable_buffer const> buffers_;
534 :
535 : bool
536 117 : await_ready() const noexcept
537 : {
538 117 : return false;
539 : }
540 :
541 : std::coroutine_handle<>
542 117 : await_suspend(std::coroutine_handle<> h, io_env const* env)
543 : {
544 234 : self_->active_ops_ = self_->vt_->construct_read_awaitable(
545 117 : self_->source_,
546 117 : self_->cached_awaitable_,
547 : buffers_);
548 :
549 117 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
550 116 : return h;
551 :
552 1 : return self_->active_ops_->await_suspend(
553 1 : self_->cached_awaitable_, h, env);
554 : }
555 :
556 : io_result<std::size_t>
557 117 : await_resume()
558 : {
559 : struct guard {
560 : any_read_source* self;
561 117 : ~guard() {
562 117 : self->active_ops_->destroy(self->cached_awaitable_);
563 117 : self->active_ops_ = nullptr;
564 117 : }
565 117 : } g{self_};
566 117 : return self_->active_ops_->await_resume(
567 202 : self_->cached_awaitable_);
568 117 : }
569 : };
570 117 : return awaitable{this, buffers};
571 : }
572 :
573 : template<MutableBufferSequence MB>
574 : io_task<std::size_t>
575 111 : any_read_source::read(MB buffers)
576 : {
577 : buffer_param bp(buffers);
578 : std::size_t total = 0;
579 :
580 : for(;;)
581 : {
582 : auto bufs = bp.data();
583 : if(bufs.empty())
584 : break;
585 :
586 : auto [ec, n] = co_await read_(bufs);
587 : total += n;
588 : if(ec)
589 : co_return {ec, total};
590 : bp.consume(n);
591 : }
592 :
593 : co_return {{}, total};
594 222 : }
595 :
596 : } // namespace capy
597 : } // namespace boost
598 :
599 : #endif
|