Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming get with prefetch and Julia IO Stream API #30

Merged
merged 14 commits into from
Feb 28, 2024

Conversation

andrebsguedes
Copy link
Member

No description provided.


Opaque IO stream of object data.

It is necessary to `finish!` the stream if it is not run to completion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to call finish! if it is run to completion? I don't see readbytes! calling finish or destroying the stream, is that correct? Also, is it more idiomatic to say that close should be called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! On a refactor _unsafe_read lost the destroy_read_stream calls. All other methods rely on it to properly destroy the stream if EOF is reached. Will change to the docs to Base.close.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to clarify, the docs statement is correct, we should only need to close if we don't run it to completion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me remember we discussed adding a finalizer to ensure proper reclamation, let's see if we will have the time for them

@andrebsguedes andrebsguedes merged commit 5375436 into main Feb 28, 2024
7 of 8 checks passed
Copy link
Member

@Drvi Drvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late review! Left a couple of comments / suggestions which will hopefully be useful. Thanks for working on this!

bytes_read + bytes_to_read > length(dest) && resize!(dest, bytes_read + bytes_to_read)
bytes_read += GC.@preserve dest _unsafe_read(io, pointer(dest, bytes_read+1), bytes_to_read)
end
resize!(dest, bytes_read)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should guard against resize!(dest, bytes_read) shrinking the dest array, as that is the expected behavior of readbytes!.

  readbytes!(stream::IO, b::AbstractVector{UInt8}, nb=length(b))

  Read at most nb bytes from stream into b, returning the number of bytes read. The size of b will be increased if needed (i.e. if nb is greater than length(b) and enough
  bytes could be read), but it will never be decreased.

Comment on lines +793 to +797
while !eof(io)
bytes_to_read = 128 * 1024
bytes_read + bytes_to_read > length(dest) && resize!(dest, bytes_read + bytes_to_read)
bytes_read += GC.@preserve dest _unsafe_read(io, pointer(dest, bytes_read+1), bytes_to_read)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we utilize the io.object_size instead of sniffing each 128KiB?

resize!(dest, bytes_read)
return bytes_read
else
bytes_to_read = n == typemax(Int) ? 64 * 1024 : Int(n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this branch n != typemax(Int), also I think this special handling of typemax(Int) should be documented

end

response_ref = Ref(ReadResponseFFI())
cond = Base.AsyncCondition()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we store the cond in the stream object so we don't have to allocate it on every _unsafe_read, eof, etc? Similarly for WriteStream.

Comment on lines +822 to +826
buf = zeros(UInt8, 1)
n = _unsafe_read(io, pointer(buf), 1)
n < 1 && throw(EOFError())
@inbounds b = buf[1]
return b
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid the unfortunate allocation of the array

    eof(io) && throw(EOFError())
    ref = Ref{UInt8}()
    n = GC.@preserve ref _unsafe_read(io, Base.unsafe_convert(Ptr{UInt8}, ref), 1)
    n < 1 && throw(EOFError())
    return ref[]

bytes_read = readbytes!(from, buf, 64 * 1024)
bytes_written = 0
while bytes_written < bytes_read
bytes_written += write(to, buf[bytes_written+1:bytes_read])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buf[bytes_written+1:bytes_read]

this creates a copy of the slice, consider using a @view or unsafe_write and advancing the pointer manually.

# Throws
- `GetException`: If the request fails for any reason.
"""
function get_object_stream(path::String, conf::AbstractConfig; size_hint::Int=0, decompress::String="")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call this ReadStream instead of get_object_stream?

# Throws
- `PutException`: If the request fails for any reason.
"""
function put_object_stream(path::String, conf::AbstractConfig; compress::String="")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this WriteStream instead of put_object_stream?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants