Skip to content

Commit

Permalink
fix comment and add case
Browse files Browse the repository at this point in the history
Co-authored-by: kaka11chen <[email protected]>
  • Loading branch information
hubgeter and kaka11chen committed Oct 21, 2024
1 parent 09dc709 commit e7e9235
Show file tree
Hide file tree
Showing 12 changed files with 2,742 additions and 91 deletions.
9 changes: 5 additions & 4 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,12 +870,12 @@ Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
});
}

Status LinearProbeRangeFinder::get_range_for(int64_t desiredOffset,
Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
io::PrefetchRange& result_range) {
while (index < _ranges.size()) {
io::PrefetchRange& range = _ranges[index];
if (range.end_offset > desiredOffset) {
if (range.start_offset > desiredOffset) [[unlikely]] {
if (range.end_offset > desired_offset) {
if (range.start_offset > desired_offset) [[unlikely]] {
return Status::InvalidArgument("Invalid desiredOffset");
}
result_range = range;
Expand Down Expand Up @@ -944,7 +944,7 @@ Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* b
}

int64_t buffer_offset = offset - _current_start_offset;
memcpy(result.data, _cache.get() + buffer_offset, request_size); //todo inline.
memcpy(result.data, _cache.get() + buffer_offset, request_size);
*bytes_read = request_size;

return Status::OK();
Expand All @@ -953,6 +953,7 @@ Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* b
offset);
// RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
// return Status::OK();
// think return error is ok,otherwise it will cover up the error.
}
}

Expand Down
36 changes: 31 additions & 5 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ struct PrefetchRange {

PrefetchRange() : start_offset(0), end_offset(0) {}

bool operator==(const PrefetchRange& other) const {
return (start_offset == other.start_offset) && (end_offset == other.end_offset);
}

bool operator!=(const PrefetchRange& other) const { return !(*this == other); }

PrefetchRange span(const PrefetchRange& other) const {
return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
}
Expand All @@ -62,9 +68,9 @@ struct PrefetchRange {
}

//Ranges needs to be sorted.
static std::vector<PrefetchRange> mergeAdjacentSeqRanges(
static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
int64_t max_read_size_bytes) {
int64_t once_max_read_bytes) {
if (seq_ranges.empty()) {
return {};
}
Expand All @@ -74,7 +80,7 @@ struct PrefetchRange {
for (size_t i = 1; i < seq_ranges.size(); ++i) {
PrefetchRange current = seq_ranges[i];
PrefetchRange merged = last.seq_span(current);
if (merged.end_offset <= max_read_size_bytes + merged.start_offset &&
if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
last.end_offset + max_merge_distance_bytes >= current.start_offset) {
last = merged;
} else {
Expand All @@ -90,15 +96,15 @@ struct PrefetchRange {
class RangeFinder {
public:
virtual ~RangeFinder() = default;
virtual Status get_range_for(int64_t desiredOffset, io::PrefetchRange& result_range) = 0;
virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
virtual size_t get_max_range_size() const = 0;
};

class LinearProbeRangeFinder : public RangeFinder {
public:
LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}

Status get_range_for(int64_t desiredOffset, io::PrefetchRange& result_range) override;
Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;

size_t get_max_range_size() const override {
size_t max_range_size = 0;
Expand All @@ -115,6 +121,13 @@ class LinearProbeRangeFinder : public RangeFinder {
size_t index {0};
};

/**
* The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
* For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
* I first merge the access to the orc files to be read (of course there is a problem of read amplification,
* but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
* and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
*/
class RangeCacheFileReader : public io::FileReader {
struct RangeCacheReaderStatistics {
int64_t request_io = 0;
Expand Down Expand Up @@ -168,6 +181,19 @@ class RangeCacheFileReader : public io::FileReader {
RuntimeProfile::Counter* _cache_refresh_count = nullptr;
RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
RangeCacheReaderStatistics _cache_statistics;
/**
* `RangeCacheFileReader`:
* 1. `CacheRefreshCount`: how many IOs are merged
* 2. `ReadToCacheBytes`: how much data is actually read after merging
* 3. `ReadToCacheTime`: how long it takes to read data after merging
* 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
* 5. `RequestIO`: how many times the apache-orc library calls this read interface
* 6. `RequestTime`: how long it takes the apache-orc library to call this read interface
*
* It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
* the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
* because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
*/
};

/**
Expand Down
112 changes: 57 additions & 55 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,74 +857,76 @@ Status OrcReader::set_fill_columns(
if (_colname_to_value_range == nullptr || !_init_search_argument(_colname_to_value_range)) {
_lazy_read_ctx.can_lazy_read = false;
}
try {
_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
_row_reader_options.include(_read_cols);
_row_reader_options.setEnableLazyDecoding(true);

_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
_row_reader_options.include(_read_cols);
_row_reader_options.setEnableLazyDecoding(true);
uint64_t number_of_stripes = _reader->getNumberOfStripes();
auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);

uint64_t number_of_stripes = _reader->getNumberOfStripes();
auto allStripesNeeded = _reader->getNeedReadStripes(_row_reader_options);
int64_t range_end_offset = _range_start_offset + _range_size;

int64_t range_end_offset = _range_start_offset + _range_size;
// If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used.
int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;

int64_t orc_tiny_stripe_threshold = 8L * 1024L * 1024L;
int64_t orc_once_max_read_size = 8L * 1024L * 1024L;
int64_t orc_max_merge_distance = 1L * 1024L * 1024L;
if (_state != nullptr) {
orc_tiny_stripe_threshold_bytes =
_state->query_options().orc_tiny_stripe_threshold_bytes;
orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
}

if (_state != nullptr) {
orc_tiny_stripe_threshold = _state->query_options().orc_tiny_stripe_threshold;
orc_once_max_read_size = _state->query_options().orc_once_max_read_size;
orc_max_merge_distance = _state->query_options().orc_max_merge_distance;
}
bool all_tiny_stripes = true;
std::vector<io::PrefetchRange> tiny_stripe_ranges;

bool all_tiny_stripes = true;
std::vector<io::PrefetchRange> tiny_stripe_ranges;
for (uint64_t i = 0; i < number_of_stripes; i++) {
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
uint64_t strip_start_offset = strip_info->getOffset();
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();

for (uint64_t i = 0; i < number_of_stripes; i++) {
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
uint64_t strip_start_offset = strip_info->getOffset();
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();
if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset ||
!all_stripes_needed[i]) {
continue;
}
if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
all_tiny_stripes = false;
break;
}

if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset ||
!allStripesNeeded[i]) {
continue;
tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
}
if (strip_info->getLength() > orc_tiny_stripe_threshold) {
all_tiny_stripes = false;
break;
}

tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
}
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::mergeAdjacentSeqRanges(
tiny_stripe_ranges, orc_max_merge_distance, orc_once_max_read_size);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));

auto* orcInputStreamPtr = static_cast<ORCFileInputStream*>(_reader->getStream());
orcInputStreamPtr->set_all_tiny_stripes();
auto& orc_file_reader = orcInputStreamPtr->get_file_reader();
orc_file_reader->collect_profile_before_close();
auto orc_inner_reader = orcInputStreamPtr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
range_finder);
_lazy_read_ctx.can_lazy_read = false;
}
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
orc_max_merge_distance_bytes,
orc_once_max_read_bytes);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));

if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
orc_input_stream_ptr->set_all_tiny_stripes();
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
orc_file_reader->collect_profile_before_close();
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
range_finder);
_lazy_read_ctx.can_lazy_read = false;
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);

if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
}
}
}

_fill_all_columns = true;
try {
_fill_all_columns = true;
// create orc row reader
if (_lazy_read_ctx.can_lazy_read) {
_row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
Expand Down
108 changes: 108 additions & 0 deletions be/test/io/fs/buffered_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,34 @@ class MockOffsetFileReader : public io::FileReader {
io::Path _path = "/tmp/mock";
};

class TestingRangeCacheFileReader : public io::FileReader {
public:
TestingRangeCacheFileReader(std::shared_ptr<io::FileReader> delegate) : _delegate(delegate) {};

~TestingRangeCacheFileReader() override = default;

Status close() override { return _delegate->close(); }

const io::Path& path() const override { return _delegate->path(); }

size_t size() const override { return _delegate->size(); }

bool closed() const override { return _delegate->closed(); }

const io::PrefetchRange& last_read_range() const { return *_last_read_range; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const io::IOContext* io_ctx) override {
_last_read_range = std::make_unique<io::PrefetchRange>(offset, offset + result.size);
return _delegate->read_at_impl(offset, result, bytes_read, io_ctx);
}

private:
std::shared_ptr<io::FileReader> _delegate;
std::unique_ptr<io::PrefetchRange> _last_read_range;
};

TEST_F(BufferedReaderTest, normal_use) {
// buffered_reader_test_file 950 bytes
io::FileReaderSPtr local_reader;
Expand Down Expand Up @@ -398,4 +426,84 @@ TEST_F(BufferedReaderTest, test_merged_io) {
}
}

TEST_F(BufferedReaderTest, test_range_cache_file_reader) {
io::FileReaderSPtr offset_reader = std::make_shared<MockOffsetFileReader>(128 * 1024 * 1024);
auto testing_reader = std::make_shared<TestingRangeCacheFileReader>(offset_reader);

int64_t orc_max_merge_distance = 1L * 1024L * 1024L;
int64_t orc_once_max_read_size = 8L * 1024L * 1024L;

{
std::vector<io::PrefetchRange> tiny_stripe_ranges = {
io::PrefetchRange(3, 33),
io::PrefetchRange(33, 63),
io::PrefetchRange(63, 8L * 1024L * 1024L + 63),
};
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(
tiny_stripe_ranges, orc_max_merge_distance, orc_once_max_read_size);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
io::RangeCacheFileReader range_cache_file_reader(nullptr, testing_reader, range_finder);
char data[1];
Slice result(data, 1);
size_t bytes_read;
EXPECT_TRUE(range_cache_file_reader.read_at(3, result, &bytes_read, nullptr).ok());
EXPECT_EQ(io::PrefetchRange(3, 63), testing_reader->last_read_range());

EXPECT_TRUE(range_cache_file_reader.read_at(63, result, &bytes_read, nullptr).ok());
EXPECT_EQ(io::PrefetchRange(63, 8 * 1024L * 1024L + 63), testing_reader->last_read_range());
EXPECT_TRUE(range_cache_file_reader.close().ok());
}

{
std::vector<io::PrefetchRange> tiny_stripe_ranges = {
io::PrefetchRange(3, 33),
io::PrefetchRange(33, 63),
io::PrefetchRange(63, 8L * 1024L * 1024L + 63),
};
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(
tiny_stripe_ranges, orc_max_merge_distance, orc_once_max_read_size);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
io::RangeCacheFileReader range_cache_file_reader(nullptr, testing_reader, range_finder);
char data[1];
Slice result(data, 1);
size_t bytes_read;
EXPECT_TRUE(range_cache_file_reader.read_at(62, result, &bytes_read, nullptr).ok());
EXPECT_EQ(io::PrefetchRange(3, 63), testing_reader->last_read_range());

EXPECT_TRUE(range_cache_file_reader.read_at(63, result, &bytes_read, nullptr).ok());
EXPECT_EQ(io::PrefetchRange(63, 8L * 1024L * 1024L + 63),
testing_reader->last_read_range());
EXPECT_TRUE(range_cache_file_reader.close().ok());
}

{
std::vector<io::PrefetchRange> tiny_stripe_ranges = {
io::PrefetchRange(3, 3),
io::PrefetchRange(4, 1048576L * 5L + 4),
io::PrefetchRange(1048576L * 5L + 4, 1048576L * 3L + 1048576L * 5L + 4),
};
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(
tiny_stripe_ranges, orc_max_merge_distance, orc_once_max_read_size);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
io::RangeCacheFileReader range_cache_file_reader(nullptr, testing_reader, range_finder);
char data[1];
Slice result(data, 1);
size_t bytes_read;
EXPECT_TRUE(range_cache_file_reader.read_at(3, result, &bytes_read, nullptr).ok());
EXPECT_EQ(io::PrefetchRange(3, 1 + 1048576 * 5 + 3), testing_reader->last_read_range());

EXPECT_TRUE(range_cache_file_reader.read_at(4 + 1048576 * 5, result, &bytes_read, nullptr)
.ok());
EXPECT_EQ(io::PrefetchRange(4 + 1048576 * 5, 3 * 1048576 + 4 + 1048576 * 5),
testing_reader->last_read_range());
EXPECT_TRUE(range_cache_file_reader.close().ok());
}
}

} // end namespace doris
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit e7e9235

Please sign in to comment.