Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Run some methods on IO thread. (#40)
Browse files Browse the repository at this point in the history
All calls to stream_ and quic_stream_ are expected to run on IO thread.
  • Loading branch information
jianjunz authored Sep 30, 2021
1 parent 995f58b commit 5867270
Showing 1 changed file with 69 additions and 6 deletions.
75 changes: 69 additions & 6 deletions web_transport/sdk/impl/web_transport_stream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,85 @@ size_t WebTransportStreamImpl::Read(uint8_t* data, size_t length) {
}

size_t WebTransportStreamImpl::ReadableBytes() const {
return stream_->ReadableBytes();
if (io_runner_->BelongsToCurrentThread()) {
return stream_->ReadableBytes();
}
size_t result;
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
io_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](WebTransportStreamImpl const* stream, size_t& result,
base::WaitableEvent* event) {
result = stream->stream_->ReadableBytes();
event->Signal();
},
base::Unretained(this), std::ref(result), base::Unretained(&done)));
done.Wait();
return result;
}

void WebTransportStreamImpl::Close() {
// TODO: Post to IO runner.
if (!stream_->SendFin()) {
LOG(ERROR) << "Failed to send FIN.";
if (io_runner_->BelongsToCurrentThread()) {
if (!stream_->SendFin()) {
LOG(ERROR) << "Failed to send FIN.";
}
return;
}
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
io_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](WebTransportStreamImpl* stream, base::WaitableEvent* event) {
if (!stream->stream_->SendFin()) {
LOG(ERROR) << "Failed to send FIN.";
}
event->Signal();
},
base::Unretained(this), base::Unretained(&done)));
done.Wait();
}

uint64_t WebTransportStreamImpl::BufferedDataBytes() const {
return quic_stream_->BufferedDataBytes();
if (io_runner_->BelongsToCurrentThread()) {
return quic_stream_->BufferedDataBytes();
}
uint64_t result;
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
io_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](WebTransportStreamImpl const* stream, uint64_t& result,
base::WaitableEvent* event) {
result = stream->quic_stream_->BufferedDataBytes();
event->Signal();
},
base::Unretained(this), std::ref(result), base::Unretained(&done)));
done.Wait();
return result;
}

bool WebTransportStreamImpl::CanWrite() const {
return stream_->CanWrite();
if (io_runner_->BelongsToCurrentThread()) {
return stream_->CanWrite();
}
bool result;
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
io_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](WebTransportStreamImpl const* stream, bool& result,
base::WaitableEvent* event) {
result = stream->stream_->CanWrite();
event->Signal();
},
base::Unretained(this), std::ref(result), base::Unretained(&done)));
done.Wait();
return result;
}

void WebTransportStreamImpl::OnCanRead() {
Expand Down

0 comments on commit 5867270

Please sign in to comment.