Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal changes. #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cpp/array_record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState {
std::queue<IndexedPair<std::future<std::vector<ChunkDecoder>>>>
future_decoders;

// Writer options for debugging purposes.
std::optional<std::string> writer_options = std::nullopt;

uint64_t ChunkEndOffset(uint64_t chunk_idx) const {
if (chunk_idx == footer.size() - 1) {
return footer_offset;
Expand Down Expand Up @@ -263,6 +266,10 @@ void ArrayRecordReaderBase::Initialize() {
return;
}
state_->num_records = footer_metadata.array_record_metadata().num_records();
if (footer_metadata.array_record_metadata().has_writer_options()) {
state_->writer_options =
footer_metadata.array_record_metadata().writer_options();
}
}
{
AR_ENDO_SCOPE("Reading footer body");
Expand Down Expand Up @@ -757,4 +764,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
return true;
}

std::optional<std::string> ArrayRecordReaderBase::WriterOptionsString() const {
return state_->writer_options;
}

} // namespace array_record
3 changes: 3 additions & 0 deletions cpp/array_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class ArrayRecordReaderBase : public riegeli::Object {
// `false` (when `!ok()`) - failure
bool ReadRecord(absl::string_view* record);

// Returns the writer options if presented.
std::optional<std::string> WriterOptionsString() const;

protected:
explicit ArrayRecordReaderBase(Options options, ARThreadPool* pool);
~ArrayRecordReaderBase() override;
Expand Down
9 changes: 9 additions & 0 deletions cpp/array_record_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
// Once a reader is moved, it is closed.
ASSERT_FALSE(reader_before_move.is_open()); // NOLINT

auto recorded_writer_options = ArrayRecordWriterBase::Options::FromString(
reader.WriterOptionsString().value())
.value();
EXPECT_EQ(writer_options.compression_type(),
recorded_writer_options.compression_type());
EXPECT_EQ(writer_options.compression_level(),
recorded_writer_options.compression_level());
EXPECT_EQ(writer_options.transpose(), recorded_writer_options.transpose());

std::vector<uint64_t> indices = {1, 2, 4};
ASSERT_TRUE(reader
.ParallelReadRecordsWithIndices(
Expand Down
64 changes: 60 additions & 4 deletions cpp/array_record_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ limitations under the License.
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
Expand Down Expand Up @@ -63,6 +64,8 @@ constexpr uint32_t kZstdDefaultWindowLog = 20;
// Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16`
constexpr uint64_t kMagic = 0x71930e704fdae05eULL;

constexpr char kArrayRecordDefaultCompression[] = "zstd:3";

using riegeli::Chunk;
using riegeli::ChunkType;
using riegeli::CompressorOptions;
Expand Down Expand Up @@ -109,11 +112,17 @@ absl::StatusOr<Chunk> ChunkFromSpan(CompressorOptions compression_options,

} // namespace

ArrayRecordWriterBase::Options::Options() {
DCHECK_OK(
this->compressor_options_.FromString(kArrayRecordDefaultCompression));
}

// static
absl::StatusOr<ArrayRecordWriterBase::Options>
ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
ArrayRecordWriterBase::Options options;
OptionsParser options_parser;
options_parser.AddOption("default", ValueParser::FailIfAnySeen());
// Group
options_parser.AddOption(
"group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_));
Expand Down Expand Up @@ -151,6 +160,15 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
if (!options_parser.FromString(text)) {
return options_parser.status();
}
// From our benchmarks we figured zstd:3 gives the best trade-off for both the
// compression and decomopression speed.
if (text == "default" ||
(!absl::StrContains(compressor_text, "uncompressed") &&
!absl::StrContains(compressor_text, "brotli") &&
!absl::StrContains(compressor_text, "snappy") &&
!absl::StrContains(compressor_text, "zstd"))) {
absl::StrAppend(&compressor_text, ",", kArrayRecordDefaultCompression);
}
// max_parallelism is set after options_parser.FromString()
if (max_parallelism > 0) {
options.set_max_parallelism(max_parallelism);
Expand All @@ -167,13 +185,48 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
return options;
}

std::string ArrayRecordWriterBase::Options::ToString() const {
std::string option;
absl::StrAppend(&option, "group_size:", this->group_size_,
",transpose:", this->transpose_ ? "true" : "false",
",pad_to_block_boundary:",
this->pad_to_block_boundary_ ? "true" : "false");
if (this->transpose_) {
absl::StrAppend(&option,
",transpose_bucket_size:", this->transpose_bucket_size_);
}
switch (this->compressor_options().compression_type()) {
case riegeli::CompressionType::kNone:
absl::StrAppend(&option, ",uncompressed");
break;
case riegeli::CompressionType::kBrotli:
absl::StrAppend(
&option, ",brotli:", this->compressor_options().compression_level());
break;
case riegeli::CompressionType::kZstd:
absl::StrAppend(&option,
",zstd:", this->compressor_options().compression_level());
break;
case riegeli::CompressionType::kSnappy:
absl::StrAppend(&option, ",snappy");
break;
}
if (this->compressor_options().window_log().has_value()) {
absl::StrAppend(&option, ",window_log:",
this->compressor_options().window_log().value());
}
if (max_parallelism_.has_value()) {
absl::StrAppend(&option, ",max_parallelism:", max_parallelism_.value());
}
return option;
}

// Thread compatible callback guarded by SequencedChunkWriter's mutex.
class ArrayRecordWriterBase::SubmitChunkCallback
: public SequencedChunkWriterBase::SubmitChunkCallback {
public:
explicit SubmitChunkCallback(const ArrayRecordWriterBase::Options options)
: compression_options_(options.compressor_options()),
max_parallelism_(options.max_parallelism().value()) {
: options_(options), max_parallelism_(options.max_parallelism().value()) {
constexpr uint64_t kDefaultDecodedDataSize = (1 << 20);
last_decoded_data_size_.store(kDefaultDecodedDataSize);
}
Expand All @@ -200,7 +253,7 @@ class ArrayRecordWriterBase::SubmitChunkCallback
void WriteFooterAndPostscript(SequencedChunkWriterBase* writer);

private:
const CompressorOptions compression_options_;
const Options options_;

absl::Mutex mu_;
const int32_t max_parallelism_;
Expand Down Expand Up @@ -456,8 +509,11 @@ ArrayRecordWriterBase::SubmitChunkCallback::CreateFooterChunk() {
footer_metadata.mutable_array_record_metadata()->set_num_chunks(
array_footer_.size());
footer_metadata.mutable_array_record_metadata()->set_num_records(num_records);
footer_metadata.mutable_array_record_metadata()->set_writer_options(
options_.ToString());
// Perhaps we can compress the footer
return ChunkFromSpan(compression_options_, absl::MakeConstSpan(array_footer_),
return ChunkFromSpan(options_.compressor_options(),
absl::MakeConstSpan(array_footer_),
std::optional(footer_metadata));
}

Expand Down
5 changes: 4 additions & 1 deletion cpp/array_record_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
public:
class Options {
public:
Options() {}
Options();

// Parses options from text:
// ```
Expand Down Expand Up @@ -268,6 +268,9 @@ class ArrayRecordWriterBase : public riegeli::Object {
return metadata_;
}

// Serialize the options to a string.
std::string ToString() const;

private:
int32_t group_size_ = kDefaultGroupSize;
riegeli::CompressorOptions compressor_options_;
Expand Down
119 changes: 116 additions & 3 deletions cpp/array_record_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,58 @@ INSTANTIATE_TEST_SUITE_P(
testing::Bool(), testing::Bool(), testing::Bool()));

TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
{
auto option = ArrayRecordWriterBase::Options();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().compression_level(), 3);
EXPECT_FALSE(option.pad_to_block_boundary());
}
{
auto option = ArrayRecordWriterBase::Options::FromString("").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kBrotli);
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().compression_level(), 3);
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString("default").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().compression_level(), 3);
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
Expand All @@ -210,10 +253,42 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_EQ(option.group_size(), 32);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().window_log(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:32,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
"zstd:3,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
"brotli:6,group_size:32,transpose,window_log:25")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kBrotli);
EXPECT_EQ(option.compressor_options().brotli_window_log(), 20);
EXPECT_EQ(option.compressor_options().window_log(), 25);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:32,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
"brotli:6,"
"window_log:25");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
Expand All @@ -224,9 +299,19 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().zstd_window_log(), 20);
EXPECT_EQ(option.compressor_options().window_log(), 20);
EXPECT_EQ(option.compressor_options().compression_level(), 5);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:32,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
"zstd:5,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
Expand All @@ -239,6 +324,34 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kNone);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:true,"
"uncompressed");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
"snappy,pad_to_block_boundary:true")
.value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kSnappy);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:true,"
"snappy");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/layout.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ message RiegeliFooterMetadata {
optional uint32 version = 1;
optional uint64 num_chunks = 2;
optional uint64 num_records = 3;
// Writer options for debugging purposes.
optional string writer_options = 4;
}

oneof metadata {
Expand Down