Skip to content

Commit

Permalink
improve how validity bitmaps are handled
Browse files Browse the repository at this point in the history
  • Loading branch information
cocoa-xu committed Apr 11, 2024
1 parent c538b55 commit 7da2cb9
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 48 deletions.
156 changes: 109 additions & 47 deletions c_src/adbc_nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,20 @@ template <typename T, typename M> static ERL_NIF_TERM values_from_buffer(ErlNifE
return enif_make_list_from_array(env, values.data(), (unsigned)values.size());
}

template <typename M> static ERL_NIF_TERM strings_from_buffer(
template <typename M, typename OffsetT> static ERL_NIF_TERM strings_from_buffer(
ErlNifEnv *env,
int64_t length,
const uint8_t * validity_bitmap,
const int32_t * offsets_buffer,
const OffsetT * offsets_buffer,
const uint8_t* value_buffer,
const M& value_to_nif) {
int64_t start_index = validity_bitmap == nullptr ? 1 : 2;

int32_t offset = offsets_buffer[0];
OffsetT offset = offsets_buffer[0];
std::vector<ERL_NIF_TERM> values(length);
if (validity_bitmap == nullptr) {
for (int64_t i = 0; i < length; i++) {
int32_t end_index = offsets_buffer[i + 1];
OffsetT end_index = offsets_buffer[i + 1];
size_t nbytes = end_index - offset;
if (nbytes == 0) {
values[i] = erlang::nif::atom(env, "nil");
Expand All @@ -82,7 +82,7 @@ template <typename M> static ERL_NIF_TERM strings_from_buffer(
} else {
for (int64_t i = 0; i < length; i++) {
uint8_t vbyte = validity_bitmap[i / 8];
int32_t end_index = offsets_buffer[i + 1];
OffsetT end_index = offsets_buffer[i + 1];
size_t nbytes = end_index - offset;
if (nbytes > 0 && vbyte & (1 << (i & 0b11111111))) {
values[i] = value_to_nif(env, value_buffer, offset, nbytes);
Expand Down Expand Up @@ -113,9 +113,18 @@ static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema *
return 1;
}

constexpr int64_t bitmap_buffer_index = 0;
const uint8_t * bitmap_buffer = (const uint8_t *)values->buffers[bitmap_buffer_index];
children.resize(values->n_children);
if (values->n_children > 0) {
for (int64_t child_i = 0; child_i < values->n_children; child_i++) {
if (bitmap_buffer && values->null_count > 0) {
uint8_t vbyte = bitmap_buffer[child_i / 8];
if (!(vbyte & (1 << (child_i & 0b11111111)))) {
children[child_i] = erlang::nif::atom(env, "nil");
continue;
}
}
struct ArrowSchema * child_schema = schema->children[child_i];
struct ArrowArray * child_values = values->children[child_i];
std::vector<ERL_NIF_TERM> childrens;
Expand Down Expand Up @@ -250,6 +259,8 @@ static ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSc
return erlang::nif::error(env, "invalid ArrowArray (list), values->n_children != 1");
}

constexpr int64_t bitmap_buffer_index = 0;
const uint8_t * bitmap_buffer = (const uint8_t *)values->buffers[bitmap_buffer_index];
struct ArrowSchema * items_schema = schema->children[0];
struct ArrowArray * items_values = values->children[0];
if (strncmp("item", items_schema->name, 4) != 0) {
Expand All @@ -261,6 +272,13 @@ static ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSc
children.resize(items_values->n_children);
bool failed = false;
for (int64_t child_i = 0; child_i < items_values->n_children; child_i++) {
if (bitmap_buffer && values->null_count > 0) {
uint8_t vbyte = bitmap_buffer[child_i / 8];
if (!(vbyte & (1 << (child_i & 0b11111111)))) {
children[child_i] = erlang::nif::atom(env, "nil");
continue;
}
}
struct ArrowSchema * item_schema = items_schema->children[child_i];
struct ArrowArray * item_values = items_values->children[child_i];

Expand Down Expand Up @@ -307,141 +325,169 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
ERL_NIF_TERM current_term{}, children_term{};
std::vector<ERL_NIF_TERM> children;

bool has_validity_bitmap = values->null_count != 0 && values->null_count != -1;
const uint8_t * bitmap_buffer = nullptr;
if (has_validity_bitmap && values->n_buffers >= 2) {
bitmap_buffer = (const uint8_t *)values->buffers[0];
}
const int32_t * offsets_buffer = nullptr;
if (values->n_buffers >= 3) {
offsets_buffer = (const int32_t *)values->buffers[1];
}
constexpr int64_t bitmap_buffer_index = 0;
int64_t data_buffer_index = 1;
int64_t offset_buffer_index = 2;

bool is_struct = false;
size_t format_len = strlen(format);
bool format_processed = true;
if (format_len == 1) {
if (format[0] == 'l') {
// NANOARROW_TYPE_INT64
using value_type = int64_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_int64
);
} else if (format[0] == 'c') {
// NANOARROW_TYPE_INT8
using value_type = int8_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_int64
);
} else if (format[0] == 's') {
// NANOARROW_TYPE_INT16
using value_type = int16_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_int64
);
} else if (format[0] == 'i') {
// NANOARROW_TYPE_INT32
using value_type = int32_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_int64
);
} else if (format[0] == 'L') {
// NANOARROW_TYPE_UINT64
using value_type = uint64_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_uint64
);
} else if (format[0] == 'C') {
// NANOARROW_TYPE_UINT8
using value_type = uint8_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_uint64
);
} else if (format[0] == 'S') {
// NANOARROW_TYPE_UINT16
using value_type = uint16_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_uint64
);
} else if (format[0] == 'I') {
// NANOARROW_TYPE_UINT32
using value_type = uint32_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_uint64
);
} else if (format[0] == 'f') {
// NANOARROW_TYPE_FLOAT
using value_type = float;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_double
);
} else if (format[0] == 'g') {
// NANOARROW_TYPE_DOUBLE
using value_type = double;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_double
);
} else if (format[0] == 'b') {
// NANOARROW_TYPE_BOOL
using value_type = bool;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
[](ErlNifEnv *env, bool val) -> ERL_NIF_TERM {
if (val) {
return erlang::nif::atom(env, "true");
}
return erlang::nif::atom(env, "false");
}
);
} else if (format[0] == 'u' || format[0] == 'U' || format[0] == 'z' || format[0] == 'Z') {
} else if (format[0] == 'u' || format[0] == 'z') {
// NANOARROW_TYPE_BINARY
// NANOARROW_TYPE_STRING
offset_buffer_index = 1;
data_buffer_index = 2;
int64_t length = values->length;
current_term = strings_from_buffer(
env,
values->length,
bitmap_buffer,
offsets_buffer,
(const uint8_t *)values->buffers[2],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const int32_t *)values->buffers[offset_buffer_index],
(const uint8_t *)values->buffers[data_buffer_index],
[](ErlNifEnv *env, const uint8_t * string_buffers, int32_t offset, size_t nbytes) -> ERL_NIF_TERM {
return erlang::nif::make_binary(env, (const char *)(string_buffers + offset), nbytes);
}
);
} else if (format[0] == 'U' || format[0] == 'Z') {
// NANOARROW_TYPE_LARGE_STRING
// NANOARROW_TYPE_LARGE_BINARY
offset_buffer_index = 1;
data_buffer_index = 2;
int64_t length = values->length;
current_term = strings_from_buffer(
env,
values->length,
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const int64_t *)values->buffers[offset_buffer_index],
(const uint8_t *)values->buffers[data_buffer_index],
[](ErlNifEnv *env, const uint8_t * string_buffers, int64_t offset, size_t nbytes) -> ERL_NIF_TERM {
return erlang::nif::make_binary(env, (const char *)(string_buffers + offset), nbytes);
}
);
} else {
format_processed = false;
}
} else if (format_len == 2) {
if (strncmp("+s", format, 2) == 0) {
// NANOARROW_TYPE_STRUCT
// only handle and return children if this is a struct
is_struct = true;

Expand All @@ -457,27 +503,38 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
children_term = erlang::nif::atom(env, "end_of_series");
}
} else if (strncmp("+m", format, 2) == 0) {
// NANOARROW_TYPE_MAP
children_term = get_arrow_array_map_children(env, schema, values, level);
} else if (strncmp("+l", format, 2) == 0 || strncmp("+L", format, 2) == 0) {
} else if (strncmp("+l", format, 2) == 0) {
// NANOARROW_TYPE_LIST
children_term = get_arrow_array_list_children(env, schema, values, level);
} else if (strncmp("+L", format, 2) == 0) {
// NANOARROW_TYPE_LARGE_LIST
children_term = get_arrow_array_list_children(env, schema, values, level);
} else {
format_processed = false;
}
} else if (format_len >= 3) {
if (strncmp("+w:", format, 3) == 0) {
// NANOARROW_TYPE_FIXED_SIZE_LIST
children_term = get_arrow_array_list_children(env, schema, values, level);
} else if (format_len > 3 && strncmp("w:", format, 2) == 0) {
// NANOARROW_TYPE_FIXED_SIZE_BINARY
if (get_arrow_array_children_as_list(env, schema, values, level, children, error) == 1) {
return 1;
}
children_term = enif_make_list_from_array(env, children.data(), (unsigned)schema->n_children);
} else if (format_len > 4 && (strncmp("+ud:", format, 4) == 0 || strncmp("+us:", format, 4) == 0)) {
// NANOARROW_TYPE_DENSE_UNION
// NANOARROW_TYPE_SPARSE_UNION
children_term = get_arrow_array_union_children(env, schema, values, level);
// date
} else if (strncmp("td", format, 2) == 0) {
char unit = format[2];

if (unit == 'D' || unit == 'm') {
// NANOARROW_TYPE_DATE32
// NANOARROW_TYPE_DATE64
ERL_NIF_TERM date_module = erlang::nif::atom(env, "Elixir.Date");
ERL_NIF_TERM calendar_iso = erlang::nif::atom(env, "Elixir.Calendar.ISO");
ERL_NIF_TERM keys[] = {
Expand Down Expand Up @@ -513,17 +570,17 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
convert
);
} else {
using value_type = uint64_t;
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
convert
);
}
Expand All @@ -536,18 +593,22 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
uint8_t us_precision;
switch (format[2]) {
case 's': // seconds
// NANOARROW_TYPE_TIME32
unit = 1000000000;
us_precision = 0;
break;
case 'm': // milliseconds
// NANOARROW_TYPE_TIME32
unit = 1000000;
us_precision = 3;
break;
case 'u': // microseconds
// NANOARROW_TYPE_TIME64
unit = 1000;
us_precision = 6;
break;
case 'n': // nanoseconds
// NANOARROW_TYPE_TIME64
unit = 1;
us_precision = 6;
break;
Expand All @@ -573,8 +634,8 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
[unit, us_precision, time_module, calendar_iso, &keys](ErlNifEnv *env, uint64_t val) -> ERL_NIF_TERM {
// Elixir only supports microsecond precision
uint64_t us = val * unit / 1000;
Expand All @@ -598,6 +659,7 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}
// timestamp
} else if (strncmp("ts", format, 2) == 0) {
// NANOARROW_TYPE_TIMESTAMP
uint64_t unit;
uint8_t us_precision;
switch (format[2]) {
Expand Down Expand Up @@ -648,8 +710,8 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
current_term = values_from_buffer(
env,
values->length,
bitmap_buffer,
(const value_type *)values->buffers[1],
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
[unit, us_precision, naive_dt_module, calendar_iso, &keys](ErlNifEnv *env, uint64_t val) -> ERL_NIF_TERM {
// Elixir only supports microsecond precision
uint64_t us = val * unit / 1000;
Expand Down
Loading

0 comments on commit 7da2cb9

Please sign in to comment.