Skip to content

Commit

Permalink
fixed Adbc.Column.fixed_size_binary/3 (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
cocoa-xu authored May 17, 2024
1 parent 23db3dd commit 6cd6d37
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 22 deletions.
29 changes: 22 additions & 7 deletions c_src/adbc_arrow_array.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,10 @@ static ERL_NIF_TERM fixed_size_binary_from_buffer(
values[i - element_offset] = value_to_nif(env, &value_buffer[element_bytes * i]);
}
} else {
int64_t index = 0;
for (int64_t i = element_offset; i < element_offset + element_count; i++) {
uint8_t vbyte = validity_bitmap[i / 8];
if (vbyte & (1 << (i % 8))) {
values[i - element_offset] = value_to_nif(env, &value_buffer[element_bytes * index]);
index++;
values[i - element_offset] = value_to_nif(env, &value_buffer[element_bytes * i]);
} else {
values[i - element_offset] = kAtomNil;
}
Expand Down Expand Up @@ -803,13 +801,30 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
// NANOARROW_TYPE_FIXED_SIZE_LIST
term_type = kAdbcColumnTypeFixedSizeList;
children_term = get_arrow_array_list_children(env, schema, values, offset, count, level);
} else if (format_len > 3 && strncmp("w:", format, 2) == 0) {
} else if (format_len >= 3 && strncmp("w:", format, 2) == 0) {
// NANOARROW_TYPE_FIXED_SIZE_BINARY
if (get_arrow_array_children_as_list(env, schema, values, offset, count, level, children, error) == 1) {
if (count == -1) count = values->length;
if (values->n_buffers != 2) {
snprintf(err_msg_buf, 255, "invalid n_buffers value for ArrowArray (format=%s), values->n_buffers != 2", schema->format);
error = erlang::nif::error(env, erlang::nif::make_binary(env, err_msg_buf));
return 1;
}
term_type = kAdbcColumnTypeFixedSizeBinary;
children_term = enif_make_list_from_array(env, children.data(), (unsigned)count);
size_t nbytes = 0;
for (size_t i = 2; i < format_len; i++) {
nbytes = nbytes * 10 + (format[i] - '0');
}
term_type = kAdbcColumnTypeFixedSizeBinary(nbytes);
current_term = fixed_size_binary_from_buffer(
env,
offset,
count,
nbytes,
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const uint8_t *)values->buffers[data_buffer_index],
[&](ErlNifEnv *env, const uint8_t * val) -> ERL_NIF_TERM {
return erlang::nif::make_binary(env, (const char *)val, nbytes);
}
);
} else if (format_len > 4 && (strncmp("+ud:", format, 4) == 0)) {
// NANOARROW_TYPE_DENSE_UNION
term_type = kAdbcColumnTypeDenseUnion;
Expand Down
18 changes: 12 additions & 6 deletions c_src/adbc_column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ int get_list_fixed_size_binary(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable,
val.data.data = bytes.data;
val.size_bytes = static_cast<int64_t>(bytes.size);
callback(val, false);
} if (nullable && enif_is_identical(head, kAtomNil)) {
} else if (nullable && enif_is_identical(head, kAtomNil)) {
callback(val, true);
} else {
return 1;
Expand All @@ -278,8 +278,8 @@ int get_list_fixed_size_binary(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable,
return 0;
}

int do_get_list_fixed_size_binary(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable, ArrowType nanoarrow_type, struct ArrowArray* array_out, struct ArrowSchema* schema_out, struct ArrowError* error_out) {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema_out, nanoarrow_type));
int do_get_list_fixed_size_binary(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable, ArrowType nanoarrow_type, int32_t fixed_size, struct ArrowArray* array_out, struct ArrowSchema* schema_out, struct ArrowError* error_out) {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(schema_out, nanoarrow_type, fixed_size));
NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(array_out, schema_out, error_out));
NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(array_out));
if (nullable) {
Expand Down Expand Up @@ -742,8 +742,6 @@ int adbc_column_to_adbc_field(ErlNifEnv *env, ERL_NIF_TERM adbc_buffer, struct A
ret = do_get_list_string(env, data_term, nullable, NANOARROW_TYPE_BINARY, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeLargeBinary)) {
ret = do_get_list_string(env, data_term, nullable, NANOARROW_TYPE_LARGE_BINARY, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeFixedSizeBinary)) {
ret = do_get_list_fixed_size_binary(env, data_term, nullable, NANOARROW_TYPE_FIXED_SIZE_BINARY, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeDate32)) {
ret = do_get_list_date(env, data_term, nullable, NANOARROW_TYPE_DATE32, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeDate64)) {
Expand Down Expand Up @@ -779,7 +777,15 @@ int adbc_column_to_adbc_field(ErlNifEnv *env, ERL_NIF_TERM adbc_buffer, struct A
const ERL_NIF_TERM *tuple = nullptr;
int arity;
if (enif_get_tuple(env, type_term, &arity, &tuple)) {
if (arity == 3) {
if (arity == 2) {
// NANOARROW_TYPE_FIXED_SIZE_BINARY
if (enif_is_identical(tuple[0], kAtomFixedSizeBinary)) {
int32_t fixed_size;
if (erlang::nif::get(env, tuple[1], &fixed_size)) {
ret = do_get_list_fixed_size_binary(env, data_term, nullable, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_size, array_out, schema_out, error_out);
}
}
} else if (arity == 3) {
// NANOARROW_TYPE_TIMESTAMP
if (enif_is_identical(tuple[0], kAtomTimestamp)) {
std::string timezone;
Expand Down
5 changes: 3 additions & 2 deletions c_src/adbc_consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ static ERL_NIF_TERM kAtomSeconds;
static ERL_NIF_TERM kAtomMilliseconds;
static ERL_NIF_TERM kAtomMicroseconds;
static ERL_NIF_TERM kAtomNanoseconds;
static ERL_NIF_TERM kAtomTimestamp;
static ERL_NIF_TERM kAtomDecimal;
static ERL_NIF_TERM kAtomTimestamp;
static ERL_NIF_TERM kAtomFixedSizeBinary;

static ERL_NIF_TERM kAtomCalendarKey;
static ERL_NIF_TERM kAtomCalendarISO;
Expand Down Expand Up @@ -61,7 +62,6 @@ static ERL_NIF_TERM kAdbcColumnTypeString;
static ERL_NIF_TERM kAdbcColumnTypeLargeString;
static ERL_NIF_TERM kAdbcColumnTypeBinary;
static ERL_NIF_TERM kAdbcColumnTypeLargeBinary;
static ERL_NIF_TERM kAdbcColumnTypeFixedSizeBinary;
static ERL_NIF_TERM kAdbcColumnTypeDenseUnion;
static ERL_NIF_TERM kAdbcColumnTypeSparseUnion;
static ERL_NIF_TERM kAdbcColumnTypeDate32;
Expand All @@ -78,6 +78,7 @@ static ERL_NIF_TERM kAdbcColumnTypeBool;
#define kAdbcColumnTypeDurationMicroseconds enif_make_tuple2(env, kAtomDuration, kAtomMicroseconds)
#define kAdbcColumnTypeDurationNanoseconds enif_make_tuple2(env, kAtomDuration, kAtomNanoseconds)
#define kAdbcColumnTypeDecimal(bitwidth, precision, scale) enif_make_tuple4(env, kAtomDecimal, enif_make_int(env, bitwidth), enif_make_int(env, precision), enif_make_int(env, scale))
#define kAdbcColumnTypeFixedSizeBinary(nbytes) enif_make_tuple2(env, kAtomFixedSizeBinary, enif_make_int64(env, nbytes))

// error codes
constexpr int kErrorBufferIsNotAMap = 1;
Expand Down
2 changes: 1 addition & 1 deletion c_src/adbc_nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ static int on_load(ErlNifEnv *env, void **, ERL_NIF_TERM) {
kAtomNanoseconds = erlang::nif::atom(env, "nanoseconds");
kAtomTimestamp = erlang::nif::atom(env, "timestamp");
kAtomDecimal = erlang::nif::atom(env, "decimal");
kAtomFixedSizeBinary = erlang::nif::atom(env, "fixed_size_binary");

kAtomCalendarKey = erlang::nif::atom(env, "calendar");
kAtomCalendarISO = erlang::nif::atom(env, "Elixir.Calendar.ISO");
Expand Down Expand Up @@ -839,7 +840,6 @@ static int on_load(ErlNifEnv *env, void **, ERL_NIF_TERM) {
kAdbcColumnTypeLargeString = erlang::nif::atom(env, "large_string");
kAdbcColumnTypeBinary = erlang::nif::atom(env, "binary");
kAdbcColumnTypeLargeBinary = erlang::nif::atom(env, "large_binary");
kAdbcColumnTypeFixedSizeBinary = erlang::nif::atom(env, "fixed_size_binary");
kAdbcColumnTypeDenseUnion = erlang::nif::atom(env, "dense_union");
kAdbcColumnTypeSparseUnion = erlang::nif::atom(env, "sparse_union");
kAdbcColumnTypeDate32 = erlang::nif::atom(env, "date32");
Expand Down
13 changes: 7 additions & 6 deletions lib/adbc_column.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ defmodule Adbc.Column do
| :large_string
| :binary
| :large_binary
| :fixed_size_binary
| {:fixed_size_binary, non_neg_integer()}
| :date32
| :date64
| time32_t
Expand Down Expand Up @@ -674,6 +674,7 @@ defmodule Adbc.Column do
## Arguments
* `data`: A list of binary values
* `nbytes`: The fixed size of the binary values in bytes
* `opts`: A keyword list of options
## Options
Expand All @@ -684,19 +685,19 @@ defmodule Adbc.Column do
## Examples
iex> Adbc.Buffer.fixed_size_binary([<<0>>, <<1>>, <<2>>])
iex> Adbc.Buffer.fixed_size_binary([<<0>>, <<1>>, <<2>>], 1)
%Adbc.Column{
name: nil,
type: :fixed_size_binary,
type: {:fixed_size_binary, 1},
nullable: false,
metadata: %{},
data: [<<0>>, <<1>>, <<2>>]
}
"""
@spec fixed_size_binary([binary()], Keyword.t()) :: %Adbc.Column{}
def fixed_size_binary(data, opts \\ []) when is_list(data) and is_list(opts) do
column(:fixed_size_binary, data, opts)
@spec fixed_size_binary([binary() | nil], non_neg_integer(), Keyword.t()) :: %Adbc.Column{}
def fixed_size_binary(data, nbytes, opts \\ []) when is_list(data) and is_list(opts) do
column({:fixed_size_binary, nbytes}, data, opts)
end

@doc """
Expand Down

0 comments on commit 6cd6d37

Please sign in to comment.