Skip to content

Commit

Permalink
feat: avoid allocating ADBC inputs and outputs twice (#97)
Browse files Browse the repository at this point in the history
* arrow_metadata_to_nif_term

* `res->private_data` should only alloc once

* check if stream ends in `adbc_arrow_array_stream_next`

* fix `get_arrow_array_map_children`

* fix: use strcmp/strncmp properly

* added a helper function

* return reference(s) in `Adbc.Column.data`

* implemented `Adbc.{Result,Column}.materialize/1`

* updated existing test cases

* added support for using data with a single reference inside

* added rwlock

* updated new test case

* a shallow copy + setting `release` to nullptr seems to be fine

* Update lib/adbc_connection.ex

Co-authored-by: José Valim <[email protected]>

* flatten top-level columns in result

* updated test cases

* minor changes to helper functions

* fix materialize/1

* fix segfault

* removed unused var

* updated make_env

* removed make_env

* Update lib/adbc_connection.ex

Co-authored-by: José Valim <[email protected]>

---------

Co-authored-by: José Valim <[email protected]>
  • Loading branch information
cocoa-xu and josevalim authored Jun 23, 2024
1 parent e11308f commit 10f74c8
Show file tree
Hide file tree
Showing 16 changed files with 1,810 additions and 570 deletions.
132 changes: 56 additions & 76 deletions c_src/adbc_arrow_array.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#ifndef ADBC_ARROW_ARRAY_HPP
#define ADBC_ARROW_ARRAY_HPP
#pragma once

#include <stdio.h>
Expand All @@ -9,9 +10,10 @@
#include <adbc.h>
#include <erl_nif.h>
#include "adbc_half_float.hpp"
#include "adbc_arrow_metadata.hpp"

static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool *end_of_series = nullptr, bool skip_dictionary_check = false);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool *end_of_series = nullptr, bool skip_dictionary_check = false);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool skip_dictionary_check = false);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool skip_dictionary_check = false);
static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error);
static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error);
static int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error);
Expand Down Expand Up @@ -227,11 +229,11 @@ int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema

