Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1264: [C++] Add a writer option to align compression block with row group #2005

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,12 @@ namespace orc {
* @param rowNumber the next row the reader should return
*/
virtual void seekToRow(uint64_t rowNumber) = 0;

/**
* Get the current stripe position entries for the specified column.
* @return the position entries for the specified column.
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual std::vector<std::vector<int>> getCurrentStripePositionEntries(uint64_t columnId) = 0;
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace orc

Expand Down
13 changes: 13 additions & 0 deletions c++/include/orc/Writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ namespace orc {
* @return if not set, return default value which is 64 KB.
*/
uint64_t getMemoryBlockSize() const;

/**
* Set whether the compression block should be aligned to row group boundary.
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
* The boolean type may not be aligned to row group boundary due to the
* requirement of the Boolean RLE encoder to pack input bits into bytes
*/
WriterOptions& setAlignBlockBoundToRowGroup(bool alignBlockBoundToRowGroup);

/**
* Get if the compression block should be aligned to row group boundary.
* @return if not set, return default value which is false.
*/
bool getAlignBlockBoundToRowGroup() const;
};

class Writer {
Expand Down
111 changes: 111 additions & 0 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ namespace orc {
// PASS
}

void ColumnWriter::finishStreams() {
notNullEncoder->finishEncode();
}

class StructColumnWriter : public ColumnWriter {
public:
StructColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -283,6 +287,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::vector<std::unique_ptr<ColumnWriter>> children_;
};
Expand Down Expand Up @@ -416,6 +422,13 @@ namespace orc {
}
}

void StructColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

template <typename BatchType>
class IntegerColumnWriter : public ColumnWriter {
public:
Expand All @@ -433,6 +446,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> rleEncoder;

Expand Down Expand Up @@ -528,6 +543,12 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void IntegerColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder->finishEncode();
}

template <typename BatchType>
class ByteColumnWriter : public ColumnWriter {
public:
Expand All @@ -544,6 +565,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> byteRleEncoder_;
};
Expand Down Expand Up @@ -637,6 +660,12 @@ namespace orc {
byteRleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void ByteColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
byteRleEncoder_->finishEncode();
}

template <typename BatchType>
class BooleanColumnWriter : public ColumnWriter {
public:
Expand All @@ -654,6 +683,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
};
Expand Down Expand Up @@ -750,6 +781,12 @@ namespace orc {
rleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void BooleanColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
}

template <typename ValueType, typename BatchType>
class FloatingColumnWriter : public ColumnWriter {
public:
Expand All @@ -767,6 +804,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
bool isFloat_;
std::unique_ptr<AppendOnlyBufferedStream> dataStream_;
Expand Down Expand Up @@ -878,6 +917,12 @@ namespace orc {
dataStream_->recordPosition(rowIndexPosition.get());
}

template <typename ValueType, typename BatchType>
void FloatingColumnWriter<ValueType, BatchType>::finishStreams() {
ColumnWriter::finishStreams();
dataStream_->finishStream();
}

/**
* Implementation of increasing sorted string dictionary
*/
Expand Down Expand Up @@ -1029,6 +1074,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
/**
* dictionary related functions
Expand Down Expand Up @@ -1222,6 +1269,14 @@ namespace orc {
}
}

void StringColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
if (!useDictionary) {
directDataStream->finishStream();
directLengthEncoder->finishEncode();
}
}

bool StringColumnWriter::checkDictionaryKeyRatio() {
if (!doneDictionaryCheck) {
useDictionary = dictionary.size() <=
Expand Down Expand Up @@ -1571,6 +1626,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;

Expand Down Expand Up @@ -1711,6 +1768,12 @@ namespace orc {
nanoRleEncoder->recordPosition(rowIndexPosition.get());
}

void TimestampColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
secRleEncoder->finishEncode();
nanoRleEncoder->finishEncode();
}

class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
public:
DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -1780,6 +1843,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
RleVersion rleVersion;
uint64_t precision;
Expand Down Expand Up @@ -1898,6 +1963,12 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
valueStream->finishStream();
scaleEncoder->finishEncode();
}

class Decimal64ColumnWriterV2 : public ColumnWriter {
public:
Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory,
Expand All @@ -1914,6 +1985,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
uint64_t precision;
uint64_t scale;
Expand Down Expand Up @@ -2004,6 +2077,11 @@ namespace orc {
valueEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriterV2::finishStreams() {
ColumnWriter::finishStreams();
valueEncoder->finishEncode();
}

class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2119,6 +2197,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<RleEncoder> lengthEncoder_;
RleVersion rleVersion_;
Expand Down Expand Up @@ -2295,6 +2375,14 @@ namespace orc {
}
}

void ListColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (child_) {
child_->finishStreams();
}
}

class MapColumnWriter : public ColumnWriter {
public:
MapColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -2327,6 +2415,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ColumnWriter> keyWriter_;
std::unique_ptr<ColumnWriter> elemWriter_;
Expand Down Expand Up @@ -2545,6 +2635,17 @@ namespace orc {
}
}

void MapColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (keyWriter_) {
keyWriter_->finishStreams();
}
if (elemWriter_) {
elemWriter_->finishStreams();
}
}

class UnionColumnWriter : public ColumnWriter {
public:
UnionColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2577,6 +2678,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
std::vector<std::unique_ptr<ColumnWriter>> children_;
Expand Down Expand Up @@ -2748,6 +2851,14 @@ namespace orc {
}
}

void UnionColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

std::unique_ptr<ColumnWriter> buildWriter(const Type& type, const StreamsFactory& factory,
const WriterOptions& options) {
switch (static_cast<int64_t>(type.getKind())) {
Expand Down
9 changes: 9 additions & 0 deletions c++/src/ColumnWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ namespace orc {
*/
virtual void writeDictionary();

/**
* Finalize the encoding and compressing process. This function should be
* called after all data required for encoding has been added. It ensures
* that any remaining data is processed and the final state of the streams
* is set. Note: the boolean type may break this spec due to some trailing bits will be written
* to the next compression block.
luffy-zh marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual void finishStreams();

protected:
/**
* Utility function to translate ColumnStatistics into protobuf form and
Expand Down
15 changes: 15 additions & 0 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,21 @@ namespace orc {
useTightNumericVector_);
}

std::vector<std::vector<int>> RowReaderImpl::getCurrentStripePositionEntries(uint64_t columnId) {
loadStripeIndex();
std::vector<std::vector<int>> result;
auto rowIndex = rowIndexes_[columnId];
for (auto rowIndexEntry : rowIndex.entry()) {
std::vector<int> posVector;
for (auto position : rowIndexEntry.positions()) {
posVector.push_back(position);
}
result.push_back(posVector);
}

return result;
}

void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
const std::string MAGIC("ORC");
const uint64_t magicLength = MAGIC.length();
Expand Down
9 changes: 5 additions & 4 deletions c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ namespace orc {
const SchemaEvolution* getSchemaEvolution() const {
return &schemaEvolution_;
}

std::vector<std::vector<int>> getCurrentStripePositionEntries(uint64_t columnId) override;
};

class ReaderImpl : public Reader {
Expand All @@ -265,10 +267,9 @@ namespace orc {
// internal methods
void readMetadata() const;
void checkOrcVersion();
void getRowIndexStatistics(
const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;

// metadata
mutable bool isMetadataLoaded_;
Expand Down
Loading
Loading