Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1264: [C++] Add a writer option to align compression block with row group #2005

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
11 changes: 11 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ namespace orc {
};
ReaderMetrics* getDefaultReaderMetrics();

struct RowGroupPositions {
uint64_t columnId;
std::vector<int32_t> positions;
};

/**
* Options for creating a Reader.
*/
Expand Down Expand Up @@ -657,6 +662,12 @@ namespace orc {
* @param rowNumber the next row the reader should return
*/
virtual void seekToRow(uint64_t rowNumber) = 0;

/**
* Get the row group positions of the specified column in the current stripe.
* @return the position entries for the specified columns.
*/
virtual std::vector<RowGroupPositions> getPositionEntries(int columnId) = 0;
};
} // namespace orc

Expand Down
13 changes: 13 additions & 0 deletions c++/include/orc/Writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ namespace orc {
* @return if not set, return default value which is 64 KB.
*/
uint64_t getMemoryBlockSize() const;

/**
* Set whether the compression block should be aligned to row group boundary.
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
* The boolean type may not be aligned to row group boundary due to the
* requirement of the Boolean RLE encoder to pack input bits into bytes
*/
WriterOptions& setAlignBlockBoundToRowGroup(bool alignBlockBoundToRowGroup);

/**
* Get if the compression block should be aligned to row group boundary.
* @return if not set, return default value which is false.
*/
bool getAlignBlockBoundToRowGroup() const;
};

class Writer {
Expand Down
111 changes: 111 additions & 0 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ namespace orc {
// PASS
}

void ColumnWriter::finishStreams() {
notNullEncoder->finishEncode();
}

class StructColumnWriter : public ColumnWriter {
public:
StructColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -283,6 +287,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::vector<std::unique_ptr<ColumnWriter>> children_;
};
Expand Down Expand Up @@ -416,6 +422,13 @@ namespace orc {
}
}

void StructColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

template <typename BatchType>
class IntegerColumnWriter : public ColumnWriter {
public:
Expand All @@ -433,6 +446,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> rleEncoder;

Expand Down Expand Up @@ -528,6 +543,12 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void IntegerColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder->finishEncode();
}

template <typename BatchType>
class ByteColumnWriter : public ColumnWriter {
public:
Expand All @@ -544,6 +565,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> byteRleEncoder_;
};
Expand Down Expand Up @@ -637,6 +660,12 @@ namespace orc {
byteRleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void ByteColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
byteRleEncoder_->finishEncode();
}

template <typename BatchType>
class BooleanColumnWriter : public ColumnWriter {
public:
Expand All @@ -654,6 +683,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
};
Expand Down Expand Up @@ -750,6 +781,12 @@ namespace orc {
rleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void BooleanColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
}

