Skip to content

Commit

Permalink
LowCardinality and Nullable (#14)
Browse files Browse the repository at this point in the history
* Nullable

* LowCardinality

* Update src/columns/LowCardinality.jl

Co-authored-by: Joel Höner <[email protected]>

* add comment

* rename nest args

Co-authored-by: Joel Höner <[email protected]>
waralex and athre0z authored Aug 31, 2020

Verified

This commit was signed with the committer’s verified signature.
snyk-bot Snyk bot
1 parent 4b40a61 commit 8b62e5b
Showing 9 changed files with 393 additions and 4 deletions.
20 changes: 18 additions & 2 deletions src/Net.jl
Original file line number Diff line number Diff line change
@@ -222,15 +222,31 @@ function read_col(sock::ClickHouseSock, num_rows::VarUInt)::Column
name = chread(sock, String)
type_name = chread(sock, String)

data = read_col_data(sock, num_rows, parse_typestring(type_name))
data = try
read_col_data(sock, num_rows, parse_typestring(type_name))
catch e
if e isa ArgumentError
error("Error while reading col $(name) ($(type)): $(e.msg)")
else
rethrow(e)
end
end
Column(name, type_name, data)
end

function chwrite(sock::ClickHouseSock, x::Column)
chwrite(sock, x.name)
chwrite(sock, x.type)

write_col_data(sock, x.data, parse_typestring(x.type))
try
write_col_data(sock, x.data, parse_typestring(x.type))
catch e
if e isa ArgumentError
error("Error while writing col $(x.name) ($(x.type)): $(e.msg)")
else
rethrow(e)
end
end
end

struct Block
7 changes: 7 additions & 0 deletions src/columns/Base.jl
Original file line number Diff line number Diff line change
@@ -17,6 +17,13 @@ macro _primitive_columns(args...)
return chwrite(sock, data)
end
end )
push!(funcs, quote
function write_col_data(sock::ClickHouseSock,
data::AbstractVector,
::Val{Symbol($arg_string)})
return chwrite(sock, convert(Vector{$arg},data))
end
end )
push!(funcs, quote deserialize(::Val{Symbol($arg_string)}) = $arg end )
end
return esc(:($(funcs...),))
3 changes: 3 additions & 0 deletions src/columns/Interfaces.jl
Original file line number Diff line number Diff line change
@@ -2,6 +2,9 @@ is_ch_type(::Val{N}) where {N} = false
is_ch_type(str::String) = is_ch_type(Val(Symbol(str)))
is_ch_type(s::Symbol) = is_ch_type(Val(s))

can_be_nullable(::Val{N}) where {N} = true
can_be_nullable(s::Symbol) = can_be_nullable(Val(s))

function read_col_data(sock::ClickHouseSock,
num_rows::VarUInt, ::Val{N}, args...) where {N}
throw(
100 changes: 100 additions & 0 deletions src/columns/LowCardinality.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using UUIDs
is_ch_type(::Val{:LowCardinality}) = true
can_be_nullable(::Val{:LowCardinality}) = false

# Need to read additional keys.
# Additional keys are stored before indexes as value N and N keys
# after them.
const lc_has_additional_keys_bit = 1 << 9
# Need to update dictionary.
# It means that previous granule has different dictionary.
const lc_need_update_dictionary = 1 << 10

const lc_serialization_type = lc_has_additional_keys_bit | lc_need_update_dictionary

const lc_index_int_types = [:UInt8, :UInt16, :UInt32, :UInt64]


function make_result(index::Vector{T}, keys, is_nullable) where {T}

result = is_nullable ?
CategoricalVector{Union{T, Missing}}(undef, 0, levels = index) :
CategoricalVector{T}(undef, 0, levels = index)
result.refs = keys
return result
end

function make_result(index::CategoricalVector{T}, keys, is_nullable) where {T}

result = is_nullable ?
CategoricalVector{Union{T, Missing}}(undef, 0, levels = get.(index)) :
CategoricalVector{T}(undef, 0, levels = get.(index))
result.refs = keys
return result
end


function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:LowCardinality}, nested::TypeAst)

UInt64(num_rows) == 0 && return read_col_data(sock, num_rows, nested)

is_nested_nullable = (nested.name == :Nullable)
notnullable_nested = is_nested_nullable ? nested.args[1] : nested

ver = chread(sock, UInt64) # KeysSerializationVersion
ver == 1 || error("unsupported LC serialization version: $(ver)")

serialization_type = chread(sock, UInt64)
int_type = serialization_type & 0xf

index_size = chread(sock, UInt64)
index = read_col_data(sock, VarUInt(index_size), notnullable_nested)
is_nested_nullable && (index = index[2:end])

keys_size = chread(sock, UInt64)
keys = read_col_data(sock, VarUInt(keys_size), Val(lc_index_int_types[int_type + 1]))

