diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 4b254593ee..4ddce64ad4 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -62,6 +62,15 @@ namespace orc { }; ReaderMetrics* getDefaultReaderMetrics(); + // Row group index of a single column in a stripe. + struct RowGroupIndex { + // Positions are represented as a two-dimensional array where the first + // dimension is row group index and the second dimension is the position + // list of the row group. The size of the second dimension should be equal + // among all row groups. + std::vector> positions; + }; + /** * Options for creating a Reader. */ @@ -605,6 +614,16 @@ namespace orc { */ virtual std::map getBloomFilters( uint32_t stripeIndex, const std::set& included) const = 0; + + /** + * Get row group index of all selected columns in the specified stripe + * @param stripeIndex index of the stripe to be read for row group index. + * @param included index of selected columns to return (if not specified, + * all columns will be returned). + * @return map of row group index keyed by its column index. + */ + virtual std::map getRowGroupIndex( + uint32_t stripeIndex, const std::set& included = {}) const = 0; }; /** diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh index b560627c45..78f06739bc 100644 --- a/c++/include/orc/Writer.hh +++ b/c++/include/orc/Writer.hh @@ -290,6 +290,19 @@ namespace orc { * @return if not set, return default value which is 64 KB. */ uint64_t getMemoryBlockSize() const; + + /** + * Set whether the compression block should be aligned to row group boundary. + * The boolean type may not be aligned to row group boundary due to the + * requirement of the Boolean RLE encoder to pack input bits into bytes + */ + WriterOptions& setAlignBlockBoundToRowGroup(bool alignBlockBoundToRowGroup); + + /** + * Get if the compression block should be aligned to row group boundary. + * @return if not set, return default value which is false. + */ + bool getAlignBlockBoundToRowGroup() const; }; class Writer { diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc index 7adca1440c..d31b1c65d4 100644 --- a/c++/src/ColumnWriter.cc +++ b/c++/src/ColumnWriter.cc @@ -254,6 +254,10 @@ namespace orc { // PASS } + void ColumnWriter::finishStreams() { + notNullEncoder->finishEncode(); + } + class StructColumnWriter : public ColumnWriter { public: StructColumnWriter(const Type& type, const StreamsFactory& factory, @@ -283,6 +287,8 @@ namespace orc { virtual void reset() override; + virtual void finishStreams() override; + private: std::vector> children_; }; @@ -416,6 +422,13 @@ namespace orc { } } + void StructColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + for (uint32_t i = 0; i < children_.size(); ++i) { + children_[i]->finishStreams(); + } + } + template class IntegerColumnWriter : public ColumnWriter { public: @@ -433,6 +446,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + protected: std::unique_ptr rleEncoder; @@ -528,6 +543,12 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } + template + void IntegerColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + rleEncoder->finishEncode(); + } + template class ByteColumnWriter : public ColumnWriter { public: @@ -544,6 +565,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + private: std::unique_ptr byteRleEncoder_; }; @@ -637,6 +660,12 @@ namespace orc { byteRleEncoder_->recordPosition(rowIndexPosition.get()); } + template + void ByteColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + byteRleEncoder_->finishEncode(); + } + template class BooleanColumnWriter : public ColumnWriter { public: @@ -654,6 +683,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + private: std::unique_ptr rleEncoder_; }; @@ -750,6 +781,12 @@ namespace orc { rleEncoder_->recordPosition(rowIndexPosition.get()); } + template + void BooleanColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + rleEncoder_->finishEncode(); + } + template class FloatingColumnWriter : public ColumnWriter { public: @@ -767,6 +804,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + private: bool isFloat_; std::unique_ptr dataStream_; @@ -878,6 +917,12 @@ namespace orc { dataStream_->recordPosition(rowIndexPosition.get()); } + template + void FloatingColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + dataStream_->finishStream(); + } + /** * Implementation of increasing sorted string dictionary */ @@ -1041,6 +1086,8 @@ namespace orc { virtual void reset() override; + virtual void finishStreams() override; + private: /** * dictionary related functions @@ -1234,6 +1281,14 @@ namespace orc { } } + void StringColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + if (!useDictionary) { + directDataStream->finishStream(); + directLengthEncoder->finishEncode(); + } + } + bool StringColumnWriter::checkDictionaryKeyRatio() { if (!doneDictionaryCheck) { useDictionary = dictionary.size() <= @@ -1583,6 +1638,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + protected: std::unique_ptr secRleEncoder, nanoRleEncoder; @@ -1723,6 +1780,12 @@ namespace orc { nanoRleEncoder->recordPosition(rowIndexPosition.get()); } + void TimestampColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + secRleEncoder->finishEncode(); + nanoRleEncoder->finishEncode(); + } + class DateColumnWriter : public IntegerColumnWriter { public: DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options); @@ -1792,6 +1855,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + protected: RleVersion rleVersion; uint64_t precision; @@ -1910,6 +1975,12 @@ namespace orc { scaleEncoder->recordPosition(rowIndexPosition.get()); } + void Decimal64ColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + valueStream->finishStream(); + scaleEncoder->finishEncode(); + } + class Decimal64ColumnWriterV2 : public ColumnWriter { public: Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory, @@ -1926,6 +1997,8 @@ namespace orc { virtual void recordPosition() const override; + virtual void finishStreams() override; + protected: uint64_t precision; uint64_t scale; @@ -2016,6 +2089,11 @@ namespace orc { valueEncoder->recordPosition(rowIndexPosition.get()); } + void Decimal64ColumnWriterV2::finishStreams() { + ColumnWriter::finishStreams(); + valueEncoder->finishEncode(); + } + class Decimal128ColumnWriter : public Decimal64ColumnWriter { public: Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory, @@ -2131,6 +2209,8 @@ namespace orc { virtual void reset() override; + virtual void finishStreams() override; + private: std::unique_ptr lengthEncoder_; RleVersion rleVersion_; @@ -2307,6 +2387,14 @@ namespace orc { } } + void ListColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + lengthEncoder_->finishEncode(); + if (child_) { + child_->finishStreams(); + } + } + class MapColumnWriter : public ColumnWriter { public: MapColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options); @@ -2339,6 +2427,8 @@ namespace orc { virtual void reset() override; + virtual void finishStreams() override; + private: std::unique_ptr keyWriter_; std::unique_ptr elemWriter_; @@ -2557,6 +2647,17 @@ namespace orc { } } + void MapColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + lengthEncoder_->finishEncode(); + if (keyWriter_) { + keyWriter_->finishStreams(); + } + if (elemWriter_) { + elemWriter_->finishStreams(); + } + } + class UnionColumnWriter : public ColumnWriter { public: UnionColumnWriter(const Type& type, const StreamsFactory& factory, @@ -2589,6 +2690,8 @@ namespace orc { virtual void reset() override; + virtual void finishStreams() override; + private: std::unique_ptr rleEncoder_; std::vector> children_; @@ -2760,6 +2863,14 @@ namespace orc { } } + void UnionColumnWriter::finishStreams() { + ColumnWriter::finishStreams(); + rleEncoder_->finishEncode(); + for (uint32_t i = 0; i < children_.size(); ++i) { + children_[i]->finishStreams(); + } + } + std::unique_ptr buildWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast(type.getKind())) { diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh index 8afd1eb72c..1c5e15d707 100644 --- a/c++/src/ColumnWriter.hh +++ b/c++/src/ColumnWriter.hh @@ -179,6 +179,18 @@ namespace orc { */ virtual void writeDictionary(); + /** + * Finalize the encoding and compressing process. This function should be + * called after all data required for encoding has been added. It ensures + * that any remaining data is processed and the final state of the streams + * is set. + * Note: boolean type cannot cut off the current byte if it is not filled + * with 8 bits, otherwise Boolean RLE may incorrectly read the unfilled + * trailing bits. In this case, the last byte will be the head of the next + * compression block. + */ + virtual void finishStreams(); + protected: /** * Utility function to translate ColumnStatistics into protobuf form and diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc index b5ca5a4c97..f373a75bff 100644 --- a/c++/src/Compression.cc +++ b/c++/src/Compression.cc @@ -176,6 +176,7 @@ namespace orc { } virtual void finishStream() override { compressInternal(); + BufferedOutputStream::finishStream(); } protected: @@ -982,13 +983,7 @@ namespace orc { } uint64_t BlockCompressionStream::flush() { - void* data; - int size; - if (!Next(&data, &size)) { - throw CompressionError("Failed to flush compression buffer."); - } - BufferedOutputStream::BackUp(outputSize - outputPosition); - bufferSize = outputSize = outputPosition = 0; + finishStream(); return BufferedOutputStream::flush(); } @@ -1031,7 +1026,13 @@ namespace orc { } void BlockCompressionStream::finishStream() { - doBlockCompression(); + void* data; + int size; + if (!Next(&data, &size)) { + throw CompressionError("Failed to flush compression buffer."); + } + BufferedOutputStream::BackUp(outputSize - outputPosition); + bufferSize = outputSize = outputPosition = 0; } /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 2966c2c2ee..034ea04ee6 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1426,17 +1426,10 @@ namespace orc { uint32_t stripeIndex, const std::set& included) const { std::map ret; - // find stripe info - if (stripeIndex >= static_cast(footer_->stripes_size())) { - throw std::logic_error("Illegal stripe index: " + - to_string(static_cast(stripeIndex))); - } - const proto::StripeInformation currentStripeInfo = - footer_->stripes(static_cast(stripeIndex)); - const proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents_); + uint64_t offset; + auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset); // iterate stripe footer to get stream of bloom_filter - uint64_t offset = static_cast(currentStripeInfo.offset()); for (int i = 0; i < currentStripeFooter.streams_size(); i++) { const proto::Stream& stream = currentStripeFooter.streams(i); uint32_t column = static_cast(stream.column()); @@ -1474,6 +1467,62 @@ namespace orc { return ret; } + proto::StripeFooter ReaderImpl::loadCurrentStripeFooter(uint32_t stripeIndex, + uint64_t& offset) const { + // find stripe info + if (stripeIndex >= static_cast(footer_->stripes_size())) { + throw std::logic_error("Illegal stripe index: " + + to_string(static_cast(stripeIndex))); + } + const proto::StripeInformation currentStripeInfo = + footer_->stripes(static_cast(stripeIndex)); + offset = static_cast(currentStripeInfo.offset()); + return getStripeFooter(currentStripeInfo, *contents_); + } + + std::map ReaderImpl::getRowGroupIndex( + uint32_t stripeIndex, const std::set& included) const { + std::map ret; + uint64_t offset; + auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset); + + // iterate stripe footer to get stream of row_index + for (int i = 0; i < currentStripeFooter.streams_size(); i++) { + const proto::Stream& stream = currentStripeFooter.streams(i); + uint32_t column = static_cast(stream.column()); + uint64_t length = static_cast(stream.length()); + RowGroupIndex& rowGroupIndex = ret[column]; + + if (stream.kind() == proto::Stream_Kind_ROW_INDEX && + (included.empty() || included.find(column) != included.end())) { + std::unique_ptr pbStream = + createDecompressor(contents_->compression, + std::make_unique( + contents_->stream.get(), offset, length, *contents_->pool), + contents_->blockSize, *(contents_->pool), contents_->readerMetrics); + + proto::RowIndex pbRowIndex; + if (!pbRowIndex.ParseFromZeroCopyStream(pbStream.get())) { + std::stringstream errMsgBuffer; + errMsgBuffer << "Failed to parse RowIndex at column " << column << " in stripe " + << stripeIndex; + throw ParseError(errMsgBuffer.str()); + } + + // add rowGroupIndex to result for one column + for (auto& rowIndexEntry : pbRowIndex.entry()) { + std::vector posVector; + for (auto& position : rowIndexEntry.positions()) { + posVector.push_back(position); + } + rowGroupIndex.positions.push_back(posVector); + } + } + offset += length; + } + return ret; + } + RowReader::~RowReader() { // PASS } diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index 630d812c38..89606c331c 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -265,10 +265,10 @@ namespace orc { // internal methods void readMetadata() const; void checkOrcVersion(); - void getRowIndexStatistics( - const proto::StripeInformation& stripeInfo, uint64_t stripeIndex, - const proto::StripeFooter& currentStripeFooter, - std::vector >* indexStats) const; + void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex, + const proto::StripeFooter& currentStripeFooter, + std::vector>* indexStats) const; + proto::StripeFooter loadCurrentStripeFooter(uint32_t stripeIndex, uint64_t& offset) const; // metadata mutable bool isMetadataLoaded_; @@ -374,6 +374,9 @@ namespace orc { std::map getBloomFilters( uint32_t stripeIndex, const std::set& included) const override; + + std::map getRowGroupIndex( + uint32_t stripeIndex, const std::set& included) const override; }; } // namespace orc diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc index 531b566558..775e6d2452 100644 --- a/c++/src/Writer.cc +++ b/c++/src/Writer.cc @@ -47,6 +47,7 @@ namespace orc { bool useTightNumericVector; uint64_t outputBufferCapacity; uint64_t memoryBlockSize; + bool alignBlockBoundToRowGroup; WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12 stripeSize = 64 * 1024 * 1024; // 64M @@ -69,6 +70,7 @@ namespace orc { useTightNumericVector = false; outputBufferCapacity = 1024 * 1024; memoryBlockSize = 64 * 1024; // 64K + alignBlockBoundToRowGroup = false; } }; @@ -298,6 +300,15 @@ namespace orc { return privateBits_->memoryBlockSize; } + WriterOptions& WriterOptions::setAlignBlockBoundToRowGroup(bool alignBlockBoundToRowGroup) { + privateBits_->alignBlockBoundToRowGroup = alignBlockBoundToRowGroup; + return *this; + } + + bool WriterOptions::getAlignBlockBoundToRowGroup() const { + return privateBits_->alignBlockBoundToRowGroup; + } + Writer::~Writer() { // PASS } @@ -401,6 +412,9 @@ namespace orc { stripeRows_ += chunkSize; if (indexRows_ >= rowIndexStride) { + if (options_.getAlignBlockBoundToRowGroup()) { + columnWriter_->finishStreams(); + } columnWriter_->createRowIndexEntry(); indexRows_ = 0; } diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc index aa4dbe6ed2..fbf1ca61dd 100644 --- a/c++/src/io/OutputStream.cc +++ b/c++/src/io/OutputStream.cc @@ -128,9 +128,7 @@ namespace orc { } uint64_t AppendOnlyBufferedStream::flush() { - outStream_->BackUp(bufferLength_ - bufferOffset_); - bufferOffset_ = bufferLength_ = 0; - buffer_ = nullptr; + finishStream(); return outStream_->flush(); } @@ -150,4 +148,11 @@ namespace orc { } } + void AppendOnlyBufferedStream::finishStream() { + outStream_->BackUp(bufferLength_ - bufferOffset_); + outStream_->finishStream(); + bufferOffset_ = bufferLength_ = 0; + buffer_ = nullptr; + } + } // namespace orc diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh index 4908f34f23..6319de96d6 100644 --- a/c++/src/io/OutputStream.hh +++ b/c++/src/io/OutputStream.hh @@ -100,6 +100,7 @@ namespace orc { void write(const char* data, size_t size); uint64_t getSize() const; uint64_t flush(); + void finishStream(); void recordPosition(PositionRecorder* recorder) const; }; diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc index 8bc4032a56..975462e30c 100644 --- a/c++/test/TestWriter.cc +++ b/c++/test/TestWriter.cc @@ -57,6 +57,8 @@ namespace orc { options.setTimezoneName(timezone); options.setUseTightNumericVector(useTightNumericVector); options.setMemoryBlockSize(memoryBlockSize); + // enable align block bound to row group when stride is not 0 + options.setAlignBlockBoundToRowGroup(true); return createWriter(type, stream, options); } @@ -84,7 +86,56 @@ namespace orc { return reader->createRowReader(rowReaderOpts); } - class WriterTest : public TestWithParam { + void verifyCompressionBlockAlignment(std::unique_ptr& reader, uint64_t columnCount) { + auto stripeCount = reader->getNumberOfStripes(); + for (uint64_t stripeIndex = 0; stripeIndex < stripeCount; ++stripeIndex) { + for (uint64_t i = 0; i < columnCount; ++i) { + auto rowGroupIndexMap = reader->getRowGroupIndex(stripeIndex); + EXPECT_TRUE(rowGroupIndexMap.size() > 0); + auto rowGroupIndex = rowGroupIndexMap[columnCount]; + auto subType = reader->getType().getSubtype(i); + EXPECT_TRUE(rowGroupIndex.positions.size() > 0); + for (auto rowGroupPositions : rowGroupIndex.positions) { + for (uint64_t posIndex = 0; posIndex < rowGroupPositions.size(); ++posIndex) { + // After we call finishStream(), unusedBufferSize is set to 0, + // so only the first position is valid in each recordPosition call. + switch (subType->getKind()) { + case DECIMAL: + case STRING: + case BINARY: + case CHAR: + case VARCHAR: { + if (posIndex != 0 && posIndex != 2) { + EXPECT_EQ(rowGroupPositions[posIndex], 0); + } + break; + } + case TIMESTAMP_INSTANT: + case TIMESTAMP: { + if (posIndex != 0 && posIndex != 3) { + EXPECT_EQ(rowGroupPositions[posIndex], 0); + } + break; + } + default: { + if (posIndex != 0) { + EXPECT_EQ(rowGroupPositions[posIndex], 0); + } + break; + } + } + } + } + } + } + } + + struct TestParams { + FileVersion fileVersion; + bool enableAlignBlockBoundToRowGroup; + }; + + class WriterTest : public TestWithParam { // You can implement all the usual fixture class members here. // To access the test parameter, call GetParam() from class // TestWithParam. @@ -92,13 +143,15 @@ namespace orc { protected: FileVersion fileVersion; + bool enableAlignBlockBoundToRowGroup; public: - WriterTest() : fileVersion(FileVersion::v_0_11()) {} + WriterTest() : fileVersion(FileVersion::v_0_11()), enableAlignBlockBoundToRowGroup(false) {} }; void WriterTest::SetUp() { - fileVersion = GetParam(); + fileVersion = GetParam().fileVersion; + enableAlignBlockBoundToRowGroup = GetParam().enableAlignBlockBoundToRowGroup; } TEST_P(WriterTest, writeEmptyFile) { @@ -252,7 +305,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(65535); StructVectorBatch* structBatch = dynamic_cast(batch.get()); StringVectorBatch* strBatch = dynamic_cast(structBatch->fields[0]); @@ -294,6 +347,9 @@ namespace orc { EXPECT_EQ(i, static_cast(atoi(str.c_str()))); EXPECT_EQ(i, static_cast(atoi(bin.c_str()))); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } EXPECT_FALSE(rowReader->next(*batch)); } @@ -315,7 +371,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); DoubleVectorBatch* doubleBatch = dynamic_cast(structBatch->fields[0]); @@ -351,6 +407,10 @@ namespace orc { 0.000001f); } EXPECT_FALSE(rowReader->next(*batch)); + + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeShortIntLong) { @@ -366,7 +426,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); LongVectorBatch* smallIntBatch = dynamic_cast(structBatch->fields[0]); @@ -403,6 +463,9 @@ namespace orc { EXPECT_EQ(static_cast(i), intBatch->data[i]); EXPECT_EQ(static_cast(i), bigIntBatch->data[i]); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeTinyint) { @@ -415,9 +478,9 @@ namespace orc { uint64_t rowCount = 65535; uint64_t memoryBlockSize = 64; - std::unique_ptr writer = - createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZSTD, *type, - pool, &memStream, fileVersion, 1024, "GMT", true); + std::unique_ptr writer = createWriter( + stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZSTD, *type, pool, + &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0, "GMT", true); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); ByteVectorBatch* byteBatch = dynamic_cast(structBatch->fields[0]); @@ -442,6 +505,9 @@ namespace orc { batch = rowReader->createRowBatch(rowCount); rowReader->seekToRow(20); EXPECT_EQ(true, rowReader->next(*batch)); + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } structBatch = dynamic_cast(batch.get()); auto outByteBatch = dynamic_cast(structBatch->fields[0]); @@ -474,7 +540,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); LongVectorBatch* byteBatch = dynamic_cast(structBatch->fields[0]); @@ -501,6 +567,9 @@ namespace orc { for (uint64_t i = 0; i < rowCount; ++i) { EXPECT_EQ((i % 3) == 0 ? 1 : 0, byteBatch->data[i]); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeDate) { @@ -515,7 +584,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); @@ -543,6 +612,9 @@ namespace orc { for (uint64_t i = 0; i < rowCount; ++i) { EXPECT_EQ(static_cast(i), longBatch->data[i]); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeTimestamp) { @@ -557,7 +629,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); TimestampVectorBatch* tsBatch = dynamic_cast(structBatch->fields[0]); @@ -589,14 +661,18 @@ namespace orc { EXPECT_EQ(times[i], tsBatch->data[i]); EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeNegativeTimestamp) { MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); MemoryPool* pool = getDefaultPool(); std::unique_ptr type(Type::buildTypeFromString("struct")); - auto writer = createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + auto writer = + createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024, CompressionKind_ZLIB, *type, pool, + &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); uint64_t batchCount = 5; auto batch = writer->createRowBatch(batchCount * 2); auto structBatch = dynamic_cast(batch.get()); @@ -646,6 +722,10 @@ namespace orc { } EXPECT_EQ(1000000, tsBatch->nanoseconds[i]); } + + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } // TODO: Disable the test below for Windows for following reasons: @@ -766,7 +846,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); TimestampVectorBatch* tsBatch = dynamic_cast(structBatch->fields[0]); @@ -798,6 +878,9 @@ namespace orc { EXPECT_EQ(times[i], tsBatch->data[i]); EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeCharAndVarcharColumn) { @@ -815,7 +898,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); @@ -877,6 +960,9 @@ namespace orc { } EXPECT_FALSE(rowReader->next(*batch)); + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeDecimal64Column) { @@ -892,7 +978,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); Decimal64VectorBatch* decBatch = dynamic_cast(structBatch->fields[0]); @@ -954,6 +1040,9 @@ namespace orc { EXPECT_EQ(dec, decBatch->values[i]); EXPECT_EQ(-dec, decBatch->values[i + maxPrecision]); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeDecimal128Column) { @@ -969,7 +1058,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); Decimal128VectorBatch* decBatch = dynamic_cast(structBatch->fields[0]); @@ -1041,6 +1130,9 @@ namespace orc { EXPECT_EQ(expected, decBatch->values[i].toString()); EXPECT_EQ("-" + expected, decBatch->values[i + maxPrecision].toString()); } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeListColumn) { @@ -1058,7 +1150,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount * maxListLength); StructVectorBatch* structBatch = dynamic_cast(batch.get()); @@ -1104,6 +1196,9 @@ namespace orc { EXPECT_EQ(static_cast(i), data[offsets[i] + j]); } } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeMapColumn) { @@ -1118,7 +1213,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount * maxListLength); StructVectorBatch* structBatch = dynamic_cast(batch.get()); MapVectorBatch* mapBatch = dynamic_cast(structBatch->fields[0]); @@ -1185,6 +1280,9 @@ namespace orc { EXPECT_EQ(static_cast(i), elemData[offsets[i] + j]); } } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeUnionColumn) { @@ -1200,7 +1298,7 @@ namespace orc { std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); UnionVectorBatch* unionBatch = dynamic_cast(structBatch->fields[0]); @@ -1282,6 +1380,9 @@ namespace orc { break; } } + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, writeUTF8CharAndVarcharColumn) { @@ -1295,7 +1396,7 @@ namespace orc { uint64_t memoryBlockSize = 64; std::unique_ptr writer = createWriter(stripeSize, memoryBlockSize, compressionBlockSize, CompressionKind_ZLIB, *type, - pool, &memStream, fileVersion); + pool, &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0); std::unique_ptr batch = writer->createRowBatch(rowCount); StructVectorBatch* structBatch = dynamic_cast(batch.get()); StringVectorBatch* charBatch = dynamic_cast(structBatch->fields[0]); @@ -1353,6 +1454,9 @@ namespace orc { EXPECT_TRUE(memcmp(varcharBatch->data[2], expectedTwoChars, 4) == 0); EXPECT_FALSE(rowReader->next(*batch)); + if (enableAlignBlockBoundToRowGroup) { + verifyCompressionBlockAlignment(reader, type->getSubtypeCount()); + } } TEST_P(WriterTest, testWriteListColumnWithNull) { @@ -2296,7 +2400,12 @@ namespace orc { EXPECT_FALSE(rowReader->next(*batch)); } - INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest, - Values(FileVersion::v_0_11(), FileVersion::v_0_12(), - FileVersion::UNSTABLE_PRE_2_0())); + std::vector testParams = {{FileVersion::v_0_11(), true}, + {FileVersion::v_0_11(), false}, + {FileVersion::v_0_12(), false}, + {FileVersion::v_0_12(), true}, + {FileVersion::UNSTABLE_PRE_2_0(), false}, + {FileVersion::UNSTABLE_PRE_2_0(), true}}; + + INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest, ::testing::ValuesIn(testParams)); } // namespace orc