template <typename ValueType, typename BatchType>
class FloatingColumnWriter : public ColumnWriter {
public:
Expand All @@ -767,6 +804,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
bool isFloat_;
std::unique_ptr<AppendOnlyBufferedStream> dataStream_;
Expand Down Expand Up @@ -878,6 +917,12 @@ namespace orc {
dataStream_->recordPosition(rowIndexPosition.get());
}

template <typename ValueType, typename BatchType>
void FloatingColumnWriter<ValueType, BatchType>::finishStreams() {
ColumnWriter::finishStreams();
dataStream_->finishStream();
}

/**
* Implementation of increasing sorted string dictionary
*/
Expand Down Expand Up @@ -1029,6 +1074,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
/**
* dictionary related functions
Expand Down Expand Up @@ -1222,6 +1269,14 @@ namespace orc {
}
}

void StringColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
if (!useDictionary) {
directDataStream->finishStream();
directLengthEncoder->finishEncode();
}
}

bool StringColumnWriter::checkDictionaryKeyRatio() {
if (!doneDictionaryCheck) {
useDictionary = dictionary.size() <=
Expand Down Expand Up @@ -1571,6 +1626,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;

Expand Down Expand Up @@ -1711,6 +1768,12 @@ namespace orc {
nanoRleEncoder->recordPosition(rowIndexPosition.get());
}

void TimestampColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
secRleEncoder->finishEncode();
nanoRleEncoder->finishEncode();
}

class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
public:
DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -1780,6 +1843,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
RleVersion rleVersion;
uint64_t precision;
Expand Down Expand Up @@ -1898,6 +1963,12 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
valueStream->finishStream();
scaleEncoder->finishEncode();
}

class Decimal64ColumnWriterV2 : public ColumnWriter {
public:
Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory,
Expand All @@ -1914,6 +1985,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
uint64_t precision;
uint64_t scale;
Expand Down Expand Up @@ -2004,6 +2077,11 @@ namespace orc {
valueEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriterV2::finishStreams() {
ColumnWriter::finishStreams();
valueEncoder->finishEncode();
}

class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2119,6 +2197,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<RleEncoder> lengthEncoder_;
RleVersion rleVersion_;
Expand Down Expand Up @@ -2295,6 +2375,14 @@ namespace orc {
}
}

void ListColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (child_) {
child_->finishStreams();
}
}

class MapColumnWriter : public ColumnWriter {
public:
MapColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -2327,6 +2415,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ColumnWriter> keyWriter_;
std::unique_ptr<ColumnWriter> elemWriter_;
Expand Down Expand Up @@ -2545,6 +2635,17 @@ namespace orc {
}
}

void MapColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (keyWriter_) {
keyWriter_->finishStreams();
}
if (elemWriter_) {
elemWriter_->finishStreams();
}
}

class UnionColumnWriter : public ColumnWriter {
public:
UnionColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2577,6 +2678,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
std::vector<std::unique_ptr<ColumnWriter>> children_;
Expand Down Expand Up @@ -2748,6 +2851,14 @@ namespace orc {
}
}

void UnionColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

std::unique_ptr<ColumnWriter> buildWriter(const Type& type, const StreamsFactory& factory,
const WriterOptions& options) {
switch (static_cast<int64_t>(type.getKind())) {
Expand Down
12 changes: 12 additions & 0 deletions c++/src/ColumnWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ namespace orc {
*/
virtual void writeDictionary();

/**
* Finalize the encoding and compressing process. This function should be
* called after all data required for encoding has been added. It ensures
* that any remaining data is processed and the final state of the streams
* is set.
* Note: boolean type cannot cut off the current byte if it is not filled
* with 8 bits, otherwise Boolean RLE may incorrectly read the unfilled
* trailing bits. In this case, the last byte will be the head of the next
* compression block.
*/
virtual void finishStreams();

protected:
/**
* Utility function to translate ColumnStatistics into protobuf form and
Expand Down
17 changes: 17 additions & 0 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,23 @@ namespace orc {
useTightNumericVector_);
}

std::vector<RowGroupPositions> RowReaderImpl::getPositionEntries(int columnId) {
loadStripeIndex();
std::vector<RowGroupPositions> result;
auto rowIndex = rowIndexes_[columnId];
RowGroupPositions rgPositions;
rgPositions.columnId = columnId;
for (auto rowIndexEntry : rowIndex.entry()) {
auto posVector = rgPositions.positions;
for (auto position : rowIndexEntry.positions()) {
posVector.push_back(position);
}
result.push_back(rgPositions);
}

return result;
}

void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
const std::string MAGIC("ORC");
const uint64_t magicLength = MAGIC.length();
Expand Down
9 changes: 5 additions & 4 deletions c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ namespace orc {
const SchemaEvolution* getSchemaEvolution() const {
return &schemaEvolution_;
}

std::vector<RowGroupPositions> getPositionEntries(int columnId) override;
};

class ReaderImpl : public Reader {
Expand All @@ -265,10 +267,9 @@ namespace orc {
// internal methods
void readMetadata() const;
void checkOrcVersion();
void getRowIndexStatistics(
const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;

// metadata
mutable bool isMetadataLoaded_;
Expand Down
Loading
Loading