diff --git a/.github/workflows/Invalidations.yml b/.github/workflows/Invalidations.yml deleted file mode 100644 index decf2561..00000000 --- a/.github/workflows/Invalidations.yml +++ /dev/null @@ -1,38 +0,0 @@ -name: Invalidations - -on: - pull_request: - branches: [main, master, dev] - -concurrency: - # Skip intermediate builds: always. - # Cancel intermediate builds: always. - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - evaluate: - runs-on: ubuntu-latest - steps: - - uses: julia-actions/setup-julia@v2 - with: - version: '1' - - uses: actions/checkout@v4 - - uses: julia-actions/julia-buildpkg@v1 - - uses: julia-actions/julia-invalidations@v1 - id: invs_pr - - - uses: actions/checkout@v4 - with: - ref: ${{ github.event.repository.default_branch }} - - uses: julia-actions/julia-buildpkg@v1 - - uses: julia-actions/julia-invalidations@v1 - id: invs_default - - - name: Report invalidation counts - run: | - echo "Invalidations on default branch: ${{ steps.invs_default.outputs.total }} (${{ steps.invs_default.outputs.deps }} via deps)" >> $GITHUB_STEP_SUMMARY - echo "This branch: ${{ steps.invs_pr.outputs.total }} (${{ steps.invs_pr.outputs.deps }} via deps)" >> $GITHUB_STEP_SUMMARY - - name: Check if the PR does increase number of invalidations - if: steps.invs_pr.outputs.total > steps.invs_default.outputs.total - run: exit 1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 9784fe6e..0931c895 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.5.5 + - Experimental support for writing to and reading from `IO` objects e.g. `jldopen(io, "r")` + ## 0.5.4 - Important correctness fix when storing very many equally sized objects that may get GC'ed while storing is in progress! (#603) diff --git a/Project.toml b/Project.toml index caaac3c2..e2669ac2 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "JLD2" uuid = "033835bb-8acc-5ee8-8aae-3f567f8a3819" -version = "0.5.4" +version = "0.5.5" [deps] FileIO = "5789e2e9-d7fb-5bc7-8068-2c6fae9b9549" diff --git a/src/JLD2.jl b/src/JLD2.jl index 079d00c2..cc868753 100644 --- a/src/JLD2.jl +++ b/src/JLD2.jl @@ -7,38 +7,16 @@ using FileIO: load, save export load, save using Requires: @require using PrecompileTools: @setup_workload, @compile_workload +export jldopen, @load, @save, save_object, load_object, jldsave -export jldopen, @load, @save, save_object, load_object, printtoc -export jldsave - - -# Due to custom overrides we do not use Base functions directly -# but define our own to avoid type piracy -""" - jlwrite(io, x) - -Wrapper around `Base.write(io, x)`. Defined separately to avoid type piracy. -""" -jlwrite(io, x) = Base.write(io, x) -""" - jlread(io, x) +include("types.jl") -Wrapper around `Base.read(io, x)`. Defined separately to avoid type piracy. -""" -jlread(io, x) = Base.read(io, x) -jlread(io::IO, ::Type{T}, n::Integer) where {T} = T[jlread(io, T) for _=1:n] -# Use internal convert function (for pointer conversion) to avoid invalidations -pconvert(T, x) = Base.convert(T, x) -jlsizeof(x) = Base.sizeof(x) -jlunsafe_store!(p, x) = Base.unsafe_store!(p, x) -jlunsafe_load(p) = Base.unsafe_load(p) -include("mmapio.jl") -include("bufferedio.jl") include("macros_utils.jl") -include("types.jl") +include("io/mmapio.jl") +include("io/bufferedio.jl") include("julia_compat.jl") include("file_header.jl") include("Lookup3.jl") @@ -181,9 +159,6 @@ end FallbackType(::Type{MmapIO}) = IOStream FallbackType(::Type{IOStream}) = nothing -# The delimiter is excluded by default -read_bytestring(io::Union{IOStream, IOBuffer}) = String(readuntil(io, 0x00)) - const OPEN_FILES = Dict{String,WeakRef}() const OPEN_FILES_LOCK = ReentrantLock() function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool, iotype::T=DEFAULT_IOTYPE; @@ -196,7 +171,6 @@ function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool, ) where T<:Union{Type{IOStream},Type{MmapIO}} mmaparrays && @warn "mmaparrays keyword is currently ignored" maxlog=1 verify_compressor(compress) - exists = ispath(fname) # Can only open multiple in parallel if mode is "r" if parallel_read && (wr, create, truncate) != (false, false, false) @@ -206,37 +180,34 @@ function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool, lock(OPEN_FILES_LOCK) f = try + exists = ispath(fname) if exists rname = realpath(fname) # catch existing file system entities that are not regular files !isfile(rname) && throw(ArgumentError("not a regular file: $fname")) + f = get(OPEN_FILES, rname, (;value=nothing)).value # If in serial, return existing handle. In parallel always generate a new handle - if haskey(OPEN_FILES, rname) - ref = OPEN_FILES[rname] - f = ref.value - if !isnothing(f) - if parallel_read - f.writable && throw(ArgumentError("Tried to open file in a parallel context but it is open in write-mode elsewhere in a serial context.")) - else - if truncate - throw(ArgumentError("attempted to truncate a file that was already open")) - elseif !isa(f, JLDFile{iotype}) - throw(ArgumentError("attempted to open file with $iotype backend, but already open with a different backend")) - elseif f.writable != wr - current = wr ? "read/write" : "read-only" - previous = f.writable ? "read/write" : "read-only" - throw(ArgumentError("attempted to open file $(current), but file was already open $(previous)")) - elseif f.compress != compress - throw(ArgumentError("attempted to open file with compress=$(compress), but file was already open with compress=$(f.compress)")) - elseif f.mmaparrays != mmaparrays - throw(ArgumentError("attempted to open file with mmaparrays=$(mmaparrays), but file was already open with mmaparrays=$(f.mmaparrays)")) - end - - f = f::JLDFile{iotype} - f.n_times_opened += 1 - return f + if !isnothing(f) + if parallel_read + f.writable && throw(ArgumentError("Tried to open file in a parallel context but it is open in write-mode elsewhere in a serial context.")) + else + if truncate + throw(ArgumentError("attempted to truncate a file that was already open")) + elseif !isa(f, JLDFile{iotype}) + throw(ArgumentError("attempted to open file with $iotype backend, but already open with a different backend")) + elseif f.writable != wr + current = wr ? "read/write" : "read-only" + previous = f.writable ? "read/write" : "read-only" + throw(ArgumentError("attempted to open file $(current), but file was already open $(previous)")) + elseif f.compress != compress + throw(ArgumentError("attempted to open file with compress=$(compress), but file was already open with compress=$(f.compress)")) + elseif f.mmaparrays != mmaparrays + throw(ArgumentError("attempted to open file with mmaparrays=$(mmaparrays), but file was already open with mmaparrays=$(f.mmaparrays)")) end + f = f::JLDFile{iotype} + f.n_times_opened += 1 + return f end end end @@ -246,78 +217,57 @@ function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool, rname = realpath(fname) f = JLDFile(io, rname, wr, created, plain, compress, mmaparrays) - if !parallel_read - OPEN_FILES[rname] = WeakRef(f) - end + !parallel_read && (OPEN_FILES[rname] = WeakRef(f)) f - catch e - rethrow(e) finally unlock(OPEN_FILES_LOCK) end - if f.written - f.base_address = 512 - if f isa JLDFile{MmapIO} - f.root_group = Group{JLDFile{MmapIO}}(f) - f.types_group = Group{JLDFile{MmapIO}}(f) - elseif f isa JLDFile{IOStream} - f.root_group = Group{JLDFile{IOStream}}(f) - f.types_group = Group{JLDFile{IOStream}}(f) - end - else - try - load_file_metadata!(f) - catch e - close(f) - throw(e) - end + try + initialize_fileobject!(f) + catch e + close(f) + throw(e) end merge!(f.typemap, typemap) return f end -function load_file_metadata!(f) +function initialize_fileobject!(f::JLDFile) + if f.written + f.base_address = 512 + f.root_group = Group{typeof(f)}(f) + f.types_group = Group{typeof(f)}(f) + return + end superblock = find_superblock(f) f.end_of_data = superblock.end_of_file_address f.base_address = superblock.base_address f.root_group_offset = superblock.root_group_object_header_address if superblock.version >= 2 verify_file_header(f) - else - @warn "This file was not written with JLD2. Some things may not work." - if f.writable - close(f) - throw(UnsupportedVersionException("This file can not be edited by JLD2. Please open in read-only mode.")) - end + elseif f.writable + close(f) + throw(UnsupportedVersionException("This file can not be edited by JLD2. Please open in read-only mode.")) end - #try - f.root_group = load_group(f, f.root_group_offset) - - if haskey(f.root_group.written_links, "_types") - types_group_offset = f.root_group.written_links["_types"]::RelOffset - f.types_group = f.loaded_groups[types_group_offset] = load_group(f, types_group_offset) - i = 0 - for (offset::RelOffset) in values(f.types_group.written_links) - f.datatype_locations[offset] = CommittedDatatype(offset, i += 1) - end - resize!(f.datatypes, length(f.datatype_locations)) - else - f.types_group = Group{typeof(f)}(f) - end - # catch e - # show(e) - # f.types_group = Group{typeof(f)}(f) + f.root_group = load_group(f, f.root_group_offset) - # end - nothing + types_offset = get(f.root_group.written_links, "_types", UNDEFINED_ADDRESS) + if types_offset != UNDEFINED_ADDRESS + f.types_group = f.loaded_groups[types_offset] = load_group(f, types_offset) + for (i, offset::RelOffset) in enumerate(values(f.types_group.written_links)) + f.datatype_locations[offset] = CommittedDatatype(offset, i) + end + resize!(f.datatypes, length(f.datatype_locations)) + else + f.types_group = Group{typeof(f)}(f) + end end """ - jldopen(fname::AbstractString, mode::AbstractString; - iotype=MmapIO, compress=false, typemap=Dict()) + jldopen(file, mode::AbstractString; iotype=MmapIO, compress=false, typemap=Dict()) -Opens a JLD2 file at path `fname`. +Opens a JLD2 file at path `file`. Alternatively `file` may be a suitable IO object. Options for `mode`: - `"r"`: Open for reading only, failing if no file exists @@ -326,13 +276,43 @@ Options for `mode`: - `"a"`/`"a+"`: Open for reading and writing, creating a new file if none exists, but preserving the existing file if one is present """ -function jldopen(fname::AbstractString, mode::AbstractString="r"; iotype=DEFAULT_IOTYPE, kwargs...) +function jldopen(fname::Union{AbstractString, IO}, mode::AbstractString="r"; iotype=DEFAULT_IOTYPE, kwargs...) (wr, create, truncate) = mode == "r" ? (false, false, false) : mode == "r+" ? (true, false, false) : mode == "a" || mode == "a+" ? (true, true, false) : mode == "w" || mode == "w+" ? (true, true, true) : throw(ArgumentError("invalid open mode: $mode")) - jldopen(fname, wr, create, truncate, iotype; kwargs...) + if fname isa AbstractString + jldopen(fname, wr, create, truncate, iotype; kwargs...) + else + jldopen(fname, wr, create, truncate; kwargs...) + end +end + + +function jldopen(io::IO, writable::Bool, create::Bool, truncate::Bool; + plain::Bool=false, + compress=false, + typemap::Dict{String}=Dict{String,Any}(), + ) + verify_compressor(compress) + # figure out what kind of io object this is + # for now assume it is + !io.readable && throw("IO object is not readable") + if io.seekable && writable && iswritable(io) + # Here could have a more lightweight wrapper + # that just ensures API is defined + created = truncate + io = RWBuffer(io) + f = JLDFile(io, "RWBuffer", writable, created, plain, compress, false) + elseif (false == writable == create == truncate) + # Were trying to read, so let's hope `io` implements `read` and bytesavailable + io = ReadOnlyBuffer(io) + f = JLDFile(io, "ReadOnlyBuffer", false, false, plain, compress, false) + end + initialize_fileobject!(f) + merge!(f.typemap, typemap) + return f end """ @@ -365,11 +345,8 @@ function prewrite(f::JLDFile) f.written = true end -Base.read(f::JLDFile, name::AbstractString) = f.root_group[name] -#Base.write(f::JLDFile, name::AbstractString, obj, wsession::JLDWriteSession=JLDWriteSession()) = -# write(f.root_group, name, obj, wsession) - -Base.getindex(f::JLDFile, name::AbstractString) = f.root_group[name] +Base.read(f::JLDFile, name::AbstractString) = Base.inferencebarrier(f.root_group[name]) +Base.getindex(f::JLDFile, name::AbstractString) = Base.inferencebarrier(f.root_group[name]) Base.setindex!(f::JLDFile, obj, name::AbstractString) = (f.root_group[name] = obj; f) Base.haskey(f::JLDFile, name::AbstractString) = haskey(f.root_group, name) Base.isempty(f::JLDFile) = isempty(f.root_group) @@ -451,6 +428,10 @@ function jld_finalizer(f::JLDFile{IOStream}) close(f) end +function jld_finalizer(f::JLDFile) + f.n_times_opened == 0 && return + close(f) +end # Display functions # simple one-line display (without trailing line break) @@ -504,7 +485,8 @@ include("data/custom_serialization.jl") include("data/writing_datatypes.jl") include("data/reconstructing_datatypes.jl") -include("dataio.jl") +include("io/dataio.jl") +include("io/io_wrappers.jl") include("loadsave.jl") include("backwards_compatibility.jl") include("inlineunion.jl") diff --git a/src/bufferedio.jl b/src/bufferedio.jl deleted file mode 100644 index ac67e754..00000000 --- a/src/bufferedio.jl +++ /dev/null @@ -1,158 +0,0 @@ -# -# BufferedIO -# - -const DEFAULT_BUFFER_SIZE = 1024 - -struct BufferedWriter <: IO - f::IOStream - buffer::Vector{UInt8} - file_position::Int64 - position::Base.RefValue{Int} -end - -function BufferedWriter(io::IOStream, buffer_size::Int) - pos = position(io) - skip(io, buffer_size) - BufferedWriter(io, Vector{UInt8}(undef, buffer_size), pos, Ref{Int}(0)) -end -Base.show(io::IO, ::BufferedWriter) = print(io, "BufferedWriter") - -function finish!(io::BufferedWriter) - f = io.f - buffer = io.buffer - io.position[] == length(buffer) || - error("buffer not written to end; position is $(io.position[]) but length is $(length(buffer))") - seek(f, io.file_position) - jlwrite(f, buffer) - io.position[] = 0 - nothing -end - -@inline function _write(io::BufferedWriter, x) - position = io.position[] - buffer = io.buffer - n = jlsizeof(x) - n + position <= length(buffer) || throw(EOFError()) - io.position[] = position + n - jlunsafe_store!(Ptr{typeof(x)}(pointer(buffer, position+1)), x) - # Base.show_backtrace(STDOUT, backtrace()) - # gc() - return n -end -@inline jlwrite(io::BufferedWriter, x::UInt8) = _write(io, x) -@inline jlwrite(io::BufferedWriter, x::Int8) = _write(io, x) -@inline jlwrite(io::BufferedWriter, x::Plain) = _write(io, x) -@inline Base.write(io::BufferedWriter, x::UInt8) = _write(io, x) -@inline Base.write(io::BufferedWriter, x::Int8) = _write(io, x) -@inline Base.write(io::BufferedWriter, x::Plain) = _write(io, x) - -function Base.unsafe_write(io::BufferedWriter, x::Ptr{UInt8}, n::UInt64) - buffer = io.buffer - position = io.position[] - n + position <= length(buffer) || throw(EOFError()) - unsafe_copyto!(pointer(buffer, position+1), x, n) - io.position[] = position + n - return n -end - -Base.position(io::BufferedWriter) = io.file_position + io.position[] - -struct BufferedReader <: IO - f::IOStream - buffer::Vector{UInt8} - file_position::Int64 - position::Base.RefValue{Int} -end - -BufferedReader(io::IOStream) = - BufferedReader(io, Vector{UInt8}(), position(io), Ref{Int}(0)) -Base.show(io::IO, ::BufferedReader) = print(io, "BufferedReader") - -function readmore!(io::BufferedReader, n::Integer) - f = io.f - amount = max(bytesavailable(f), n) - buffer = io.buffer - oldlen = length(buffer) - resize!(buffer, oldlen + amount) - unsafe_read(f, pointer(buffer, oldlen+1), amount) -end - -@inline function _read(io::BufferedReader, T::DataType) - position = io.position[] - buffer = io.buffer - if length(buffer) - position < jlsizeof(T) - readmore!(io, jlsizeof(T)) - end - io.position[] = position + jlsizeof(T) - jlunsafe_load(Ptr{T}(pointer(buffer, position+1))) -end -@inline jlread(io::BufferedReader, T::Type{UInt8}) = _read(io, T) -@inline jlread(io::BufferedReader, T::Type{Int8}) = _read(io, T) -@inline jlread(io::BufferedReader, T::PlainType) = _read(io, T) - -function jlread(io::BufferedReader, ::Type{T}, n::Int) where T - position = io.position[] - buffer = io.buffer - m = jlsizeof(T) * n - if length(buffer) - position < m - readmore!(io, m) - end - io.position[] = position + m - arr = Vector{T}(undef, n) - unsafe_copyto!(pointer(arr), Ptr{T}(pointer(buffer, position+1)), n) - arr -end -jlread(io::BufferedReader, ::Type{T}, n::Integer) where {T} = - jlread(io, T, Int(n)) - -Base.position(io::BufferedReader) = io.file_position + io.position[] - -function adjust_position!(io::BufferedReader, position::Integer) - if position < 0 - throw(ArgumentError("cannot seek before start of buffer")) - elseif position > length(io.buffer) - readmore!(io, position - length(io.buffer)) - end - io.position[] = position -end - -Base.seek(io::BufferedReader, offset::Integer) = - adjust_position!(io, offset - io.file_position) - -Base.skip(io::BufferedReader, offset::Integer) = - adjust_position!(io, io.position[] + offset) - -finish!(io::BufferedReader) = - seek(io.f, io.file_position + io.position[]) - -function truncate_and_close(io::IOStream, endpos::Integer) - truncate(io, endpos) - close(io) -end - - -# We sometimes need to compute checksums. We do this by first calling begin_checksum when -# starting to handle whatever needs checksumming, and calling end_checksum afterwards. Note -# that we never compute nested checksums, but we may compute multiple checksums -# simultaneously. - -function begin_checksum_read(io::IOStream) - BufferedReader(io) -end -function begin_checksum_write(io::IOStream, sz::Integer) - BufferedWriter(io, sz) -end -function end_checksum(io::Union{BufferedReader,BufferedWriter}) - ret = Lookup3.hash(io.buffer, 1, io.position[]) - finish!(io) - ret -end - -function update_checksum(io, chunk_start, chunk_end) - seek(io, chunk_start) - cio = begin_checksum_read(io) - seek(cio, chunk_end) - seek(io, chunk_end) - jlwrite(io, end_checksum(cio)) -end \ No newline at end of file diff --git a/src/compression.jl b/src/compression.jl index 211e0514..05f5dbe1 100644 --- a/src/compression.jl +++ b/src/compression.jl @@ -72,7 +72,7 @@ end write(f.root_group, name, obj, wsession; compress=compress) # groups.jl 112 -function Base.write(g::Group, name::AbstractString, obj, wsession::JLDWriteSession=JLDWriteSession(); compress=nothing) +@nospecializeinfer function Base.write(g::Group, name::AbstractString, @nospecialize(obj), wsession::JLDWriteSession=JLDWriteSession(); compress=nothing) f = g.f prewrite(f) (g, name) = pathize(g, name, true) @@ -200,7 +200,9 @@ function write_chunked_storage_message( io::IO, data_address) end -function decompress!(inptr::Ptr, data_length, element_size, n, decompressor::TranscodingStreams.Codec) +function decompress!(io::MemoryBackedIO, data_length, element_size, n, decompressor::TranscodingStreams.Codec) + ensureroom(io, data_length) + inptr = io.curptr TranscodingStreams.initialize(decompressor) data = transcode(decompressor, unsafe_wrap(Array, Ptr{UInt8}(inptr), data_length))::Array{UInt8, 1} TranscodingStreams.finalize(decompressor) @@ -223,41 +225,12 @@ function decompress!(data::Vector{UInt8}, data_length, element_size, num_element end return data_new end -function decompress!(io::IOStream, data_length, element_size, n, decompressor) - read!(TranscodingStreams.TranscodingStream(decompressor, io), Vector{UInt8}(undef, element_size*n)) -end - -function read_compressed_array!(v::Array{T}, f::JLDFile{MmapIO}, - rr::ReadRepresentation{T,RR}, - data_length::Integer, - filters - ) where {T,RR} - invoke_again, decompressors = get_decompressor(filters) - if invoke_again - return Base.invokelatest(read_compressed_array!, v, f, rr, data_length, filters)::typeof(v) - end - io = f.io - inptr = io.curptr - element_size = odr_sizeof(RR) - n = length(v) - data = decompress!(inptr, data_length, element_size, n, decompressors[end]) - if length(decompressors) > 1 - for decompressor in decompressors[end-1:-1:1] - data = decompress!(data, length(data), element_size, n, decompressor) - end - end - @simd for i = 1:length(v) - dataptr = Ptr{Cvoid}(pointer(data, odr_sizeof(RR)*(i-1)+1)) - if !jlconvert_canbeuninitialized(rr) || jlconvert_isinitialized(rr, dataptr) - @inbounds v[i] = jlconvert(rr, f, dataptr, NULL_REFERENCE) - end - end - io.curptr = inptr + data_length - v +function decompress!(io::IO, data_length, element_size, n, decompressor) + read!(TranscodingStreams.TranscodingStream(decompressor, io), Vector{UInt8}(undef, element_size*n)) end -function read_compressed_array!(v::Array{T}, f::JLDFile{IOStream}, +function read_compressed_array!(v::Array{T}, f::JLDFile, rr::ReadRepresentation{T,RR}, data_length::Integer, filters, @@ -277,12 +250,13 @@ function read_compressed_array!(v::Array{T}, f::JLDFile{IOStream}, data = decompress!(data, length(data), element_size, n, decompressor) end end - @simd for i = 1:n - dataptr = Ptr{Cvoid}(pointer(data, odr_sizeof(RR)*(i-1)+1)) + cp0 = Ptr{Cvoid}(pointer(data)) + @simd for i = eachindex(v) + dataptr = cp0 + element_size*(i-1) if !jlconvert_canbeuninitialized(rr) || jlconvert_isinitialized(rr, dataptr) - @inbounds v[i] = jlconvert(rr, f, dataptr, NULL_REFERENCE) + v[i] = jlconvert(rr, f, dataptr, NULL_REFERENCE) end end seek(io, data_offset + data_length) v -end +end \ No newline at end of file diff --git a/src/data/custom_serialization.jl b/src/data/custom_serialization.jl index 3100dd76..380784ba 100644 --- a/src/data/custom_serialization.jl +++ b/src/data/custom_serialization.jl @@ -24,7 +24,7 @@ CustomSerialization(::Type{WrittenAs}, ::Type{WrittenAs}, odr) where {WrittenAs} CustomSerialization(::Type{WrittenAs}, ::Type{ReadAs}, odr) where {WrittenAs,ReadAs} = CustomSerialization{WrittenAs,odr} -odr_sizeof(::Type{CustomSerialization{T,ODR}}) where {T,ODR} = odr_sizeof(ODR) +odr_sizeof(::Type{CustomSerialization{T,ODR}}) where {T,ODR} = odr_sizeof(ODR)::Int # Usually we want to convert the object and then write it. h5convert!(out::Pointers, ::Type{CustomSerialization{T,ODR}}, f::JLDFile, diff --git a/src/data/reconstructing_datatypes.jl b/src/data/reconstructing_datatypes.jl index 01e6fee4..ebb0fd87 100644 --- a/src/data/reconstructing_datatypes.jl +++ b/src/data/reconstructing_datatypes.jl @@ -16,7 +16,10 @@ function read_field_datatypes(f::JLDFile, dt::CompoundDatatype, attrs::Vector{Re end isnothing(namevec) && (namevec = string.(dt.names)) isnothing(offsets) && (offsets = fill(NULL_REFERENCE, length(namevec))) - OrderedDict{String, RelOffset}(namevec .=> offsets) + namevec::Vector{String} + offsets::Vector{RelOffset} + v = [n=>v for (n,v) in zip(namevec,offsets)] + OrderedDict{String, RelOffset}(v) end """ @@ -159,7 +162,7 @@ If `hard_failure` is true, then throw a `TypeMappingException` instead of attemp reconstruction. This helps in cases where we can't know if reconstructed parametric types will have a matching memory layout without first inspecting the memory layout. """ -function constructrr(f::JLDFile, T::DataType, dt::CompoundDatatype, +@nospecializeinfer function constructrr(f::JLDFile, @nospecialize(T::DataType), dt::CompoundDatatype, attrs::Vector{ReadAttribute}, hard_failure::Bool=false) field_datatypes = read_field_datatypes(f, dt, attrs) @@ -249,7 +252,7 @@ function constructrr(f::JLDFile, T::DataType, dt::CompoundDatatype, tequal &= odr_offsets == offsets tequal && return (ReadRepresentation{T,wodr}(), true) end - return (ReadRepresentation{T,OnDiskRepresentation{offsets, Tuple{types...}, Tuple{odrs...}, offsets[end]+odr_sizeof(odrs[end])}()}(), false) + return (ReadRepresentation{T,OnDiskRepresentation{offsets, Tuple{types...}, Tuple{odrs...}, Int(offsets[end]+odr_sizeof(odrs[end]))}()}(), false) end function constructrr(f::JLDFile, u::Upgrade, dt::CompoundDatatype, @@ -603,7 +606,7 @@ function reconstruct_odr(f::JLDFile, dt::CompoundDatatype, push!(offsets, offset) offset += odr_sizeof(dtrr) end - OnDiskRepresentation{(offsets...,), Tuple{types...}, Tuple{h5types...},dt.size}() + OnDiskRepresentation{(offsets...,), Tuple{types...}, Tuple{h5types...},Int(dt.size)}() end # Reconstruct type that is a "lost cause": either we were not able to resolve @@ -616,7 +619,7 @@ function reconstruct_compound(f::JLDFile, T::String, dt::H5Datatype, fnames = tuple((Symbol(k) for k in keys(field_datatypes))...,) if !any(jlconvert_canbeuninitialized(ReadRepresentation{types[i], odrs[i]}()) for i = 1:length(types)) rt = ReconstructedStatic{Symbol(T), fnames, Tuple{types...}} - odr = OnDiskRepresentation{(0,), Tuple{NamedTuple{fnames,Tuple{types...}}}, Tuple{rodr}, dt.size}() + odr = OnDiskRepresentation{(0,), Tuple{NamedTuple{fnames,Tuple{types...}}}, Tuple{rodr}, Int(dt.size)}() return (ReadRepresentation{rt, odr}(), false) end T = ReconstructedMutable{Symbol(T), fnames, Tuple{types...}} diff --git a/src/data/specialcased_types.jl b/src/data/specialcased_types.jl index 768a1cec..740fb39c 100644 --- a/src/data/specialcased_types.jl +++ b/src/data/specialcased_types.jl @@ -137,8 +137,8 @@ function jlconvert(rr::ReadRepresentation{String, FixedLengthAsciiString{SpacePa unsafe_copyto!(pointer(data), pconvert(Ptr{UInt8}, ptr), N) rstrip(String(data)) end -odr_sizeof(x::AsciiString) = x.length -odr_sizeof(x::Type{FixedLengthAsciiString{TERM, N}}) where {TERM, N} = UInt32(N)#::Int +odr_sizeof(x::AsciiString) = Int(x.length) +odr_sizeof(x::Type{FixedLengthAsciiString{TERM, N}}) where {TERM, N} = Int(N)::Int diff --git a/src/data/type_defs.jl b/src/data/type_defs.jl index b57c6e30..4537977c 100644 --- a/src/data/type_defs.jl +++ b/src/data/type_defs.jl @@ -4,36 +4,9 @@ const Pointers = Union{Ptr{Cvoid}, IndirectPointer} struct OnDiskRepresentation{Offsets,JLTypes,H5Types, Size} end odr_sizeof(::Nothing) = 0 -@static if VERSION ≥ v"1.9.0-DEV" - # Modelled after Base.datatype_alignment - function datatype_size(dt::DataType) - Base.@_foldable_meta - dt.layout == C_NULL && throw(UndefRefError()) - size = unsafe_load(pconvert(Ptr{Base.DataTypeLayout}, dt.layout)).size - return Int(size) - end - @Base.pure odr_sizeof(x::DataType) = datatype_size(x) -else - @Base.pure odr_sizeof(x::DataType) = Int(x.size) -end +odr_sizeof(x::DataType) = Core.sizeof(x) struct UnknownType{T, P} end - -# Horrible Invalidations -# function Base.show(io::IO, x::Type{UnknownType{T, P}}) where {T, P} -# print(io, "UnknownType:\"", T,"\"") -# if !isempty(P.parameters) -# print(io, "{") -# for p in P.parameters -# print(io, p) -# if p !== P.parameters[end] -# print(io, ", ") -# end -# end -# print(io, "}") -# end -# end - struct Vlen{T} size::UInt32 id::GlobalHeapID diff --git a/src/data/writing_datatypes.jl b/src/data/writing_datatypes.jl index 3619a289..1f60b49d 100644 --- a/src/data/writing_datatypes.jl +++ b/src/data/writing_datatypes.jl @@ -21,7 +21,7 @@ end # Carries the type and on-disk representation of data to be read from # the disk -odr_sizeof(::ReadRepresentation{T,S}) where {T,S} = odr_sizeof(S) +odr_sizeof(::ReadRepresentation{T,S}) where {T,S} = odr_sizeof(S)::Int # Determines whether a specific field type should be saved in the file function hasfielddata(@nospecialize(T), encounteredtypes=DataType[])::Bool @@ -45,7 +45,7 @@ end # Gets the size of an on-disk representation function odr_sizeof(::OnDiskRepresentation{Offsets,JLTypes,H5Types,Size}) where {Offsets,JLTypes,H5Types,Size} - Size + Size::Int end # Determines whether a type will have the same layout on disk as in memory @@ -138,7 +138,7 @@ _odr(writtenas::DataType, readas::DataType, odr) = end check_writtenas_type(::DataType) = nothing check_writtenas_type(::Any) = throw(ArgumentError("writeas(leaftype) must return a leaf type")) -h5type(f::JLDFile, @nospecialize(x)) = h5type(f, writeas(typeof(x)), x) +@nospecializeinfer h5type(f::JLDFile, @nospecialize(x)) = h5type(f, writeas(typeof(x)), x) # Make a compound datatype from a set of names and types @nospecializeinfer function commit_compound(f::JLDFile, names::AbstractVector{Symbol}, @@ -656,7 +656,7 @@ end offset += odr_sizeof(fodr) end - OnDiskRepresentation{(offsets...,), Tuple{T.types...}, Tuple{odrs...}, offset}() + OnDiskRepresentation{(offsets...,), Tuple{T.types...}, Tuple{odrs...}, Int(offset)}() end abstract type DataMode end @@ -669,7 +669,7 @@ datamode(::DataType) = ReferenceFree() datamode(::FixedLengthString) = ReferenceFree() datamode(::AsciiString) = ReferenceFree() datamode(::Nothing) = ReferenceFree() -function datamode(odr::OnDiskRepresentation{Offsets,JLTypes,H5Types,Size} where {Offsets,JLTypes,Size}) where H5Types +function datamode(::OnDiskRepresentation{Offsets,JLTypes,H5Types,Size} where {Offsets,JLTypes,Size}) where H5Types for ty in H5Types.parameters datamode(ty) == HasReferences() && return HasReferences() end diff --git a/src/dataio.jl b/src/dataio.jl deleted file mode 100644 index 66b0693d..00000000 --- a/src/dataio.jl +++ /dev/null @@ -1,274 +0,0 @@ -""" - read_scalar(f::JLDFile, rr, header_offset::RelOffset) - -Read raw data representing a scalar with read representation `rr` from the current position -of JLDFile `f`. `header_offset` is the [`RelOffset`](@ref) of the object header, used to resolve -cycles. -""" -function read_scalar end - -""" - read_array!(v::Array, f::JLDFile, rr) - -Fill the array `v` with the contents of JLDFile `f` at the current position, assuming a -[`ReadRepresentation`](@ref) `rr`. -""" -function read_array! end - - -""" - read_compressed_array!(v::Array, f::JLDFile, rr, data_length::Int, Val(filter_id)) - -Fill the array `v` with the compressed contents of JLDFile `f` at the current position, -assuming a [`ReadRepresentation`](@ref) `rr` and that the compressed data has length `data_length`. -""" -function read_compressed_array! end - -# -# MmapIO -# - -# Cutoff for using ordinary IO instead of copying into mmapped region -const MMAP_CUTOFF = 1048576 - -function read_scalar(f::JLDFile{MmapIO}, @nospecialize(rr), header_offset::RelOffset)::Any - io = f.io - inptr = io.curptr - obj = jlconvert(rr, f, inptr, header_offset) - io.curptr = inptr + odr_sizeof(rr) - obj -end - -function read_array!(v::Array{T}, f::JLDFile{MmapIO}, - rr::ReadRepresentation{T,T}) where T - io = f.io - inptr = io.curptr - n = length(v) - nb = odr_sizeof(T)*n - if nb > MMAP_CUTOFF && (!Sys.iswindows() || !f.written) - # It turns out that regular IO is faster here (at least on OS X), but on Windows, - # we shouldn't use ordinary IO to read, since coherency with the memory map is not - # guaranteed - mmapio = f.io - regulario = mmapio.f - seek(regulario, inptr - io.startptr) - unsafe_read(regulario, pointer(v), nb) - else - unsafe_copyto!(pointer(v), pconvert(Ptr{T}, inptr), n) - end - io.curptr = inptr + odr_sizeof(T) * n - v -end - -function read_array!(v::Array{T}, f::JLDFile{MmapIO}, - rr::ReadRepresentation{T,RR}) where {T,RR} - io = f.io - inptr = io.curptr - n = length(v) - @simd for i = 1:n - if !jlconvert_canbeuninitialized(rr) || jlconvert_isinitialized(rr, inptr) - @inbounds v[i] = jlconvert(rr, f, inptr, NULL_REFERENCE) - end - inptr += odr_sizeof(RR) - end - io.curptr = inptr + odr_sizeof(RR) * n - v -end - - -function write_data(io::MmapIO, f::JLDFile, data, odr::S, ::ReferenceFree, - wsession::JLDWriteSession) where S - io = f.io - ensureroom(io, odr_sizeof(odr)) - cp = io.curptr - h5convert!(cp, odr, f, data, wsession) - io.curptr == cp || throw(InternalError()) - io.curptr = cp + odr_sizeof(odr) - nothing -end - -function write_data(io::MmapIO, f::JLDFile, data, odr::S, ::HasReferences, - wsession::JLDWriteSession) where S - io = f.io - ensureroom(io, odr_sizeof(odr)) - p = position(io) - cp = IndirectPointer(io, p) - h5convert!(cp, odr, f, data, wsession) - seek(io, p + odr_sizeof(odr)) - nothing -end - -@static if Sys.isunix() - function raw_write(io::MmapIO, ptr::Ptr{UInt8}, nb::Int) - if nb > MMAP_CUTOFF - pos = position(io) - - # Ensure that the current page has been flushed to disk - msync(io, pos, min(io.endptr - io.curptr, nb)) - - # Write to the underlying IOStream - regulario = io.f - seek(regulario, pos) - unsafe_write(regulario, ptr, nb) - - # Invalidate cache of any pages that were just written to - msync(io, pos, min(io.n - pos, nb), true) - - # Make sure the mapping is encompasses the written data - ensureroom(io, nb + 1) - - # Seek to the place we just wrote - seek(io, pos + nb) - else - unsafe_write(io, ptr, nb) - end - nothing - end -else - # Don't use ordinary IO to write files on Windows, since coherency with memory map is - # not guaranteed - function raw_write(io::MmapIO, ptr::Ptr{UInt8}, nb::Int) - unsafe_write(io, ptr, nb) - nothing - end -end - -write_data(io::MmapIO, f::JLDFile, data::Array{T}, odr::Type{T}, ::ReferenceFree, - wsession::JLDWriteSession) where {T} = - raw_write(io, Ptr{UInt8}(pointer(data)), odr_sizeof(odr) * length(data)) - -function write_data(io::MmapIO, f::JLDFile, data::Array{T}, odr::S, ::ReferenceFree, - wsession::JLDWriteSession) where {T,S} - io = f.io - ensureroom(io, odr_sizeof(odr) * length(data)) - cp = cporig = io.curptr - @simd for i = 1:length(data) - @inbounds h5convert!(cp, odr, f, data[i], wsession) - cp += odr_sizeof(odr) - end - io.curptr == cporig || throw(InternalError()) - io.curptr = cp - nothing -end - -function write_data(io::MmapIO, f::JLDFile, data::Array{T}, odr::S, ::HasReferences, - wsession::JLDWriteSession) where {T,S} - io = f.io - ensureroom(io, odr_sizeof(odr) * length(data)) - p = position(io) - cp = IndirectPointer(io, p) - - for i = 1:length(data) - if isassigned(data, i) - @inbounds h5convert!(cp, odr, f, data[i], wsession) - else - @inbounds h5convert_uninitialized!(cp, odr) - end - cp += odr_sizeof(odr) - end - - seek(io, cp.offset) - nothing -end - -# -# IOStream/BufferedWriter -# - -function read_scalar(f::JLDFile{IOStream}, rr, header_offset::RelOffset)::Any - r = Vector{UInt8}(undef, odr_sizeof(rr)) - @GC.preserve r begin - unsafe_read(f.io, pointer(r), odr_sizeof(rr)) - jlconvert(rr, f, pointer(r), header_offset) - end -end - - -function read_array!(v::Array{T}, f::JLDFile{IOStream}, - rr::ReadRepresentation{T,T}) where T - unsafe_read(f.io, pointer(v), odr_sizeof(T)*length(v)) - v -end - -function read_array!(v::Array{T}, f::JLDFile{IOStream}, - rr::ReadRepresentation{T,RR}) where {T,RR} - n = length(v) - nb = odr_sizeof(RR)*n - io = f.io - data = read!(io, Vector{UInt8}(undef, nb)) - @GC.preserve data begin - @simd for i = 1:n - dataptr = Ptr{Cvoid}(pointer(data, odr_sizeof(RR)*(i-1)+1)) - if !jlconvert_canbeuninitialized(rr) || jlconvert_isinitialized(rr, dataptr) - @inbounds v[i] = jlconvert(rr, f, dataptr, NULL_REFERENCE) - end - end - end - v -end - -function write_data(io::BufferedWriter, f::JLDFile, data, odr::S, ::DataMode, - wsession::JLDWriteSession) where S - position = io.position[] - h5convert!(Ptr{Cvoid}(pointer(io.buffer, position+1)), odr, f, data, wsession) - io.position[] = position + odr_sizeof(odr) - nothing -end - -function write_data(io::BufferedWriter, f::JLDFile, data::Array{T}, odr::Type{T}, ::ReferenceFree, - wsession::JLDWriteSession) where T - unsafe_write(io, Ptr{UInt8}(pointer(data)), odr_sizeof(odr) * length(data)) - nothing -end - -function write_data(io::IOStream, f::JLDFile, data::Array{T}, odr::Type{T}, ::ReferenceFree, - wsession::JLDWriteSession) where T - unsafe_write(io, Ptr{UInt8}(pointer(data)), odr_sizeof(odr) * length(data)) - nothing -end - -function write_data(io::IOStream, f::JLDFile, data, odr, _, wsession::JLDWriteSession) - buf = Vector{UInt8}(undef, odr_sizeof(odr)) - cp = Ptr{Cvoid}(pointer(buf)) - h5convert!(cp, odr, f, data, wsession) - unsafe_write(io, Ptr{UInt8}(pointer(buf)), odr_sizeof(odr)) - nothing -end - -function write_data(io::BufferedWriter, f::JLDFile, data::Array{T}, odr::S, - ::DataMode, wsession::JLDWriteSession) where {T,S} - position = io.position[] - cp = Ptr{Cvoid}(pointer(io.buffer, position+1)) - @simd for i = 1:length(data) - if isassigned(data, i) - @inbounds h5convert!(cp, odr, f, data[i], wsession) - else - @inbounds h5convert_uninitialized!(cp, odr) - end - cp += odr_sizeof(odr) - end - io.position[] = position + odr_sizeof(odr) * length(data) - nothing -end - -function write_data(io::IOStream, f::JLDFile, data::Array{T}, odr::S, wm::DataMode, - wsession::JLDWriteSession) where {T,S} - nb = odr_sizeof(odr) * length(data) - buf = Vector{UInt8}(undef, nb) - pos = position(io) - cp = Ptr{Cvoid}(pointer(buf)) - @simd for i = 1:length(data) - if isassigned(data, i) - @inbounds h5convert!(cp, odr, f, data[i], wsession) - else - @inbounds h5convert_uninitialized!(cp, odr) - end - cp += odr_sizeof(odr) - end - # We might seek around in the file as a consequence of writing stuff, so seek back. We - # don't need to worry about this for a BufferedWriter, since it will seek back before - # writing. - !isa(wm, ReferenceFree) && seek(io, pos) - jlwrite(io, buf) - nothing -end diff --git a/src/datalayouts.jl b/src/datalayouts.jl index 5c8b2323..879fc0a9 100644 --- a/src/datalayouts.jl +++ b/src/datalayouts.jl @@ -25,10 +25,10 @@ function DataLayout(f::JLD2.JLDFile, msg::HmWrap{HmDataLayout}) data_offset::Int64 = rf != UNDEFINED_ADDRESS ? fileoffset(f, rf) : typemax(Int64) if version == 4 || version == 3 if storage_type == LcCompact - data_length = Int64(msg.data_size::UInt16) + data_length = Int64(msg.data_size) return DataLayout(version, storage_type, data_length, data_offset) elseif storage_type == LcContiguous - data_length = Int64(msg.data_size::Int64) + data_length = Int64(msg.data_size) return DataLayout(version, storage_type, data_length, data_offset) elseif version == 4 && storage_type == LcChunked chunk_dimensions = Int[msg.dimensions...] diff --git a/src/datasets.jl b/src/datasets.jl index 4d981df7..c8186e5a 100644 --- a/src/datasets.jl +++ b/src/datasets.jl @@ -45,9 +45,7 @@ function load_dataset(f::JLDFile{IO}, offset::RelOffset) where IO throw(InvalidDataException("No datatype message found")) end iscompressed(filter_pipeline) && !ischunked(layout) && throw(InvalidDataException("Compressed data must be chunked")) - - # TODO verify that data length matches - read_data(f, dataspace, dt, layout, filter_pipeline, offset, attrs) + Base.inferencebarrier(read_data(f, dataspace, dt, layout, filter_pipeline, offset, attrs)) end @@ -335,7 +333,6 @@ end f.end_of_data = header_offset + fullsz track!(wsession, data, h5offset(f, header_offset)) - cio = begin_checksum_write(io, fullsz - 4) jlwrite(cio, ObjectStart(size_flag(psz))) write_size(cio, psz) diff --git a/src/datatypes.jl b/src/datatypes.jl index 9bb03392..ef163df5 100644 --- a/src/datatypes.jl +++ b/src/datatypes.jl @@ -279,7 +279,7 @@ function jlwrite(io::IO, dt::CommittedDatatype) jlwrite(io, dt.header_offset) end -function commit(f::JLDFile, +@nospecializeinfer function commit(f::JLDFile, @nospecialize(dt::H5Datatype), attrs::Tuple{Vararg{WrittenAttribute}}=()) psz = jlsizeof(Val(HmDatatype), 64; dt) @@ -355,7 +355,7 @@ end struct ArrayPlaceHolder{T, D} end -odr_sizeof(::Type{ArrayPlaceHolder{T,D}}) where {T,D} = odr_sizeof(T)*prod(D) +odr_sizeof(::Type{ArrayPlaceHolder{T,D}}) where {T,D} = Int(odr_sizeof(T)*prod(D)) function jltype(f::JLDFile, dt::ArrayDatatype) rr = jltype(f, dt.base_type) diff --git a/src/fractal_heaps.jl b/src/fractal_heaps.jl index 79c8340d..6dad3405 100644 --- a/src/fractal_heaps.jl +++ b/src/fractal_heaps.jl @@ -371,7 +371,7 @@ function read_btree(f, offset_hh, offset_bh) m.link_name, m.target end end - links + links::Vector{Tuple{String, RelOffset}} end ########################################################################################### @@ -383,8 +383,8 @@ function read_oldstyle_group(f, v1btree_address, name_index_heap) links = read_v1btree(f, v1btree_address) map(links) do link link_name = read_in_local_heap(f, local_heap, link.link_name_offset) - (link_name, link.obj_header_address) - end + (link_name, link.obj_header_address::RelOffset) + end::Vector{Tuple{String, RelOffset}} end const LOCAL_HEAP_SIGNATURE = htol(0x50414548) # UInt8['H', 'E', 'A', 'P'] diff --git a/src/global_heaps.jl b/src/global_heaps.jl index 6252edab..7f365ad7 100644 --- a/src/global_heaps.jl +++ b/src/global_heaps.jl @@ -16,11 +16,11 @@ isatend(f::JLDFile, gh::GlobalHeap) = heap_object_length(data::AbstractArray) = length(data) heap_object_length(::Any) = 1 -function write_heap_object(f::JLDFile, odr, data, wsession::JLDWriteSession) - psz = odr_sizeof(odr)*heap_object_length(data) +function write_heap_object(f::JLDFile, odr::ODR, data, wsession::JLDWriteSession) where ODR + # The type parameter ODR is needed to convince the compiler to specialize on ODR. + psz = odr_sizeof(odr) * heap_object_length(data) objsz = 8 + jlsizeof(Length) + psz objsz += 8 - mod1(objsz, 8) - io = f.io # This is basically a memory allocation problem. Right now we do it diff --git a/src/groups.jl b/src/groups.jl index 11af3075..b1e122ba 100644 --- a/src/groups.jl +++ b/src/groups.jl @@ -96,7 +96,7 @@ function Base.getindex(g::Group, name::AbstractString) throw(KeyError(name)) end - load_dataset(f, roffset) + Base.inferencebarrier(load_dataset(f, roffset)) end function Base.setindex!(g::Group, obj, name::AbstractString) @@ -231,14 +231,14 @@ function load_group(f::JLDFile, offset::RelOffset) end if fractal_heap_address != UNDEFINED_ADDRESS - records = read_btree(f, fractal_heap_address, name_index_btree) + records = read_btree(f, fractal_heap_address, name_index_btree)::Vector{Tuple{String, RelOffset}} for r in records links[r[1]] = r[2] end end if v1btree_address != UNDEFINED_ADDRESS - records = read_oldstyle_group(f, v1btree_address, name_index_heap) + records = read_oldstyle_group(f, v1btree_address, name_index_heap)::Vector{Tuple{String, RelOffset}} for r in records links[r[1]] = r[2] end diff --git a/src/headermessages.jl b/src/headermessages.jl index f2270a9f..9574d936 100644 --- a/src/headermessages.jl +++ b/src/headermessages.jl @@ -90,7 +90,7 @@ end @pseudostruct HmDataLayout begin version::UInt8 = 4 - if version in (1,2) + if version == 1 || version == 2 dimensionality::UInt8 layout_class::LayoutClass @skip(5) @@ -98,7 +98,7 @@ end dimensions::NTuple{Int(dimensionality), Int32} (layout_class == LcChunked) && element_size::UInt32 if (layout_class == LcCompact) - data_size::UInt32 + data_size::@Int(4) data_address::@Offset data::@Blob(data_size) end @@ -106,19 +106,19 @@ end if version == 3 || version == 4 layout_class::LayoutClass if layout_class == LcCompact - data_size::UInt16 + data_size::@Int(2) data_address::@Offset data::@Blob(data_size) = UInt8[] # don't write anything if nothing is passed end if layout_class == LcContiguous data_address::RelOffset = UNDEFINED_ADDRESS - data_size::Int64 = 0# Lengths + data_size::@Int(8) = 0# Lengths end if version == 3 && layout_class == LcChunked dimensionality::UInt8 data_address::RelOffset dimensions::NTuple{Int(dimensionality), Int32} - data_size::UInt32#element_size::UInt32 + data_size::@Int(4)#UInt32#element_size::UInt32 end if version == 4 && layout_class == LcChunked flags::UInt8 @@ -127,7 +127,7 @@ end dimensions::NTuple{Int(dimensionality), uintofsize(dim_size)} chunk_indexing_type::UInt8 if chunk_indexing_type == 1 # Single Chunk - data_size::Int64 # Lengths + data_size::@Int(8)#Int64 # Lengths filters::UInt32 end if chunk_indexing_type == 3 diff --git a/src/io/bufferedio.jl b/src/io/bufferedio.jl new file mode 100644 index 00000000..3f16b6bb --- /dev/null +++ b/src/io/bufferedio.jl @@ -0,0 +1,121 @@ +# +# BufferedIO +# + +mutable struct BufferedWriter{io} <: MemoryBackedIO + f::io + buffer::Vector{UInt8} + file_position::Int64 + curptr::Ptr{Nothing} + extensible::Bool +end + +function BufferedWriter(io, buffer_size::Integer = 0; extensible::Bool=false) + pos = position(io) + skip(io, buffer_size) + buf = Vector{UInt8}(undef, buffer_size) + BufferedWriter(io, buf, Int64(pos), Ptr{Nothing}(pointer(buf)), extensible) +end +Base.show(io::IO, ::BufferedWriter) = print(io, "BufferedWriter") + +function ensureroom(io::BufferedWriter, n::Integer) + if bufferpos(io) + n > length(io.buffer) + if io.extensible + pos = bufferpos(io) + resize!(io.buffer, length(io.buffer) + n) + io.curptr = pointer(io.buffer, pos+1) + else + throw(InternalError("BufferedWriter: not enough room")) + end + end +end + +Base.position(io::BufferedWriter) = io.file_position + bufferpos(io) + +function Base.seek(io::BufferedWriter, offset::Integer) + buffer_offset = offset - io.file_position + buffer_offset < 0 && throw(ArgumentError("cannot seek before start of buffer")) + ensureroom(io, buffer_offset - bufferpos(io)) + io.curptr = pointer(io.buffer) + buffer_offset +end + +function finish!(io::BufferedWriter) + bufferpos(io) == length(io.buffer) || + throw(InternalError("BufferedWriter: buffer not written to end; position is $(bufferpos(io)) but length is $(length(io.buffer))")) + seek(io.f, io.file_position) + jlwrite(io.f, io.buffer) + nothing +end + +mutable struct BufferedReader{io} <: MemoryBackedIO + f::io + buffer::Vector{UInt8} + file_position::Int64 + curptr::Ptr{Nothing} +end + +function BufferedReader(io) + buf = Vector{UInt8}() + BufferedReader(io, buf, Int64(position(io)), Ptr{Nothing}(pointer(buf))) +end + +Base.show(io::IO, ::BufferedReader) = print(io, "BufferedReader") + +function readmore!(io::BufferedReader, n::Integer) + f = io.f + amount = max(bytesavailable(f), n) #TODO: check if this reads way to much + amount < n && throw(EOFError()) + buffer = io.buffer + pos = bufferpos(io) + resize!(buffer, length(buffer) + amount) + io.curptr = pointer(buffer, pos+1) + unsafe_read(f, io.curptr, amount) +end + +ensureroom(io::BufferedReader, n::Integer) = + (bufferpos(io) + n >= length(io.buffer)) && readmore!(io, n) + + +Base.position(io::BufferedReader) = io.file_position + bufferpos(io) + +""" + bufferpos(io::Union{BufferedReader, BufferedWriter}) + +Get the current position in the buffer. +""" +bufferpos(io::Union{BufferedReader, BufferedWriter}) = Int(io.curptr - pointer(io.buffer)) + +function Base.seek(io::BufferedReader, offset::Integer) + pos = offset - io.file_position + pos < 0 && throw(ArgumentError("cannot seek before start of buffer")) + ensureroom(io, offset - position(io)) + io.curptr = pointer(io.buffer) + pos + nothing +end +finish!(io::BufferedReader) = seek(io.f, io.file_position + bufferpos(io)) + +function truncate_and_close(io::IOStream, endpos::Integer) + truncate(io, endpos) + close(io) +end + +Base.close(::BufferedReader) = nothing + +begin_checksum_read(io::IO) = BufferedReader(io) + +function begin_checksum_write(io::IO, sz::Integer) + BufferedWriter(io, sz) +end +function end_checksum(io::Union{BufferedReader,BufferedWriter}) + ret = Lookup3.hash(io.buffer, 1, bufferpos(io)) + finish!(io) + ret +end + +function update_checksum(io, chunk_start, chunk_end) + seek(io, chunk_start) + cio = begin_checksum_read(io) + seek(cio, chunk_end) + seek(io, chunk_end) + jlwrite(io, end_checksum(cio)) +end \ No newline at end of file diff --git a/src/io/dataio.jl b/src/io/dataio.jl new file mode 100644 index 00000000..58b6556b --- /dev/null +++ b/src/io/dataio.jl @@ -0,0 +1,281 @@ +const Plain = Union{Int8, Int16,Int32,Int64,Int128,UInt8, UInt16,UInt32,UInt64,UInt128,Float16,Float32, + Float64} +const PlainType = Union{Type{Int8}, Type{Int16},Type{Int32}, Type{Int64}, Type{Int128}, + Type{UInt8}, Type{UInt16}, Type{UInt32}, Type{UInt64}, Type{UInt128}, Type{Float16}, + Type{Float32}, Type{Float64}} +# JLD2 requires two levels of read / write customization +# These need to use separate function names to avoid ambiguities +# Non-trivial structs get a custom jlread/jlwrite method for generic ::IO +# Different IO types should implement `_read` and `_write` for plain types. +jlwrite(io, x) = Base.write(io, x) +jlwrite(io, x::Plain) = _write(io, x) +_write(io, x) = Base.write(io, x) + +jlread(io, T) = Base.read(io, T) +jlread(io, x::PlainType) = _read(io, x) +_read(io, T) = Base.read(io, T) + +jlread(io::IO, ::Type{T}, n::Integer) where {T} = T[jlread(io, T) for _=1:n] + +Base.read(io::MemoryBackedIO, T::Type{UInt8}) = _read(io, T) + +function _read(io::MemoryBackedIO, T::DataType) + n = jlsizeof(T) + ensureroom(io, n) + v = jlunsafe_load(Ptr{T}(io.curptr)) + io.curptr += n + v +end + +function Base.read(io::MemoryBackedIO, ::Type{T}, n::Int) where T + m = jlsizeof(T) * n + ensureroom(io, m) + arr = Vector{T}(undef, n) + unsafe_copyto!(pointer(arr), Ptr{T}(io.curptr), n) + io.curptr += m + arr +end + +Base.read(io::MemoryBackedIO, ::Type{T}, n::Integer) where {T} = read(io, T, Int(n)) +jlread(io::MemoryBackedIO, ::Type{T}, n::Integer) where {T} = read(io, T, Int(n)) + + +function _write(io::MemoryBackedIO, x) + n = jlsizeof(x) + ensureroom(io, n) + jlunsafe_store!(Ptr{typeof(x)}(io.curptr), x) + io.curptr += n + return n +end + +function Base.unsafe_write(io::MemoryBackedIO, ptr::Ptr{UInt8}, n::UInt) + ensureroom(io, n) + unsafe_copyto!(Ptr{UInt8}(io.curptr), ptr, n) + io.curptr += n + n +end + +function Base.seek(io::MemoryBackedIO, offset::Integer) + offset < 0 && throw(ArgumentError("cannot seek before start of file")) + ensureroom(io, offset-position(io)) + io.curptr = io.startptr + offset + nothing +end + +function Base.skip(io::MemoryBackedIO, offset::Integer) + ensureroom(io, offset) + io.curptr += offset + nothing +end + +""" + read_scalar(f::JLDFile, rr, header_offset::RelOffset) + +Read raw data representing a scalar with read representation `rr` from the current position +of JLDFile `f`. `header_offset` is the [`RelOffset`](@ref) of the object header, used to resolve +cycles. +""" +function read_scalar end + +""" + read_array!(v::Array, f::JLDFile, rr) + +Fill the array `v` with the contents of JLDFile `f` at the current position, assuming a +[`ReadRepresentation`](@ref) `rr`. +""" +function read_array! end + + +""" + read_compressed_array!(v::Array, f::JLDFile, rr, data_length::Int, Val(filter_id)) + +Fill the array `v` with the compressed contents of JLDFile `f` at the current position, +assuming a [`ReadRepresentation`](@ref) `rr` and that the compressed data has length `data_length`. +""" +function read_compressed_array! end + +# +# MmapIO +# + +# Cutoff for using ordinary IO instead of copying into mmapped region +const MMAP_CUTOFF = 1048576 + +@nospecializeinfer function read_scalar(f::JLDFile{<:MemoryBackedIO}, @nospecialize(rr), header_offset::RelOffset) + io = f.io + inptr = io.curptr + obj = jlconvert(rr, f, inptr, header_offset) + io.curptr = inptr + odr_sizeof(rr)::Int + obj +end + +function read_array!(v::Array{T}, f::JLDFile{<:MemoryBackedIO}, ::ReadRepresentation{T,T}) where T + inptr = f.io.curptr + n = length(v) + unsafe_copyto!(pointer(v), pconvert(Ptr{T}, inptr), n) + f.io.curptr = inptr + odr_sizeof(T) * n + v +end + +function read_array!(v::Array{T}, f::JLDFile{<:MemoryBackedIO}, rr::ReadRepresentation{T,RR}) where {T,RR} + cp0 = f.io.curptr + @simd for i in eachindex(v) + cp = cp0 + (i-1)*odr_sizeof(RR) + if !jlconvert_canbeuninitialized(rr) || jlconvert_isinitialized(rr, cp) + v[i] = jlconvert(rr, f, cp, NULL_REFERENCE) + end + end + f.io.curptr = cp0 + odr_sizeof(RR) * length(v) + v +end + +function write_data(io::MemoryBackedIO, f::JLDFile, data, odr::S, ::ReferenceFree, + wsession::JLDWriteSession) where S + ensureroom(io, odr_sizeof(odr)) + cp = io.curptr + h5convert!(cp, odr, f, data, wsession) + io.curptr == cp || throw(InternalError()) + io.curptr = cp + odr_sizeof(odr) + nothing +end + +function write_data(io::MemoryBackedIO, f::JLDFile, data, odr::S, ::HasReferences, + wsession::JLDWriteSession) where S + ensureroom(io, odr_sizeof(odr)) + cp = IndirectPointer(io) + h5convert!(cp, odr, f, data, wsession) + io.curptr = pconvert(Ptr{Nothing}, cp) + odr_sizeof(odr) + nothing +end + +function write_data(io::MemoryBackedIO, f::JLDFile, data::Array{T}, odr::S, ::ReferenceFree, + wsession::JLDWriteSession) where {T,S} + ensureroom(io, odr_sizeof(odr) * length(data)) + cp0 = io.curptr + @simd for i = 1:length(data) + cp = cp0 + (i-1)*odr_sizeof(odr) + @inbounds h5convert!(cp, odr, f, data[i], wsession) + end + io.curptr == cp0 || throw(InternalError()) + io.curptr = cp0 + length(data)*odr_sizeof(odr) + nothing +end + +function write_data(io::MemoryBackedIO, f::JLDFile, data::Array{T}, odr::S, ::HasReferences, + wsession::JLDWriteSession) where {T,S} + ensureroom(io, odr_sizeof(odr) * length(data)) + cp = IndirectPointer(io) + + for i = eachindex(data) + if isassigned(data, i) + h5convert!(cp, odr, f, data[i], wsession) + else + h5convert_uninitialized!(cp, odr) + end + cp += odr_sizeof(odr) + end + io.curptr = pconvert(Ptr{Nothing}, cp) + nothing +end + +write_data(io::MemoryBackedIO, ::JLDFile, data::Array{T}, odr::Type{T}, ::ReferenceFree, ::JLDWriteSession) where {T} = + unsafe_write(io, Ptr{UInt8}(pointer(data)), odr_sizeof(odr) * length(data)) + + +# +# Fallback for non-memory-backed IO +# + +function read_scalar(f::JLDFile, rr, header_offset::RelOffset) + r = Vector{UInt8}(undef, odr_sizeof(rr)) + @GC.preserve r begin + unsafe_read(f.io, pointer(r), odr_sizeof(rr)) + jlconvert(rr, f, pointer(r), header_offset) + end +end + + +function read_array!(v::Array{T}, f::JLDFile, rr::ReadRepresentation{T,T}) where {T} + unsafe_read(f.io, pointer(v), odr_sizeof(T)*length(v)) + v +end + +function read_array!(v::Array{T}, f::JLDFile, rr::ReadRepresentation{T,RR}) where {T,RR} + n = length(v) + nb = odr_sizeof(RR)*n + data = read!(f.io, Vector{UInt8}(undef, nb)) + @GC.preserve data begin + p0 = Ptr{Cvoid}(pointer(data)) + @simd for i = eachindex(v) + dataptr = p0 + odr_sizeof(RR)*(i-1) + if !jlconvert_canbeuninitialized(rr) || jlconvert_isinitialized(rr, dataptr) + v[i] = jlconvert(rr, f, dataptr, NULL_REFERENCE) + end + end + end + v +end + + +function write_data(io::IO, f::JLDFile, data::Array{T}, odr::Type{T}, ::ReferenceFree, + wsession::JLDWriteSession) where T + unsafe_write(io, Ptr{UInt8}(pointer(data)), odr_sizeof(odr) * length(data)) + nothing +end + +function write_data(io::IO, f::JLDFile, data, odr, _, wsession::JLDWriteSession) + buf = Vector{UInt8}(undef, odr_sizeof(odr)) + GC.@preserve buf begin + cp = Ptr{Cvoid}(pointer(buf)) + h5convert!(cp, odr, f, data, wsession) + unsafe_write(io, Ptr{UInt8}(pointer(buf)), odr_sizeof(odr)) + end + nothing +end + +function write_data(io::IO, f::JLDFile, data::Array{T}, odr::S, wm::DataMode, + wsession::JLDWriteSession) where {T,S} + nb = odr_sizeof(odr) * length(data) + buf = Vector{UInt8}(undef, nb) + pos = position(io) + cp0 = Ptr{Cvoid}(pointer(buf)) + for i = eachindex(data) + cp = cp0 + (i-1)*odr_sizeof(odr) + if isassigned(data, i) + h5convert!(cp, odr, f, data[i], wsession) + else + h5convert_uninitialized!(cp, odr) + end + end + # We might seek around in the file as a consequence of writing stuff, so seek back. We + # don't need to worry about this for a BufferedWriter, since it will seek back before + # writing. + !isa(wm, ReferenceFree) && seek(io, pos) + jlwrite(io, buf) + nothing +end + + +# The delimiter is excluded by default +read_bytestring(io::Union{IOStream, IOBuffer}) = String(readuntil(io, 0x00)) + +# Late addition for MmapIO that can't be defined in mmapio.jl due to include ordering +function read_array!(v::Array{T}, f::JLDFile{MmapIO}, ::ReadRepresentation{T,T}) where T + io = f.io + inptr = io.curptr + n = length(v) + nb = odr_sizeof(T)*n + if nb > MMAP_CUTOFF && (!Sys.iswindows() || !f.written) + # It turns out that regular IO is faster here (at least on OS X), but on Windows, + # we shouldn't use ordinary IO to read, since coherency with the memory map is not + # guaranteed + mmapio = f.io + regulario = mmapio.f + seek(regulario, inptr - io.startptr) + unsafe_read(regulario, pointer(v), nb) + else + unsafe_copyto!(pointer(v), pconvert(Ptr{T}, inptr), n) + end + io.curptr = inptr + nb + v +end \ No newline at end of file diff --git a/src/io/io_wrappers.jl b/src/io/io_wrappers.jl new file mode 100644 index 00000000..50dbc8b1 --- /dev/null +++ b/src/io/io_wrappers.jl @@ -0,0 +1,164 @@ +## Requirements for an IO object +#= + +Implement for Reading: +Base.position(io)::Int +Base.bufferpos(io)::Int (if applicable) +Base.close(io) +Base.seek(io, pos::Int) +read_bytestring(io)::String # read a null-terminated byte string +Base.skip(io, n::Int) +_read(io, ::Type{T}) for [U]Int8 and `PlainType` +_write(io, x::T) for [U]Int8 and `PlainType` +begin_checksum_read(io) # see existing implementations +end_checksum(io) +=# + +## Create an IO object which wraps a non-seekable read-only buffer +const MINBUFFERSIZE = 2^9 # should maybe be 2^16 + +mutable struct ReadOnlyBuffer{B <: IO} <: MemoryBackedIO + _buf::B + offset::UInt64 # position of file start in wrapped stream + data::Vector{UInt8} + startptr::Ptr{Cvoid} + curptr::Ptr{Cvoid} + size::UInt64 + checksum_pos::Vector{Int64} + nchecksum::Int64 + function ReadOnlyBuffer(_buf::IO) + offset = position(_buf) + nb = min(MINBUFFERSIZE, bytesavailable(_buf)) + data = read(_buf, nb) + curptr = startptr = pointer(data) + new{typeof(_buf)}(_buf, offset, data, + startptr, curptr, length(data), Int[], 0) + end +end + +Base.position(io::ReadOnlyBuffer) = Int(io.curptr-io.startptr) +bufferpos(io::ReadOnlyBuffer) = Int(io.curptr-io.startptr) +Base.close(::ReadOnlyBuffer) = nothing + +function Base.resize!(io::ReadOnlyBuffer, newend::Integer) + newend < io.size && return + nb = min(bytesavailable(io._buf), max(newend-io.size, MINBUFFERSIZE)) + bts = read(io._buf, nb) + append!(io.data, bts) + io.size += length(bts) + newend ≤ io.size || throw(EOFError()) + # update pointers + oldstart = io.startptr + io.startptr = pointer(io.data) + io.curptr = io.curptr - oldstart + io.startptr + nothing +end +Base.resize!(io::ReadOnlyBuffer, p::Ptr) = resize!(io, p - io.startptr) + +ensureroom(io::ReadOnlyBuffer, n::Integer) = resize!(io, bufferpos(io) + n) + +# Read a null-terminated string +function read_bytestring(io::ReadOnlyBuffer) + nb = 0 + while true + idx = position(io)+1+nb + idx > io.size && resize!(io, idx) + io.data[idx] == 0x00 && break + nb += 1 + end + pos = position(io) + v = io.data[pos+1 : pos+nb] + skip(io, nb+1) + return String(v) +end + +function begin_checksum_read(io::ReadOnlyBuffer) + idx = io.nchecksum += 1 + if idx > length(io.checksum_pos) + push!(io.checksum_pos, position(io)) + else + io.checksum_pos[idx] = position(io) + end + io +end + +function end_checksum(io::ReadOnlyBuffer) + v = io.checksum_pos[io.nchecksum] + io.nchecksum -= 1 + Lookup3.hash(Ptr{UInt8}(io.startptr + v), position(io) - v) +end + +########################################################################################### +## RWBuffer +########################################################################################### + +mutable struct RWBuffer{B <: IO} <: IO + _buf::B + offset::UInt64 # position of file start in wrapped stream + pos::UInt64 + size::UInt64 + RWBuffer(_buf::IO) = new{typeof(_buf)}(_buf, position(_buf), 0, _buf.size) +end + +Base.position(io::RWBuffer) = Int(io.pos) +Base.close(::RWBuffer) = nothing +function truncate_and_close(io::RWBuffer, endpos::Integer) + #truncate(io, endpos) + close(io) +end + +function Base.seek(io::RWBuffer, n::Integer) + n > io.size && resize!(io, n) + seek(io._buf, n + io.offset) + @assert position(io._buf) == n+io.offset + io.pos = n +end + +function Base.resize!(io::RWBuffer, newend::Integer) + newend < io.size && return + buf = io._buf + pos = position(buf) + seek(buf, io.size+io.offset) + write(buf, zeros(UInt8, newend-io.size)) + seek(buf, pos) + io.size = newend +end +ensureroom(io::RWBuffer, n::Integer) = resize!(io, position(io) + n) + +# Read a null-terminated string +function read_bytestring(io::RWBuffer) + v = readuntil(io._buf, 0x00) + io.pos = position(io._buf) - io.offset + return String(v) +end + +Base.skip(io::RWBuffer, offset::Integer) = seek(io, position(io)+offset) + +function _read(io::RWBuffer, T::DataType) + n = jlsizeof(T) + ensureroom(io, n) + io.pos += jlsizeof(T) + read(io._buf, T) +end +# needed explicitly for avoiding ambiguities +Base.read(io::RWBuffer, T::Type{UInt8}) = _read(io, T) +Base.read(io::RWBuffer, T::PlainType) = _read(io, T) +Base.write(io::RWBuffer, x::UInt8) = _write(io, x) +jlwrite(io::RWBuffer, x::String) = _write(io, x) + +function jlwrite(io::RWBuffer, x::Array{T}) where T + for y in x + jlwrite(io, y) + end + return length(x) * jlsizeof(T) +end + +function _write(io::RWBuffer, x) + posprev = position(io) + jlwrite(io._buf, x) + io.pos = position(io._buf) - io.offset + io.size = max(io.size, io.pos) + return io.pos-posprev +end + +Base.bytesavailable(io::RWBuffer) = io.size-io.pos \ No newline at end of file diff --git a/src/mmapio.jl b/src/io/mmapio.jl similarity index 63% rename from src/mmapio.jl rename to src/io/mmapio.jl index f003c7c9..45c80ce6 100644 --- a/src/mmapio.jl +++ b/src/io/mmapio.jl @@ -10,13 +10,7 @@ else const FILE_GROW_SIZE = 2^18 end -const Plain = Union{Int16,Int32,Int64,Int128,UInt16,UInt32,UInt64,UInt128,Float16,Float32, - Float64} -const PlainType = Union{Type{Int16},Type{Int32},Type{Int64},Type{Int128},Type{UInt16}, - Type{UInt32},Type{UInt64},Type{UInt128},Type{Float16}, - Type{Float32},Type{Float64}} - -mutable struct MmapIO <: IO +mutable struct MmapIO <: MemoryBackedIO f::IOStream write::Bool n::Int @@ -54,6 +48,8 @@ if Sys.isunix() offset_page::Int64 = div(offset, Mmap.PAGESIZE) * Mmap.PAGESIZE # add (offset - offset_page) to `len` to get total length of memory-mapped region mmaplen::Int64 = (offset - offset_page) + len + # Edge case: calling msync with 0 mmaplen fails on mac + mmaplen == 0 && return nothing systemerror("msync", ccall(:msync, Cint, (Ptr{Cvoid}, Csize_t, Cint), io.startptr + offset_page, mmaplen, @@ -124,19 +120,16 @@ end Base.show(io::IO, ::MmapIO) = print(io, "MmapIO") -if Sys.islinux() - # This is used to be "pwrite" but that is actually not defined behaviour - # and fails on NFS systems. "ftruncate" is recommended instead - # TODO: Benchmark on Windows - grow(io::IOStream, sz::Integer) = +function grow(io::IOStream, sz::Integer) + @static if Sys.islinux() systemerror("ftruncate", - ccall(:ftruncate, Cint, (Cint, Int64), - fd(io), sz) != 0) - -else - grow(io::IOStream, sz::Integer) = truncate(io, sz) + ccall(:ftruncate, Cint, (Cint, Int64), fd(io), sz) != 0) + else + truncate(io, sz) + end end + function Base.resize!(io::MmapIO, newend::Ptr{Cvoid}) io.write || throw(EOFError()) @@ -158,12 +151,9 @@ function Base.resize!(io::MmapIO, newend::Ptr{Cvoid}) io end -@inline function ensureroom(io::MmapIO, n::Int) +function ensureroom(io::MmapIO, n::Integer) ep = io.curptr + n - if ep > io.endptr - resize!(io, ep) - end - nothing + ep > io.endptr && resize!(io, ep) end function truncate_and_close(io::MmapIO, endpos::Integer) @@ -179,105 +169,46 @@ function Base.close(io::MmapIO) close(io.f) end -@inline function _write(io::MmapIO, x) - cp = io.curptr - ep = cp + jlsizeof(x) - if ep > io.endptr - resize!(io, ep) - cp = io.curptr - ep = cp + jlsizeof(x) - end - jlunsafe_store!(Ptr{typeof(x)}(cp), x) - io.curptr = ep - return jlsizeof(x) -end -@inline Base.write(io::MmapIO, x::UInt8) = _write(io, x) -@inline Base.write(io::MmapIO, x::Int8) = _write(io, x) -@inline Base.write(io::MmapIO, x::Plain) = _write(io, x) - -function Base.unsafe_write(io::MmapIO, x::Ptr{UInt8}, n::UInt) - cp = io.curptr - ep = cp + n - if ep > io.endptr - resize!(io, ep) - cp = io.curptr - ep = cp + n - end - unsafe_copyto!(Ptr{UInt8}(cp), x, n) - io.curptr = ep - return n -end +@static if Sys.isunix() + function Base.unsafe_write(io::MmapIO, ptr::Ptr{UInt8}, nb::UInt) + if nb > MMAP_CUTOFF + pos = position(io) -@inline function _read(io::MmapIO, T::DataType) - cp = io.curptr - ep = cp + jlsizeof(T) - ep > io.endptr && throw(EOFError()) - v = jlunsafe_load(Ptr{T}(cp)) - io.curptr = ep - v -end -@inline Base.read(io::MmapIO, T::Type{UInt8}) = _read(io, T) -@inline Base.read(io::MmapIO, T::Type{Int8}) = _read(io, T) -@inline Base.read(io::MmapIO, T::PlainType) = _read(io, T) - -function Base.read(io::MmapIO, ::Type{T}, n::Int) where T - cp = io.curptr - ep = cp + jlsizeof(T)*n - ep > io.endptr && throw(EOFError()) - arr = Vector{T}(undef, n) - unsafe_copyto!(pointer(arr), Ptr{T}(cp), n) - io.curptr = ep - arr + # Ensure that the current page has been flushed to disk + msync(io, pos, min(io.endptr - io.curptr, nb)) + + # Write to the underlying IOStream + regulario = io.f + seek(regulario, pos) + unsafe_write(regulario, ptr, nb) + + # Invalidate cache of any pages that were just written to + msync(io, pos, min(io.n - pos, nb), true) + + # Make sure the mapping encompasses the written data + ensureroom(io, nb + 1) + + # Seek to the place we just wrote + seek(io, pos + nb) + else + ensureroom(io, nb) + unsafe_copyto!(Ptr{UInt8}(io.curptr), ptr, nb) + io.curptr += nb + end + nb + end end -Base.read(io::MmapIO, ::Type{T}, n::Integer) where {T} = read(io, T, Int(n)) -jlread(io::MmapIO, ::Type{T}, n::Integer) where {T} = read(io, T, Int(n)) # Read a null-terminated string function read_bytestring(io::MmapIO) # TODO do not try to read outside the buffer - cp = io.curptr - str = unsafe_string(pconvert(Ptr{UInt8}, cp)) - io.curptr = cp + jlsizeof(str) + 1 + str = unsafe_string(pconvert(Ptr{UInt8}, io.curptr)) + io.curptr += jlsizeof(str) + 1 str end -@inline function Base.seek(io::MmapIO, offset::Integer) - if io.startptr + offset > io.endptr - resize!(io, io.startptr + offset) - end - io.curptr = io.startptr + offset - nothing -end - -@inline function Base.skip(io::MmapIO, offset::Integer) - if io.curptr + offset > io.endptr - resize!(io, io.curptr + offset) - end - io.curptr += offset - nothing -end - Base.position(io::MmapIO) = Int64(io.curptr - io.startptr) - -""" - IndirectPointer - -When writing data, we may need to enlarge the memory mapping, which would invalidate any -memory addresses arising from the old `mmap` pointer. `IndirectPointer` holds a pointer to -the `startptr` field of an `MmapIO`, and the offset relative to that pointer. It defers -computing a memory address until converted to a `Ptr{T}`, so the memory mapping can be -enlarged and addresses will remain valid. -""" -struct IndirectPointer - ptr::Ptr{Ptr{Cvoid}} - offset::Int -end - -function IndirectPointer(io::MmapIO, offset::Integer=position(io)) - IndirectPointer(pointer_from_objref(io) + fieldoffset(MmapIO, 4), offset) -end -Base.:+(x::IndirectPointer, y::Integer) = IndirectPointer(x.ptr, x.offset+y) -pconvert(::Type{Ptr{T}}, x::IndirectPointer) where {T} = Ptr{T}(jlunsafe_load(x.ptr) + x.offset) +bufferpos(io::MmapIO) = Int64(io.curptr - io.startptr) # We sometimes need to compute checksums. We do this by first calling begin_checksum when # starting to handle whatever needs checksumming, and calling end_checksum afterwards. Note @@ -293,12 +224,14 @@ function begin_checksum_read(io::MmapIO) end io end + function begin_checksum_write(io::MmapIO, sz::Integer) ensureroom(io, sz) begin_checksum_read(io) end + function end_checksum(io::MmapIO) @inbounds v = io.checksum_pos[io.nchecksum] io.nchecksum -= 1 Lookup3.hash(Ptr{UInt8}(io.startptr + v), position(io) - v) -end +end \ No newline at end of file diff --git a/src/loadsave.jl b/src/loadsave.jl index af2ba341..133d8497 100644 --- a/src/loadsave.jl +++ b/src/loadsave.jl @@ -1,4 +1,4 @@ -function jldopen(@nospecialize(f::Function), args...; kws...) +@nospecializeinfer function jldopen(@nospecialize(f::Function), args...; kws...) jld = jldopen(args...; kws...) try return f(jld) diff --git a/src/macros_utils.jl b/src/macros_utils.jl index 80ee8c2d..f19c1359 100644 --- a/src/macros_utils.jl +++ b/src/macros_utils.jl @@ -1,40 +1,3 @@ - -# Redefine unsafe_load, unsafe_store!, read, and write so that they pack the type -function define_packed(ty::DataType) - @assert isbitstype(ty) - packed_offsets = cumsum([jlsizeof(x) for x in ty.types]) - sz = pop!(packed_offsets) - pushfirst!(packed_offsets, 0) - - if sz != jlsizeof(ty) - @eval begin - function jlunsafe_store!(p::Ptr{$ty}, x::$ty) - $([:(jlunsafe_store!(pconvert(Ptr{$(ty.types[i])}, p+$(packed_offsets[i])), getfield(x, $i))) - for i = 1:length(packed_offsets)]...) - end - function jlunsafe_load(p::Ptr{$ty}) - $(Expr(:new, ty, [:(jlunsafe_load(pconvert(Ptr{$(ty.types[i])}, p+$(packed_offsets[i])))) - for i = 1:length(packed_offsets)]...)) - end - jlsizeof(::Union{$ty,Type{$ty}}) = $(Int(sz))::Int - end - end - - @eval begin - @inline jlwrite(io::Union{MmapIO,BufferedWriter}, x::$ty) = _write(io, x) - @inline jlread(io::Union{MmapIO,BufferedReader}, x::Type{$ty}) = _read(io, x) - function jlread(io::IO, ::Type{$ty}) - $(Expr(:new, ty, [:(jlread(io, $(ty.types[i]))) for i = 1:length(packed_offsets)]...)) - end - function jlwrite(io::IO, x::$ty) - $([:(jlwrite(io, getfield(x, $i))) for i = 1:length(packed_offsets)]...) - nothing - end - end - nothing -end - - ########################################################################################### # The following macro is used for declarative definition of Header Messages ########################################################################################### @@ -71,12 +34,14 @@ macro pseudostruct(name, blck) return keyvalue end - function $(esc(:(Base.getproperty)))(tw::HmWrap{$name}, s::Symbol) - s in (:size, :hflags, :m) && return getfield(tw, s) - m = getfield(tw, :m) + function $(esc(:(Base.getproperty)))(tw::HmWrap{$name, iot}, s::Symbol) where iot + s == :size && return getfield(tw, s) + s ==:hflags && return getfield(tw, s) + s==:m && return getfield(tw, s) + m = getfield(tw, :m)::Message{iot} hflags = getfield(tw, :hflags) hsize = getfield(tw, :size) - io = getfield(m, :io) + io = getfield(m, :io)::iot $(get_prop_exprs) throw(ArgumentError("property $s not found")) end @@ -134,7 +99,7 @@ function linefun(ex) write_statement = :(jlwrite(io, $(esc(s)))) elseif @capture(T, @Int(len_)) len = esc(len) - read_io = :(read_nb_uint($io, $(len))) + read_io = :(read_nb_uint($io, $(len))::UInt64) increment = :($len) write_statement = :(write_nb_int(io, $(esc(s)), $len)) elseif @capture(T, @Offset) @@ -211,10 +176,10 @@ function generate_getprop(exprs, fields=[], precond=true) read_io = :(String(jlread($io, UInt8, $(len)))) increment = :($len) elseif @capture(T, @Blob(len_)) - read_io = :(jlread($io, UInt8, $(len))) + read_io = :(jlread($io, UInt8, $(len))::Vector{UInt8}) increment = :($len) elseif @capture(T, @Int(len_)) - read_io = :(read_nb_uint($io, $len)) + read_io = :(read_nb_uint($io, $len)::UInt64) increment = :($len) elseif @capture(T, @Offset) read_io = :(RelOffset(getfield(m,:offset) + position($io) - getfield(m,:address))) @@ -226,7 +191,7 @@ function generate_getprop(exprs, fields=[], precond=true) read_io = :(jlread($io, $(type))) increment = !isnothing(rsize) ? rsize : :(sizeof(typeof(s))) else - read_io = :(jlread($io, $T)) + read_io = :(jlread($io, $T)::$(T)) increment = :(sizeof($(T))) end diff --git a/src/misc.jl b/src/misc.jl index 0d4b06c8..db4a3142 100644 --- a/src/misc.jl +++ b/src/misc.jl @@ -208,7 +208,7 @@ function write_nb_int(io::IO, sz::Integer, nb::Integer) end function read_nb_uint(io::IO, nb) - val = zero(UInt) + val = zero(UInt64) for i = 1:nb val += jlread(io, UInt8) * 2^(8*(i-1)) end diff --git a/src/types.jl b/src/types.jl index 04905020..01bf4f02 100644 --- a/src/types.jl +++ b/src/types.jl @@ -1,3 +1,61 @@ +""" + MemoryBackedIO <: IO + +Abstract type for IO objects that are backed by memory in such a way that +one can use pointer based `unsafe_load` and `unsafe_store!` operations +after ensuring that there is enough memory allocated. + +It needs to provide: + - `getproperty(io, :curptr)` to get the current pointer + - `ensureroom(io, nb)` to ensure that there are at least nb bytes available + - `position(io)` to get the current (zero-based) position + - `seek(io, pos)` to set the current position (zero-based) +""" +abstract type MemoryBackedIO <: IO end + + + +# Define custom `jlread`, `jlwrite`, `jlunsafe_load`, `jlunsafe_store!` functions for a struct +# this needs to be called immediately after the struct definition (since it itself calls `jlsizeof`) +function define_packed(ty::DataType) + @assert isbitstype(ty) + packed_offsets = cumsum([jlsizeof(x) for x in ty.types]) + sz = pop!(packed_offsets) + pushfirst!(packed_offsets, 0) + + if sz != jlsizeof(ty) + @eval begin + function jlunsafe_store!(p::Ptr{$ty}, x::$ty) + $([:(jlunsafe_store!(pconvert(Ptr{$(ty.types[i])}, p+$(packed_offsets[i])), getfield(x, $i))) + for i = 1:length(packed_offsets)]...) + end + function jlunsafe_load(p::Ptr{$ty}) + $(Expr(:new, ty, [:(jlunsafe_load(pconvert(Ptr{$(ty.types[i])}, p+$(packed_offsets[i])))) + for i = 1:length(packed_offsets)]...)) + end + jlsizeof(::Union{$ty,Type{$ty}}) = $(Int(sz))::Int + end + end + + @eval begin + @inline jlwrite(io::MemoryBackedIO, x::$ty) = _write(io, x) + @inline jlread(io::MemoryBackedIO, x::Type{$ty}) = _read(io, x) + function jlread(io::IO, ::Type{$ty}) + $(Expr(:new, ty, [:(jlread(io, $(ty.types[i]))) for i = 1:length(packed_offsets)]...)) + end + function jlwrite(io::IO, x::$ty) + $([:(jlwrite(io, getfield(x, $i))) for i = 1:length(packed_offsets)]...) + nothing + end + end + nothing +end + +jlsizeof(x) = Base.sizeof(x) +jlunsafe_store!(p, x) = Base.unsafe_store!(p, x) +jlunsafe_load(p) = Base.unsafe_load(p) + + ## Signatures const OBJECT_HEADER_SIGNATURE = htol(0x5244484f) # "OHDR" const OH_ATTRIBUTE_CREATION_ORDER_TRACKED = 2^2 @@ -14,6 +72,7 @@ const OBJECT_HEADER_CONTINUATION_SIGNATURE = htol(0x4b48434f) # "OCHK" LcVirtual = 0x03 end LayoutClass(lc::LayoutClass) = lc +jlwrite(io, lc::LayoutClass) = jlwrite(io, UInt8(lc)) @enum(CharacterSet::UInt8, CSET_ASCII, @@ -258,3 +317,24 @@ Message(type::HeaderMessageType, f, offset::RelOffset) = Message(type::HeaderMessageType, io::IO) = Message(type, position(io), UNDEFINED_ADDRESS, io) Message(type::HeaderMessageType, data::Vector{UInt8}) = Message(type, 0, UNDEFINED_ADDRESS, IOBuffer(data)) + +""" + IndirectPointer + +When writing data, we may need to enlarge the memory mapping, which would invalidate any +memory addresses arising from the old `mmap` pointer. `IndirectPointer` holds an offset relative to the +MemoryBackedIO. It defers computing a memory address until converted to a `Ptr{T}`, +so the memory mapping can be enlarged and addresses will remain valid. +""" +struct IndirectPointer{P<:MemoryBackedIO} + io::P + offset::Int +end + +IndirectPointer(io::MemoryBackedIO) = IndirectPointer(io, Int(bufferpos(io))) + +Base.:+(x::IndirectPointer, y::Integer) = IndirectPointer(x.io, x.offset+y) +pconvert(::Type{Ptr{T}}, x::IndirectPointer) where {T} = Ptr{T}(x.io.curptr - bufferpos(x.io) + x.offset) + +# Use internal convert function (for pointer conversion) to avoid invalidations +pconvert(T, x) = Base.convert(T, x) \ No newline at end of file diff --git a/test/internal.jl b/test/internal.jl index ff348993..b4a99002 100644 --- a/test/internal.jl +++ b/test/internal.jl @@ -2,7 +2,7 @@ using JLD2, Test function writeloop(f, sz) for i = 1:sz - write(f, UInt8(6)) + JLD2.jlwrite(f, UInt8(6)) end end @@ -21,7 +21,7 @@ rd = read!(f, Array{UInt8}(undef, sz)) close(f) f = JLD2.MmapIO(name, true, true, true) -write(f, rd) +JLD2.jlwrite(f, rd) JLD2.truncate_and_close(f, position(f)) @test filesize(name) == sz diff --git a/test/runtests.jl b/test/runtests.jl index a6534548..c5424344 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -32,4 +32,5 @@ include("compression.jl") include("test_files.jl") include("unpack_test.jl") include("dataset_api.jl") -include("mmap_test.jl") \ No newline at end of file +include("mmap_test.jl") +include("wrapped_io.jl") diff --git a/test/rw.jl b/test/rw.jl index 2f8ef2ff..cdf9cf28 100644 --- a/test/rw.jl +++ b/test/rw.jl @@ -1,48 +1,36 @@ using JLD2, Test, LinearAlgebra, Random -macro read(fid, sym) - if !isa(sym, Symbol) - error("Second input to @read must be a symbol (i.e., a variable)") - end - esc(:($sym = read($fid, $(string(sym))))) -end -macro write(fid, sym) - if !isa(sym, Symbol) - error("Second input to @write must be a symbol (i.e., a variable)") - end - esc(:(write($fid, $(string(sym)), $sym))) -end - +data = Dict{String, Any}() # Seed random so that we get the same values every time Random.seed!(1337) # Define variables of different types -x = 3.7 -A = reshape(1:15, 3, 5) -A3 = reshape(1:30, 3, 5, 2) -A4 = reshape(1:120, 2, 3, 4, 5) -Aarray = Vector{Float64}[[1.2,1.3],[2.2,2.3,2.4]] -basic_types = Any[UInt8(42), UInt16(42), UInt32(42), UInt64(42), UInt128(42), +data["x"] = 3.7 +data["A"] = A = reshape(1:15, 3, 5) +data["A3"] = reshape(1:30, 3, 5, 2) +data["A4"] = reshape(1:120, 2, 3, 4, 5) +data["Aarray"] = Vector{Float64}[[1.2,1.3],[2.2,2.3,2.4]] +data["basic_types"] = Any[UInt8(42), UInt16(42), UInt32(42), UInt64(42), UInt128(42), Int8(42), Int16(42), Int32(42), Int64(42), Int128(42), Float16(42), Float32(42), Float64(42)] -str = "Hello" -str_unicode = "pandapanda🐼panda" -str_embedded_null = "there is a null\0 in the middle of this string" -strings = String["It", "was", "a", "dark", "and", "stormy", "night"] -empty_string = "" -empty_string_array = String[] -empty_array_of_strings = String[""] -tf = true -TF = A .> 10 -B = [-1.5 sqrt(2) NaN 6; +data["str"] = "Hello" +data["str_unicode"] = "pandapanda🐼panda" +data["str_embedded_null"] = "there is a null\0 in the middle of this string" +data["strings"] = String["It", "was", "a", "dark", "and", "stormy", "night"] +data["empty_string"] = "" +data["empty_string_array"] = String[] +data["empty_array_of_strings"] = String[""] +data["tf"] = true +data["TF"] = A .> 10 +data["B"] = B = [-1.5 sqrt(2) NaN 6; 0.0 Inf eps() -Inf] -AB = Any[A, B] -t = (3, "cat") -c = Complex{Float32}(3,7) -cint = 1+im # issue 108 -C = reshape(reinterpret(Complex{Float64}, B), (4,)) -emptyA = zeros(0,2) -emptyB = zeros(2,0) +data["AB"] = Any[A, B] +data["t"] = (3, "cat") +data["c"] = Complex{Float32}(3,7) +data["cint"] = 1+im # issue 108 +data["C"] = reshape(reinterpret(Complex{Float64}, B), (4,)) +data["emptyA"] = zeros(0,2) +data["emptyB"] = zeros(2,0) try global MyStruct mutable struct MyStruct @@ -53,84 +41,83 @@ try end catch end -ms = MyStruct(2, [3.2, -1.7]) -msempty = MyStruct(5, Float64[]) -sym = :TestSymbol -syms = [:a, :b] -d = Dict([(syms[1],"aardvark"), (syms[2], "banana")]) -oidd = IdDict([(syms[1],"aardvark"), (syms[2], "banana")]) -# imdd = Base.ImmutableDict(syms[1]=>"aardvark", syms[2]=>"banana") +data["ms"] = MyStruct(2, [3.2, -1.7]) +data["msempty"] = MyStruct(5, Float64[]) +data["sym"] = :TestSymbol +data["syms"] = syms = [:a, :b] +data["d"] = Dict([(syms[1],"aardvark"), (syms[2], "banana")]) +data["oidd"] = IdDict([(syms[1],"aardvark"), (syms[2], "banana")]) # Incremental construction due to lacking constructor method in v1.0 -imdd = Base.ImmutableDict(Base.ImmutableDict(syms[1]=>"aardvark"), syms[2]=>"banana") -ex = quote +data["imdd"] = Base.ImmutableDict(Base.ImmutableDict(syms[1]=>"aardvark"), syms[2]=>"banana") +data["ex"] = quote function incrementby1(x::Int) x+1 end end -T = UInt8 -Tarr = DataType[UInt8, UInt16, UInt32, UInt64, UInt128] -char = 'x' -unicode_char = '\U10ffff' -α = 22 -β = Any[[1, 2], [3, 4]] # issue #93 -vv = Vector{Int}[[1,2,3]] # issue #123 -typevar = Array{Int}[[1]] -typevar_lb = (Vector{U} where U<:Integer)[[1]] -typevar_ub = (Vector{U} where U>:Int)[[1]] -typevar_lb_ub = (Vector{U} where Int<:U<:Real)[[1]] -arr_undef = Vector{Any}(undef, 1) -arr_undefs = Matrix{Any}(undef, 2, 2) -ms_undef = MyStruct(0) +data["T"] = UInt8 +data["Tarr"] = DataType[UInt8, UInt16, UInt32, UInt64, UInt128] +data["char"] = 'x' +data["unicode_char"] = '\U10ffff' +data["α"] = 22 +data["β"] = Any[[1, 2], [3, 4]] # issue #93 +data["vv"] = Vector{Int}[[1,2,3]] # issue #123 +data["typevar"] = Array{Int}[[1]] +data["typevar_lb"] = (Vector{U} where U<:Integer)[[1]] +data["typevar_ub"] = (Vector{U} where U>:Int)[[1]] +data["typevar_lb_ub"] = (Vector{U} where Int<:U<:Real)[[1]] +data["arr_undef"] = Vector{Any}(undef, 1) +data["arr_undefs"] = Matrix{Any}(undef, 2, 2) +data["ms_undef"] = MyStruct(0) # Unexported type: -version_info = Base.GIT_VERSION_INFO +data["version_info"] = Base.GIT_VERSION_INFO # Immutable type: -rng = 1:5 +data["rng"] = 1:5 # Custom BitsType (#99) primitive type MyBT 64 end -bt = reinterpret(MyBT, Int64(55)) -btarray = fill(bt, 5, 7) +data["bt"] = bt = reinterpret(MyBT, Int64(55)) +data["btarray"] = fill(bt, 5, 7) # Symbol arrays (#100) -sa_asc = [:a, :b] -sa_utf8 = [:α, :β] +data["sa_asc"] = [:a, :b] +data["sa_utf8"] = [:α, :β] # SubArray (to test tuple type params) -subarray = view([1:5;], 1:5) +data["subarray"] = view([1:5;], 1:5) # Array of empty tuples (to test tuple type params) -arr_empty_tuple = Tuple{}[] +data["arr_empty_tuple"] = Tuple{}[] struct EmptyImmutable end -emptyimmutable = EmptyImmutable() -arr_emptyimmutable = [emptyimmutable] -empty_arr_emptyimmutable = EmptyImmutable[] +data["emptyimmutable"] = emptyimmutable = EmptyImmutable() +data["arr_emptyimmutable"] = [emptyimmutable] +data["empty_arr_emptyimmutable"] = EmptyImmutable[] mutable struct EmptyType end -emptytype = EmptyType() -arr_emptytype = [emptytype] -empty_arr_emptytype = EmptyImmutable[] -uninitialized_arr_emptytype = Vector{EmptyType}(undef, 1) +data["emptytype"] = EmptyType() +data["arr_emptytype"] = [EmptyType()] +data["empty_arr_emptytype"] = EmptyImmutable[] +data["uninitialized_arr_emptytype"] = Vector{EmptyType}(undef, 1) struct EmptyII x::EmptyImmutable end -emptyii = EmptyII(EmptyImmutable()) +data["emptyii"] = EmptyII(EmptyImmutable()) struct EmptyIT x::EmptyType end -emptyit = EmptyIT(EmptyType()) +data["emptyit"] = EmptyIT(EmptyType()) mutable struct EmptyTI x::EmptyImmutable end -emptyti = EmptyTI(EmptyImmutable()) +data["emptyti"] = EmptyTI(EmptyImmutable()) mutable struct EmptyTT x::EmptyType end -emptytt = EmptyTT(EmptyType()) +data["emptytt"] = EmptyTT(EmptyType()) struct EmptyIIOtherField x::EmptyImmutable y::Float64 end -emptyiiotherfield = EmptyIIOtherField(EmptyImmutable(), 5.0) +data["emptyiiotherfield"] = EmptyIIOtherField(EmptyImmutable(), 5.0) struct EmptyIIType x::Type{Float64} y::EmptyImmutable end -emptyiitype = EmptyIIType(Float64, EmptyImmutable()) +data["emptyiitype"] = EmptyIIType(Float64, EmptyImmutable()) # Unicode type field names (#118) mutable struct MyUnicodeStruct☺{τ} @@ -138,48 +125,48 @@ mutable struct MyUnicodeStruct☺{τ} ∂ₓα::τ MyUnicodeStruct☺{τ}(α::τ, ∂ₓα::τ) where τ = new(α, ∂ₓα) end -unicodestruct☺ = MyUnicodeStruct☺{Float64}(1.0, -1.0) +data["unicodestruct☺"] = MyUnicodeStruct☺{Float64}(1.0, -1.0) # Arrays of matrices (#131) -array_of_matrices = Matrix{Int}[[1 2; 3 4], [5 6; 7 8]] +data["array_of_matrices"] = Matrix{Int}[[1 2; 3 4], [5 6; 7 8]] # Tuple of arrays and bitstype -tup = (1, 2, [1, 2], [1 2; 3 4], bt) +data["tup"] = (1, 2, [1, 2], [1 2; 3 4], bt) # Empty tuple -empty_tup = () +data["empty_tup"] = () # Non-pointer-free struct struct MyImmutable{T} x::Int y::Vector{T} z::Bool end -nonpointerfree_immutable_1 = MyImmutable(1, [1., 2., 3.], false) -nonpointerfree_immutable_2 = MyImmutable(2, Any[3., 4., 5.], true) +data["nonpointerfree_immutable_1"] = MyImmutable(1, [1., 2., 3.], false) +data["nonpointerfree_immutable_2"] = MyImmutable(2, Any[3., 4., 5.], true) struct MyImmutable2 x::Vector{Int} MyImmutable2() = new() end -nonpointerfree_immutable_3 = MyImmutable2() +data["nonpointerfree_immutable_3"] = MyImmutable2() struct Vague name::String end -vague = Vague("foo") +data["vague"] = Vague("foo") # Immutable with a union of BitsTypes struct BitsUnion x::Union{Int64,Float64} end -bitsunion = BitsUnion(5.0) +data["bitsunion"] = BitsUnion(5.0) # Immutable with a union of Types struct TypeUnionField x::Type{T} where T<:Union{Int64,Float64} end -typeunionfield = TypeUnionField(Int64) +data["typeunionfield"] = TypeUnionField(Int64) # Generic union type field struct GenericUnionField x::Union{Vector{Int},Int} end -genericunionfield = GenericUnionField(1) +data["genericunionfield"] = GenericUnionField(1) # Array references -arr_contained = [1, 2, 3] -arr_ref = typeof(arr_contained)[] +data["arr_contained"] = arr_contained = [1, 2, 3] +data["arr_ref"] = arr_ref = typeof(arr_contained)[] push!(arr_ref, arr_contained, arr_contained) # Object references mutable struct ObjRefType @@ -188,59 +175,59 @@ mutable struct ObjRefType ObjRefType() = new() ObjRefType(x, y) = new(x, y) end -ref1 = ObjRefType() -obj_ref = ObjRefType(ObjRefType(ref1, ref1), ObjRefType(ref1, ref1)) +ref1 =ObjRefType() +data["obj_ref"] = ObjRefType(ObjRefType(ref1, ref1), ObjRefType(ref1, ref1)) # Immutable that requires padding between elements in array struct PaddingTest x::Int64 y::Int8 end -padding_test = PaddingTest[PaddingTest(i, i) for i = 1:8] +data["padding_test"] = PaddingTest[PaddingTest(i, i) for i = 1:8] # Empty arrays of various types and sizes -empty_arr_1 = Int[] -empty_arr_2 = Matrix{Int}(undef, 56, 0) -empty_arr_3 = Any[] -empty_arr_4 = Matrix{Any}(undef, 0, 97) +data["empty_arr_1"] = Int[] +data["empty_arr_2"] = Matrix{Int}(undef, 56, 0) +data["empty_arr_3"] = Any[] +data["empty_arr_4"] = Matrix{Any}(undef, 0, 97) # Moderately big dataset (which will be mmapped) -bigdata = [1:1000000;] +data["bigdata"] = [1:1000000;] # BigFloats and BigInts -bigint = big(3) -bigfloat = big(3.2) -bigints = big(3).^(1:100) -bigfloats = big(3.2).^(1:100) +data["bigint"] = big(3) +data["bigfloat"] = big(3.2) +data["bigints"] = big(3).^(1:100) +data["bigfloats"] = big(3.2).^(1:100) struct BigFloatIntObject bigfloat::BigFloat bigint::BigInt end -bigfloatintobj = BigFloatIntObject(big(pi), big(typemax(UInt128))+1) +data["bigfloatintobj"] = BigFloatIntObject(big(pi), big(typemax(UInt128))+1) # None -none = Union{} -nonearr = Vector{Union{}}(undef, 5) +data["none"] = Union{} +data["nonearr"] = Vector{Union{}}(undef, 5) # nothing/Nothing -scalar_nothing = nothing -vector_nothing = Union{Int,Nothing}[1,nothing] +data["scalar_nothing"] = nothing +data["vector_nothing"] = Union{Int,Nothing}[1,nothing] # some data big enough to ensure that compression is used: -Abig = kron(Matrix{Float64}(I, 10, 10), rand(20, 20)) -Bbig = Any[i for i=1:3000] -Sbig = "A test string "^1000 +data["Abig"] = kron(Matrix{Float64}(I, 10, 10), rand(20, 20)) +data["Bbig"] = Any[i for i=1:3000] +data["Sbig"] = "A test string "^1000 # Bitstype type parameters mutable struct BitsParams{x}; end -bitsparamfloat = BitsParams{1.0}() -bitsparambool = BitsParams{true}() -bitsparamsymbol = BitsParams{:x}() -bitsparamint = BitsParams{1}() -bitsparamuint = BitsParams{0x01}() -bitsparamint16 = BitsParams{Int16(1)}() +data["bitsparamfloat"] = BitsParams{1.0}() +data["bitsparambool"] = BitsParams{true}() +data["bitsparamsymbol"] = BitsParams{:x}() +data["bitsparamint"] = BitsParams{1}() +data["bitsparamuint"] = BitsParams{0x01}() +data["bitsparamint16"] = BitsParams{Int16(1)}() # Tuple of tuples -tuple_of_tuples = (1, 2, (3, 4, [5, 6]), [7, 8]) +data["tuple_of_tuples"] = (1, 2, (3, 4, [5, 6]), [7, 8]) # Zero-dimensional arrays -zerod = Array{Int}(undef) +data["zerod"] = zerod = Array{Int}(undef) zerod[] = 1 -zerod_any = Array{Any}(undef) +data["zerod_any"] = zerod_any = Array{Any}(undef) zerod_any[] = 1.0+1.0im # Cyclic object @@ -250,11 +237,11 @@ mutable struct CyclicObject CyclicObject() = new() CyclicObject(x) = new(x) end -cyclicobject = CyclicObject() -cyclicobject.x = cyclicobject +data["cyclicobject"] = CyclicObject() +data["cyclicobject"].x = data["cyclicobject"] # SimpleVector -simplevec = Core.svec(1, 2, Int64, "foo") +data["simplevec"] = Core.svec(1, 2, Int64, "foo") iseq(x::Core.SimpleVector, y::Core.SimpleVector) = collect(x) == collect(y) # JLD issue #243 @@ -263,10 +250,10 @@ mutable struct NALikeType; end Base.:!=(::NALikeType, ::NALikeType) = NALikeType() Base.:!=(::NALikeType, ::Nothing) = NALikeType() Base.:!=(::Nothing, ::NALikeType) = NALikeType() -natyperef = Any[NALikeType(), NALikeType()] +data["natyperef"] = Any[NALikeType(), NALikeType()] # JLD2 issue #31 (lots of strings) -lotsastrings = fill("a", 100000) +data["lotsastrings"] = fill("a", 100000) iseq(x,y) = isequal(x,y) function iseq(x::Array{EmptyType}, y::Array{EmptyType}) @@ -297,28 +284,7 @@ iseq(x::Array{Union{}}, y::Array{Union{}}) = size(x) == size(y) iseq(x::BigFloatIntObject, y::BigFloatIntObject) = (x.bigfloat == y.bigfloat && x.bigint == y.bigint) iseq(x::T, y::T) where {T<:Union{EmptyType,EmptyImmutable,NALikeType}} = true iseq(x::BitsParams{T}, y::BitsParams{S}) where {T,S} = (T == S) -macro check(fid, sym) - ex = quote - let tmp - try - tmp = read($fid, $(string(sym))) - catch e - @show e - Base.show_backtrace(stdout, catch_backtrace()) - error("error reading ", $(string(sym))) - end - written_type = typeof($sym) - if typeof(tmp) != written_type - error("For ", $(string(sym)), ", read type $(typeof(tmp)) does not agree with written type $(written_type)") - end - if !iseq(tmp, $sym) - written = $sym - error("For ", $(string(sym)), ", read value $tmp does not agree with written value $written") - end - end - end - esc(ex) -end + # Test for equality of expressions, skipping line numbers checkexpr(a, b) = @assert a == b @@ -343,258 +309,91 @@ function checkexpr(a::Expr, b::Expr) end fn = joinpath(mktempdir(), "test.jld") -for ioty in [JLD2.MmapIO, IOStream], compress in [false, true] - @info("[$fn]: Using $(ioty), $(compress ? "compressed" : "uncompressed")") - @info(" Write time:") - fid = jldopen(fn, true, true, true, ioty, compress=compress) - @time begin - @write fid x - @write fid A - @write fid A3 - @write fid A4 - @write fid Aarray - @write fid basic_types - @write fid str - @write fid str_unicode - @write fid str_embedded_null - @write fid strings - @write fid empty_string - @write fid empty_string_array - @write fid empty_array_of_strings - @write fid tf - @write fid TF - @write fid AB - @write fid t - @write fid c - @write fid cint - @write fid C - @write fid emptyA - @write fid emptyB - @write fid ms - @write fid msempty - @write fid sym - @write fid syms - @write fid d - @write fid oidd - @write fid imdd - @write fid ex - @write fid T - @write fid Tarr - @write fid char - @write fid unicode_char - @write fid α - @write fid β - @write fid vv - @write fid version_info - @write fid rng - @write fid typevar - @write fid typevar_lb - @write fid typevar_ub - @write fid typevar_lb_ub - @write fid arr_undef - @write fid arr_undefs - @write fid ms_undef - @write fid bt - @write fid btarray - @write fid sa_asc - @write fid sa_utf8 - @write fid subarray - @write fid arr_empty_tuple - @write fid emptyimmutable - @write fid arr_emptyimmutable - @write fid empty_arr_emptyimmutable - @write fid emptytype - @write fid arr_emptytype - @write fid empty_arr_emptytype - @write fid uninitialized_arr_emptytype - @write fid emptyii - @write fid emptyit - @write fid emptyti - @write fid emptytt - @write fid emptyiiotherfield - @write fid emptyiitype - @write fid unicodestruct☺ - @write fid array_of_matrices - @write fid tup - @write fid empty_tup - @write fid nonpointerfree_immutable_1 - @write fid nonpointerfree_immutable_2 - @write fid nonpointerfree_immutable_3 - @write fid vague - @write fid bitsunion - @write fid typeunionfield - @write fid genericunionfield - @write fid arr_ref - @write fid obj_ref - @write fid padding_test - @write fid empty_arr_1 - @write fid empty_arr_2 - @write fid empty_arr_3 - @write fid empty_arr_4 - @write fid bigdata - @write fid bigfloat - @write fid bigint - @write fid bigfloats - @write fid bigints - @write fid bigfloatintobj - @write fid none - @write fid nonearr - @write fid scalar_nothing - @write fid vector_nothing - @write fid Abig - @write fid Bbig - @write fid Sbig - @write fid bitsparamint16 - @write fid bitsparamfloat - @write fid bitsparambool - @write fid bitsparamsymbol - @write fid bitsparamint - @write fid bitsparamuint - @write fid tuple_of_tuples - @write fid zerod - @write fid zerod_any - @write fid cyclicobject - @write fid simplevec - @write fid natyperef - @write fid lotsastrings - end - close(fid) +openfuns = [ + (writef=() -> jldopen(fn, "w"; iotype=JLD2.MmapIO, compress=false), + readf=() -> jldopen(fn, "r"; iotype=JLD2.MmapIO), + inf="Mmap, uncompressed"), + (writef=() -> jldopen(fn, "w"; iotype=IOStream, compress=false), + readf=() -> jldopen(fn, "r"; iotype=IOStream), + info="IOStream, uncompressed"), + (writef=() -> jldopen(fn, "w"; iotype=JLD2.MmapIO, compress=true), + readf=() -> jldopen(fn, "r"; iotype=JLD2.MmapIO), + inf="Mmap, compressed"), + (writef=() -> jldopen(fn, "w"; iotype=IOStream, compress=true), + readf=() -> jldopen(fn, "r"; iotype=IOStream), + inf="IOStream, compressed"), + (writef=() -> (global io_buffer=IOBuffer(); jldopen(io_buffer, "w"; compress=false)), + readf=() -> (seekstart(io_buffer); jldopen(io_buffer)), + inf="IOBuffer, uncompressed"), + (writef=() -> (global io_buffer=IOBuffer(); jldopen(io_buffer, "w"; compress=true)), + readf=() -> (seekstart(io_buffer); jldopen(io_buffer)), + inf="IOBuffer, compressed"), +] - @info(" Read time:") - fidr = jldopen(fn, false, false, false, ioty) - @time begin - @check fidr x - @check fidr A - @check fidr A3 - @check fidr A4 - @check fidr Aarray - @check fidr basic_types - @check fidr str - @check fidr str_unicode - @check fidr str_embedded_null - @check fidr strings - @check fidr empty_string - @check fidr empty_string_array - @check fidr empty_array_of_strings - @check fidr tf - @check fidr TF - @check fidr AB - @check fidr t - @check fidr c - @check fidr cint - @check fidr C - @check fidr emptyA - @check fidr emptyB - @check fidr ms - @check fidr msempty - @check fidr sym - @check fidr syms - @check fidr d - @check fidr oidd - @check fidr imdd - exr = read(fidr, "ex") # line numbers are stripped, don't expect equality - checkexpr(ex, exr) - @check fidr T - @check fidr Tarr - @check fidr char - @check fidr unicode_char - @check fidr α - @check fidr β - @check fidr vv - @check fidr version_info - @check fidr rng - @check fidr typevar - @check fidr typevar_lb - @check fidr typevar_ub - @check fidr typevar_lb_ub - - # Special cases for reading undefs - global arr_undef = read(fidr, "arr_undef") - if !isa(arr_undef, Array{Any, 1}) || length(arr_undef) != 1 || isassigned(arr_undef, 1) - error("For arr_undef, read value does not agree with written value") - end - global arr_undefs = read(fidr, "arr_undefs") - if !isa(arr_undefs, Array{Any, 2}) || length(arr_undefs) != 4 || any(map(i->isassigned(arr_undefs, i), 1:4)) - error("For arr_undefs, read value does not agree with written value") - end - global ms_undef = read(fidr, "ms_undef") - if !isa(ms_undef, MyStruct) || ms_undef.len != 0 || isdefined(ms_undef, :data) - error("For ms_undef, read value does not agree with written value") +function write_all(openfun, data) + f = openfun() + try + for key in keys(data) + write(f, key, data[key]) + end + finally + close(f) end + nothing +end - @check fidr bt - @check fidr btarray - @check fidr sa_asc - @check fidr sa_utf8 - @check fidr subarray - @check fidr arr_empty_tuple - @check fidr emptyimmutable - @check fidr arr_emptyimmutable - @check fidr empty_arr_emptyimmutable - @check fidr emptytype - @check fidr arr_emptytype - @check fidr empty_arr_emptytype - @check fidr uninitialized_arr_emptytype - @check fidr emptyii - @check fidr emptyit - @check fidr emptyti - @check fidr emptytt - @check fidr emptyiiotherfield - @check fidr emptyiitype - @check fidr unicodestruct☺ - @check fidr array_of_matrices - @check fidr tup - @check fidr empty_tup - @check fidr nonpointerfree_immutable_1 - @check fidr nonpointerfree_immutable_2 - @check fidr nonpointerfree_immutable_3 - vaguer = read(fidr, "vague") - @test typeof(vaguer) == typeof(vague) && vaguer.name == vague.name - @check fidr bitsunion - @check fidr typeunionfield - @check fidr genericunionfield - - arr = read(fidr, "arr_ref") - @test arr == arr_ref - @test arr[1] === arr[2] - - obj = read(fidr, "obj_ref") - @test obj.x.x === obj.x.y == obj.y.x === obj.y.y - @test obj.x !== obj.y - - @check fidr padding_test - @check fidr empty_arr_1 - @check fidr empty_arr_2 - @check fidr empty_arr_3 - @check fidr empty_arr_4 - @check fidr bigdata - @check fidr bigfloat - @check fidr bigint - @check fidr bigfloats - @check fidr bigints - @check fidr bigfloatintobj - @check fidr none - @check fidr nonearr - @check fidr scalar_nothing - @check fidr vector_nothing - @check fidr Abig - @check fidr Bbig - @check fidr Sbig - @check fidr bitsparamfloat - @check fidr bitsparambool - @check fidr bitsparamsymbol - @check fidr bitsparamint - @check fidr bitsparamuint - @check fidr tuple_of_tuples - @check fidr zerod - @check fidr zerod_any +function check_all(openfun, data) + f = openfun() + try + for key in keys(data) + value = data[key] + tmp = read(f, key) - obj = read(fidr, "cyclicobject") - @test obj.x === obj - - @check fidr simplevec - @check fidr natyperef - @check fidr lotsastrings + # Special cases for reading undefs + if key=="arr_undef" + !isa(tmp, Array{Any, 1}) || length(tmp) != 1 || isassigned(tmp, 1) && + error("For arr_undef, read value does not agree with written value") + elseif key=="arr_undefs" + !isa(tmp, Array{Any, 2}) || length(tmp) != 4 || any(map(i->isassigned(tmp, i), 1:4)) && + error("For arr_undefs, read value does not agree with written value") + elseif key=="ms_undef" + !isa(tmp, MyStruct) || tmp.len != 0 || isdefined(tmp, :data) && + error("For ms_undef, read value does not agree with written value") + elseif key=="ex" + checkexpr(value, tmp) + elseif key=="vague" + @test typeof(tmp) == typeof(value) && tmp.name == value.name + elseif key=="arr_ref" + @test tmp == arr_ref + @test tmp[1] === tmp[2] + elseif key=="obj_ref" + @test tmp.x.x === tmp.x.y == tmp.y.x === tmp.y.y + @test tmp.x !== tmp.y + elseif key=="cyclicobject" + @test tmp.x === tmp + else + written_type = typeof(value) + if typeof(tmp) != written_type + error("For $key, read type $(typeof(tmp)) does not agree with written type $(written_type)") + end + try + @test iseq(tmp, value) + catch + println("For $key, read value $tmp does not agree with written value $value") + end + end + end + finally + close(f) end - close(fidr) + nothing end + + +for (writef, readf, inf) in openfuns + @info(inf) + @info(" Write time:") + @time write_all(writef, data) + @info(" Read time:") + @time check_all(readf, data) +end \ No newline at end of file diff --git a/test/wrapped_io.jl b/test/wrapped_io.jl new file mode 100644 index 00000000..aa65c420 --- /dev/null +++ b/test/wrapped_io.jl @@ -0,0 +1,39 @@ +using Test, JLD2 + +@testset "Write to IOBuffer" begin + iobuf = IOBuffer() + f = jldopen(iobuf, "w") + f["a"] = 42 + close(f) + + seekstart(iobuf) + f = jldopen(iobuf, "r") + @test f["a"] == 42 + + seekstart(iobuf) + cd(mktempdir()) do + write("test.jld2", take!(iobuf)) + @test load("test.jld2", "a") == 42 + end +end + +@testset "Append to IOBuffer file" begin + iobuf = IOBuffer() + jldopen(iobuf, "w") do f + f["a"] = 42 + end + + seekstart(iobuf) + f = jldopen(iobuf, "r+") + @test f["a"] == 42 + f["b/c"] = "a string" + close(f) + + seekstart(iobuf) + cd(mktempdir()) do + write("test.jld2", take!(iobuf)) + @test load("test.jld2", "a") == 42 + @test load("test.jld2", "b/c") == "a string" + + end +end