Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LowCardinality and Nullable #14

Merged
merged 5 commits into from
Aug 31, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/Net.jl
Original file line number Diff line number Diff line change
@@ -222,15 +222,31 @@ function read_col(sock::ClickHouseSock, num_rows::VarUInt)::Column
name = chread(sock, String)
type_name = chread(sock, String)

data = read_col_data(sock, num_rows, parse_typestring(type_name))
data = try
read_col_data(sock, num_rows, parse_typestring(type_name))
catch e
if e isa ArgumentError
error("Error while reading col $(name) ($(type)): $(e.msg)")
else
rethrow(e)
end
end
Column(name, type_name, data)
end

function chwrite(sock::ClickHouseSock, x::Column)
chwrite(sock, x.name)
chwrite(sock, x.type)

write_col_data(sock, x.data, parse_typestring(x.type))
try
write_col_data(sock, x.data, parse_typestring(x.type))
catch e
if e isa ArgumentError
error("Error while writing col $(x.name) ($(x.type)): $(e.msg)")
else
rethrow(e)
end
end
end

struct Block
7 changes: 7 additions & 0 deletions src/columns/Base.jl
Original file line number Diff line number Diff line change
@@ -17,6 +17,13 @@ macro _primitive_columns(args...)
return chwrite(sock, data)
end
end )
push!(funcs, quote
function write_col_data(sock::ClickHouseSock,
data::AbstractVector,
::Val{Symbol($arg_string)})
return chwrite(sock, convert(Vector{$arg},data))
end
end )
push!(funcs, quote deserialize(::Val{Symbol($arg_string)}) = $arg end )
end
return esc(:($(funcs...),))
3 changes: 3 additions & 0 deletions src/columns/Interfaces.jl
Original file line number Diff line number Diff line change
@@ -2,6 +2,9 @@ is_ch_type(::Val{N}) where {N} = false
is_ch_type(str::String) = is_ch_type(Val(Symbol(str)))
is_ch_type(s::Symbol) = is_ch_type(Val(s))

can_be_nullable(::Val{N}) where {N} = true
can_be_nullable(s::Symbol) = can_be_nullable(Val(s))

function read_col_data(sock::ClickHouseSock,
num_rows::VarUInt, ::Val{N}, args...) where {N}
throw(
99 changes: 99 additions & 0 deletions src/columns/LowCardinality.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using UUIDs
is_ch_type(::Val{:LowCardinality}) = true
can_be_nullable(::Val{:LowCardinality}) = false

# Need to read additional keys.
# Additional keys are stored before indexes as value N and N keys
# after them.
const lc_has_additional_keys_bit = 1 << 9
# Need to update dictionary.
# It means that previous granule has different dictionary.
const lc_need_update_dictionary = 1 << 10

const lc_serialization_type = lc_has_additional_keys_bit | lc_need_update_dictionary

const lc_index_int_types = [:UInt8, :UInt16, :UInt32, :UInt64]


function make_result(index::Vector{T}, keys, is_nullable) where {T}

result = is_nullable ?
CategoricalVector{Union{T, Missing}}(undef, 0, levels = index) :
CategoricalVector{T}(undef, 0, levels = index)
result.refs = keys
return result
end

function make_result(index::CategoricalVector{T}, keys, is_nullable) where {T}

result = is_nullable ?
CategoricalVector{Union{T, Missing}}(undef, 0, levels = get.(index)) :
CategoricalVector{T}(undef, 0, levels = get.(index))
result.refs = keys
return result
end


function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:LowCardinality}, nest::TypeAst)
athre0z marked this conversation as resolved.
Show resolved Hide resolved

UInt64(num_rows) == 0 && return read_col_data(sock, num_rows, nest)

is_nest_nullable = (nest.name == :Nullable)
notnullable_nest = is_nest_nullable ? nest.args[1] : nest

chread(sock, UInt64) #KeysSerializationVersion
waralex marked this conversation as resolved.
Show resolved Hide resolved

serialization_type = chread(sock, UInt64)
int_type = serialization_type & 0xf

index_size = chread(sock, UInt64)
index = read_col_data(sock, VarUInt(index_size), notnullable_nest)
is_nest_nullable && (index = index[2:end])

keys_size = chread(sock, UInt64)
keys = read_col_data(sock, VarUInt(keys_size), Val(lc_index_int_types[int_type + 1]))

(nest.name != :Nullable) && (keys .= keys .+ 1)


return make_result(index, keys, nest.name == :Nullable)
end


function write_col_data(sock::ClickHouseSock,
data::AbstractCategoricalVector{T},
::Val{:LowCardinality}, nest::TypeAst) where {T}

is_nest_nullable = (nest.name == :Nullable)
notnullable_nest = is_nest_nullable ? nest.args[1] : nest

# KeysSerializationVersion. See ClickHouse docs.
chwrite(sock, Int64(1))
isempty(data) && return

int_type = floor(Int, log2(length(levels(data))) / 2)

