diff --git a/src/Net.jl b/src/Net.jl index 18c327f..6a97a9c 100644 --- a/src/Net.jl +++ b/src/Net.jl @@ -222,7 +222,15 @@ function read_col(sock::ClickHouseSock, num_rows::VarUInt)::Column name = chread(sock, String) type_name = chread(sock, String) - data = read_col_data(sock, num_rows, parse_typestring(type_name)) + data = try + read_col_data(sock, num_rows, parse_typestring(type_name)) + catch e + if e isa ArgumentError + error("Error while reading col $(name) ($(type)): $(e.msg)") + else + rethrow(e) + end + end Column(name, type_name, data) end @@ -230,7 +238,15 @@ function chwrite(sock::ClickHouseSock, x::Column) chwrite(sock, x.name) chwrite(sock, x.type) - write_col_data(sock, x.data, parse_typestring(x.type)) + try + write_col_data(sock, x.data, parse_typestring(x.type)) + catch e + if e isa ArgumentError + error("Error while writing col $(x.name) ($(x.type)): $(e.msg)") + else + rethrow(e) + end + end end struct Block diff --git a/src/columns/Base.jl b/src/columns/Base.jl index eb0f24d..cbaa2a2 100644 --- a/src/columns/Base.jl +++ b/src/columns/Base.jl @@ -17,6 +17,13 @@ macro _primitive_columns(args...) return chwrite(sock, data) end end ) + push!(funcs, quote + function write_col_data(sock::ClickHouseSock, + data::AbstractVector, + ::Val{Symbol($arg_string)}) + return chwrite(sock, convert(Vector{$arg},data)) + end + end ) push!(funcs, quote deserialize(::Val{Symbol($arg_string)}) = $arg end ) end return esc(:($(funcs...),)) diff --git a/src/columns/Interfaces.jl b/src/columns/Interfaces.jl index 121c73c..3e8f4df 100644 --- a/src/columns/Interfaces.jl +++ b/src/columns/Interfaces.jl @@ -2,6 +2,9 @@ is_ch_type(::Val{N}) where {N} = false is_ch_type(str::String) = is_ch_type(Val(Symbol(str))) is_ch_type(s::Symbol) = is_ch_type(Val(s)) +can_be_nullable(::Val{N}) where {N} = true +can_be_nullable(s::Symbol) = can_be_nullable(Val(s)) + function read_col_data(sock::ClickHouseSock, num_rows::VarUInt, ::Val{N}, args...) where {N} throw( diff --git a/src/columns/LowCardinality.jl b/src/columns/LowCardinality.jl new file mode 100644 index 0000000..51eb5c9 --- /dev/null +++ b/src/columns/LowCardinality.jl @@ -0,0 +1,100 @@ +using UUIDs +is_ch_type(::Val{:LowCardinality}) = true +can_be_nullable(::Val{:LowCardinality}) = false + +# Need to read additional keys. +# Additional keys are stored before indexes as value N and N keys +# after them. +const lc_has_additional_keys_bit = 1 << 9 +# Need to update dictionary. +# It means that previous granule has different dictionary. +const lc_need_update_dictionary = 1 << 10 + +const lc_serialization_type = lc_has_additional_keys_bit | lc_need_update_dictionary + +const lc_index_int_types = [:UInt8, :UInt16, :UInt32, :UInt64] + + +function make_result(index::Vector{T}, keys, is_nullable) where {T} + + result = is_nullable ? + CategoricalVector{Union{T, Missing}}(undef, 0, levels = index) : + CategoricalVector{T}(undef, 0, levels = index) + result.refs = keys + return result +end + +function make_result(index::CategoricalVector{T}, keys, is_nullable) where {T} + + result = is_nullable ? + CategoricalVector{Union{T, Missing}}(undef, 0, levels = get.(index)) : + CategoricalVector{T}(undef, 0, levels = get.(index)) + result.refs = keys + return result +end + + +function read_col_data(sock::ClickHouseSock, num_rows::VarUInt, + ::Val{:LowCardinality}, nested::TypeAst) + + UInt64(num_rows) == 0 && return read_col_data(sock, num_rows, nested) + + is_nested_nullable = (nested.name == :Nullable) + notnullable_nested = is_nested_nullable ? nested.args[1] : nested + + ver = chread(sock, UInt64) # KeysSerializationVersion + ver == 1 || error("unsupported LC serialization version: $(ver)") + + serialization_type = chread(sock, UInt64) + int_type = serialization_type & 0xf + + index_size = chread(sock, UInt64) + index = read_col_data(sock, VarUInt(index_size), notnullable_nested) + is_nested_nullable && (index = index[2:end]) + + keys_size = chread(sock, UInt64) + keys = read_col_data(sock, VarUInt(keys_size), Val(lc_index_int_types[int_type + 1])) + + (nested.name != :Nullable) && (keys .= keys .+ 1) + + + return make_result(index, keys, nested.name == :Nullable) +end + + +function write_col_data(sock::ClickHouseSock, + data::AbstractCategoricalVector{T}, + ::Val{:LowCardinality}, nested::TypeAst) where {T} + + is_nested_nullable = (nested.name == :Nullable) + notnullable_nested = is_nested_nullable ? nested.args[1] : nested + + # KeysSerializationVersion. See ClickHouse docs. + chwrite(sock, Int64(1)) + isempty(data) && return + + int_type = floor(Int, log2(length(levels(data))) / 2) + + serialization_type = lc_serialization_type | int_type + chwrite(sock, serialization_type) + + index = is_nested_nullable ? + vcat(missing_replacement(T), levels(data)) : + levels(data) + + chwrite(sock, length(index)) + write_col_data(sock, index, notnullable_nested) + + chwrite(sock, length(data)) + + #In c++ indexes started from 0, in case of nullable nested 0 means null and + # it's ok, but if nested not nullable we must sub 1 from index + keys = is_nested_nullable ? data.refs : data.refs .- 1 + write_col_data(sock, keys, Val(lc_index_int_types[int_type + 1])) +end + +function write_col_data(sock::ClickHouseSock, + data::AbstractVector{T}, + v::Val{:LowCardinality}, nested::TypeAst) where {T} + write_col_data(sock, CategoricalVector{T}(data), v, nested) +end diff --git a/src/columns/Nullable.jl b/src/columns/Nullable.jl new file mode 100644 index 0000000..82fbd20 --- /dev/null +++ b/src/columns/Nullable.jl @@ -0,0 +1,80 @@ +using UUIDs +is_ch_type(::Val{:Nullable}) = true +can_be_nullable(::Val{:Nullable}) = false + +convert_to_missings(data::Vector{T}) where {T} = + convert(Vector{Union{T, Missing}}, data) + +convert_to_missings(data::CategoricalVector{T}) where {T} = + convert(CategoricalVector{Union{T, Missing}}, data) + +function read_col_data(sock::ClickHouseSock, num_rows::VarUInt, + ::Val{:Nullable}, nested::TypeAst) + + missing_map = chread(sock, Vector{UInt8}, num_rows) + unmissing = read_col_data(sock, num_rows, nested) + result = convert_to_missings(unmissing) + for i in 1:length(missing_map) + (missing_map[i] == 0x1) && (result[i] = missing) + end + return result +end + +missing_replacement(::Type{T}) where {T <: Number} = zero(T) +missing_replacement(::Type{UUID}) = UUID(0) +missing_replacement(::Type{Date}) = Date(1970) +missing_replacement(::Type{DateTime}) = unix2datetime(0) +missing_replacement(::Type{String}) = "" +missing_replacement(::Type{Union{T, Missing}}) where {T} = + missing_replacement(T) + + +uint8_ismissing(v)::UInt8 = ismissing(v) ? 1 : 0 + +function write_col_data(sock::ClickHouseSock, + data::AbstractVector{Union{Missing, T}}, + ::Val{:Nullable}, nested::TypeAst) where {T} + !can_be_nullable(nested.name) && + error("$(nested.name) cannot be inside Nullable") + missing_map = uint8_ismissing.(data) + chwrite(sock, missing_map) + unmissing = if !any(x -> x > 0, missing_map) + convert(Vector{T}, data) + else + replacement = missing_replacement(T) + [ismissing(v) ? replacement : v for v in data] + end + + write_col_data(sock, unmissing, nested) +end + +function write_col_data(sock::ClickHouseSock, + data::AbstractVector{T}, + ::Val{:Nullable}, nested::TypeAst) where {T} + !can_be_nullable(nested.name) && + error("$(nested.name) cannot be inside Nullable") + + missing_map = fill(Int8(0), 1:length(data)) + chwrite(sock, missing_map) + write_col_data(sock, data, nested) +end + +function write_col_data(sock::ClickHouseSock, + data::AbstractCategoricalVector{Union{Missing, T}}, + ::Val{:Nullable}, nested::TypeAst) where {T} + !can_be_nullable(nested.name) && + error("$(nested.name) cannot be inside Nullable") + missing_map = uint8_ismissing.(data) + chwrite(sock, missing_map) + unmissing = if !any(x -> x > 0, missing_map) + convert(CategoricalVector{T}, data) + else + tmp = deepcopy(data) + #replace missing (it's always 0 in refs of CategorialVector) + #with something valid + replace!(tmp.refs, 0=>1) + convert(CategoricalVector{T}, tmp) + end + + write_col_data(sock, unmissing, nested) +end \ No newline at end of file diff --git a/src/columns/Tuple.jl b/src/columns/Tuple.jl index f673197..da28a44 100644 --- a/src/columns/Tuple.jl +++ b/src/columns/Tuple.jl @@ -1,4 +1,5 @@ is_ch_type(::Val{:Tuple}) = true +can_be_nullable(::Val{Tuple}) = false function read_col_data(sock::ClickHouseSock, num_rows::VarUInt, ::Val{:Tuple}, args::TypeAst...) diff --git a/src/columns/columns.jl b/src/columns/columns.jl index d0dabf8..4018e16 100644 --- a/src/columns/columns.jl +++ b/src/columns/columns.jl @@ -8,3 +8,5 @@ include("Enum.jl") include("FixedString.jl") include("Tuple.jl") include("UUID.jl") +include("Nullable.jl") +include("LowCardinality.jl") diff --git a/test/columns_io.jl b/test/columns_io.jl index f0c2d85..1b65b80 100644 --- a/test/columns_io.jl +++ b/test/columns_io.jl @@ -38,6 +38,7 @@ using UUIDs @test r.args[2].name == :Tuple @test r.args[2].args[1].name == :Int32 @test r.args[2].args[2].name == :Float32 + end @testset "Int columns" begin @@ -50,6 +51,14 @@ end res = read_col(sock, VarUInt(nrows)) @test res == column + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = rand(Int32, nrows) + column = Column("test", "Int64", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test res == column + end @testset "String columns" begin @@ -182,4 +191,138 @@ end res = read_col(sock, VarUInt(nrows)) @test res == column +end + +@testset "Nullable columns" begin + + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = rand(Int64, nrows) + data = convert(Vector{Union{Int64, Missing}}, data) + data[rand(1:nrows, 20)] .= missing + column = Column("test", "Nullable(Int64)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a == b) + end + + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = rand(Float64, nrows) + data = convert(Vector{Union{Float64, Missing}}, data) + data[rand(1:nrows, 20)] .= missing + column = Column("test", "Nullable(Float64)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a ≈ b) + end + + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = string.(rand(Int64, nrows)) + data = convert(Vector{Union{String, Missing}}, data) + data[rand(1:nrows, 20)] .= missing + column = Column("test", "Nullable(String)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a == b) + end + + + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = CategoricalVector(rand(["a","b","c"], nrows)) + data = convert(CategoricalVector{Union{String, Missing}}, data) + data[rand(1:nrows, 20)] .= missing + + column = Column("test", "Nullable(Enum8('a'=1,'b'=3,'c'=10))", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a == b) + end + + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = Date.(rand(2010:2020, nrows), rand(1:12, nrows), rand(1:20, nrows)) + data = convert(Vector{Union{Date, Missing}}, data) + data[rand(1:nrows, 20)] .= missing + column = Column("test", "Nullable(Date)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a == b) + end + + sock = ClickHouseSock(PipeBuffer()) + nrows = 100 + data = DateTime.(rand(2010:2020, nrows), rand(1:12, nrows), rand(1:20, nrows), + rand(0:23, nrows), rand(0:59, nrows), rand(0:59, nrows)) + data = convert(Vector{Union{DateTime, Missing}}, data) + data[rand(1:nrows, 20)] .= missing + column = Column("test", "Nullable(DateTime)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a == b) + end +end + +@testset "LowCardinality columns" begin + + sock = ClickHouseSock(PipeBuffer()) + nrows = 10 + data = rand(1:10, nrows) + column = Column("test", "LowCardinality(Int64)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + + @test res.data == data + + sock = ClickHouseSock(PipeBuffer()) + data = rand(1:10, nrows) + data = convert(Vector{Union{Int64, Missing}}, data) + data[rand(1:nrows, 5)] .= missing + column = Column("test", "LowCardinality(Nullable(Int64))", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + + @test all(zip(data, res.data)) do t + (a, b) = t + return (ismissing(a) && ismissing(b)) || + (a == b) + end + + sock = ClickHouseSock(PipeBuffer()) + data = rand(["a", "b", "c"], nrows) + column = Column("test", "LowCardinality(String)", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + + @test res.data == data + + sock = ClickHouseSock(PipeBuffer()) + data = CategoricalVector(rand(["a","b","c"], nrows)) + + column = Column("test", "LowCardinality(Enum8('a'=1,'b'=3,'c'=10))", data) + chwrite(sock, column) + res = read_col(sock, VarUInt(nrows)) + @test res.data == data + end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 4e619f5..f0ec3bd 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,6 +7,10 @@ using UUIDs include("columns_io.jl") +function miss_or_equal(a, b) + return (ismissing(a) && ismissing(b)) || + (a==b) +end @test begin sock = IOBuffer([0xC2, 0x0A]) |> ClickHouseSock ClickHouse.chread(sock, ClickHouse.VarUInt) == ClickHouse.VarUInt(0x542) @@ -201,7 +205,12 @@ end foo_fixed FixedString(5), ddd Date, enu Enum8('a' = 1, 'c' = 3, 'foobar' = 44, 'd' = 9), - uuid UUID + uuid UUID, + nn Nullable(Int64), + ns Nullable(String), + ne Nullable(Enum16('a' = 1, 'b' = 2)), + las LowCardinality(String), + lan LowCardinality(Nullable(String)) ) ENGINE = Memory """) @@ -219,7 +228,18 @@ end :foo_fixed => String["aaaaa", "bbb", "cc"], :ddd => Date[td, td, td], :enu => ["a", "c", "foobar"], - :uuid => [uuid4(), uuid4(), uuid4()] + :uuid => [ + UUID("c187abfa-31c1-4131-a33e-556f23f7aa67"), + UUID("f9a7e2b9-dc22-4ca6-b4fe-83ba551ea3bb"), + UUID("dc986a81-9f1d-4d96-b618-6e8d034285c1") + ], + :nn => [10, missing, 20], + :ns => [missing, "sst", "aaa"], + :ne => CategoricalVector(["a", "b", missing]), + :las => ["a", "b", "a"], + :lan => [missing, "b", "a"], + + ) # Single block inserts. @@ -238,6 +258,23 @@ end @test proj[:foo_fixed] == String["aaaaa", "bbb ", "cc ", "aaaaa"] @test proj[:ddd] == Date[td, td, td, td] @test proj[:uuid] == vcat(data[:uuid], data[:uuid][1:1]) + @test all( + miss_or_equal.(proj[:nn], [10, missing, 20 , 10]) + ) + + @test all( + miss_or_equal.(proj[:ns], [missing, "sst", "aaa" , missing]) + ) + + @test all( + miss_or_equal.(proj[:ne], ["a", "b", missing, "a"]) + ) + @test proj[:las] == ["a", "b", "a", "a"] + + @test all( + miss_or_equal.(proj[:lan], [missing, "b", "a", missing]) + ) + # SELECT Tuple -> Dict