Skip to content

Commit

Permalink
Fixes build failure with apache-arrow 14 (#1609)
Browse files Browse the repository at this point in the history
Fixes #1608

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Nov 2, 2023
1 parent 5cc5a2c commit bef84cd
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 23 deletions.
14 changes: 6 additions & 8 deletions .github/workflows/build-test-graph.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ jobs:
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt update
sudo apt install -y libarrow-dev=12.0.1-1 \
libarrow-dataset-dev=12.0.1-1 \
libparquet-dev=12.0.1-1 \
libarrow-acero-dev=12.0.1-1 \
libarrow-flight-dev=12.0.1-1 \
libgandiva-dev=12.0.1-1 \
libparquet-dev=12.0.1-1 \
libarrow-cuda-dev=12.0.1-1
sudo apt install -y libarrow-dev=14.0.0-1 \
libarrow-dataset-dev=14.0.0-1 \
libarrow-acero-dev=14.0.0-1 \
libarrow-flight-dev=14.0.0-1 \
libgandiva-dev=14.0.0-1 \
libparquet-dev=14.0.0-1
# install clang-format
sudo curl -L https://github.com/muttleyxd/clang-tools-static-binaries/releases/download/master-1d7ec53d/clang-format-11_linux-amd64 --output /usr/bin/clang-format
Expand Down
13 changes: 6 additions & 7 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,12 @@ jobs:
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt update
sudo apt install -y libarrow-dev=11.0.0-1 \
libarrow-dataset-dev=11.0.0-1 \
libarrow-flight-dev=11.0.0-1 \
libgandiva-dev=11.0.0-1 \
libparquet-dev=11.0.0-1 \
libplasma-dev=11.0.0-1 \
libarrow-cuda-dev=11.0.0-1
sudo apt install -y libarrow-dev=14.0.0-1 \
libarrow-dataset-dev=14.0.0-1 \
libarrow-acero-dev=14.0.0-1 \
libarrow-flight-dev=14.0.0-1 \
libgandiva-dev=14.0.0-1 \
libparquet-dev=14.0.0-1
# install deps for java
sudo apt install -y default-jdk-headless maven
Expand Down
8 changes: 8 additions & 0 deletions modules/basic/ds/arrow_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ Status DeserializeRecordBatches(
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
RETURN_ON_ARROW_ERROR_AND_ASSIGN(
batch_reader, arrow::ipc::RecordBatchStreamReader::Open(&reader));
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(batch_reader->ReadAll(batches));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(*batches, batch_reader->ToRecordBatches());
#endif
return Status::OK();
}

Expand Down Expand Up @@ -567,7 +571,11 @@ Status DeserializeTable(const std::shared_ptr<arrow::Buffer> buffer,
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
RETURN_ON_ARROW_ERROR_AND_ASSIGN(
batch_reader, arrow::ipc::RecordBatchStreamReader::Open(&reader));
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(batch_reader->ReadAll(table));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(*table, batch_reader->ToTable());
#endif
return Status::OK();
}

Expand Down
6 changes: 5 additions & 1 deletion modules/fuse/adaptors/arrow_ipc/serializer_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ static void from_arrow_view(Client* client, std::string const& path,
std::shared_ptr<arrow::Table> table;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;

VINEYARD_CHECK_OK(reader->ReadAll(&batches));
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
CHECK_ARROW_ERROR(reader->ReadAll(&batches));
#else
CHECK_ARROW_ERROR_AND_ASSIGN(batches, reader->ToRecordBatches());
#endif

VINEYARD_CHECK_OK(RecordBatchesToTable(batches, &table));

Expand Down
18 changes: 11 additions & 7 deletions modules/graph/fragment/property_graph_utils_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,28 @@ boost::leaf::result<void> generate_local_id_list(
static_cast<size_t>(0), chunks.size(),
[pool, fid, &parser, &ovg2l_maps, &chunks,
&lid_list](size_t chunk_index) -> boost::leaf::result<void> {
ArrowBuilderType<VID_T> builder(pool);
arrow::BufferBuilder builder(pool);
auto chunk = std::dynamic_pointer_cast<ArrowArrayType<VID_T>>(
chunks[chunk_index]);
chunks[chunk_index].reset(); // release the used chunks
ARROW_OK_OR_RAISE(builder.Resize(chunk->length()));
ARROW_OK_OR_RAISE(builder.Resize(chunk->length() * sizeof(VID_T)));
builder.UnsafeAdvance(chunk->length() * sizeof(VID_T));

const VID_T* vec = chunk->raw_values();
VID_T* builder_data = reinterpret_cast<VID_T*>(builder.mutable_data());
for (int64_t i = 0; i < chunk->length(); ++i) {
VID_T gid = vec[i];
if (parser.GetFid(gid) == fid) {
builder[i] = parser.GenerateId(0, parser.GetLabelId(gid),
parser.GetOffset(gid));
builder_data[i] = parser.GenerateId(0, parser.GetLabelId(gid),
parser.GetOffset(gid));
} else {
builder[i] = ovg2l_maps[parser.GetLabelId(gid)].at(gid);
builder_data[i] = ovg2l_maps[parser.GetLabelId(gid)].at(gid);
}
}
ARROW_OK_OR_RAISE(builder.Advance(chunk->length()));
ARROW_OK_OR_RAISE(builder.Finish(&lid_list[chunk_index]));
std::shared_ptr<arrow::Buffer> buffer;
ARROW_OK_OR_RAISE(builder.Finish(&buffer));
lid_list[chunk_index] =
std::make_shared<ArrowArrayType<VID_T>>(chunk->length(), buffer);
return {};
},
concurrency);
Expand Down
29 changes: 29 additions & 0 deletions modules/graph/utils/table_shuffler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ Status TableAppender::Apply(
}
if (builder->GetField(0)->length() == builder->initial_capacity()) {
std::shared_ptr<arrow::RecordBatch> tmp_batch;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(builder->Flush(&tmp_batch));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(tmp_batch, builder->Flush());
#endif
batches_out.emplace_back(std::move(tmp_batch));
}
return Status::OK();
Expand All @@ -356,7 +360,11 @@ Status TableAppender::Flush(
// If there's no batch, we need an empty batch to make an empty table
if (builder->GetField(0)->length() != 0 || batches_out.size() == 0) {
std::shared_ptr<arrow::RecordBatch> batch;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(builder->Flush(&batch));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(batch, builder->Flush());
#endif
batches_out.emplace_back(std::move(batch));
}
return Status::OK();
Expand Down Expand Up @@ -633,13 +641,23 @@ void DeserializeSelectedRows(grape::OutArchive& arc,
int64_t row_num;
arc >> row_num;
std::unique_ptr<arrow::RecordBatchBuilder> builder;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(arrow::RecordBatchBuilder::Make(
schema, arrow::default_memory_pool(), row_num, &builder));
#else
ARROW_CHECK_OK_AND_ASSIGN(builder,
arrow::RecordBatchBuilder::Make(
schema, arrow::default_memory_pool(), row_num));
#endif
int col_num = builder->num_fields();
for (int col_id = 0; col_id != col_num; ++col_id) {
DeserializeSelectedItems(arc, row_num, builder->GetField(col_id));
}
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(builder->Flush(&batch_out));
#else
ARROW_CHECK_OK_AND_ASSIGN(batch_out, builder->Flush());
#endif
}

void SelectItems(std::shared_ptr<arrow::Array> array,
Expand Down Expand Up @@ -687,15 +705,26 @@ void SelectRows(std::shared_ptr<arrow::RecordBatch> record_batch_in,
return;
}
std::unique_ptr<arrow::RecordBatchBuilder> builder;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(arrow::RecordBatchBuilder::Make(record_batch_in->schema(),
arrow::default_memory_pool(),
row_num, &builder));
#else
ARROW_CHECK_OK_AND_ASSIGN(
builder,
arrow::RecordBatchBuilder::Make(record_batch_in->schema(),
arrow::default_memory_pool(), row_num));
#endif
int col_num = builder->num_fields();
for (int col_id = 0; col_id != col_num; ++col_id) {
SelectItems(record_batch_in->column(col_id), offset,
builder->GetField(col_id));
}
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(builder->Flush(&record_batch_out));
#else
ARROW_CHECK_OK_AND_ASSIGN(record_batch_out, builder->Flush());
#endif
}

boost::leaf::result<void> ShuffleTableByOffsetLists(
Expand Down

0 comments on commit bef84cd

Please sign in to comment.