Skip to content

Commit

Permalink
fix stop_on_end and remove done mode
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 committed Mar 8, 2024
1 parent f2916c9 commit a64b01b
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 119 deletions.
3 changes: 0 additions & 3 deletions docs/src/assets/modes.dot
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@ digraph modes {
"read" -> "panic";

"write" -> "write";
"write" -> "done";
"write" -> "close";
"write" -> "panic";

"stop" -> "close";
"done" -> "close";

"start" [ shape = point ];
"idle" [ shape = circle ];
"read" [ shape = circle ];
"write" [ shape = circle ];
"stop" [ shape = circle; style=bold; ];
"done" [ shape = circle; style=bold; ];
"close" [ shape = circle; style=bold; ];
"panic" [ shape = circle; style=bold; ];
}
112 changes: 47 additions & 65 deletions docs/src/assets/modes.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ The `mode` field may be one of the following value:
- `:read` : being ready to read data, data may be buffered
- `:write`: being ready to write data, data may be buffered
- `:stop` : transcoding is stopped after read, data may be buffered
- `:done` : transcoding is stopped after write, data may be buffered
- `:close`: closed, no buffered data
- `:panic`: an exception has been thrown in codec, data may be buffered but we
cannot do anything
Expand Down
21 changes: 0 additions & 21 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,4 @@ function TranscodingStreams.test_chunked_read(Encoder, Decoder)
finalize(encoder)
end

function TranscodingStreams.test_chunked_write(Encoder, Decoder)
seed!(TEST_RANDOM_SEED)
alpha = b"空即是色"
encoder = Encoder()
initialize(encoder)
for _ in 1:500
chunks = [rand(alpha, rand(0:100)) for _ in 1:2]
data = map(x->transcode(encoder, x), chunks)
buffer = IOBuffer()
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
write(stream, vcat(data...))
flush(stream)
ok = true
ok &= hash(take!(buffer)) == hash(chunks[1])
ok &= buffersize(stream.state.buffer1) == length(data[2])
Test.@test ok
close(stream)
end
finalize(encoder)
end

end # module
5 changes: 4 additions & 1 deletion src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ end

function TranscodingStream(codec::Noop, stream::IO;
bufsize::Integer=DEFAULT_BUFFER_SIZE,
stop_on_end::Bool=false,
sharedbuf::Bool=(stream isa TranscodingStream))
checkbufsize(bufsize)
checksharedbuf(sharedbuf, stream)
Expand All @@ -38,7 +39,9 @@ function TranscodingStream(codec::Noop, stream::IO;
else
buffer = Buffer(bufsize)
end
return TranscodingStream(codec, stream, State(buffer, buffer))
state = State(buffer, buffer)
state.stop_on_end = stop_on_end
return TranscodingStream(codec, stream, state)
end

"""
Expand Down
4 changes: 2 additions & 2 deletions src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ See Developer's notes for details.
"""
mutable struct State
# current stream mode
mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic}
mode::Symbol # {:idle, :read, :write, :stop, :close, :panic}

# return code of the last method call
code::Symbol # {:ok, :end, :error}

# flag to go :stop or :done on :end
# flag to go :stop on :end while reading
stop_on_end::Bool

# exception thrown while data processing
Expand Down
29 changes: 10 additions & 19 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ Arguments
The initial buffer size (the default size is 16KiB). The buffer may be
extended whenever `codec` requests so.
- `stop_on_end`:
The flag to stop transcoding on `:end` return code from `codec`. The
The flag to stop reading on `:end` return code from `codec`. The
transcoded data are readable even after stopping transcoding process. With
this flag on, `stream` is not closed when the wrapper stream is closed with
`close`. Note that some extra data may be read from `stream` into an
`close`. Note that if reading some extra data may be read from `stream` into an
internal buffer, and thus `stream` must be a `TranscodingStream` object and
`sharedbuf` must be `true` to reuse `stream`.
- `sharedbuf`:
Expand Down Expand Up @@ -184,11 +184,10 @@ end

function Base.close(stream::TranscodingStream)
mode = stream.state.mode
stopped = mode === :stop || mode === :done
if mode != :panic
changemode!(stream, :close)
end
if !stopped
if !stream.state.stop_on_end
close(stream.stream)
end
return nothing
Expand All @@ -214,8 +213,6 @@ end
continue
elseif mode == :write
return eof(stream.stream)
elseif mode == :done
return eof(stream.stream)
elseif mode == :close
return true
elseif mode == :stop
Expand Down Expand Up @@ -517,6 +514,9 @@ const TOKEN_END = EndToken()

function Base.write(stream::TranscodingStream, ::EndToken)
changemode!(stream, :write)
if stream.state.code == :end
callstartproc(stream, :write)
end
flushbufferall(stream)
flushuntilend(stream)
return 0
Expand Down Expand Up @@ -622,7 +622,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false)
buffer1 = stream.buffer1
buffer2 = stream.buffer2
nflushed::Int = 0
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0)
if state.code == :end
callstartproc(stream, :write)
end
Expand Down Expand Up @@ -689,8 +689,6 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
elseif state.code == :end && state.stop_on_end
if stream.state.mode == :read
changemode!(stream, :stop)
else
changemode!(stream, :done)
end
end
return Δin, Δout
Expand Down Expand Up @@ -775,11 +773,9 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
return
end
elseif mode == :write
if newmode == :close || newmode == :done
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
end
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
state.mode = newmode
finalize_codec(stream.codec, state.error)
return
Expand All @@ -789,11 +785,6 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
state.mode = newmode
return
end
elseif mode == :done
if newmode == :close
state.mode = newmode
return
end
elseif mode == :panic
throw_panic_error()
end
Expand Down
Loading

0 comments on commit a64b01b

Please sign in to comment.