TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/detail/buffer_array.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/write_sink.hpp>
20 : #include <coroutine>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any WriteSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref WriteSink concept, enabling runtime polymorphism for
42 : sink write operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper supports two construction modes:
46 : - **Owning**: Pass by value to transfer ownership. The wrapper
47 : allocates storage and owns the sink.
48 : - **Reference**: Pass a pointer to wrap without ownership. The
49 : pointed-to sink must outlive this wrapper.
50 :
51 : @par Awaitable Preallocation
52 : The constructor preallocates storage for the type-erased awaitable.
53 : This reserves all virtual address space at server startup
54 : so memory usage can be measured up front, rather than
55 : allocating piecemeal as traffic arrives.
56 :
57 : @par Immediate Completion
58 : Operations complete immediately without suspending when the
59 : buffer sequence is empty, or when the underlying sink's
60 : awaitable reports readiness via `await_ready`.
61 :
62 : @par Thread Safety
63 : Not thread-safe. Concurrent operations on the same wrapper
64 : are undefined behavior.
65 :
66 : @par Example
67 : @code
68 : // Owning - takes ownership of the sink
69 : any_write_sink ws(some_sink{args...});
70 :
71 : // Reference - wraps without ownership
72 : some_sink sink;
73 : any_write_sink ws(&sink);
74 :
75 : const_buffer buf(data, size);
76 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 : auto [ec2] = co_await ws.write_eof();
78 : @endcode
79 :
80 : @see any_write_stream, WriteSink
81 : */
82 : class any_write_sink
83 : {
84 : struct vtable;
85 : struct write_awaitable_ops;
86 : struct eof_awaitable_ops;
87 :
88 : template<WriteSink S>
89 : struct vtable_for_impl;
90 :
91 : void* sink_ = nullptr;
92 : vtable const* vt_ = nullptr;
93 : void* cached_awaitable_ = nullptr;
94 : void* storage_ = nullptr;
95 : write_awaitable_ops const* active_write_ops_ = nullptr;
96 : eof_awaitable_ops const* active_eof_ops_ = nullptr;
97 :
98 : public:
99 : /** Destructor.
100 :
101 : Destroys the owned sink (if any) and releases the cached
102 : awaitable storage.
103 : */
104 : ~any_write_sink();
105 :
106 : /** Construct a default instance.
107 :
108 : Constructs an empty wrapper. Operations on a default-constructed
109 : wrapper result in undefined behavior.
110 : */
111 : any_write_sink() = default;
112 :
113 : /** Non-copyable.
114 :
115 : The awaitable cache is per-instance and cannot be shared.
116 : */
117 : any_write_sink(any_write_sink const&) = delete;
118 : any_write_sink& operator=(any_write_sink const&) = delete;
119 :
120 : /** Construct by moving.
121 :
122 : Transfers ownership of the wrapped sink (if owned) and
123 : cached awaitable storage from `other`. After the move, `other` is
124 : in a default-constructed state.
125 :
126 : @param other The wrapper to move from.
127 : */
128 HIT 1 : any_write_sink(any_write_sink&& other) noexcept
129 1 : : sink_(std::exchange(other.sink_, nullptr))
130 1 : , vt_(std::exchange(other.vt_, nullptr))
131 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1 : , storage_(std::exchange(other.storage_, nullptr))
133 1 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1 : , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 : {
136 1 : }
137 :
138 : /** Assign by moving.
139 :
140 : Destroys any owned sink and releases existing resources,
141 : then transfers ownership from `other`.
142 :
143 : @param other The wrapper to move from.
144 : @return Reference to this wrapper.
145 : */
146 : any_write_sink&
147 : operator=(any_write_sink&& other) noexcept;
148 :
149 : /** Construct by taking ownership of a WriteSink.
150 :
151 : Allocates storage and moves the sink into this wrapper.
152 : The wrapper owns the sink and will destroy it.
153 :
154 : @param s The sink to take ownership of.
155 : */
156 : template<WriteSink S>
157 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 : any_write_sink(S s);
159 :
160 : /** Construct by wrapping a WriteSink without ownership.
161 :
162 : Wraps the given sink by pointer. The sink must remain
163 : valid for the lifetime of this wrapper.
164 :
165 : @param s Pointer to the sink to wrap.
166 : */
167 : template<WriteSink S>
168 : any_write_sink(S* s);
169 :
170 : /** Check if the wrapper contains a valid sink.
171 :
172 : @return `true` if wrapping a sink, `false` if default-constructed
173 : or moved-from.
174 : */
175 : bool
176 18 : has_value() const noexcept
177 : {
178 18 : return sink_ != nullptr;
179 : }
180 :
181 : /** Check if the wrapper contains a valid sink.
182 :
183 : @return `true` if wrapping a sink, `false` if default-constructed
184 : or moved-from.
185 : */
186 : explicit
187 2 : operator bool() const noexcept
188 : {
189 2 : return has_value();
190 : }
191 :
192 : /** Initiate a partial write operation.
193 :
194 : Attempt to write up to `buffer_size( buffers )` bytes from
195 : the provided buffer sequence. May consume less than the
196 : full sequence.
197 :
198 : @param buffers The buffer sequence containing data to write.
199 :
200 : @return An awaitable that await-returns `(error_code,std::size_t)`.
201 :
202 : @par Immediate Completion
203 : The operation completes immediately without suspending
204 : the calling coroutine when:
205 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
206 : @li The underlying sink's awaitable reports immediate
207 : readiness via `await_ready`.
208 :
209 : @note This is a partial operation and may not process the
210 : entire buffer sequence. Use @ref write for guaranteed
211 : complete transfer.
212 :
213 : @par Preconditions
214 : The wrapper must contain a valid sink (`has_value() == true`).
215 : */
216 : template<ConstBufferSequence CB>
217 : auto
218 : write_some(CB buffers);
219 :
220 : /** Initiate a complete write operation.
221 :
222 : Writes data from the provided buffer sequence. The operation
223 : completes when all bytes have been consumed, or an error
224 : occurs. Forwards to the underlying sink's `write` operation,
225 : windowed through @ref buffer_param when the sequence exceeds
226 : the per-call buffer limit.
227 :
228 : @param buffers The buffer sequence containing data to write.
229 :
230 : @return An awaitable that await-returns `(error_code,std::size_t)`.
231 :
232 : @par Immediate Completion
233 : The operation completes immediately without suspending
234 : the calling coroutine when:
235 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
236 : @li Every underlying `write` call completes
237 : immediately (the wrapped sink reports readiness
238 : via `await_ready` on each iteration).
239 :
240 : @par Preconditions
241 : The wrapper must contain a valid sink (`has_value() == true`).
242 : */
243 : template<ConstBufferSequence CB>
244 : io_task<std::size_t>
245 : write(CB buffers);
246 :
247 : /** Atomically write data and signal end-of-stream.
248 :
249 : Writes all data from the buffer sequence and then signals
250 : end-of-stream. The implementation decides how to partition
251 : the data across calls to the underlying sink's @ref write
252 : and `write_eof`. When the caller's buffer sequence is
253 : non-empty, the final call to the underlying sink is always
254 : `write_eof` with a non-empty buffer sequence. When the
255 : caller's buffer sequence is empty, only `write_eof()` with
256 : no data is called.
257 :
258 : @param buffers The buffer sequence containing data to write.
259 :
260 : @return An awaitable that await-returns `(error_code,std::size_t)`.
261 :
262 : @par Immediate Completion
263 : The operation completes immediately without suspending
264 : the calling coroutine when:
265 : @li The buffer sequence is empty. Only the @ref write_eof()
266 : call is performed.
267 : @li All underlying operations complete immediately (the
268 : wrapped sink reports readiness via `await_ready`).
269 :
270 : @par Preconditions
271 : The wrapper must contain a valid sink (`has_value() == true`).
272 : */
273 : template<ConstBufferSequence CB>
274 : io_task<std::size_t>
275 : write_eof(CB buffers);
276 :
277 : /** Signal end of data.
278 :
279 : Indicates that no more data will be written to the sink.
280 : The operation completes when the sink is finalized, or
281 : an error occurs.
282 :
283 : @return An awaitable that await-returns `(error_code)`.
284 :
285 : @par Immediate Completion
286 : The operation completes immediately without suspending
287 : the calling coroutine when the underlying sink's awaitable
288 : reports immediate readiness via `await_ready`.
289 :
290 : @par Preconditions
291 : The wrapper must contain a valid sink (`has_value() == true`).
292 : */
293 : auto
294 : write_eof();
295 :
296 : protected:
297 : /** Rebind to a new sink after move.
298 :
299 : Updates the internal pointer to reference a new sink object.
300 : Used by owning wrappers after move assignment when the owned
301 : object has moved to a new location.
302 :
303 : @param new_sink The new sink to bind to. Must be the same
304 : type as the original sink.
305 :
306 : @note Terminates if called with a sink of different type
307 : than the original.
308 : */
309 : template<WriteSink S>
310 : void
311 : rebind(S& new_sink) noexcept
312 : {
313 : if(vt_ != &vtable_for_impl<S>::value)
314 : std::terminate();
315 : sink_ = &new_sink;
316 : }
317 :
318 : private:
319 : auto
320 : write_some_(std::span<const_buffer const> buffers);
321 :
322 : auto
323 : write_(std::span<const_buffer const> buffers);
324 :
325 : auto
326 : write_eof_buffers_(std::span<const_buffer const> buffers);
327 : };
328 :
329 : struct any_write_sink::write_awaitable_ops
330 : {
331 : bool (*await_ready)(void*);
332 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
333 : io_result<std::size_t> (*await_resume)(void*);
334 : void (*destroy)(void*) noexcept;
335 : };
336 :
337 : struct any_write_sink::eof_awaitable_ops
338 : {
339 : bool (*await_ready)(void*);
340 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341 : io_result<> (*await_resume)(void*);
342 : void (*destroy)(void*) noexcept;
343 : };
344 :
345 : struct any_write_sink::vtable
346 : {
347 : write_awaitable_ops const* (*construct_write_some_awaitable)(
348 : void* sink,
349 : void* storage,
350 : std::span<const_buffer const> buffers);
351 : write_awaitable_ops const* (*construct_write_awaitable)(
352 : void* sink,
353 : void* storage,
354 : std::span<const_buffer const> buffers);
355 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
356 : void* sink,
357 : void* storage,
358 : std::span<const_buffer const> buffers);
359 : eof_awaitable_ops const* (*construct_eof_awaitable)(
360 : void* sink,
361 : void* storage);
362 : std::size_t awaitable_size;
363 : std::size_t awaitable_align;
364 : void (*destroy)(void*) noexcept;
365 : };
366 :
367 : template<WriteSink S>
368 : struct any_write_sink::vtable_for_impl
369 : {
370 : using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
371 : std::span<const_buffer const>{}));
372 : using WriteAwaitable = decltype(std::declval<S&>().write(
373 : std::span<const_buffer const>{}));
374 : using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
375 : std::span<const_buffer const>{}));
376 : using EofAwaitable = decltype(std::declval<S&>().write_eof());
377 :
378 : static void
379 8 : do_destroy_impl(void* sink) noexcept
380 : {
381 8 : static_cast<S*>(sink)->~S();
382 8 : }
383 :
384 : static write_awaitable_ops const*
385 41 : construct_write_some_awaitable_impl(
386 : void* sink,
387 : void* storage,
388 : std::span<const_buffer const> buffers)
389 : {
390 41 : auto& s = *static_cast<S*>(sink);
391 41 : ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
392 :
393 : static constexpr write_awaitable_ops ops = {
394 41 : +[](void* p) {
395 41 : return static_cast<WriteSomeAwaitable*>(p)->await_ready();
396 : },
397 3 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
398 3 : return detail::call_await_suspend(
399 3 : static_cast<WriteSomeAwaitable*>(p), h, env);
400 : },
401 39 : +[](void* p) {
402 39 : return static_cast<WriteSomeAwaitable*>(p)->await_resume();
403 : },
404 43 : +[](void* p) noexcept {
405 2 : static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
406 : }
407 : };
408 41 : return &ops;
409 : }
410 :
411 : static write_awaitable_ops const*
412 79 : construct_write_awaitable_impl(
413 : void* sink,
414 : void* storage,
415 : std::span<const_buffer const> buffers)
416 : {
417 79 : auto& s = *static_cast<S*>(sink);
418 79 : ::new(storage) WriteAwaitable(s.write(buffers));
419 :
420 : static constexpr write_awaitable_ops ops = {
421 79 : +[](void* p) {
422 79 : return static_cast<WriteAwaitable*>(p)->await_ready();
423 : },
424 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
425 1 : return detail::call_await_suspend(
426 1 : static_cast<WriteAwaitable*>(p), h, env);
427 : },
428 79 : +[](void* p) {
429 79 : return static_cast<WriteAwaitable*>(p)->await_resume();
430 : },
431 79 : +[](void* p) noexcept {
432 MIS 0 : static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
433 : }
434 : };
435 HIT 79 : return &ops;
436 : }
437 :
438 : static write_awaitable_ops const*
439 17 : construct_write_eof_buffers_awaitable_impl(
440 : void* sink,
441 : void* storage,
442 : std::span<const_buffer const> buffers)
443 : {
444 17 : auto& s = *static_cast<S*>(sink);
445 17 : ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
446 :
447 : static constexpr write_awaitable_ops ops = {
448 17 : +[](void* p) {
449 17 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
450 : },
451 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
452 1 : return detail::call_await_suspend(
453 1 : static_cast<WriteEofBuffersAwaitable*>(p), h, env);
454 : },
455 17 : +[](void* p) {
456 17 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
457 : },
458 17 : +[](void* p) noexcept {
459 MIS 0 : static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
460 : }
461 : };
462 HIT 17 : return &ops;
463 : }
464 :
465 : static eof_awaitable_ops const*
466 19 : construct_eof_awaitable_impl(
467 : void* sink,
468 : void* storage)
469 : {
470 19 : auto& s = *static_cast<S*>(sink);
471 19 : ::new(storage) EofAwaitable(s.write_eof());
472 :
473 : static constexpr eof_awaitable_ops ops = {
474 19 : +[](void* p) {
475 19 : return static_cast<EofAwaitable*>(p)->await_ready();
476 : },
477 3 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
478 3 : return detail::call_await_suspend(
479 3 : static_cast<EofAwaitable*>(p), h, env);
480 : },
481 17 : +[](void* p) {
482 17 : return static_cast<EofAwaitable*>(p)->await_resume();
483 : },
484 21 : +[](void* p) noexcept {
485 2 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
486 : }
487 : };
488 19 : return &ops;
489 : }
490 :
491 : static constexpr std::size_t max4(
492 : std::size_t a, std::size_t b,
493 : std::size_t c, std::size_t d) noexcept
494 : {
495 : std::size_t ab = a > b ? a : b;
496 : std::size_t cd = c > d ? c : d;
497 : return ab > cd ? ab : cd;
498 : }
499 :
500 : static constexpr std::size_t max_awaitable_size =
501 : max4(sizeof(WriteSomeAwaitable),
502 : sizeof(WriteAwaitable),
503 : sizeof(WriteEofBuffersAwaitable),
504 : sizeof(EofAwaitable));
505 :
506 : static constexpr std::size_t max_awaitable_align =
507 : max4(alignof(WriteSomeAwaitable),
508 : alignof(WriteAwaitable),
509 : alignof(WriteEofBuffersAwaitable),
510 : alignof(EofAwaitable));
511 :
512 : static constexpr vtable value = {
513 : &construct_write_some_awaitable_impl,
514 : &construct_write_awaitable_impl,
515 : &construct_write_eof_buffers_awaitable_impl,
516 : &construct_eof_awaitable_impl,
517 : max_awaitable_size,
518 : max_awaitable_align,
519 : &do_destroy_impl
520 : };
521 : };
522 :
523 : inline
524 134 : any_write_sink::~any_write_sink()
525 : {
526 134 : if(storage_)
527 : {
528 7 : vt_->destroy(sink_);
529 7 : ::operator delete(storage_);
530 : }
531 134 : if(cached_awaitable_)
532 : {
533 126 : if(active_write_ops_)
534 1 : active_write_ops_->destroy(cached_awaitable_);
535 125 : else if(active_eof_ops_)
536 1 : active_eof_ops_->destroy(cached_awaitable_);
537 126 : ::operator delete(cached_awaitable_);
538 : }
539 134 : }
540 :
541 : inline any_write_sink&
542 4 : any_write_sink::operator=(any_write_sink&& other) noexcept
543 : {
544 4 : if(this != &other)
545 : {
546 4 : if(storage_)
547 : {
548 1 : vt_->destroy(sink_);
549 1 : ::operator delete(storage_);
550 : }
551 4 : if(cached_awaitable_)
552 : {
553 3 : if(active_write_ops_)
554 1 : active_write_ops_->destroy(cached_awaitable_);
555 2 : else if(active_eof_ops_)
556 1 : active_eof_ops_->destroy(cached_awaitable_);
557 3 : ::operator delete(cached_awaitable_);
558 : }
559 4 : sink_ = std::exchange(other.sink_, nullptr);
560 4 : vt_ = std::exchange(other.vt_, nullptr);
561 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
562 4 : storage_ = std::exchange(other.storage_, nullptr);
563 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
564 4 : active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
565 : }
566 4 : return *this;
567 : }
568 :
569 : template<WriteSink S>
570 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
571 9 : any_write_sink::any_write_sink(S s)
572 9 : : vt_(&vtable_for_impl<S>::value)
573 : {
574 : struct guard {
575 : any_write_sink* self;
576 : bool committed = false;
577 9 : ~guard() {
578 9 : if(!committed && self->storage_) {
579 : // sink_ is null if the sink move-ctor threw before
580 : // the placement-new assigned it.
581 1 : if(self->sink_)
582 MIS 0 : self->vt_->destroy(self->sink_);
583 HIT 1 : ::operator delete(self->storage_);
584 1 : self->storage_ = nullptr;
585 1 : self->sink_ = nullptr;
586 : }
587 9 : }
588 9 : } g{this};
589 :
590 9 : storage_ = ::operator new(sizeof(S));
591 9 : sink_ = ::new(storage_) S(std::move(s));
592 :
593 : // Preallocate the awaitable storage (sized for max of write/eof)
594 8 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
595 :
596 8 : g.committed = true;
597 9 : }
598 :
599 : template<WriteSink S>
600 121 : any_write_sink::any_write_sink(S* s)
601 121 : : sink_(s)
602 121 : , vt_(&vtable_for_impl<S>::value)
603 : {
604 : // Preallocate the awaitable storage (sized for max of write/eof)
605 121 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
606 121 : }
607 :
608 : inline auto
609 : any_write_sink::write_some_(
610 : std::span<const_buffer const> buffers)
611 : {
612 : struct awaitable
613 : {
614 : any_write_sink* self_;
615 : std::span<const_buffer const> buffers_;
616 :
617 : bool
618 : await_ready() const noexcept
619 : {
620 : return false;
621 : }
622 :
623 : std::coroutine_handle<>
624 : await_suspend(std::coroutine_handle<> h, io_env const* env)
625 : {
626 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
627 : self_->sink_,
628 : self_->cached_awaitable_,
629 : buffers_);
630 :
631 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
632 : return h;
633 :
634 : return self_->active_write_ops_->await_suspend(
635 : self_->cached_awaitable_, h, env);
636 : }
637 :
638 : io_result<std::size_t>
639 : await_resume()
640 : {
641 : struct guard {
642 : any_write_sink* self;
643 : ~guard() {
644 : self->active_write_ops_->destroy(self->cached_awaitable_);
645 : self->active_write_ops_ = nullptr;
646 : }
647 : } g{self_};
648 : return self_->active_write_ops_->await_resume(
649 : self_->cached_awaitable_);
650 : }
651 : };
652 : return awaitable{this, buffers};
653 : }
654 :
655 : inline auto
656 79 : any_write_sink::write_(
657 : std::span<const_buffer const> buffers)
658 : {
659 : struct awaitable
660 : {
661 : any_write_sink* self_;
662 : std::span<const_buffer const> buffers_;
663 :
664 : bool
665 79 : await_ready() const noexcept
666 : {
667 79 : return false;
668 : }
669 :
670 : std::coroutine_handle<>
671 79 : await_suspend(std::coroutine_handle<> h, io_env const* env)
672 : {
673 158 : self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
674 79 : self_->sink_,
675 79 : self_->cached_awaitable_,
676 : buffers_);
677 :
678 79 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
679 78 : return h;
680 :
681 1 : return self_->active_write_ops_->await_suspend(
682 1 : self_->cached_awaitable_, h, env);
683 : }
684 :
685 : io_result<std::size_t>
686 79 : await_resume()
687 : {
688 : struct guard {
689 : any_write_sink* self;
690 79 : ~guard() {
691 79 : self->active_write_ops_->destroy(self->cached_awaitable_);
692 79 : self->active_write_ops_ = nullptr;
693 79 : }
694 79 : } g{self_};
695 79 : return self_->active_write_ops_->await_resume(
696 137 : self_->cached_awaitable_);
697 79 : }
698 : };
699 79 : return awaitable{this, buffers};
700 : }
701 :
702 : inline auto
703 19 : any_write_sink::write_eof()
704 : {
705 : struct awaitable
706 : {
707 : any_write_sink* self_;
708 :
709 : bool
710 19 : await_ready() const noexcept
711 : {
712 19 : return false;
713 : }
714 :
715 : std::coroutine_handle<>
716 19 : await_suspend(std::coroutine_handle<> h, io_env const* env)
717 : {
718 : // Construct the underlying awaitable into cached storage
719 38 : self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
720 19 : self_->sink_,
721 19 : self_->cached_awaitable_);
722 :
723 : // Check if underlying is immediately ready
724 19 : if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
725 16 : return h;
726 :
727 : // Forward to underlying awaitable
728 3 : return self_->active_eof_ops_->await_suspend(
729 3 : self_->cached_awaitable_, h, env);
730 : }
731 :
732 : io_result<>
733 17 : await_resume()
734 : {
735 : struct guard {
736 : any_write_sink* self;
737 17 : ~guard() {
738 17 : self->active_eof_ops_->destroy(self->cached_awaitable_);
739 17 : self->active_eof_ops_ = nullptr;
740 17 : }
741 17 : } g{self_};
742 17 : return self_->active_eof_ops_->await_resume(
743 29 : self_->cached_awaitable_);
744 17 : }
745 : };
746 19 : return awaitable{this};
747 : }
748 :
749 : inline auto
750 17 : any_write_sink::write_eof_buffers_(
751 : std::span<const_buffer const> buffers)
752 : {
753 : struct awaitable
754 : {
755 : any_write_sink* self_;
756 : std::span<const_buffer const> buffers_;
757 :
758 : bool
759 17 : await_ready() const noexcept
760 : {
761 17 : return false;
762 : }
763 :
764 : std::coroutine_handle<>
765 17 : await_suspend(std::coroutine_handle<> h, io_env const* env)
766 : {
767 34 : self_->active_write_ops_ =
768 34 : self_->vt_->construct_write_eof_buffers_awaitable(
769 17 : self_->sink_,
770 17 : self_->cached_awaitable_,
771 : buffers_);
772 :
773 17 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
774 16 : return h;
775 :
776 1 : return self_->active_write_ops_->await_suspend(
777 1 : self_->cached_awaitable_, h, env);
778 : }
779 :
780 : io_result<std::size_t>
781 17 : await_resume()
782 : {
783 : struct guard {
784 : any_write_sink* self;
785 17 : ~guard() {
786 17 : self->active_write_ops_->destroy(self->cached_awaitable_);
787 17 : self->active_write_ops_ = nullptr;
788 17 : }
789 17 : } g{self_};
790 17 : return self_->active_write_ops_->await_resume(
791 29 : self_->cached_awaitable_);
792 17 : }
793 : };
794 17 : return awaitable{this, buffers};
795 : }
796 :
797 : template<ConstBufferSequence CB>
798 : auto
799 43 : any_write_sink::write_some(CB buffers)
800 : {
801 : struct awaitable
802 : {
803 : any_write_sink* self_;
804 : detail::const_buffer_array<detail::max_iovec_> ba_;
805 :
806 43 : awaitable(
807 : any_write_sink* self,
808 : CB const& buffers)
809 43 : : self_(self)
810 43 : , ba_(buffers)
811 : {
812 43 : }
813 :
814 : bool
815 43 : await_ready() const noexcept
816 : {
817 43 : return ba_.to_span().empty();
818 : }
819 :
820 : std::coroutine_handle<>
821 41 : await_suspend(std::coroutine_handle<> h, io_env const* env)
822 : {
823 41 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
824 41 : self_->sink_,
825 41 : self_->cached_awaitable_,
826 41 : ba_.to_span());
827 :
828 41 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
829 38 : return h;
830 :
831 3 : return self_->active_write_ops_->await_suspend(
832 3 : self_->cached_awaitable_, h, env);
833 : }
834 :
835 : io_result<std::size_t>
836 41 : await_resume()
837 : {
838 41 : if(ba_.to_span().empty())
839 2 : return {{}, 0};
840 :
841 : struct guard {
842 : any_write_sink* self;
843 39 : ~guard() {
844 39 : self->active_write_ops_->destroy(self->cached_awaitable_);
845 39 : self->active_write_ops_ = nullptr;
846 39 : }
847 39 : } g{self_};
848 39 : return self_->active_write_ops_->await_resume(
849 39 : self_->cached_awaitable_);
850 39 : }
851 : };
852 43 : return awaitable{this, buffers};
853 : }
854 :
855 : template<ConstBufferSequence CB>
856 : io_task<std::size_t>
857 69 : any_write_sink::write(CB buffers)
858 : {
859 : buffer_param<CB> bp(buffers);
860 : std::size_t total = 0;
861 :
862 : for(;;)
863 : {
864 : auto bufs = bp.data();
865 : if(bufs.empty())
866 : break;
867 :
868 : auto [ec, n] = co_await write_(bufs);
869 : total += n;
870 : if(ec)
871 : co_return {ec, total};
872 : bp.consume(n);
873 : }
874 :
875 : co_return {{}, total};
876 138 : }
877 :
878 : template<ConstBufferSequence CB>
879 : io_task<std::size_t>
880 27 : any_write_sink::write_eof(CB buffers)
881 : {
882 : const_buffer_param<CB> bp(buffers);
883 : std::size_t total = 0;
884 :
885 : for(;;)
886 : {
887 : auto bufs = bp.data();
888 : if(bufs.empty())
889 : {
890 : auto [ec] = co_await write_eof();
891 : co_return {ec, total};
892 : }
893 :
894 : if(! bp.more())
895 : {
896 : // Last window — send atomically with EOF
897 : auto [ec, n] = co_await write_eof_buffers_(bufs);
898 : total += n;
899 : co_return {ec, total};
900 : }
901 :
902 : auto [ec, n] = co_await write_(bufs);
903 : total += n;
904 : if(ec)
905 : co_return {ec, total};
906 : bp.consume(n);
907 : }
908 54 : }
909 :
910 : } // namespace capy
911 : } // namespace boost
912 :
913 : #endif
|