(nested.name != :Nullable) && (keys .= keys .+ 1)


return make_result(index, keys, nested.name == :Nullable)
end


function write_col_data(sock::ClickHouseSock,
data::AbstractCategoricalVector{T},
::Val{:LowCardinality}, nested::TypeAst) where {T}

is_nested_nullable = (nested.name == :Nullable)
notnullable_nested = is_nested_nullable ? nested.args[1] : nested

# KeysSerializationVersion. See ClickHouse docs.
chwrite(sock, Int64(1))
isempty(data) && return

int_type = floor(Int, log2(length(levels(data))) / 2)

serialization_type = lc_serialization_type | int_type
chwrite(sock, serialization_type)

index = is_nested_nullable ?
vcat(missing_replacement(T), levels(data)) :
levels(data)

chwrite(sock, length(index))
write_col_data(sock, index, notnullable_nested)

chwrite(sock, length(data))

#In c++ indexes started from 0, in case of nullable nested 0 means null and
# it's ok, but if nested not nullable we must sub 1 from index
keys = is_nested_nullable ? data.refs : data.refs .- 1
write_col_data(sock, keys, Val(lc_index_int_types[int_type + 1]))
end

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{T},
v::Val{:LowCardinality}, nested::TypeAst) where {T}
write_col_data(sock, CategoricalVector{T}(data), v, nested)
end
80 changes: 80 additions & 0 deletions src/columns/Nullable.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using UUIDs
is_ch_type(::Val{:Nullable}) = true
can_be_nullable(::Val{:Nullable}) = false

convert_to_missings(data::Vector{T}) where {T} =
convert(Vector{Union{T, Missing}}, data)

convert_to_missings(data::CategoricalVector{T}) where {T} =
convert(CategoricalVector{Union{T, Missing}}, data)

function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:Nullable}, nested::TypeAst)

missing_map = chread(sock, Vector{UInt8}, num_rows)
unmissing = read_col_data(sock, num_rows, nested)
result = convert_to_missings(unmissing)
for i in 1:length(missing_map)
(missing_map[i] == 0x1) && (result[i] = missing)
end
return result
end

missing_replacement(::Type{T}) where {T <: Number} = zero(T)
missing_replacement(::Type{UUID}) = UUID(0)
missing_replacement(::Type{Date}) = Date(1970)
missing_replacement(::Type{DateTime}) = unix2datetime(0)
missing_replacement(::Type{String}) = ""
missing_replacement(::Type{Union{T, Missing}}) where {T} =
missing_replacement(T)


uint8_ismissing(v)::UInt8 = ismissing(v) ? 1 : 0

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{Union{Missing, T}},
::Val{:Nullable}, nested::TypeAst) where {T}
!can_be_nullable(nested.name) &&
error("$(nested.name) cannot be inside Nullable")
missing_map = uint8_ismissing.(data)
chwrite(sock, missing_map)
unmissing = if !any(x -> x > 0, missing_map)
convert(Vector{T}, data)
else
replacement = missing_replacement(T)
[ismissing(v) ? replacement : v for v in data]
end

write_col_data(sock, unmissing, nested)
end

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{T},
::Val{:Nullable}, nested::TypeAst) where {T}
!can_be_nullable(nested.name) &&
error("$(nested.name) cannot be inside Nullable")

missing_map = fill(Int8(0), 1:length(data))
chwrite(sock, missing_map)
write_col_data(sock, data, nested)
end

function write_col_data(sock::ClickHouseSock,
data::AbstractCategoricalVector{Union{Missing, T}},
::Val{:Nullable}, nested::TypeAst) where {T}
!can_be_nullable(nested.name) &&
error("$(nested.name) cannot be inside Nullable")
missing_map = uint8_ismissing.(data)
chwrite(sock, missing_map)
unmissing = if !any(x -> x > 0, missing_map)
convert(CategoricalVector{T}, data)
else
tmp = deepcopy(data)
#replace missing (it's always 0 in refs of CategorialVector)
#with something valid
replace!(tmp.refs, 0=>1)
convert(CategoricalVector{T}, tmp)
end

write_col_data(sock, unmissing, nested)
end
1 change: 1 addition & 0 deletions src/columns/Tuple.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
is_ch_type(::Val{:Tuple}) = true
can_be_nullable(::Val{Tuple}) = false

function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:Tuple}, args::TypeAst...)
2 changes: 2 additions & 0 deletions src/columns/columns.jl
Original file line number Diff line number Diff line change
@@ -8,3 +8,5 @@ include("Enum.jl")
include("FixedString.jl")
include("Tuple.jl")
include("UUID.jl")
include("Nullable.jl")
include("LowCardinality.jl")
Loading

0 comments on commit 8b62e5b

Please sign in to comment.