serialization_type = lc_serialization_type | int_type
chwrite(sock, serialization_type)

index = is_nest_nullable ?
vcat(missing_replacement(T), levels(data)) :
levels(data)

chwrite(sock, length(index))
write_col_data(sock, index, notnullable_nest)

chwrite(sock, length(data))

#In c++ indexes started from 0, in case of nullable nest 0 means null and
# it's ok, but if nest not nullable we must sub 1 from index
keys = is_nest_nullable ? data.refs : data.refs .- 1
write_col_data(sock, keys, Val(lc_index_int_types[int_type + 1]))
end

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{T},
v::Val{:LowCardinality}, nest::TypeAst) where {T}
write_col_data(sock, CategoricalVector{T}(data), v, nest)
end
78 changes: 78 additions & 0 deletions src/columns/Nullable.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using UUIDs
is_ch_type(::Val{:Nullable}) = true
can_be_nullable(::Val{:Nullable}) = false

convert_to_missings(data::Vector{T}) where {T} =
convert(Vector{Union{T, Missing}}, data)

convert_to_missings(data::CategoricalVector{T}) where {T} =
convert(CategoricalVector{Union{T, Missing}}, data)

function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:Nullable}, nest::TypeAst)

missing_map = chread(sock, Vector{UInt8}, num_rows)
unmissing = read_col_data(sock, num_rows, nest)
result = convert_to_missings(unmissing)
for i in 1:length(missing_map)
(missing_map[i] == 0x1) && (result[i] = missing)
end
return result
end

missing_replacement(::Type{T}) where {T <: Number} = zero(T)
missing_replacement(::Type{UUID}) = UUID(0)
missing_replacement(::Type{Date}) = Date(1970)
missing_replacement(::Type{DateTime}) = unix2datetime(0)
missing_replacement(::Type{String}) = ""
missing_replacement(::Type{Union{T, Missing}}) where {T} =
missing_replacement(T)


uint8_ismissing(v)::UInt8 = ismissing(v) ? 1 : 0

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{Union{Missing, T}},
::Val{:Nullable}, nest::TypeAst) where {T}
!can_be_nullable(nest.name) &&
error("$(nest.name) cannot be inside Nullable")
missing_map = uint8_ismissing.(data)
chwrite(sock, missing_map)
unmissing = if !any(x -> x > 0, missing_map)
convert(Vector{T}, data)
else
replacement = missing_replacement(T)
[ismissing(v) ? replacement : v for v in data]
end

write_col_data(sock, unmissing, nest)
end

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{T},
::Val{:Nullable}, nest::TypeAst) where {T}
!can_be_nullable(nest.name) &&
error("$(nest.name) cannot be inside Nullable")

missing_map = fill(Int8(0), 1:length(data))
chwrite(sock, missing_map)
write_col_data(sock, data, nest)
end

function write_col_data(sock::ClickHouseSock,
data::AbstractCategoricalVector{Union{Missing, T}},
::Val{:Nullable}, nest::TypeAst) where {T}
!can_be_nullable(nest.name) &&
error("$(nest.name) cannot be inside Nullable")
missing_map = uint8_ismissing.(data)
chwrite(sock, missing_map)
unmissing = if !any(x -> x > 0, missing_map)
convert(CategoricalVector{T}, data)
else
tmp = deepcopy(data)
replace!(tmp.refs, 0=>1)
athre0z marked this conversation as resolved.
Show resolved Hide resolved
convert(CategoricalVector{T}, tmp)
end

write_col_data(sock, unmissing, nest)
end
1 change: 1 addition & 0 deletions src/columns/Tuple.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
is_ch_type(::Val{:Tuple}) = true
can_be_nullable(::Val{Tuple}) = false

function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:Tuple}, args::TypeAst...)
2 changes: 2 additions & 0 deletions src/columns/columns.jl
Original file line number Diff line number Diff line change
@@ -8,3 +8,5 @@ include("Enum.jl")
include("FixedString.jl")
include("Tuple.jl")
include("UUID.jl")
include("Nullable.jl")
include("LowCardinality.jl")
143 changes: 143 additions & 0 deletions test/columns_io.jl
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ using UUIDs
@test r.args[2].name == :Tuple
@test r.args[2].args[1].name == :Int32
@test r.args[2].args[2].name == :Float32

end

@testset "Int columns" begin
@@ -50,6 +51,14 @@ end
res = read_col(sock, VarUInt(nrows))
@test res == column

sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = rand(Int32, nrows)
column = Column("test", "Int64", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test res == column

end

@testset "String columns" begin
@@ -182,4 +191,138 @@ end
res = read_col(sock, VarUInt(nrows))
@test res == column

end

@testset "Nullable columns" begin

sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = rand(Int64, nrows)
data = convert(Vector{Union{Int64, Missing}}, data)
data[rand(1:nrows, 20)] .= missing
column = Column("test", "Nullable(Int64)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a == b)
end

sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = rand(Float64, nrows)
data = convert(Vector{Union{Float64, Missing}}, data)
data[rand(1:nrows, 20)] .= missing
column = Column("test", "Nullable(Float64)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a b)
end

sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = string.(rand(Int64, nrows))
data = convert(Vector{Union{String, Missing}}, data)
data[rand(1:nrows, 20)] .= missing
column = Column("test", "Nullable(String)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a == b)
end


sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = CategoricalVector(rand(["a","b","c"], nrows))
data = convert(CategoricalVector{Union{String, Missing}}, data)
data[rand(1:nrows, 20)] .= missing

column = Column("test", "Nullable(Enum8('a'=1,'b'=3,'c'=10))", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))

@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a == b)
end

sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = Date.(rand(2010:2020, nrows), rand(1:12, nrows), rand(1:20, nrows))
data = convert(Vector{Union{Date, Missing}}, data)
data[rand(1:nrows, 20)] .= missing
column = Column("test", "Nullable(Date)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a == b)
end

sock = ClickHouseSock(PipeBuffer())
nrows = 100
data = DateTime.(rand(2010:2020, nrows), rand(1:12, nrows), rand(1:20, nrows),
rand(0:23, nrows), rand(0:59, nrows), rand(0:59, nrows))
data = convert(Vector{Union{DateTime, Missing}}, data)
data[rand(1:nrows, 20)] .= missing
column = Column("test", "Nullable(DateTime)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a == b)
end
end

@testset "LowCardinality columns" begin

sock = ClickHouseSock(PipeBuffer())
nrows = 10
data = rand(1:10, nrows)
column = Column("test", "LowCardinality(Int64)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))

@test res.data == data

sock = ClickHouseSock(PipeBuffer())
data = rand(1:10, nrows)
data = convert(Vector{Union{Int64, Missing}}, data)
data[rand(1:nrows, 5)] .= missing
column = Column("test", "LowCardinality(Nullable(Int64))", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))

@test all(zip(data, res.data)) do t
(a, b) = t
return (ismissing(a) && ismissing(b)) ||
(a == b)
end

sock = ClickHouseSock(PipeBuffer())
data = rand(["a", "b", "c"], nrows)
column = Column("test", "LowCardinality(String)", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))

@test res.data == data

sock = ClickHouseSock(PipeBuffer())
data = CategoricalVector(rand(["a","b","c"], nrows))

column = Column("test", "LowCardinality(Enum8('a'=1,'b'=3,'c'=10))", data)
chwrite(sock, column)
res = read_col(sock, VarUInt(nrows))
@test res.data == data

end
41 changes: 39 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -7,6 +7,10 @@ using UUIDs

include("columns_io.jl")

function miss_or_equal(a, b)
return (ismissing(a) && ismissing(b)) ||
(a==b)
end
@test begin
sock = IOBuffer([0xC2, 0x0A]) |> ClickHouseSock
ClickHouse.chread(sock, ClickHouse.VarUInt) == ClickHouse.VarUInt(0x542)
@@ -201,7 +205,12 @@ end
foo_fixed FixedString(5),
ddd Date,
enu Enum8('a' = 1, 'c' = 3, 'foobar' = 44, 'd' = 9),
uuid UUID
uuid UUID,
nn Nullable(Int64),
ns Nullable(String),
ne Nullable(Enum16('a' = 1, 'b' = 2)),
las LowCardinality(String),
lan LowCardinality(Nullable(String))
)
ENGINE = Memory
""")
@@ -219,7 +228,18 @@ end
:foo_fixed => String["aaaaa", "bbb", "cc"],
:ddd => Date[td, td, td],
:enu => ["a", "c", "foobar"],
:uuid => [uuid4(), uuid4(), uuid4()]
:uuid => [
UUID("c187abfa-31c1-4131-a33e-556f23f7aa67"),
UUID("f9a7e2b9-dc22-4ca6-b4fe-83ba551ea3bb"),
UUID("dc986a81-9f1d-4d96-b618-6e8d034285c1")
],
:nn => [10, missing, 20],
:ns => [missing, "sst", "aaa"],
:ne => CategoricalVector(["a", "b", missing]),
:las => ["a", "b", "a"],
:lan => [missing, "b", "a"],


)

# Single block inserts.
@@ -238,6 +258,23 @@ end
@test proj[:foo_fixed] == String["aaaaa", "bbb ", "cc ", "aaaaa"]
@test proj[:ddd] == Date[td, td, td, td]
@test proj[:uuid] == vcat(data[:uuid], data[:uuid][1:1])
@test all(
miss_or_equal.(proj[:nn], [10, missing, 20 , 10])
)

@test all(
miss_or_equal.(proj[:ns], [missing, "sst", "aaa" , missing])
)

@test all(
miss_or_equal.(proj[:ne], ["a", "b", missing, "a"])
)
@test proj[:las] == ["a", "b", "a", "a"]

@test all(
miss_or_equal.(proj[:lan], [missing, "b", "a", missing])
)


# SELECT Tuple -> Dict