int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error) {
if (schema->n_children > 0 && schema->children == nullptr) {
error = erlang::nif::error(env, "invalid ArrowSchema, schema->children == nullptr, however, schema->n_children > 0");
error = erlang::nif::error(env, "invalid ArrowSchema, schema->children == nullptr while schema->n_children > 0");
return 1;
}
if (values->n_children > 0 && values->children == nullptr) {
error = erlang::nif::error(env, "invalid ArrowArray, values->children == nullptr, however, values->n_children > 0");
error = erlang::nif::error(env, "invalid ArrowArray, values->children == nullptr while values->n_children > 0");
return 1;
}
if (values->n_children != schema->n_children) {
Expand All @@ -257,7 +259,7 @@ int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowAr
if (enif_is_identical(childrens[1], kAtomNil)) {
children[child_i] = kAtomNil;
} else {
children[child_i] = make_adbc_column(env, schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
children[child_i] = make_adbc_column(env, child_schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
}
}
}
Expand All @@ -275,10 +277,10 @@ int get_arrow_dictionary(ErlNifEnv *env,
std::vector<ERL_NIF_TERM> keys, values;
ERL_NIF_TERM index_type, index_metadata;
ERL_NIF_TERM value_type, value_metadata;
if (arrow_array_to_nif_term(env, index_schema, index_array, offset, count, level + 1, keys, index_type, index_metadata, error, nullptr, true) == 1) {
if (arrow_array_to_nif_term(env, index_schema, index_array, offset, count, level + 1, keys, index_type, index_metadata, error, true) == 1) {
return 1;
}
if (arrow_array_to_nif_term(env, value_schema, value_array, offset, count, level + 1, values, value_type, value_metadata, error, nullptr, false) == 1) {
if (arrow_array_to_nif_term(env, value_schema, value_array, offset, count, level + 1, values, value_type, value_metadata, error, false) == 1) {
return 1;
}

Expand Down Expand Up @@ -327,46 +329,50 @@ ERL_NIF_TERM get_arrow_array_map_children(ErlNifEnv *env, struct ArrowSchema * s

struct ArrowSchema * entries_schema = schema->children[0];
struct ArrowArray * entries_values = values->children[0];
if (strncmp("entries", entries_schema->name, 7) != 0) {
if (strcmp("entries", entries_schema->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (map), its single child is not named entries");
}
if (count == -1) count = entries_values->n_children;
if (entries_schema->n_children != 2) {
return erlang::nif::error(env, "invalid ArrowSchema (map), its entries n_children != 2");
}

struct ArrowSchema * key_schema, * value_schema;
struct ArrowArray * key_values, * value_values;
if (strcmp("key", entries_schema->children[0]->name) == 0 && strcmp("value", entries_schema->children[1]->name) == 0) {
key_schema = entries_schema->children[0];
key_values = entries_values->children[0];
value_schema = entries_schema->children[1];
value_values = entries_values->children[1];
} else if (strcmp("key", entries_schema->children[1]->name) == 0 && strcmp("value", entries_schema->children[0]->name) == 0) {
key_schema = entries_schema->children[1];
key_values = entries_values->children[1];
value_schema = entries_schema->children[0];
value_values = entries_values->children[0];
} else {
return erlang::nif::error(env, "invalid map entries, key or value or both are missing");
}

std::vector<ERL_NIF_TERM> nif_keys, nif_values;
bool failed = false;
for (int64_t child_i = offset; child_i < offset + count; child_i++) {
struct ArrowSchema * entry_schema = entries_schema->children[child_i];
struct ArrowArray * entry_values = entries_values->children[child_i];
if (strncmp("key", entry_schema->name, 3) == 0) {
if (get_arrow_array_children_as_list(env, entry_schema, entry_values, level + 1, nif_keys, error) == 1) {
failed = true;
break;
}
} else if (strncmp("value", entry_schema->name, 5) == 0 && entry_schema->n_children == 1) {
struct ArrowSchema * item_schema = entry_schema->children[0];
struct ArrowArray * item_values = entry_values->children[0];
if (get_arrow_array_children_as_list(env, item_schema, item_values, level + 1, nif_values, error) == 1) {
failed = true;
break;
}
} else {
failed = true;
}
ERL_NIF_TERM key_type, key_metadata;
ERL_NIF_TERM value_type, value_metadata;
if (arrow_array_to_nif_term(env, key_schema, key_values, offset, count, level + 1, nif_keys, key_type, key_metadata, error) == 1) {
return erlang::nif::error(env, "failed to get map keys");
}

if (!failed) {
if (nif_keys.size() != nif_values.size()) {
return erlang::nif::error(env, "number of keys and values doesn't match");
}

if (!enif_make_map_from_arrays(env, nif_keys.data(), nif_values.data(), (unsigned)nif_keys.size(), &map_out)) {
return erlang::nif::error(env, "map contains duplicated keys");
} else {
return map_out;
}
} else {
return erlang::nif::error(env, "invalid map");
if (arrow_array_to_nif_term(env, value_schema, value_values, offset, count, level + 1, nif_values, value_type, value_metadata, error) == 1) {
return erlang::nif::error(env, "failed to get map values");
}

ERL_NIF_TERM map_keys[] = {
kAtomKey,
kAtomValue
};
ERL_NIF_TERM map_values[] = {
make_adbc_column(env, key_schema, key_values, nif_keys[0], key_type, key_schema->flags & ARROW_FLAG_NULLABLE, key_metadata, nif_keys[1]),
make_adbc_column(env, value_schema, value_values, nif_values[0], value_type, value_schema->flags & ARROW_FLAG_NULLABLE, value_metadata, nif_values[1])
};

enif_make_map_from_arrays(env, map_keys, map_values, (unsigned)(sizeof(map_keys)/sizeof(map_keys[0])), &map_out);
return map_out;
}

ERL_NIF_TERM get_arrow_array_map_children(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level) {
Expand Down Expand Up @@ -511,10 +517,10 @@ ERL_NIF_TERM get_arrow_run_end_encoded(ErlNifEnv *env, struct ArrowSchema * sche
if (schema->children == nullptr || values->children == nullptr) {
return erlang::nif::error(env, "invalid ArrowArray (run_end_encoded), schema->children == nullptr || values->children == nullptr");
}
if (strncmp("run_ends", schema->children[0]->name, 8) != 0) {
if (strcmp("run_ends", schema->children[0]->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), its first child is not named run_ends");
}
if (strncmp("values", schema->children[1]->name, 6) != 0) {
if (strcmp("values", schema->children[1]->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), its second child is not named values");
}

Expand Down Expand Up @@ -574,7 +580,7 @@ ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSchema *
const uint8_t * bitmap_buffer = (const uint8_t *)values->buffers[bitmap_buffer_index];
struct ArrowSchema * items_schema = schema->children[0];
struct ArrowArray * items_values = values->children[0];
if (strncmp("item", items_schema->name, 4) != 0) {
if (strcmp("item", items_schema->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (list), its single child is not named item");
}

Expand Down Expand Up @@ -698,7 +704,7 @@ ERL_NIF_TERM get_arrow_array_list_view(ErlNifEnv *env, struct ArrowSchema * sche

struct ArrowSchema * items_schema = schema->children[0];
struct ArrowArray * items_values = values->children[0];
if (strncmp("item", items_schema->name, 4) != 0) {
if (strcmp("item", items_schema->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (list), its single child is not named item");
}
if (count == -1) count = values->length;
Expand Down Expand Up @@ -753,7 +759,7 @@ ERL_NIF_TERM get_arrow_array_list_view(ErlNifEnv *env, struct ArrowSchema * sche
return get_arrow_array_list_view(env, schema, values, 0, -1, level, list_type);
}

int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &term_type, ERL_NIF_TERM &arrow_metadata, ERL_NIF_TERM &error, bool *end_of_series, bool skip_dictionary_check) {
int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &term_type, ERL_NIF_TERM &arrow_metadata, ERL_NIF_TERM &error, bool skip_dictionary_check) {
if (schema == nullptr) {
error = erlang::nif::error(env, "invalid ArrowSchema (nullptr) when invoking next");
return 1;
Expand All @@ -769,39 +775,14 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
ERL_NIF_TERM current_term{}, children_term{};
size_t format_len = strlen(format);

if (level == 0 && format_len == 2 && strncmp("+s", format, 2) == 0 && values->length == 0) {
if (end_of_series) {
*end_of_series = true;
}
out_terms.clear();
out_terms.emplace_back(kAtomEndOfSeries);
return 0;
}

term_type = kAtomNil;
arrow_metadata = kAtomNil;
std::vector<ERL_NIF_TERM> children;

constexpr int64_t bitmap_buffer_index = 0;
int64_t data_buffer_index = 1;
int64_t offset_buffer_index = 2;

std::vector<ERL_NIF_TERM> metadata_keys, metadata_values;
if (schema->metadata) {
struct ArrowMetadataReader metadata_reader{};
struct ArrowStringView key;
struct ArrowStringView value;
if (ArrowMetadataReaderInit(&metadata_reader, schema->metadata) == NANOARROW_OK) {
while (ArrowMetadataReaderRead(&metadata_reader, &key, &value) == NANOARROW_OK) {
// printf("key: %.*s, value: %.*s\n", (int)key.size_bytes, key.data, (int)value.size_bytes, value.data);
metadata_keys.push_back(erlang::nif::make_binary(env, key.data, (size_t)key.size_bytes));
metadata_values.push_back(erlang::nif::make_binary(env, value.data, (size_t)value.size_bytes));
}
if (metadata_keys.size() > 0) {
enif_make_map_from_arrays(env, metadata_keys.data(), metadata_values.data(), (unsigned)metadata_keys.size(), &arrow_metadata);
}
}
}
NANOARROW_RETURN_NOT_OK(arrow_metadata_to_nif_term(env, schema->metadata, &arrow_metadata));

if (!skip_dictionary_check) {
if (schema->dictionary != nullptr && values->dictionary != nullptr) {
Expand Down Expand Up @@ -1703,7 +1684,7 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}

if (!format_processed) {
snprintf(err_msg_buf, 255, "not yet implemented for format: `%s`", schema->format);
snprintf(err_msg_buf, sizeof(err_msg_buf)/sizeof(err_msg_buf[0]), "not yet implemented for format: `%s`", schema->format);
error = erlang::nif::error(env, erlang::nif::make_binary(env, err_msg_buf));
return 1;
// printf("not implemented for format: `%s`\r\n", schema->format);
Expand All @@ -1716,7 +1697,6 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}

out_terms.clear();

if (is_struct) {
if (level > 0) {
out_terms.emplace_back(erlang::nif::make_binary(env, name));
Expand All @@ -1735,8 +1715,8 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
return 0;
}

int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &out_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool *end_of_series, bool skip_dictionary_check) {
return arrow_array_to_nif_term(env, schema, values, 0, -1, level, out_terms, out_type, metadata, error, end_of_series, skip_dictionary_check);
int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &out_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool skip_dictionary_check) {
return arrow_array_to_nif_term(env, schema, values, 0, -1, level, out_terms, out_type, metadata, error, skip_dictionary_check);
}

#endif // ADBC_ARROW_ARRAY_HPP
50 changes: 50 additions & 0 deletions c_src/adbc_arrow_array_stream_record.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#ifndef ADBC_ARROW_ARRAY_STREAM_RECORD_HPP
#define ADBC_ARROW_ARRAY_STREAM_RECORD_HPP
#pragma once

#include <adbc.h>

struct ArrowArrayStreamRecord {
struct ArrowSchema *schema = nullptr;
struct ArrowArray *values = nullptr;

/// Allocate memory for schema and values
/// @return 0 if success, 1 if failed
int allocate_schema_and_values() {
this->schema = (struct ArrowSchema *)enif_alloc(sizeof(struct ArrowSchema));
if (this->schema == nullptr) {
return 1;
}
memset(this->schema, 0, sizeof(struct ArrowSchema));

this->values = (struct ArrowArray *)enif_alloc(sizeof(struct ArrowArray));
if (this->values == nullptr) {
enif_free(this->schema);
this->schema = nullptr;
return 1;
}
memset(this->values, 0, sizeof(struct ArrowArray));

return 0;
}

void release_schema_and_values() {
if (this->schema) {
if (this->schema->release) {
this->schema->release(this->schema);
}
enif_free(this->schema);
this->schema = nullptr;
}

if (this->values) {
if (this->values->release) {
this->values->release(this->values);
}
enif_free(this->values);
this->values = nullptr;
}
}
};

#endif // ADBC_ARROW_ARRAY_STREAM_RECORD_HPP
32 changes: 32 additions & 0 deletions c_src/adbc_arrow_metadata.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef ADBC_ARROW_METADATA_HPP
#define ADBC_ARROW_METADATA_HPP
#pragma once

#include <stdio.h>
#include <vector>
#include <adbc.h>
#include <erl_nif.h>
#include "adbc_consts.h"
#include "nif_utils.hpp"

static int arrow_metadata_to_nif_term(ErlNifEnv *env, const char * metadata, ERL_NIF_TERM * out_metadata) {
std::vector<ERL_NIF_TERM> metadata_keys, metadata_values;
*out_metadata = kAtomNil;
if (metadata == nullptr) return NANOARROW_OK;

struct ArrowMetadataReader metadata_reader{};
struct ArrowStringView key;
struct ArrowStringView value;
NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata_reader, metadata));
while (ArrowMetadataReaderRead(&metadata_reader, &key, &value) == NANOARROW_OK) {
// printf("key: %.*s, value: %.*s\n", (int)key.size_bytes, key.data, (int)value.size_bytes, value.data);
metadata_keys.push_back(erlang::nif::make_binary(env, key.data, (size_t)key.size_bytes));
metadata_values.push_back(erlang::nif::make_binary(env, value.data, (size_t)value.size_bytes));
}
if (metadata_keys.size() > 0) {
enif_make_map_from_arrays(env, metadata_keys.data(), metadata_values.data(), (unsigned)metadata_keys.size(), out_metadata);
}
return NANOARROW_OK;
}

#endif // ADBC_ARROW_METADATA_HPP
Loading

0 comments on commit 10f74c8

Please sign in to comment.