Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1264: [C++] Add a writer option to align compression block with row group #2005

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ namespace orc {
};
ReaderMetrics* getDefaultReaderMetrics();

// Row group index of a single column in a stripe.
struct RowGroupIndex {
// Positions are represented as a two-dimensional array where the first
// dimension is row group index and the second dimension is the position
// list of the row group. The size of the second dimension should be equal
// among all row groups.
std::vector<std::vector<uint64_t>> positions;
};

/**
* Options for creating a Reader.
*/
Expand Down Expand Up @@ -605,6 +614,16 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

/**
* Get row group index of all selected columns in the specified stripe
* @param stripeIndex index of the stripe to be read for row group index.
* @param included index of selected columns to return (if not specified,
* all columns will be returned).
* @return map of row group index keyed by its column index.
*/
virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 0;
};

/**
Expand Down
13 changes: 13 additions & 0 deletions c++/include/orc/Writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ namespace orc {
* @return if not set, return default value which is 64 KB.
*/
uint64_t getMemoryBlockSize() const;

/**
* Set whether the compression block should be aligned to row group boundary.
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
* The boolean type may not be aligned to row group boundary due to the
* requirement of the Boolean RLE encoder to pack input bits into bytes
*/
WriterOptions& setAlignBlockBoundToRowGroup(bool alignBlockBoundToRowGroup);

/**
* Get if the compression block should be aligned to row group boundary.
* @return if not set, return default value which is false.
*/
bool getAlignBlockBoundToRowGroup() const;
};

class Writer {
Expand Down
111 changes: 111 additions & 0 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ namespace orc {
// PASS
}

void ColumnWriter::finishStreams() {
notNullEncoder->finishEncode();
}

class StructColumnWriter : public ColumnWriter {
public:
StructColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -283,6 +287,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::vector<std::unique_ptr<ColumnWriter>> children_;
};
Expand Down Expand Up @@ -416,6 +422,13 @@ namespace orc {
}
}

void StructColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

template <typename BatchType>
class IntegerColumnWriter : public ColumnWriter {
public:
Expand All @@ -433,6 +446,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> rleEncoder;

Expand Down Expand Up @@ -528,6 +543,12 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void IntegerColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder->finishEncode();
}

template <typename BatchType>
class ByteColumnWriter : public ColumnWriter {
public:
Expand All @@ -544,6 +565,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> byteRleEncoder_;
};
Expand Down Expand Up @@ -637,6 +660,12 @@ namespace orc {
byteRleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void ByteColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
byteRleEncoder_->finishEncode();
}

template <typename BatchType>
class BooleanColumnWriter : public ColumnWriter {
public:
Expand All @@ -654,6 +683,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
};
Expand Down Expand Up @@ -750,6 +781,12 @@ namespace orc {
rleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void BooleanColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
}

