TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_source.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/read_source.hpp>
21 : #include <boost/capy/error.hpp>
22 : #include <boost/capy/ex/io_env.hpp>
23 : #include <boost/capy/io_result.hpp>
24 : #include <boost/capy/io_task.hpp>
25 :
26 : #include <concepts>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <exception>
30 : #include <new>
31 : #include <span>
32 : #include <stop_token>
33 : #include <system_error>
34 : #include <utility>
35 :
36 : namespace boost {
37 : namespace capy {
38 :
39 : /** Type-erased wrapper for any BufferSource.
40 :
41 : This class provides type erasure for any type satisfying the
42 : @ref BufferSource concept, enabling runtime polymorphism for
43 : buffer pull operations. It uses cached awaitable storage to achieve
44 : zero steady-state allocation after construction.
45 :
46 : The wrapper also satisfies @ref ReadSource. When the wrapped type
47 : satisfies only @ref BufferSource, the read operations are
48 : synthesized using @ref pull and @ref consume with an extra
49 : buffer copy. When the wrapped type satisfies both @ref BufferSource
50 : and @ref ReadSource, the native read operations are forwarded
51 : directly across the virtual boundary, avoiding the copy.
52 :
53 : The wrapper supports two construction modes:
54 : - **Owning**: Pass by value to transfer ownership. The wrapper
55 : allocates storage and owns the source.
56 : - **Reference**: Pass a pointer to wrap without ownership. The
57 : pointed-to source must outlive this wrapper.
58 :
59 : Within each mode, the vtable is populated at compile time based
60 : on whether the wrapped type also satisfies @ref ReadSource:
61 : - **BufferSource only**: @ref read_some and @ref read are
62 : synthesized from @ref pull and @ref consume, incurring one
63 : buffer copy per operation.
64 : - **BufferSource + ReadSource**: All read operations are
65 : forwarded natively through the type-erased boundary with
66 : no extra copy.
67 :
68 : @par Awaitable Preallocation
69 : The constructor preallocates storage for the type-erased awaitable.
70 : This reserves all virtual address space at server startup
71 : so memory usage can be measured up front, rather than
72 : allocating piecemeal as traffic arrives.
73 :
74 : @par Thread Safety
75 : Not thread-safe. Concurrent operations on the same wrapper
76 : are undefined behavior.
77 :
78 : @par Example
79 : @code
80 : // Owning - takes ownership of the source
81 : any_buffer_source abs(some_buffer_source{args...});
82 :
83 : // Reference - wraps without ownership
84 : some_buffer_source src;
85 : any_buffer_source abs(&src);
86 :
87 : const_buffer arr[16];
88 : auto [ec, bufs] = co_await abs.pull(arr);
89 :
90 : // ReadSource interface also available
91 : char buf[64];
92 : auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
93 : @endcode
94 :
95 : @see any_buffer_sink, BufferSource, ReadSource
96 : */
97 : class any_buffer_source
98 : {
99 : struct vtable;
100 : struct awaitable_ops;
101 : struct read_awaitable_ops;
102 :
103 : template<BufferSource S>
104 : struct vtable_for_impl;
105 :
106 : // hot-path members first for cache locality
107 : void* source_ = nullptr;
108 : vtable const* vt_ = nullptr;
109 : void* cached_awaitable_ = nullptr;
110 : awaitable_ops const* active_ops_ = nullptr;
111 : read_awaitable_ops const* active_read_ops_ = nullptr;
112 : void* storage_ = nullptr;
113 :
114 : public:
115 : /** Destructor.
116 :
117 : Destroys the owned source (if any) and releases the cached
118 : awaitable storage.
119 : */
120 : ~any_buffer_source();
121 :
122 : /** Construct a default instance.
123 :
124 : Constructs an empty wrapper. Operations on a default-constructed
125 : wrapper result in undefined behavior.
126 : */
127 : any_buffer_source() = default;
128 :
129 : /** Non-copyable.
130 :
131 : The awaitable cache is per-instance and cannot be shared.
132 : */
133 : any_buffer_source(any_buffer_source const&) = delete;
134 : any_buffer_source& operator=(any_buffer_source const&) = delete;
135 :
136 : /** Construct by moving.
137 :
138 : Transfers ownership of the wrapped source (if owned) and
139 : cached awaitable storage from `other`. After the move, `other` is
140 : in a default-constructed state.
141 :
142 : @param other The wrapper to move from.
143 : */
144 HIT 2 : any_buffer_source(any_buffer_source&& other) noexcept
145 2 : : source_(std::exchange(other.source_, nullptr))
146 2 : , vt_(std::exchange(other.vt_, nullptr))
147 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
148 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
149 2 : , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
150 2 : , storage_(std::exchange(other.storage_, nullptr))
151 : {
152 2 : }
153 :
154 : /** Assign by moving.
155 :
156 : Destroys any owned source and releases existing resources,
157 : then transfers ownership from `other`.
158 :
159 : @param other The wrapper to move from.
160 : @return Reference to this wrapper.
161 : */
162 : any_buffer_source&
163 : operator=(any_buffer_source&& other) noexcept;
164 :
165 : /** Construct by taking ownership of a BufferSource.
166 :
167 : Allocates storage and moves the source into this wrapper.
168 : The wrapper owns the source and will destroy it. If `S` also
169 : satisfies @ref ReadSource, native read operations are
170 : forwarded through the virtual boundary.
171 :
172 : @param s The source to take ownership of.
173 : */
174 : template<BufferSource S>
175 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
176 : any_buffer_source(S s);
177 :
178 : /** Construct by wrapping a BufferSource without ownership.
179 :
180 : Wraps the given source by pointer. The source must remain
181 : valid for the lifetime of this wrapper. If `S` also
182 : satisfies @ref ReadSource, native read operations are
183 : forwarded through the virtual boundary.
184 :
185 : @param s Pointer to the source to wrap.
186 : */
187 : template<BufferSource S>
188 : any_buffer_source(S* s);
189 :
190 : /** Check if the wrapper contains a valid source.
191 :
192 : @return `true` if wrapping a source, `false` if default-constructed
193 : or moved-from.
194 : */
195 : bool
196 19 : has_value() const noexcept
197 : {
198 19 : return source_ != nullptr;
199 : }
200 :
201 : /** Check if the wrapper contains a valid source.
202 :
203 : @return `true` if wrapping a source, `false` if default-constructed
204 : or moved-from.
205 : */
206 : explicit
207 2 : operator bool() const noexcept
208 : {
209 2 : return has_value();
210 : }
211 :
212 : /** Consume bytes from the source.
213 :
214 : Advances the internal read position of the underlying source
215 : by the specified number of bytes. The next call to @ref pull
216 : returns data starting after the consumed bytes.
217 :
218 : @param n The number of bytes to consume. Must not exceed the
219 : total size of buffers returned by the previous @ref pull.
220 :
221 : @par Preconditions
222 : The wrapper must contain a valid source (`has_value() == true`).
223 : */
224 : void
225 : consume(std::size_t n) noexcept;
226 :
227 : /** Pull buffer data from the source.
228 :
229 : Fills the provided span with buffer descriptors from the
230 : underlying source. The operation completes when data is
231 : available, the source is exhausted, or an error occurs.
232 :
233 : @param dest Span of const_buffer to fill.
234 :
235 : @return An awaitable that await-returns `(error_code,std::span<const_buffer>)`.
236 : On success with data, a non-empty span of filled buffers.
237 : On EOF, `ec == cond::eof` and span is empty.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid source (`has_value() == true`).
241 : The caller must not call this function again after a prior
242 : call returned an error.
243 : */
244 : auto
245 : pull(std::span<const_buffer> dest);
246 :
247 : /** Read some data into a mutable buffer sequence.
248 :
249 : Attempt to read up to `buffer_size( buffers )` bytes into
250 : the caller's buffers. May fill less than the full sequence.
251 :
252 : When the wrapped type provides native @ref ReadSource support,
253 : the operation forwards directly. Otherwise it is synthesized
254 : from @ref pull, @ref buffer_copy, and @ref consume.
255 :
256 : @param buffers The buffer sequence to fill.
257 :
258 : @return An awaitable that await-returns `(error_code,std::size_t)`.
259 :
260 : @par Preconditions
261 : The wrapper must contain a valid source (`has_value() == true`).
262 : The caller must not call this function again after a prior
263 : call returned an error (including EOF).
264 :
265 : @see pull, consume
266 : */
267 : template<MutableBufferSequence MB>
268 : io_task<std::size_t>
269 : read_some(MB buffers);
270 :
271 : /** Read data into a mutable buffer sequence.
272 :
273 : Fills the provided buffer sequence completely. When the
274 : wrapped type provides native @ref ReadSource support, each
275 : window is forwarded directly. Otherwise the data is
276 : synthesized from @ref pull, @ref buffer_copy, and @ref consume.
277 :
278 : @param buffers The buffer sequence to fill.
279 :
280 : @return An awaitable that await-returns `(error_code,std::size_t)`.
281 : On success, `n == buffer_size(buffers)`.
282 : On EOF, `ec == error::eof` and `n` is bytes transferred.
283 :
284 : @par Preconditions
285 : The wrapper must contain a valid source (`has_value() == true`).
286 : The caller must not call this function again after a prior
287 : call returned an error (including EOF).
288 :
289 : @see pull, consume
290 : */
291 : template<MutableBufferSequence MB>
292 : io_task<std::size_t>
293 : read(MB buffers);
294 :
295 : protected:
296 : /** Rebind to a new source after move.
297 :
298 : Updates the internal pointer to reference a new source object.
299 : Used by owning wrappers after move assignment when the owned
300 : object has moved to a new location.
301 :
302 : @param new_source The new source to bind to. Must be the same
303 : type as the original source.
304 :
305 : @note Terminates if called with a source of different type
306 : than the original.
307 : */
308 : template<BufferSource S>
309 : void
310 : rebind(S& new_source) noexcept
311 : {
312 : if(vt_ != &vtable_for_impl<S>::value)
313 : std::terminate();
314 : source_ = &new_source;
315 : }
316 :
317 : private:
318 : /** Forward a partial read through the vtable.
319 :
320 : Constructs the underlying `read_some` awaitable in
321 : cached storage and returns a type-erased awaitable.
322 : */
323 : auto
324 : read_some_(std::span<mutable_buffer const> buffers);
325 :
326 : /** Forward a complete read through the vtable.
327 :
328 : Constructs the underlying `read` awaitable in
329 : cached storage and returns a type-erased awaitable.
330 : */
331 : auto
332 : read_(std::span<mutable_buffer const> buffers);
333 : };
334 :
335 : /** Type-erased ops for awaitables that await-return `io_result<std::span<const_buffer>>`. */
336 : struct any_buffer_source::awaitable_ops
337 : {
338 : bool (*await_ready)(void*);
339 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
340 : io_result<std::span<const_buffer>> (*await_resume)(void*);
341 : void (*destroy)(void*) noexcept;
342 : };
343 :
344 : /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
345 : struct any_buffer_source::read_awaitable_ops
346 : {
347 : bool (*await_ready)(void*);
348 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
349 : io_result<std::size_t> (*await_resume)(void*);
350 : void (*destroy)(void*) noexcept;
351 : };
352 :
353 : struct any_buffer_source::vtable
354 : {
355 : // BufferSource ops (always populated)
356 : void (*destroy)(void*) noexcept;
357 : void (*do_consume)(void* source, std::size_t n) noexcept;
358 : std::size_t awaitable_size;
359 : std::size_t awaitable_align;
360 : awaitable_ops const* (*construct_awaitable)(
361 : void* source,
362 : void* storage,
363 : std::span<const_buffer> dest);
364 :
365 : // ReadSource forwarding (null when wrapped type is BufferSource-only)
366 : read_awaitable_ops const* (*construct_read_some_awaitable)(
367 : void* source,
368 : void* storage,
369 : std::span<mutable_buffer const> buffers);
370 : read_awaitable_ops const* (*construct_read_awaitable)(
371 : void* source,
372 : void* storage,
373 : std::span<mutable_buffer const> buffers);
374 : };
375 :
376 : template<BufferSource S>
377 : struct any_buffer_source::vtable_for_impl
378 : {
379 : using PullAwaitable = decltype(std::declval<S&>().pull(
380 : std::declval<std::span<const_buffer>>()));
381 :
382 : static void
383 9 : do_destroy_impl(void* source) noexcept
384 : {
385 9 : static_cast<S*>(source)->~S();
386 9 : }
387 :
388 : static void
389 45 : do_consume_impl(void* source, std::size_t n) noexcept
390 : {
391 45 : static_cast<S*>(source)->consume(n);
392 45 : }
393 :
394 : static awaitable_ops const*
395 111 : construct_awaitable_impl(
396 : void* source,
397 : void* storage,
398 : std::span<const_buffer> dest)
399 : {
400 111 : auto& s = *static_cast<S*>(source);
401 111 : ::new(storage) PullAwaitable(s.pull(dest));
402 :
403 : static constexpr awaitable_ops ops = {
404 111 : +[](void* p) {
405 111 : return static_cast<PullAwaitable*>(p)->await_ready();
406 : },
407 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
408 1 : return detail::call_await_suspend(
409 1 : static_cast<PullAwaitable*>(p), h, env);
410 : },
411 111 : +[](void* p) {
412 111 : return static_cast<PullAwaitable*>(p)->await_resume();
413 : },
414 111 : +[](void* p) noexcept {
415 111 : static_cast<PullAwaitable*>(p)->~PullAwaitable();
416 : }
417 : };
418 111 : return &ops;
419 : }
420 :
421 : static read_awaitable_ops const*
422 49 : construct_read_some_awaitable_impl(
423 : void* source,
424 : void* storage,
425 : std::span<mutable_buffer const> buffers)
426 : requires ReadSource<S>
427 : {
428 : using Aw = decltype(std::declval<S&>().read_some(
429 : std::span<mutable_buffer const>{}));
430 49 : auto& s = *static_cast<S*>(source);
431 49 : ::new(storage) Aw(s.read_some(buffers));
432 :
433 : static constexpr read_awaitable_ops ops = {
434 49 : +[](void* p) {
435 49 : return static_cast<Aw*>(p)->await_ready();
436 : },
437 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
438 1 : return detail::call_await_suspend(
439 1 : static_cast<Aw*>(p), h, env);
440 : },
441 49 : +[](void* p) {
442 49 : return static_cast<Aw*>(p)->await_resume();
443 : },
444 49 : +[](void* p) noexcept {
445 49 : static_cast<Aw*>(p)->~Aw();
446 : }
447 : };
448 49 : return &ops;
449 : }
450 :
451 : static read_awaitable_ops const*
452 19 : construct_read_awaitable_impl(
453 : void* source,
454 : void* storage,
455 : std::span<mutable_buffer const> buffers)
456 : requires ReadSource<S>
457 : {
458 : using Aw = decltype(std::declval<S&>().read(
459 : std::span<mutable_buffer const>{}));
460 19 : auto& s = *static_cast<S*>(source);
461 19 : ::new(storage) Aw(s.read(buffers));
462 :
463 : static constexpr read_awaitable_ops ops = {
464 19 : +[](void* p) {
465 19 : return static_cast<Aw*>(p)->await_ready();
466 : },
467 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
468 1 : return detail::call_await_suspend(
469 1 : static_cast<Aw*>(p), h, env);
470 : },
471 19 : +[](void* p) {
472 19 : return static_cast<Aw*>(p)->await_resume();
473 : },
474 19 : +[](void* p) noexcept {
475 19 : static_cast<Aw*>(p)->~Aw();
476 : }
477 : };
478 19 : return &ops;
479 : }
480 :
481 : static consteval std::size_t
482 : compute_max_size() noexcept
483 : {
484 : std::size_t s = sizeof(PullAwaitable);
485 : if constexpr (ReadSource<S>)
486 : {
487 : using RS = decltype(std::declval<S&>().read_some(
488 : std::span<mutable_buffer const>{}));
489 : using R = decltype(std::declval<S&>().read(
490 : std::span<mutable_buffer const>{}));
491 :
492 : if(sizeof(RS) > s) s = sizeof(RS);
493 : if(sizeof(R) > s) s = sizeof(R);
494 : }
495 : return s;
496 : }
497 :
498 : static consteval std::size_t
499 : compute_max_align() noexcept
500 : {
501 : std::size_t a = alignof(PullAwaitable);
502 : if constexpr (ReadSource<S>)
503 : {
504 : using RS = decltype(std::declval<S&>().read_some(
505 : std::span<mutable_buffer const>{}));
506 : using R = decltype(std::declval<S&>().read(
507 : std::span<mutable_buffer const>{}));
508 :
509 : if(alignof(RS) > a) a = alignof(RS);
510 : if(alignof(R) > a) a = alignof(R);
511 : }
512 : return a;
513 : }
514 :
515 : static consteval vtable
516 : make_vtable() noexcept
517 : {
518 : vtable v{};
519 : v.destroy = &do_destroy_impl;
520 : v.do_consume = &do_consume_impl;
521 : v.awaitable_size = compute_max_size();
522 : v.awaitable_align = compute_max_align();
523 : v.construct_awaitable = &construct_awaitable_impl;
524 : v.construct_read_some_awaitable = nullptr;
525 : v.construct_read_awaitable = nullptr;
526 :
527 : if constexpr (ReadSource<S>)
528 : {
529 : v.construct_read_some_awaitable =
530 : &construct_read_some_awaitable_impl;
531 : v.construct_read_awaitable =
532 : &construct_read_awaitable_impl;
533 : }
534 : return v;
535 : }
536 :
537 : static constexpr vtable value = make_vtable();
538 : };
539 :
540 : inline
541 127 : any_buffer_source::~any_buffer_source()
542 : {
543 127 : if(storage_)
544 : {
545 8 : vt_->destroy(source_);
546 8 : ::operator delete(storage_);
547 : }
548 127 : if(cached_awaitable_)
549 121 : ::operator delete(cached_awaitable_);
550 127 : }
551 :
552 : inline any_buffer_source&
553 3 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
554 : {
555 3 : if(this != &other)
556 : {
557 3 : if(storage_)
558 : {
559 1 : vt_->destroy(source_);
560 1 : ::operator delete(storage_);
561 : }
562 3 : if(cached_awaitable_)
563 1 : ::operator delete(cached_awaitable_);
564 3 : source_ = std::exchange(other.source_, nullptr);
565 3 : vt_ = std::exchange(other.vt_, nullptr);
566 3 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
567 3 : storage_ = std::exchange(other.storage_, nullptr);
568 3 : active_ops_ = std::exchange(other.active_ops_, nullptr);
569 3 : active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
570 : }
571 3 : return *this;
572 : }
573 :
574 : template<BufferSource S>
575 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
576 10 : any_buffer_source::any_buffer_source(S s)
577 10 : : vt_(&vtable_for_impl<S>::value)
578 : {
579 : struct guard {
580 : any_buffer_source* self;
581 : bool committed = false;
582 10 : ~guard() {
583 10 : if(!committed && self->storage_) {
584 : // source_ is null if the source move-ctor threw before
585 : // the placement-new assigned it.
586 1 : if(self->source_)
587 MIS 0 : self->vt_->destroy(self->source_);
588 HIT 1 : ::operator delete(self->storage_);
589 1 : self->storage_ = nullptr;
590 1 : self->source_ = nullptr;
591 : }
592 10 : }
593 10 : } g{this};
594 :
595 10 : storage_ = ::operator new(sizeof(S));
596 10 : source_ = ::new(storage_) S(std::move(s));
597 :
598 9 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
599 :
600 9 : g.committed = true;
601 10 : }
602 :
603 : template<BufferSource S>
604 113 : any_buffer_source::any_buffer_source(S* s)
605 113 : : source_(s)
606 113 : , vt_(&vtable_for_impl<S>::value)
607 : {
608 113 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
609 113 : }
610 :
611 : inline void
612 45 : any_buffer_source::consume(std::size_t n) noexcept
613 : {
614 45 : vt_->do_consume(source_, n);
615 45 : }
616 :
617 : inline auto
618 111 : any_buffer_source::pull(std::span<const_buffer> dest)
619 : {
620 : struct awaitable
621 : {
622 : any_buffer_source* self_;
623 : std::span<const_buffer> dest_;
624 :
625 : bool
626 111 : await_ready()
627 : {
628 222 : self_->active_ops_ = self_->vt_->construct_awaitable(
629 111 : self_->source_,
630 111 : self_->cached_awaitable_,
631 : dest_);
632 111 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
633 : }
634 :
635 : std::coroutine_handle<>
636 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
637 : {
638 1 : return self_->active_ops_->await_suspend(
639 1 : self_->cached_awaitable_, h, env);
640 : }
641 :
642 : io_result<std::span<const_buffer>>
643 111 : await_resume()
644 : {
645 : struct guard {
646 : any_buffer_source* self;
647 111 : ~guard() {
648 111 : self->active_ops_->destroy(self->cached_awaitable_);
649 111 : self->active_ops_ = nullptr;
650 111 : }
651 111 : } g{self_};
652 111 : return self_->active_ops_->await_resume(
653 197 : self_->cached_awaitable_);
654 111 : }
655 : };
656 111 : return awaitable{this, dest};
657 : }
658 :
659 : inline auto
660 49 : any_buffer_source::read_some_(
661 : std::span<mutable_buffer const> buffers)
662 : {
663 : struct awaitable
664 : {
665 : any_buffer_source* self_;
666 : std::span<mutable_buffer const> buffers_;
667 :
668 : bool
669 49 : await_ready() const noexcept
670 : {
671 49 : return false;
672 : }
673 :
674 : std::coroutine_handle<>
675 49 : await_suspend(std::coroutine_handle<> h, io_env const* env)
676 : {
677 98 : self_->active_read_ops_ =
678 98 : self_->vt_->construct_read_some_awaitable(
679 49 : self_->source_,
680 49 : self_->cached_awaitable_,
681 : buffers_);
682 :
683 49 : if(self_->active_read_ops_->await_ready(
684 49 : self_->cached_awaitable_))
685 48 : return h;
686 :
687 1 : return self_->active_read_ops_->await_suspend(
688 1 : self_->cached_awaitable_, h, env);
689 : }
690 :
691 : io_result<std::size_t>
692 49 : await_resume()
693 : {
694 : struct guard {
695 : any_buffer_source* self;
696 49 : ~guard() {
697 49 : self->active_read_ops_->destroy(
698 49 : self->cached_awaitable_);
699 49 : self->active_read_ops_ = nullptr;
700 49 : }
701 49 : } g{self_};
702 49 : return self_->active_read_ops_->await_resume(
703 90 : self_->cached_awaitable_);
704 49 : }
705 : };
706 49 : return awaitable{this, buffers};
707 : }
708 :
709 : inline auto
710 19 : any_buffer_source::read_(
711 : std::span<mutable_buffer const> buffers)
712 : {
713 : struct awaitable
714 : {
715 : any_buffer_source* self_;
716 : std::span<mutable_buffer const> buffers_;
717 :
718 : bool
719 19 : await_ready() const noexcept
720 : {
721 19 : return false;
722 : }
723 :
724 : std::coroutine_handle<>
725 19 : await_suspend(std::coroutine_handle<> h, io_env const* env)
726 : {
727 38 : self_->active_read_ops_ =
728 38 : self_->vt_->construct_read_awaitable(
729 19 : self_->source_,
730 19 : self_->cached_awaitable_,
731 : buffers_);
732 :
733 19 : if(self_->active_read_ops_->await_ready(
734 19 : self_->cached_awaitable_))
735 18 : return h;
736 :
737 1 : return self_->active_read_ops_->await_suspend(
738 1 : self_->cached_awaitable_, h, env);
739 : }
740 :
741 : io_result<std::size_t>
742 19 : await_resume()
743 : {
744 : struct guard {
745 : any_buffer_source* self;
746 19 : ~guard() {
747 19 : self->active_read_ops_->destroy(
748 19 : self->cached_awaitable_);
749 19 : self->active_read_ops_ = nullptr;
750 19 : }
751 19 : } g{self_};
752 19 : return self_->active_read_ops_->await_resume(
753 32 : self_->cached_awaitable_);
754 19 : }
755 : };
756 19 : return awaitable{this, buffers};
757 : }
758 :
759 : template<MutableBufferSequence MB>
760 : io_task<std::size_t>
761 59 : any_buffer_source::read_some(MB buffers)
762 : {
763 : buffer_param<MB> bp(buffers);
764 : auto dest = bp.data();
765 : if(dest.empty())
766 : co_return {{}, 0};
767 :
768 : // Native ReadSource path
769 : if(vt_->construct_read_some_awaitable)
770 : co_return co_await read_some_(dest);
771 :
772 : // Synthesized path: pull + buffer_copy + consume
773 : const_buffer arr[detail::max_iovec_];
774 : auto [ec, bufs] = co_await pull(arr);
775 : if(ec)
776 : co_return {ec, 0};
777 :
778 : auto n = buffer_copy(dest, bufs);
779 : consume(n);
780 : co_return {{}, n};
781 118 : }
782 :
783 : template<MutableBufferSequence MB>
784 : io_task<std::size_t>
785 25 : any_buffer_source::read(MB buffers)
786 : {
787 : buffer_param<MB> bp(buffers);
788 : std::size_t total = 0;
789 :
790 : // Native ReadSource path
791 : if(vt_->construct_read_awaitable)
792 : {
793 : for(;;)
794 : {
795 : auto dest = bp.data();
796 : if(dest.empty())
797 : break;
798 :
799 : auto [ec, n] = co_await read_(dest);
800 : total += n;
801 : if(ec)
802 : co_return {ec, total};
803 : bp.consume(n);
804 : }
805 : co_return {{}, total};
806 : }
807 :
808 : // Synthesized path: pull + buffer_copy + consume
809 : for(;;)
810 : {
811 : auto dest = bp.data();
812 : if(dest.empty())
813 : break;
814 :
815 : const_buffer arr[detail::max_iovec_];
816 : auto [ec, bufs] = co_await pull(arr);
817 :
818 : if(ec)
819 : co_return {ec, total};
820 :
821 : auto n = buffer_copy(dest, bufs);
822 : consume(n);
823 : total += n;
824 : bp.consume(n);
825 : }
826 :
827 : co_return {{}, total};
828 50 : }
829 :
830 : static_assert(BufferSource<any_buffer_source>);
831 : static_assert(ReadSource<any_buffer_source>);
832 :
833 : } // namespace capy
834 : } // namespace boost
835 :
836 : #endif
|