Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stop_on_end = true closing underlying stream #178

Merged
merged 3 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/src/assets/modes.dot
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@ digraph modes {
"read" -> "panic";

"write" -> "write";
"write" -> "done";
"write" -> "close";
"write" -> "panic";

"stop" -> "close";
"done" -> "close";

"start" [ shape = point ];
"idle" [ shape = circle ];
"read" [ shape = circle ];
"write" [ shape = circle ];
"stop" [ shape = circle; style=bold; ];
"done" [ shape = circle; style=bold; ];
"close" [ shape = circle; style=bold; ];
"panic" [ shape = circle; style=bold; ];
}
112 changes: 47 additions & 65 deletions docs/src/assets/modes.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ The `mode` field may be one of the following value:
- `:read` : being ready to read data, data may be buffered
- `:write`: being ready to write data, data may be buffered
- `:stop` : transcoding is stopped after read, data may be buffered
- `:done` : transcoding is stopped after write, data may be buffered
- `:close`: closed, no buffered data
- `:panic`: an exception has been thrown in codec, data may be buffered but we
cannot do anything
Expand Down
7 changes: 3 additions & 4 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,11 @@ function TranscodingStreams.test_chunked_write(Encoder, Decoder)
buffer = IOBuffer()
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
write(stream, vcat(data...))
flush(stream)
close(stream)
ok = true
ok &= hash(take!(buffer)) == hash(chunks[1])
ok &= buffersize(stream.state.buffer1) == length(data[2])
ok &= hash(take!(buffer)) == hash(vcat(chunks...))
ok &= buffersize(stream.state.buffer1) == 0
Test.@test ok
close(stream)
end
finalize(encoder)
end
Expand Down
5 changes: 4 additions & 1 deletion src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ end

function TranscodingStream(codec::Noop, stream::IO;
bufsize::Integer=DEFAULT_BUFFER_SIZE,
stop_on_end::Bool=false,
sharedbuf::Bool=(stream isa TranscodingStream))
checkbufsize(bufsize)
checksharedbuf(sharedbuf, stream)
Expand All @@ -38,7 +39,9 @@ function TranscodingStream(codec::Noop, stream::IO;
else
buffer = Buffer(bufsize)
end
return TranscodingStream(codec, stream, State(buffer, buffer))
state = State(buffer, buffer)
state.stop_on_end = stop_on_end
return TranscodingStream(codec, stream, state)
end

"""
Expand Down
4 changes: 2 additions & 2 deletions src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ See Developer's notes for details.
"""
mutable struct State
# current stream mode
mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic}
mode::Symbol # {:idle, :read, :write, :stop, :close, :panic}

# return code of the last method call
code::Symbol # {:ok, :end, :error}

# flag to go :stop or :done on :end
# flag to go :stop on :end while reading
stop_on_end::Bool

# exception thrown while data processing
Expand Down
29 changes: 10 additions & 19 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ Arguments
The initial buffer size (the default size is 16KiB). The buffer may be
extended whenever `codec` requests so.
- `stop_on_end`:
The flag to stop transcoding on `:end` return code from `codec`. The
The flag to stop reading on `:end` return code from `codec`. The
transcoded data are readable even after stopping transcoding process. With
this flag on, `stream` is not closed when the wrapper stream is closed with
`close`. Note that some extra data may be read from `stream` into an
`close`. Note that if reading some extra data may be read from `stream` into an
internal buffer, and thus `stream` must be a `TranscodingStream` object and
`sharedbuf` must be `true` to reuse `stream`.
- `sharedbuf`:
Expand Down Expand Up @@ -184,11 +184,10 @@ end

function Base.close(stream::TranscodingStream)
mode = stream.state.mode
stopped = mode === :stop || mode === :done
if mode != :panic
changemode!(stream, :close)
end
if !stopped
if !stream.state.stop_on_end
close(stream.stream)
end
return nothing
Expand All @@ -214,8 +213,6 @@ end
continue
elseif mode == :write
return eof(stream.stream)
elseif mode == :done
return eof(stream.stream)
elseif mode == :close
return true
elseif mode == :stop
Expand Down Expand Up @@ -517,6 +514,9 @@ const TOKEN_END = EndToken()

function Base.write(stream::TranscodingStream, ::EndToken)
changemode!(stream, :write)
if stream.state.code == :end
callstartproc(stream, :write)
end
flushbufferall(stream)
flushuntilend(stream)
return 0
Expand Down Expand Up @@ -622,7 +622,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false)
buffer1 = stream.buffer1
buffer2 = stream.buffer2
nflushed::Int = 0
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0)
if state.code == :end
callstartproc(stream, :write)
end
Expand Down Expand Up @@ -689,8 +689,6 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
elseif state.code == :end && state.stop_on_end
if stream.state.mode == :read
changemode!(stream, :stop)
else
changemode!(stream, :done)
end
end
return Δin, Δout
Expand Down Expand Up @@ -775,11 +773,9 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
return
end
elseif mode == :write
if newmode == :close || newmode == :done
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
end
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
state.mode = newmode
finalize_codec(stream.codec, state.error)
return
Expand All @@ -789,11 +785,6 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
state.mode = newmode
return
end
elseif mode == :done
if newmode == :close
state.mode = newmode
return
end
elseif mode == :panic
throw_panic_error()
end
Expand Down
Loading
Loading