template <typename ValueType, typename BatchType>
class FloatingColumnWriter : public ColumnWriter {
public:
Expand All @@ -767,6 +804,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
bool isFloat_;
std::unique_ptr<AppendOnlyBufferedStream> dataStream_;
Expand Down Expand Up @@ -878,6 +917,12 @@ namespace orc {
dataStream_->recordPosition(rowIndexPosition.get());
}

template <typename ValueType, typename BatchType>
void FloatingColumnWriter<ValueType, BatchType>::finishStreams() {
ColumnWriter::finishStreams();
dataStream_->finishStream();
}

/**
* Implementation of increasing sorted string dictionary
*/
Expand Down Expand Up @@ -1041,6 +1086,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
/**
* dictionary related functions
Expand Down Expand Up @@ -1234,6 +1281,14 @@ namespace orc {
}
}

void StringColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
if (!useDictionary) {
directDataStream->finishStream();
directLengthEncoder->finishEncode();
}
}

bool StringColumnWriter::checkDictionaryKeyRatio() {
if (!doneDictionaryCheck) {
useDictionary = dictionary.size() <=
Expand Down Expand Up @@ -1583,6 +1638,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;

Expand Down Expand Up @@ -1723,6 +1780,12 @@ namespace orc {
nanoRleEncoder->recordPosition(rowIndexPosition.get());
}

void TimestampColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
secRleEncoder->finishEncode();
nanoRleEncoder->finishEncode();
}

class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
public:
DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -1792,6 +1855,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
RleVersion rleVersion;
uint64_t precision;
Expand Down Expand Up @@ -1910,6 +1975,12 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
valueStream->finishStream();
scaleEncoder->finishEncode();
}

class Decimal64ColumnWriterV2 : public ColumnWriter {
public:
Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory,
Expand All @@ -1926,6 +1997,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
uint64_t precision;
uint64_t scale;
Expand Down Expand Up @@ -2016,6 +2089,11 @@ namespace orc {
valueEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriterV2::finishStreams() {
ColumnWriter::finishStreams();
valueEncoder->finishEncode();
}

class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2131,6 +2209,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<RleEncoder> lengthEncoder_;
RleVersion rleVersion_;
Expand Down Expand Up @@ -2307,6 +2387,14 @@ namespace orc {
}
}

void ListColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (child_) {
child_->finishStreams();
}
}

class MapColumnWriter : public ColumnWriter {
public:
MapColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -2339,6 +2427,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ColumnWriter> keyWriter_;
std::unique_ptr<ColumnWriter> elemWriter_;
Expand Down Expand Up @@ -2557,6 +2647,17 @@ namespace orc {
}
}

void MapColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (keyWriter_) {
keyWriter_->finishStreams();
}
if (elemWriter_) {
elemWriter_->finishStreams();
}
}

class UnionColumnWriter : public ColumnWriter {
public:
UnionColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2589,6 +2690,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
std::vector<std::unique_ptr<ColumnWriter>> children_;
Expand Down Expand Up @@ -2760,6 +2863,14 @@ namespace orc {
}
}

void UnionColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

std::unique_ptr<ColumnWriter> buildWriter(const Type& type, const StreamsFactory& factory,
const WriterOptions& options) {
switch (static_cast<int64_t>(type.getKind())) {
Expand Down
12 changes: 12 additions & 0 deletions c++/src/ColumnWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ namespace orc {
*/
virtual void writeDictionary();

/**
* Finalize the encoding and compressing process. This function should be
* called after all data required for encoding has been added. It ensures
* that any remaining data is processed and the final state of the streams
* is set.
* Note: boolean type cannot cut off the current byte if it is not filled
* with 8 bits, otherwise Boolean RLE may incorrectly read the unfilled
* trailing bits. In this case, the last byte will be the head of the next
* compression block.
*/
virtual void finishStreams();

protected:
/**
* Utility function to translate ColumnStatistics into protobuf form and
Expand Down
17 changes: 9 additions & 8 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ namespace orc {
}
virtual void finishStream() override {
compressInternal();
BufferedOutputStream::finishStream();
}

protected:
Expand Down Expand Up @@ -982,13 +983,7 @@ namespace orc {
}

uint64_t BlockCompressionStream::flush() {
void* data;
int size;
if (!Next(&data, &size)) {
throw CompressionError("Failed to flush compression buffer.");
}
BufferedOutputStream::BackUp(outputSize - outputPosition);
bufferSize = outputSize = outputPosition = 0;
finishStream();
return BufferedOutputStream::flush();
}

Expand Down Expand Up @@ -1031,7 +1026,13 @@ namespace orc {
}

void BlockCompressionStream::finishStream() {
doBlockCompression();
void* data;
int size;
if (!Next(&data, &size)) {
throw CompressionError("Failed to flush compression buffer.");
}
BufferedOutputStream::BackUp(outputSize - outputPosition);
bufferSize = outputSize = outputPosition = 0;
}

/**
Expand Down
Loading
Loading