From 9122ddc2fba72c8443a07decff0c45fcba12fda1 Mon Sep 17 00:00:00 2001 From: Felix Chern Date: Mon, 10 Oct 2022 06:18:28 -0700 Subject: [PATCH] Internal changes. PiperOrigin-RevId: 480067689 --- cpp/array_record_reader.cc | 11 +++ cpp/array_record_reader.h | 3 + cpp/array_record_reader_test.cc | 9 +++ cpp/array_record_writer.cc | 64 +++++++++++++++-- cpp/array_record_writer.h | 5 +- cpp/array_record_writer_test.cc | 119 +++++++++++++++++++++++++++++++- cpp/layout.proto | 2 + 7 files changed, 205 insertions(+), 8 deletions(-) diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index bea5694..10ee333 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -125,6 +125,9 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState { std::queue>>> future_decoders; + // Writer options for debugging purposes. + std::optional writer_options = std::nullopt; + uint64_t ChunkEndOffset(uint64_t chunk_idx) const { if (chunk_idx == footer.size() - 1) { return footer_offset; @@ -263,6 +266,10 @@ void ArrayRecordReaderBase::Initialize() { return; } state_->num_records = footer_metadata.array_record_metadata().num_records(); + if (footer_metadata.array_record_metadata().has_writer_options()) { + state_->writer_options = + footer_metadata.array_record_metadata().writer_options(); + } } { AR_ENDO_SCOPE("Reading footer body"); @@ -757,4 +764,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { return true; } +std::optional ArrayRecordReaderBase::WriterOptionsString() const { + return state_->writer_options; +} + } // namespace array_record diff --git a/cpp/array_record_reader.h b/cpp/array_record_reader.h index d439484..317a9fe 100644 --- a/cpp/array_record_reader.h +++ b/cpp/array_record_reader.h @@ -269,6 +269,9 @@ class ArrayRecordReaderBase : public riegeli::Object { // `false` (when `!ok()`) - failure bool ReadRecord(absl::string_view* record); + // Returns the writer options if presented. + std::optional WriterOptionsString() const; + protected: explicit ArrayRecordReaderBase(Options options, ARThreadPool* pool); ~ArrayRecordReaderBase() override; diff --git a/cpp/array_record_reader_test.cc b/cpp/array_record_reader_test.cc index 9f76506..d953827 100644 --- a/cpp/array_record_reader_test.cc +++ b/cpp/array_record_reader_test.cc @@ -110,6 +110,15 @@ TEST_P(ArrayRecordReaderTest, MoveTest) { // Once a reader is moved, it is closed. ASSERT_FALSE(reader_before_move.is_open()); // NOLINT + auto recorded_writer_options = ArrayRecordWriterBase::Options::FromString( + reader.WriterOptionsString().value()) + .value(); + EXPECT_EQ(writer_options.compression_type(), + recorded_writer_options.compression_type()); + EXPECT_EQ(writer_options.compression_level(), + recorded_writer_options.compression_level()); + EXPECT_EQ(writer_options.transpose(), recorded_writer_options.transpose()); + std::vector indices = {1, 2, 4}; ASSERT_TRUE(reader .ParallelReadRecordsWithIndices( diff --git a/cpp/array_record_writer.cc b/cpp/array_record_writer.cc index 6b77edc..3f90d9b 100644 --- a/cpp/array_record_writer.cc +++ b/cpp/array_record_writer.cc @@ -31,6 +31,7 @@ limitations under the License. #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/match.h" #include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" #include "absl/types/span.h" @@ -63,6 +64,8 @@ constexpr uint32_t kZstdDefaultWindowLog = 20; // Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16` constexpr uint64_t kMagic = 0x71930e704fdae05eULL; +constexpr char kArrayRecordDefaultCompression[] = "zstd:3"; + using riegeli::Chunk; using riegeli::ChunkType; using riegeli::CompressorOptions; @@ -109,11 +112,17 @@ absl::StatusOr ChunkFromSpan(CompressorOptions compression_options, } // namespace +ArrayRecordWriterBase::Options::Options() { + DCHECK_OK( + this->compressor_options_.FromString(kArrayRecordDefaultCompression)); +} + // static absl::StatusOr ArrayRecordWriterBase::Options::FromString(absl::string_view text) { ArrayRecordWriterBase::Options options; OptionsParser options_parser; + options_parser.AddOption("default", ValueParser::FailIfAnySeen()); // Group options_parser.AddOption( "group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_)); @@ -151,6 +160,15 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { if (!options_parser.FromString(text)) { return options_parser.status(); } + // From our benchmarks we figured zstd:3 gives the best trade-off for both the + // compression and decomopression speed. + if (text == "default" || + (!absl::StrContains(compressor_text, "uncompressed") && + !absl::StrContains(compressor_text, "brotli") && + !absl::StrContains(compressor_text, "snappy") && + !absl::StrContains(compressor_text, "zstd"))) { + absl::StrAppend(&compressor_text, ",", kArrayRecordDefaultCompression); + } // max_parallelism is set after options_parser.FromString() if (max_parallelism > 0) { options.set_max_parallelism(max_parallelism); @@ -167,13 +185,48 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { return options; } +std::string ArrayRecordWriterBase::Options::ToString() const { + std::string option; + absl::StrAppend(&option, "group_size:", this->group_size_, + ",transpose:", this->transpose_ ? "true" : "false", + ",pad_to_block_boundary:", + this->pad_to_block_boundary_ ? "true" : "false"); + if (this->transpose_) { + absl::StrAppend(&option, + ",transpose_bucket_size:", this->transpose_bucket_size_); + } + switch (this->compressor_options().compression_type()) { + case riegeli::CompressionType::kNone: + absl::StrAppend(&option, ",uncompressed"); + break; + case riegeli::CompressionType::kBrotli: + absl::StrAppend( + &option, ",brotli:", this->compressor_options().compression_level()); + break; + case riegeli::CompressionType::kZstd: + absl::StrAppend(&option, + ",zstd:", this->compressor_options().compression_level()); + break; + case riegeli::CompressionType::kSnappy: + absl::StrAppend(&option, ",snappy"); + break; + } + if (this->compressor_options().window_log().has_value()) { + absl::StrAppend(&option, ",window_log:", + this->compressor_options().window_log().value()); + } + if (max_parallelism_.has_value()) { + absl::StrAppend(&option, ",max_parallelism:", max_parallelism_.value()); + } + return option; +} + // Thread compatible callback guarded by SequencedChunkWriter's mutex. class ArrayRecordWriterBase::SubmitChunkCallback : public SequencedChunkWriterBase::SubmitChunkCallback { public: explicit SubmitChunkCallback(const ArrayRecordWriterBase::Options options) - : compression_options_(options.compressor_options()), - max_parallelism_(options.max_parallelism().value()) { + : options_(options), max_parallelism_(options.max_parallelism().value()) { constexpr uint64_t kDefaultDecodedDataSize = (1 << 20); last_decoded_data_size_.store(kDefaultDecodedDataSize); } @@ -200,7 +253,7 @@ class ArrayRecordWriterBase::SubmitChunkCallback void WriteFooterAndPostscript(SequencedChunkWriterBase* writer); private: - const CompressorOptions compression_options_; + const Options options_; absl::Mutex mu_; const int32_t max_parallelism_; @@ -456,8 +509,11 @@ ArrayRecordWriterBase::SubmitChunkCallback::CreateFooterChunk() { footer_metadata.mutable_array_record_metadata()->set_num_chunks( array_footer_.size()); footer_metadata.mutable_array_record_metadata()->set_num_records(num_records); + footer_metadata.mutable_array_record_metadata()->set_writer_options( + options_.ToString()); // Perhaps we can compress the footer - return ChunkFromSpan(compression_options_, absl::MakeConstSpan(array_footer_), + return ChunkFromSpan(options_.compressor_options(), + absl::MakeConstSpan(array_footer_), std::optional(footer_metadata)); } diff --git a/cpp/array_record_writer.h b/cpp/array_record_writer.h index d40bca5..c4e5d84 100644 --- a/cpp/array_record_writer.h +++ b/cpp/array_record_writer.h @@ -83,7 +83,7 @@ class ArrayRecordWriterBase : public riegeli::Object { public: class Options { public: - Options() {} + Options(); // Parses options from text: // ``` @@ -268,6 +268,9 @@ class ArrayRecordWriterBase : public riegeli::Object { return metadata_; } + // Serialize the options to a string. + std::string ToString() const; + private: int32_t group_size_ = kDefaultGroupSize; riegeli::CompressorOptions compressor_options_; diff --git a/cpp/array_record_writer_test.cc b/cpp/array_record_writer_test.cc index 8582873..182d478 100644 --- a/cpp/array_record_writer_test.cc +++ b/cpp/array_record_writer_test.cc @@ -193,6 +193,17 @@ INSTANTIATE_TEST_SUITE_P( testing::Bool(), testing::Bool(), testing::Bool())); TEST(ArrayRecordWriterOptionsTest, ParsingTest) { + { + auto option = ArrayRecordWriterBase::Options(); + EXPECT_EQ(option.group_size(), + ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_FALSE(option.transpose()); + EXPECT_EQ(option.max_parallelism(), std::nullopt); + EXPECT_EQ(option.compressor_options().compression_type(), + riegeli::CompressionType::kZstd); + EXPECT_EQ(option.compressor_options().compression_level(), 3); + EXPECT_FALSE(option.pad_to_block_boundary()); + } { auto option = ArrayRecordWriterBase::Options::FromString("").value(); EXPECT_EQ(option.group_size(), @@ -200,8 +211,40 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_FALSE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), - riegeli::CompressionType::kBrotli); + riegeli::CompressionType::kZstd); + EXPECT_EQ(option.compressor_options().compression_level(), 3); + EXPECT_EQ(option.compressor_options().window_log().value(), 20); + EXPECT_FALSE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:65536," + "transpose:false," + "pad_to_block_boundary:false," + "zstd:3," + "window_log:20"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); + } + { + auto option = ArrayRecordWriterBase::Options::FromString("default").value(); + EXPECT_EQ(option.group_size(), + ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_FALSE(option.transpose()); + EXPECT_EQ(option.max_parallelism(), std::nullopt); + EXPECT_EQ(option.compressor_options().compression_type(), + riegeli::CompressionType::kZstd); + EXPECT_EQ(option.compressor_options().compression_level(), 3); + EXPECT_EQ(option.compressor_options().window_log().value(), 20); EXPECT_FALSE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:65536," + "transpose:false," + "pad_to_block_boundary:false," + "zstd:3," + "window_log:20"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); } { auto option = ArrayRecordWriterBase::Options::FromString( @@ -210,10 +253,42 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_EQ(option.group_size(), 32); EXPECT_TRUE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); + EXPECT_EQ(option.compressor_options().compression_type(), + riegeli::CompressionType::kZstd); + EXPECT_EQ(option.compressor_options().window_log(), 20); + EXPECT_FALSE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:32," + "transpose:true," + "pad_to_block_boundary:false," + "transpose_bucket_size:256," + "zstd:3," + "window_log:20"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); + } + { + auto option = ArrayRecordWriterBase::Options::FromString( + "brotli:6,group_size:32,transpose,window_log:25") + .value(); + EXPECT_EQ(option.group_size(), 32); + EXPECT_TRUE(option.transpose()); + EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), riegeli::CompressionType::kBrotli); - EXPECT_EQ(option.compressor_options().brotli_window_log(), 20); + EXPECT_EQ(option.compressor_options().window_log(), 25); EXPECT_FALSE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:32," + "transpose:true," + "pad_to_block_boundary:false," + "transpose_bucket_size:256," + "brotli:6," + "window_log:25"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); } { auto option = ArrayRecordWriterBase::Options::FromString( @@ -224,9 +299,19 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), riegeli::CompressionType::kZstd); - EXPECT_EQ(option.compressor_options().zstd_window_log(), 20); + EXPECT_EQ(option.compressor_options().window_log(), 20); EXPECT_EQ(option.compressor_options().compression_level(), 5); EXPECT_FALSE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:32," + "transpose:true," + "pad_to_block_boundary:false," + "transpose_bucket_size:256," + "zstd:5," + "window_log:20"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); } { auto option = ArrayRecordWriterBase::Options::FromString( @@ -239,6 +324,34 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_EQ(option.compressor_options().compression_type(), riegeli::CompressionType::kNone); EXPECT_TRUE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:65536," + "transpose:false," + "pad_to_block_boundary:true," + "uncompressed"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); + } + { + auto option = ArrayRecordWriterBase::Options::FromString( + "snappy,pad_to_block_boundary:true") + .value(); + EXPECT_EQ(option.group_size(), + ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_FALSE(option.transpose()); + EXPECT_EQ(option.max_parallelism(), std::nullopt); + EXPECT_EQ(option.compressor_options().compression_type(), + riegeli::CompressionType::kSnappy); + EXPECT_TRUE(option.pad_to_block_boundary()); + + EXPECT_EQ(option.ToString(), + "group_size:65536," + "transpose:false," + "pad_to_block_boundary:true," + "snappy"); + EXPECT_TRUE( + ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); } } diff --git a/cpp/layout.proto b/cpp/layout.proto index c088e98..d90a9a5 100644 --- a/cpp/layout.proto +++ b/cpp/layout.proto @@ -82,6 +82,8 @@ message RiegeliFooterMetadata { optional uint32 version = 1; optional uint64 num_chunks = 2; optional uint64 num_records = 3; + // Writer options for debugging purposes. + optional string writer_options = 4; } oneof metadata {