diff --git a/web_transport/sdk/impl/web_transport_stream_impl.cc b/web_transport/sdk/impl/web_transport_stream_impl.cc index 8117a45..83c7208 100644 --- a/web_transport/sdk/impl/web_transport_stream_impl.cc +++ b/web_transport/sdk/impl/web_transport_stream_impl.cc @@ -120,22 +120,85 @@ size_t WebTransportStreamImpl::Read(uint8_t* data, size_t length) { } size_t WebTransportStreamImpl::ReadableBytes() const { - return stream_->ReadableBytes(); + if (io_runner_->BelongsToCurrentThread()) { + return stream_->ReadableBytes(); + } + size_t result; + base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + io_runner_->PostTask( + FROM_HERE, + base::BindOnce( + [](WebTransportStreamImpl const* stream, size_t& result, + base::WaitableEvent* event) { + result = stream->stream_->ReadableBytes(); + event->Signal(); + }, + base::Unretained(this), std::ref(result), base::Unretained(&done))); + done.Wait(); + return result; } void WebTransportStreamImpl::Close() { - // TODO: Post to IO runner. - if (!stream_->SendFin()) { - LOG(ERROR) << "Failed to send FIN."; + if (io_runner_->BelongsToCurrentThread()) { + if (!stream_->SendFin()) { + LOG(ERROR) << "Failed to send FIN."; + } + return; } + base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + io_runner_->PostTask( + FROM_HERE, + base::BindOnce( + [](WebTransportStreamImpl* stream, base::WaitableEvent* event) { + if (!stream->stream_->SendFin()) { + LOG(ERROR) << "Failed to send FIN."; + } + event->Signal(); + }, + base::Unretained(this), base::Unretained(&done))); + done.Wait(); } uint64_t WebTransportStreamImpl::BufferedDataBytes() const { - return quic_stream_->BufferedDataBytes(); + if (io_runner_->BelongsToCurrentThread()) { + return quic_stream_->BufferedDataBytes(); + } + uint64_t result; + base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + io_runner_->PostTask( + FROM_HERE, + base::BindOnce( + [](WebTransportStreamImpl const* stream, uint64_t& result, + base::WaitableEvent* event) { + result = stream->quic_stream_->BufferedDataBytes(); + event->Signal(); + }, + base::Unretained(this), std::ref(result), base::Unretained(&done))); + done.Wait(); + return result; } bool WebTransportStreamImpl::CanWrite() const { - return stream_->CanWrite(); + if (io_runner_->BelongsToCurrentThread()) { + return stream_->CanWrite(); + } + bool result; + base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + io_runner_->PostTask( + FROM_HERE, + base::BindOnce( + [](WebTransportStreamImpl const* stream, bool& result, + base::WaitableEvent* event) { + result = stream->stream_->CanWrite(); + event->Signal(); + }, + base::Unretained(this), std::ref(result), base::Unretained(&done))); + done.Wait(); + return result; } void WebTransportStreamImpl::OnCanRead() {