diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 67e07665fbfd9f..c9c5af5be60452 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -67,13 +67,24 @@ struct PrefetchRange {
         return {start_offset, other.end_offset};
     }
 
-    //Ranges needs to be sorted.
+    bool contains(const PrefetchRange& range) const {
+        return start_offset <= range.start_offset && range.end_offset <= end_offset;
+    }
+
     static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
-            const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
+            const std::vector<PrefetchRange>& input_ranges, int64_t max_merge_distance_bytes,
             int64_t once_max_read_bytes) {
-        if (seq_ranges.empty()) {
+        if (input_ranges.empty()) {
             return {};
         }
+
+        // Sort ranges by start offset
+        std::vector<PrefetchRange> seq_ranges = input_ranges;
+        std::sort(seq_ranges.begin(), seq_ranges.end(),
+                  [](const PrefetchRange& a, const PrefetchRange& b) {
+                      return a.start_offset < b.start_offset;
+                  });
+
         // Merge overlapping ranges
         std::vector<PrefetchRange> result;
         PrefetchRange last = seq_ranges.front();
diff --git a/be/src/vec/exec/format/parquet/parquet_column_chunk_file_reader.cpp b/be/src/vec/exec/format/parquet/parquet_column_chunk_file_reader.cpp
new file mode 100644
index 00000000000000..2ef18f65aff40d
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/parquet_column_chunk_file_reader.cpp
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/parquet_column_chunk_file_reader.h"
+
+#include <cstring>
+#include <stdexcept>
+
+namespace doris {
+namespace vectorized {
+
+const Slice ParquetColumnChunkFileReader::EMPTY_SLICE;
+
+ParquetColumnChunkFileReader::ParquetColumnChunkFileReader(
+        std::vector<std::shared_ptr<ChunkReader>> chunks, ChunkReader::Statistics& statistics)
+        : chunks_(std::move(chunks)),
+          statistics_(statistics),
+          current_chunk_index_(-1),
+          last_read_offset_(0) {
+    if (chunks_.empty()) {
+        throw std::invalid_argument("At least one chunk is expected but got none");
+    }
+    current_slice_ = std::make_shared<Slice>();
+}
+
+ParquetColumnChunkFileReader::~ParquetColumnChunkFileReader() {
+    static_cast<void>(close());
+}
+
+ParquetColumnChunkFileReader::ParquetColumnChunkFileReader(
+        ParquetColumnChunkFileReader&& other) noexcept
+        : chunks_(std::move(other.chunks_)),
+          statistics_(other.statistics_),
+          current_chunk_index_(other.current_chunk_index_),
+          current_slice_(std::move(other.current_slice_)),
+          current_position_(other.current_position_),
+          last_read_offset_(other.last_read_offset_) {
+    other.current_slice_ = nullptr;
+    other.current_position_ = 0;
+    other.current_chunk_index_ = -1;
+    other.last_read_offset_ = 0;
+}
+
+ParquetColumnChunkFileReader& ParquetColumnChunkFileReader::operator=(
+        ParquetColumnChunkFileReader&& other) noexcept {
+    if (this != &other) {
+        static_cast<void>(close());
+        chunks_ = std::move(other.chunks_);
+        current_chunk_index_ = other.current_chunk_index_;
+        current_slice_ = std::move(other.current_slice_);
+        current_position_ = other.current_position_;
+        last_read_offset_ = other.last_read_offset_;
+
+        other.current_slice_ = nullptr;
+        other.current_position_ = 0;
+        other.current_chunk_index_ = -1;
+        other.last_read_offset_ = 0;
+    }
+    return *this;
+}
+
+Status ParquetColumnChunkFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                                                  const io::IOContext* io_ctx) {
+    if (result.size == 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(ensureOpen());
+
+    // 检查 offset 是否满足顺序读取的要求
+    if (offset < last_read_offset_) {
+        return Status::InternalError(fmt::format(
+                "Invalid offset: {}. Must be greater than or equal to last read offset: {}", offset,
+                last_read_offset_));
+    }
+
+    // 二分查找确定 offset 在哪个 chunk
+    size_t left = current_chunk_index_ >= 0 ? current_chunk_index_
+                                            : 0; // 从当前 chunk 开始查找,因为 offset 是递增的
+    size_t right = chunks_.size();
+    size_t target_chunk = right; // 初始化为 right,如果找不到合适的 chunk,就是这个值
+
+    while (left < right) {
+        size_t mid = left + (right - left) / 2;
+        auto chunk = chunks_[mid];
+        if (!chunk) {
+            return Status::InternalError("Invalid chunk type");
+        }
+
+        size_t chunk_start = chunk->getDiskOffset();
+        size_t chunk_size = chunk->size();
+        size_t chunk_end = chunk_start + chunk_size;
+
+        if (offset >= chunk_start && offset < chunk_end) {
+            target_chunk = mid;
+            break;
+        } else if (offset < chunk_start) {
+            right = mid;
+            target_chunk = right; // 更新 target_chunk,因为 offset 可能在这个范围内
+        } else {
+            left = mid + 1;
+        }
+    }
+
+    // 如果没找到合适的 chunk,说明 offset 不在任何 chunk 的范围内
+    if (target_chunk >= chunks_.size()) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+
+    // 如果需要跳过一些 chunks,释放掉它们
+    bool need_create_new_chunk_reader =
+            (current_chunk_index_ != static_cast<int64_t>(target_chunk));
+    while (current_chunk_index_ < static_cast<int64_t>(target_chunk)) {
+        // 释放掉要跳过的 chunks
+        if (current_chunk_index_ >= 0) {
+            chunks_[current_chunk_index_]->free();
+            chunks_[current_chunk_index_].reset();
+        }
+        current_chunk_index_++;
+    }
+
+    // 确保 current_chunk_index_ 是有效的
+    if (current_chunk_index_ < 0 || current_chunk_index_ >= static_cast<int64_t>(chunks_.size())) {
+        return Status::InternalError(fmt::format("Invalid chunk index: {}", current_chunk_index_));
+    }
+
+    // 只有当需要读取新的 chunk 时才重新创建和读取
+    if (need_create_new_chunk_reader) {
+        if (current_slice_) {
+            current_slice_.reset();
+        }
+        RETURN_IF_ERROR(chunks_[current_chunk_index_]->read(io_ctx, &current_slice_));
+    }
+
+    // 验证 offset 是否在当前 chunk 范围内
+    size_t chunk_start = chunks_[current_chunk_index_]->getDiskOffset();
+    size_t chunk_end = chunk_start + current_slice_->size;
+
+    if (offset < chunk_start || offset >= chunk_end) {
+        return Status::InternalError(
+                fmt::format("Invalid offset: {}. Current chunk file range: [{}, {})", offset,
+                            chunk_start, chunk_end));
+    }
+
+    // 设置读取位置
+    current_position_ = offset - chunk_start;
+
+    // 读取数据
+    size_t bytes_to_read = std::min(result.size, current_slice_->size - current_position_);
+    memcpy(result.data, current_slice_->data + current_position_, bytes_to_read);
+    current_position_ += bytes_to_read;
+    *bytes_read = bytes_to_read;
+    last_read_offset_ = offset; // 更新最后读取的偏移量
+
+    return Status::OK();
+}
+
+Status ParquetColumnChunkFileReader::close() {
+    bool expected = false;
+    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
+        // 释放所有的 chunks
+        if (current_chunk_index_ >= 0) {
+            for (int i = current_chunk_index_; i < chunks_.size(); ++i) {
+                if (chunks_[i]) {
+                    chunks_[i]->free();
+                    chunks_[i].reset();
+                }
+            }
+        }
+
+        if (current_slice_) {
+            current_slice_.reset();
+        }
+    }
+    return Status::OK();
+}
+
+Status ParquetColumnChunkFileReader::addChunkReader(std::shared_ptr<ChunkReader> reader) {
+    if (!reader) {
+        return Status::InvalidArgument("ChunkReader cannot be null");
+    }
+    chunks_.push_back(std::move(reader));
+    return Status::OK();
+}
+
+Status ParquetColumnChunkFileReader::getSlice(const io::IOContext* io_ctx, size_t length,
+                                              Slice* result) {
+    //    if (length == 0) {
+    //        *result = EMPTY_SLICE;
+    //        return Status::OK();
+    //    }
+    //
+    //    RETURN_IF_ERROR(ensureOpen());
+    //
+    //    while (!current_slice_ || current_slice_->size == current_position_) {
+    //        if (current_chunk_index_ + 1 >= static_cast<int64_t>(chunks_.size())) {
+    //            return Status::InvalidArgument(
+    //                fmt::format("Requested {} bytes but 0 was available", length));
+    //        }
+    //        RETURN_IF_ERROR(readNextChunk(io_ctx));
+    //    }
+    //
+    //    if (current_slice_->size - current_position_ >= length) {
+    //        // We can satisfy the request from the current slice
+    //        result->data = current_slice_->data + current_position_;
+    //        result->size = length;
+    //        current_position_ += length;
+    //        return Status::OK();
+    //    }
+    //
+    //    // Need to combine data from multiple chunks
+    //    auto buffer = std::make_unique<uint8_t[]>(length);
+    //    size_t total_read = 0;
+    //    size_t remaining = length;
+    //
+    //    while (remaining > 0) {
+    //        size_t bytes_read;
+    //        Slice temp_result(buffer.get() + total_read, remaining);
+    //        RETURN_IF_ERROR(read_at_impl(current_position_, temp_result, &bytes_read, nullptr));
+    //
+    //        if (bytes_read == 0) {
+    //            return Status::InvalidArgument(
+    //                fmt::format("Failed to read {} bytes", length));
+    //        }
+    //
+    //        total_read += bytes_read;
+    //        remaining -= bytes_read;
+    //    }
+    //
+    //    result->data = (char *)buffer.release();
+    //    result->size = length;
+    return Status::OK();
+}
+
+Status ParquetColumnChunkFileReader::ensureOpen() {
+    if (!current_slice_) {
+        return Status::InternalError("Stream closed");
+    }
+    return Status::OK();
+}
+
+//Status ParquetColumnChunkFileReader::readNextChunk(const io::IOContext* io_ctx) {
+//    if (current_chunk_index_ + 1 >= static_cast<int64_t>(chunks_.size())) {
+//        return Status::InternalError("No more chunks to read");
+//    }
+//
+//    // 释放当前 chunk
+//    if (current_chunk_reader_) {
+//        current_chunk_reader_->free();
+//        current_chunk_reader_.reset();
+//    }
+//
+//    // 读取新的 chunk
+//    current_chunk_index_++;
+//    current_chunk_reader_ = std::move(chunks_[current_chunk_index_]);
+//    RETURN_IF_ERROR(current_chunk_reader_->read(io_ctx, &current_slice_));
+//
+//    return Status::OK();
+//}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/format/parquet/parquet_column_chunk_file_reader.h b/be/src/vec/exec/format/parquet/parquet_column_chunk_file_reader.h
new file mode 100644
index 00000000000000..cf8db66c57dd20
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/parquet_column_chunk_file_reader.h
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iterator>
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/path.h"
+#include "util/slice.h"
+#include "vec/exec/format/parquet/reference_counted_reader.h"
+
+namespace doris {
+namespace vectorized {
+
+/**
+ * A FileReader implementation for reading parquet column chunks.
+ * It reads column chunks in limited (small) byte chunks (8MB by default).
+ * Column chunks consists of multiple pages.
+ * This abstraction is used because the page size is unknown until the page header is read
+ * and page header and page data can be split between two or more byte chunks.
+ */
+class ParquetColumnChunkFileReader : public io::FileReader {
+public:
+    ParquetColumnChunkFileReader(std::vector<std::shared_ptr<ChunkReader>> chunks,
+                                 ChunkReader::Statistics& statistics);
+    ~ParquetColumnChunkFileReader() override;
+
+    // Prevent copying
+    ParquetColumnChunkFileReader(const ParquetColumnChunkFileReader&) = delete;
+    ParquetColumnChunkFileReader& operator=(const ParquetColumnChunkFileReader&) = delete;
+
+    // Allow moving
+    ParquetColumnChunkFileReader(ParquetColumnChunkFileReader&&) noexcept;
+    ParquetColumnChunkFileReader& operator=(ParquetColumnChunkFileReader&&) noexcept;
+
+    // FileReader interface implementation
+    Status close() override;
+    //    const Path& path() const override { return path_; }
+    size_t size() const override { return chunks_.back()->file_size(); }
+    //    bool closed() const override { return current_slice_ == nullptr; }
+
+    // Additional functionality
+    Status addChunkReader(std::shared_ptr<ChunkReader> reader);
+    Status getSlice(const io::IOContext* io_ctx, size_t length, Slice* result);
+    size_t available() const {
+        return current_slice_ ? current_slice_->size - current_position_ : 0;
+    }
+
+    //    Status close() override {
+    //        if (!_closed) {
+    //            _closed = true;
+    //        }
+    //        return Status::OK();
+    //    }
+
+    const io::Path& path() const override { return chunks_.back()->path(); }
+
+    bool closed() const override { return _closed.load(std::memory_order_acquire); }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* io_ctx) override;
+
+    void _collect_profile_before_close() override {
+        //        if (_profile != nullptr) {
+        //            COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+        //            COUNTER_UPDATE(_read_time, _statistics.read_time);
+        //            COUNTER_UPDATE(_request_io, _statistics.request_io);
+        //            COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+        //            COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+        //            COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+        //            COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
+        //            if (_reader != nullptr) {
+        //                _reader->collect_profile_before_close();
+        //            }
+        //        }
+    }
+
+private:
+    Status ensureOpen();
+    //    Status readNextChunk(const io::IOContext* io_ctx);
+
+private:
+    std::vector<std::shared_ptr<ChunkReader>> chunks_;
+    ChunkReader::Statistics& statistics_;
+    int64_t current_chunk_index_ = -1; // 当前正在读取的 chunk 的索引,-1 表示还没有读取任何 chunk
+                                       //    std::shared_ptr<ChunkReader> current_chunk_reader_;
+    std::shared_ptr<Slice> current_slice_;
+    size_t current_position_ = 0;
+    int64_t last_read_offset_ = -1; // 记录上一次读取的文件偏移量,用于确保顺序读取
+    std::atomic<bool> _closed = false;
+
+    static const Slice EMPTY_SLICE;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index da374d5fe793f8..f4e664c238ca85 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -285,4 +285,49 @@ class CorruptStatistics {
     static const SemanticVersion CDH_5_PARQUET_251_FIXED_END;
 };
 
+class ChunkKey {
+public:
+    ChunkKey(int column, int rowGroup) : _column(column), _rowGroup(rowGroup) {}
+
+    // 实现 hash 函数
+    size_t hash() const {
+        size_t h1 = std::hash<int> {}(_column);
+        size_t h2 = std::hash<int> {}(_rowGroup);
+        return h1 ^ (h2 << 1);
+    }
+
+    // 重载相等运算符
+    bool operator==(const ChunkKey& other) const {
+        return _column == other._column && _rowGroup == other._rowGroup;
+    }
+
+    // 添加小于运算符,用于 multimap 的排序
+    bool operator<(const ChunkKey& other) const {
+        if (_column != other._column) {
+            return _column < other._column;
+        }
+        return _rowGroup < other._rowGroup;
+    }
+
+    // toString 方法
+    std::string toString() const {
+        std::ostringstream oss;
+        oss << "[rowGroup=" << _rowGroup << ", column=" << _column << "]";
+        return oss.str();
+    }
+
+private:
+    int _column;
+    int _rowGroup;
+};
+
 } // namespace doris::vectorized
+
+// 为 ChunkKey 实现 std::hash 特化
+namespace std {
+template <>
+struct hash<doris::vectorized::ChunkKey> { // 修改这里,添加完整的命名空间
+    size_t operator()(const doris::vectorized::ChunkKey& key) const { return key.hash(); }
+};
+
+} // namespace std
diff --git a/be/src/vec/exec/format/parquet/reference_counted_reader.cpp b/be/src/vec/exec/format/parquet/reference_counted_reader.cpp
new file mode 100644
index 00000000000000..8d426d1eded5ba
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/reference_counted_reader.cpp
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/reference_counted_reader.h"
+
+#include <sstream>
+#include <stdexcept>
+
+namespace doris {
+namespace vectorized {
+
+ReferenceCountedReader::ReferenceCountedReader(const io::PrefetchRange& range,
+                                               io::FileReaderSPtr file_reader,
+                                               ChunkReader::Statistics& statistics)
+        : _range(range),
+          _file_reader(std::move(file_reader)),
+          _statistics(statistics),
+          _data(nullptr),
+          _reference_count(1) {
+    if (_range.end_offset - _range.start_offset > MAX_ARRAY_SIZE) {
+        throw std::invalid_argument("Cannot read range bigger than " +
+                                    std::to_string(MAX_ARRAY_SIZE) + " but got " +
+                                    std::to_string(_range.end_offset - _range.start_offset));
+    }
+}
+
+void ReferenceCountedReader::addReference() {
+    if (_reference_count <= 0) {
+        throw std::runtime_error("Chunk reader is already closed");
+    }
+    _reference_count++;
+}
+
+int64_t ReferenceCountedReader::getDiskOffset() {
+    return _range.start_offset;
+}
+
+Status ReferenceCountedReader::read(const io::IOContext* io_ctx, std::shared_ptr<Slice>* result) {
+    if (_reference_count <= 0) {
+        throw std::runtime_error("Chunk reader is already closed");
+    }
+
+    auto range_size = _range.end_offset - _range.start_offset;
+    if (_data == nullptr) { // need read new range to cache.
+
+        //        _cache_statistics.cache_refresh_count++;
+        //        _cache_statistics.read_to_cache_bytes += range_size;
+        //        SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);
+
+        //        Slice slice = {_data.get(), range_size};
+        _data = std::make_unique<char[]>(range_size);
+        *result = std::make_shared<Slice>(_data.get(), range_size);
+        size_t bytes_read;
+        RETURN_IF_ERROR(
+                _file_reader->read_at(_range.start_offset, *(*result), &bytes_read, io_ctx));
+        _statistics.merged_io++;
+        _statistics.merged_bytes += bytes_read;
+
+        if (bytes_read != range_size) [[unlikely]] {
+            return Status::InternalError(
+                    "const io::IOContext* io_ctx use file reader read bytes {} not eq expect size "
+                    "{}",
+                    bytes_read, range_size);
+        }
+    }
+    *result = std::make_shared<Slice>(_data.get(), range_size);
+    return Status::OK();
+}
+
+//Status ReferenceCountedReader::read(size_t offset, Slice result, size_t* bytes_read,
+//                                    const io::IOContext* io_ctx) {
+//    if (_reference_count <= 0) {
+//        throw std::runtime_error("Chunk reader is already closed");
+//    }
+//
+//    auto request_size = result.size;
+//
+//    if (_data == nullptr) { // need read new range to cache.
+//        auto range_size = _range.end_offset - _range.start_offset;
+//
+////        _cache_statistics.cache_refresh_count++;
+////        _cache_statistics.read_to_cache_bytes += range_size;
+////        SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);
+//
+//        Slice cache_slice = {_data.get(), range_size};
+//        RETURN_IF_ERROR(
+//                _file_reader->read_at(_range.start_offset, cache_slice, bytes_read, io_ctx));
+//
+//        if (*bytes_read != range_size) [[unlikely]] {
+//            return Status::InternalError(
+//                    "RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
+//                    *bytes_read, range_size);
+//        }
+//    }
+//
+//    int64_t buffer_offset = offset - _range.start_offset;
+//    memcpy(result.data, _data.get() + buffer_offset, request_size);
+//    *bytes_read = request_size;
+//
+//    return Status::OK();
+//}
+
+void ReferenceCountedReader::free() {
+    if (_reference_count <= 0) {
+        throw std::runtime_error("Reference count is already 0");
+    }
+
+    _reference_count--;
+    if (_reference_count == 0) {
+        _data.reset();
+    }
+}
+
+size_t ReferenceCountedReader::size() {
+    return _range.end_offset - _range.start_offset;
+}
+
+size_t ReferenceCountedReader::file_size() {
+    return _file_reader->size();
+}
+
+std::string ReferenceCountedReader::toString() const {
+    std::ostringstream oss;
+    oss << "ReferenceCountedReader{range=" << /* range_.toString() */
+            ", referenceCount=" << _reference_count << "}";
+    return oss.str();
+}
+
+// 这个函数需要根据实际的I/O实现来完成
+//Status ReferenceCountedReader::readFully(size_t offset, Slice result, size_t* bytes_read, const doris::io::IOContext* io_ctx) {
+//    return _file_reader->read_at(offset, result, bytes_read, io_ctx)
+//}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/format/parquet/reference_counted_reader.h b/be/src/vec/exec/format/parquet/reference_counted_reader.h
new file mode 100644
index 00000000000000..e868a9617567fc
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/reference_counted_reader.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+
+namespace doris {
+namespace vectorized {
+
+class ChunkReader {
+public:
+    struct Statistics {
+        int64_t merged_io {0};
+        int64_t merged_bytes {0};
+
+        void merge(Statistics& statistics) {
+            merged_io += statistics.merged_io;
+            merged_bytes += statistics.merged_bytes;
+        }
+    };
+    virtual int64_t getDiskOffset() = 0;
+    virtual Status read(const io::IOContext* io_ctx, std::shared_ptr<Slice>* result) = 0;
+    //    Status read(size_t offset, Slice result, size_t* bytes_read,
+    //                const io::IOContext* io_ctx);
+    virtual void free() = 0;
+    virtual size_t size() = 0;
+    virtual size_t file_size() = 0;
+    virtual const io::Path& path() const = 0;
+    virtual ~ChunkReader() = default;
+};
+
+class ReferenceCountedReader : public ChunkReader {
+public:
+    // 对应Java中的MAX_ARRAY_SIZE常量
+    static const int32_t MAX_ARRAY_SIZE = INT32_MAX - 8;
+
+    ReferenceCountedReader(const io::PrefetchRange& range, io::FileReaderSPtr file_reader,
+                           ChunkReader::Statistics& statistics);
+
+    ~ReferenceCountedReader() override = default;
+
+    void addReference();
+
+    // ChunkReader interface implementation
+    int64_t getDiskOffset() override;
+    Status read(const io::IOContext* io_ctx, std::shared_ptr<Slice>* result) override;
+    //    Status read(size_t offset, Slice result, size_t* bytes_read,
+    //                                        const io::IOContext* io_ctx);
+    void free() override;
+
+    size_t size() override;
+
+    size_t file_size() override;
+
+    const io::Path& path() const override { return _file_reader->path(); }
+
+    std::string toString() const;
+
+    int32_t reference_count() const { return _reference_count; }
+
+private:
+    //    Status readFully(size_t offset, Slice result, size_t* bytes_read, const doris::io::IOContext* io_ctx);
+
+    io::PrefetchRange _range;
+    io::FileReaderSPtr _file_reader;
+    ChunkReader::Statistics& _statistics;
+    std::unique_ptr<char[]> _data;
+    int32_t _reference_count;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 11fec1d5a79042..e36a320e3f570d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -103,16 +103,16 @@ static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offset
     }
 }
 
-Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
-                                   const tparquet::RowGroup& row_group,
-                                   const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
-                                   io::IOContext* io_ctx,
-                                   std::unique_ptr<ParquetColumnReader>& reader,
-                                   size_t max_buf_size, const tparquet::OffsetIndex* offset_index) {
+Status ParquetColumnReader::create(
+        const std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>& chunk_readers,
+        FieldSchema* field, const tparquet::RowGroup& row_group, int32_t row_group_id,
+        const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, io::IOContext* io_ctx,
+        std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size,
+        const tparquet::OffsetIndex* offset_index) {
     if (field->type.type == TYPE_ARRAY) {
         std::unique_ptr<ParquetColumnReader> element_reader;
-        RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
-                               element_reader, max_buf_size));
+        RETURN_IF_ERROR(create(chunk_readers, &field->children[0], row_group, row_group_id,
+                               row_ranges, ctz, io_ctx, element_reader, max_buf_size));
         element_reader->set_nested_column();
         auto array_reader = ArrayColumnReader::create_unique(row_ranges, ctz, io_ctx);
         RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
@@ -120,10 +120,10 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
     } else if (field->type.type == TYPE_MAP) {
         std::unique_ptr<ParquetColumnReader> key_reader;
         std::unique_ptr<ParquetColumnReader> value_reader;
-        RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz,
-                               io_ctx, key_reader, max_buf_size));
-        RETURN_IF_ERROR(create(file, &field->children[0].children[1], row_group, row_ranges, ctz,
-                               io_ctx, value_reader, max_buf_size));
+        RETURN_IF_ERROR(create(chunk_readers, &field->children[0].children[0], row_group,
+                               row_group_id, row_ranges, ctz, io_ctx, key_reader, max_buf_size));
+        RETURN_IF_ERROR(create(chunk_readers, &field->children[0].children[1], row_group,
+                               row_group_id, row_ranges, ctz, io_ctx, value_reader, max_buf_size));
         key_reader->set_nested_column();
         value_reader->set_nested_column();
         auto map_reader = MapColumnReader::create_unique(row_ranges, ctz, io_ctx);
@@ -134,8 +134,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
         child_readers.reserve(field->children.size());
         for (int i = 0; i < field->children.size(); ++i) {
             std::unique_ptr<ParquetColumnReader> child_reader;
-            RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx,
-                                   child_reader, max_buf_size));
+            RETURN_IF_ERROR(create(chunk_readers, &field->children[i], row_group, row_group_id,
+                                   row_ranges, ctz, io_ctx, child_reader, max_buf_size));
             child_reader->set_nested_column();
             child_readers[field->children[i].name] = std::move(child_reader);
         }
@@ -146,7 +146,9 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
         const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
         auto scalar_reader =
                 ScalarColumnReader::create_unique(row_ranges, chunk, offset_index, ctz, io_ctx);
-        RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size));
+        RETURN_IF_ERROR(
+                scalar_reader->init(chunk_readers.at({field->physical_column_index, row_group_id}),
+                                    field, max_buf_size));
         reader.reset(scalar_reader.release());
     }
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 4c6e5b1eac9f60..21b123c137bf18 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -33,6 +33,7 @@
 #include "parquet_column_convert.h"
 #include "vec/columns/columns_number.h"
 #include "vec/data_types/data_type.h"
+#include "vec/exec/format/parquet/parquet_column_chunk_file_reader.h"
 #include "vec/exec/format/parquet/parquet_common.h"
 #include "vparquet_column_chunk_reader.h"
 
@@ -133,11 +134,12 @@ class ParquetColumnReader {
         __builtin_unreachable();
     }
 
-    static Status create(io::FileReaderSPtr file, FieldSchema* field,
-                         const tparquet::RowGroup& row_group,
-                         const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
-                         io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader,
-                         size_t max_buf_size, const tparquet::OffsetIndex* offset_index = nullptr);
+    static Status create(
+            const std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>& chunk_readers,
+            FieldSchema* field, const tparquet::RowGroup& row_group, int32_t row_group_id,
+            const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, io::IOContext* io_ctx,
+            std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size,
+            const tparquet::OffsetIndex* offset_index = nullptr);
     void set_nested_column() { _nested_column = true; }
     virtual const std::vector<level_t>& get_rep_level() const = 0;
     virtual const std::vector<level_t>& get_def_level() const = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index b9259be936bb31..71fedf82e401b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -53,6 +53,7 @@
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/data_types/data_type_string.h"
+#include "vec/exec/format/parquet/parquet_column_chunk_file_reader.h"
 #include "vec/exprs/vdirect_in_predicate.h"
 #include "vec/exprs/vectorized_fn_call.h"
 #include "vec/exprs/vexpr.h"
@@ -76,13 +77,13 @@ namespace doris::vectorized {
 
 const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
 
-RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
-                               const std::vector<std::string>& read_columns,
-                               const int32_t row_group_id, const tparquet::RowGroup& row_group,
-                               cctz::time_zone* ctz, io::IOContext* io_ctx,
-                               const PositionDeleteContext& position_delete_ctx,
-                               const LazyReadContext& lazy_read_ctx, RuntimeState* state)
-        : _file_reader(file_reader),
+RowGroupReader::RowGroupReader(
+        const std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>& chunk_readers,
+        const std::vector<std::string>& read_columns, const int32_t row_group_id,
+        const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx,
+        const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx,
+        RuntimeState* state)
+        : _chunk_readers(chunk_readers),
           _read_columns(read_columns),
           _row_group_id(row_group_id),
           _row_group_meta(row_group),
@@ -131,9 +132,9 @@ Status RowGroupReader::init(
         const tparquet::OffsetIndex* offset_index =
                 col_offsets.find(physical_index) != col_offsets.end() ? &col_offsets[physical_index]
                                                                       : nullptr;
-        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta,
-                                                    _read_ranges, _ctz, _io_ctx, reader,
-                                                    max_buf_size, offset_index));
+        RETURN_IF_ERROR(ParquetColumnReader::create(_chunk_readers, field, _row_group_meta,
+                                                    _row_group_id, _read_ranges, _ctz, _io_ctx,
+                                                    reader, max_buf_size, offset_index));
         if (reader == nullptr) {
             VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
             return Status::Corruption("Init row group reader failed");
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index a889c1774ea126..f3df93f1826bae 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -29,7 +29,9 @@
 
 #include "io/fs/file_reader_writer_fwd.h"
 #include "vec/columns/column.h"
+#include "vec/exec/format/parquet/parquet_column_chunk_file_reader.h"
 #include "vec/exec/format/parquet/parquet_common.h"
+#include "vec/exec/format/parquet/reference_counted_reader.h"
 #include "vec/exprs/vexpr_fwd.h"
 #include "vparquet_column_reader.h"
 
@@ -142,11 +144,12 @@ class RowGroupReader : public ProfileCollector {
         PositionDeleteContext(const PositionDeleteContext& filter) = default;
     };
 
-    RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<std::string>& read_columns,
-                   const int32_t row_group_id, const tparquet::RowGroup& row_group,
-                   cctz::time_zone* ctz, io::IOContext* io_ctx,
-                   const PositionDeleteContext& position_delete_ctx,
-                   const LazyReadContext& lazy_read_ctx, RuntimeState* state);
+    RowGroupReader(
+            const std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>& chunk_readers,
+            const std::vector<std::string>& read_columns, const int32_t row_group_id,
+            const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx,
+            const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx,
+            RuntimeState* state);
 
     ~RowGroupReader();
     Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
@@ -165,9 +168,9 @@ class RowGroupReader : public ProfileCollector {
 
 protected:
     void _collect_profile_before_close() override {
-        if (_file_reader != nullptr) {
-            _file_reader->collect_profile_before_close();
-        }
+        // if (_file_reader != nullptr) {
+        //    _file_reader->collect_profile_before_close();
+        // }
     }
 
 private:
@@ -198,7 +201,8 @@ class RowGroupReader : public ProfileCollector {
     Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, bool is_nullable);
     void _convert_dict_cols_to_string_cols(Block* block);
 
-    io::FileReaderSPtr _file_reader;
+    //io::FileReaderSPtr _file_reader;
+    const std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>& _chunk_readers;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
     const std::vector<std::string>& _read_columns;
     const int32_t _row_group_id;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 47209dbb332cb8..c98a2e115f1705 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -183,6 +183,13 @@ void ParquetReader::_init_profile() {
                 _profile, "ParsePageHeaderNum", TUnit::UNIT, parquet_profile, 1);
         _parquet_profile.predicate_filter_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", parquet_profile, 1);
+
+        const char* merge_small_io_profile = "MergedSmallIO";
+        ADD_TIMER_WITH_LEVEL(_profile, merge_small_io_profile, 1);
+        _parquet_profile.merged_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO", TUnit::UNIT,
+                                                                  merge_small_io_profile, 1);
+        _parquet_profile.merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "MergedBytes", TUnit::BYTES, merge_small_io_profile, 1);
     }
 }
 
@@ -641,7 +648,7 @@ Status ParquetReader::_next_row_group_reader() {
 
     RowGroupReader::PositionDeleteContext position_delete_ctx =
             _get_position_delete_ctx(row_group, row_group_index);
-    io::FileReaderSPtr group_file_reader;
+    /*io::FileReaderSPtr group_file_reader;
     if (typeid_cast<io::InMemoryFileReader*>(_file_reader.get())) {
         // InMemoryFileReader has the ability to merge small IO
         group_file_reader = _file_reader;
@@ -655,10 +662,10 @@ Status ParquetReader::_next_row_group_reader() {
                                     ? std::make_shared<io::MergeRangeFileReader>(
                                               _profile, _file_reader, io_ranges)
                                     : _file_reader;
-    }
+    }*/
     _current_group_reader.reset(new RowGroupReader(
-            group_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz,
-            _io_ctx, position_delete_ctx, _lazy_read_ctx, _state));
+            _parquet_column_chunk_file_readers, _read_columns, row_group_index.row_group_id,
+            row_group, _ctz, _io_ctx, position_delete_ctx, _lazy_read_ctx, _state));
     _row_group_eof = false;
     return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets,
                                        _tuple_descriptor, _row_descriptor, _colname_to_slot_id,
@@ -721,6 +728,14 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
     if (_read_row_groups.empty()) {
         return Status::EndOfFile("No row group to read");
     }
+
+    size_t avg_io_size = 0;
+    const std::multimap<ChunkKey, io::PrefetchRange> io_ranges =
+            _generate_random_access_ranges2(&avg_io_size);
+    auto readers = _plan_read(io_ranges);
+
+    // 保存所有的 readers
+    _parquet_column_chunk_file_readers = std::move(readers);
     return Status::OK();
 }
 
@@ -744,7 +759,11 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
                     const tparquet::ColumnChunk& chunk =
                             row_group.columns[field->physical_column_index];
                     auto& chunk_meta = chunk.meta_data;
-                    int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+                    /*int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+                                                  ? chunk_meta.dictionary_page_offset
+                                                  : chunk_meta.data_page_offset;*/
+                    int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset &&
+                                                          chunk_meta.dictionary_page_offset > 0
                                                   ? chunk_meta.dictionary_page_offset
                                                   : chunk_meta.data_page_offset;
                     int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size;
@@ -765,6 +784,245 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
     return result;
 }
 
+// 递归获取所有叶子节点字段
+void ParquetReader::_get_leaf_fields(const FieldSchema* field,
+                                     std::vector<const FieldSchema*>* leaf_fields) {
+    if (!field) {
+        return;
+    }
+
+    // 如果是复杂类型(STRUCT, MAP, LIST 等),递归处理其子字段
+    if (field->type.type == TYPE_STRUCT) {
+        for (const auto& child : field->children) {
+            _get_leaf_fields(&child, leaf_fields);
+        }
+    } else if (field->type.type == TYPE_MAP) {
+        // MAP 类型有两个子字段:key 和 value
+        _get_leaf_fields(&field->children[0].children[0], leaf_fields); // key
+        _get_leaf_fields(&field->children[0].children[1], leaf_fields); // value
+    } else if (field->type.type == TYPE_ARRAY) {
+        // ARRAY 类型有一个子字段:element
+        _get_leaf_fields(&field->children[0], leaf_fields);
+    } else {
+        // 基本类型,直接添加到结果中
+        leaf_fields->push_back(field);
+    }
+}
+
+std::multimap<ChunkKey, io::PrefetchRange> ParquetReader::_generate_random_access_ranges2(
+        size_t* avg_io_size) {
+    std::multimap<ChunkKey, io::PrefetchRange> result;
+    int64_t last_chunk_end = -1;
+    int64_t total_io_size = 0;
+    for (const auto& row_group_index : _read_row_groups) {
+        const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_index.row_group_id];
+
+        // 对每个需要读取的列
+        for (auto& read_col : _read_columns) {
+            const FieldSchema* field = _file_metadata->schema().get_column(read_col);
+            //            if (field->physical_column_index < 0) {
+            //                continue;
+            //            }
+            // 递归处理所有叶子节点
+            std::vector<const FieldSchema*> leaf_fields;
+            _get_leaf_fields(field, &leaf_fields);
+
+            for (const FieldSchema* leaf_field : leaf_fields) {
+                int parquet_col_id = leaf_field->physical_column_index;
+                const tparquet::ColumnChunk& chunk = row_group.columns[parquet_col_id];
+                auto& chunk_meta = chunk.meta_data;
+                int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset &&
+                                                      chunk_meta.dictionary_page_offset > 0
+                                              ? chunk_meta.dictionary_page_offset
+                                              : chunk_meta.data_page_offset;
+                int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size;
+                DCHECK_GE(chunk_start, last_chunk_end);
+                ChunkKey key(parquet_col_id, row_group_index.row_group_id);
+                io::PrefetchRange range(chunk_start, chunk_end);
+                result.emplace(key, range);
+                total_io_size += chunk_meta.total_compressed_size;
+                last_chunk_end = chunk_end;
+            }
+        }
+    }
+
+    if (!_read_row_groups.empty()) {
+        *avg_io_size = total_io_size / _read_row_groups.size();
+    } else {
+        *avg_io_size = 0;
+    }
+
+    return result;
+}
+
+std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>> ParquetReader::_plan_read(
+        const std::multimap<ChunkKey, io::PrefetchRange>& ranges) {
+    if (ranges.empty()) {
+        return std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>();
+    }
+
+    // 获取planChunksRead的结果并转换为map
+    auto chunks_result = _plan_chunks_read(ranges);
+
+    // 创建结果map
+    std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>> result;
+
+    // 按key分组处理chunks
+    auto current = chunks_result.begin();
+    while (current != chunks_result.end()) {
+        // 找到当前key的所有chunks
+        auto range_end = chunks_result.upper_bound(current->first);
+        std::vector<std::shared_ptr<ChunkReader>> readers;
+
+        // 收集相同key的所有readers
+        for (auto it = current; it != range_end; ++it) {
+            readers.push_back(it->second);
+        }
+
+        // 为这个key创建新的ParquetColumnChunkFileReader
+        auto chunked_stream = std::make_shared<ParquetColumnChunkFileReader>(std::move(readers),
+                                                                             _chunk_readers_stats);
+        result.emplace(current->first, chunked_stream);
+
+        current = range_end;
+    }
+
+    return result;
+}
+
+std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> ParquetReader::_plan_chunks_read(
+        const std::multimap<ChunkKey, io::PrefetchRange>& ranges) {
+    if (ranges.empty()) {
+        return std::multimap<ChunkKey, std::shared_ptr<ChunkReader>>();
+    }
+
+    // 分割磁盘范围为"大"和"小"两类
+    std::multimap<ChunkKey, io::PrefetchRange> small_ranges;
+    std::multimap<ChunkKey, io::PrefetchRange> large_ranges;
+
+    int64_t max_buffer_size = 8 * 1024 * 1024;
+
+    for (const auto& range : ranges) {
+        if (range.second.end_offset - range.second.start_offset <= max_buffer_size) {
+            small_ranges.emplace(range.first, range.second);
+        } else {
+            auto split_ranges = _split_large_range(range.second);
+            for (const auto& split_range : split_ranges) {
+                large_ranges.emplace(range.first, split_range);
+            }
+        }
+    }
+
+    // 读取范围
+    std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> slices;
+
+    // 读取小范围
+    auto small_readers = _read_small_ranges(small_ranges);
+    slices.insert(small_readers.begin(), small_readers.end());
+
+    // 读取大范围
+    auto large_readers = _read_large_ranges(large_ranges);
+    slices.insert(large_readers.begin(), large_readers.end());
+
+    // 对每个 key 对应的 ChunkReaders 进行排序
+    auto current = slices.begin();
+    while (current != slices.end()) {
+        auto range_end = slices.upper_bound(current->first);
+        std::vector<std::shared_ptr<ChunkReader>> readers;
+
+        // 收集相同 key 的所有 readers
+        for (auto it = current; it != range_end; ++it) {
+            readers.push_back(it->second);
+        }
+
+        // 排序
+        std::sort(readers.begin(), readers.end(),
+                  [](const std::shared_ptr<ChunkReader>& a, const std::shared_ptr<ChunkReader>& b) {
+                      return a->getDiskOffset() < b->getDiskOffset();
+                  });
+
+        // 更新排序后的结果
+        auto reader_it = readers.begin();
+        for (auto it = current; it != range_end; ++it, ++reader_it) {
+            it->second = *reader_it;
+        }
+
+        current = range_end;
+    }
+
+    return slices;
+}
+
+std::vector<io::PrefetchRange> ParquetReader::_split_large_range(const io::PrefetchRange& range) {
+    std::vector<io::PrefetchRange> result;
+
+    int64_t current_offset = range.start_offset;
+    int64_t remaining_size = range.end_offset - range.start_offset;
+    int64_t max_buffer_size = 8 * 1024 * 1024;
+
+    while (remaining_size > 0) {
+        int64_t chunk_size = std::min(remaining_size, max_buffer_size);
+        result.emplace_back(current_offset, current_offset + chunk_size);
+        current_offset += chunk_size;
+        remaining_size -= chunk_size;
+    }
+
+    return result;
+}
+
+std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> ParquetReader::_read_small_ranges(
+        const std::multimap<ChunkKey, io::PrefetchRange>& ranges) {
+    if (ranges.empty()) {
+        return std::multimap<ChunkKey, std::shared_ptr<ChunkReader>>();
+    }
+
+    // 创建结果multimap
+    std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> slices;
+
+    // 收集所有ranges
+    std::vector<io::PrefetchRange> all_ranges;
+    all_ranges.reserve(ranges.size());
+    std::transform(ranges.begin(), ranges.end(), std::back_inserter(all_ranges),
+                   [](const auto& pair) { return pair.second; });
+
+    // 对所有ranges进行合并
+    int64_t max_merge_distance = 1 * 1024 * 1024; // 1MB
+    int64_t max_buffer_size = 8 * 1024 * 1024;    // 8MB
+    auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
+            all_ranges, max_merge_distance, max_buffer_size);
+
+    // 为每个合并后的范围创建reader
+    for (const auto& merged_range : merged_ranges) {
+        auto merged_range_reader = std::make_shared<ReferenceCountedReader>(
+                merged_range, _file_reader, _chunk_readers_stats);
+
+        // 检查每个原始的disk range是否包含在合并范围内
+        for (const auto& [key, range] : ranges) {
+            if (range.start_offset >= merged_range.start_offset &&
+                range.end_offset <= merged_range.end_offset) {
+                merged_range_reader->addReference();
+                slices.emplace(key, std::make_shared<ParquetChunkReader>(merged_range_reader, range,
+                                                                         merged_range));
+            }
+        }
+        merged_range_reader->free();
+    }
+
+    return slices;
+}
+
+std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> ParquetReader::_read_large_ranges(
+        const std::multimap<ChunkKey, io::PrefetchRange>& ranges) {
+    std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> result;
+
+    for (const auto& range : ranges) {
+        auto reader = std::make_shared<ReferenceCountedReader>(range.second, _file_reader,
+                                                               _chunk_readers_stats);
+        result.emplace(range.first, reader);
+    }
+    return result;
+}
+
 bool ParquetReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) {
     int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data);
 
@@ -1007,7 +1265,7 @@ Status ParquetReader::_process_bloom_filter(bool* filter_group) {
 }
 
 int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
-    if (column.__isset.dictionary_page_offset) {
+    if (column.__isset.dictionary_page_offset && column.dictionary_page_offset > 0) {
         DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
         return column.dictionary_page_offset;
     }
@@ -1055,6 +1313,9 @@ void ParquetReader::_collect_profile() {
     COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time);
     COUNTER_UPDATE(_parquet_profile.decode_level_time, _column_statistics.decode_level_time);
     COUNTER_UPDATE(_parquet_profile.decode_null_map_time, _column_statistics.decode_null_map_time);
+
+    COUNTER_UPDATE(_parquet_profile.merged_io, _chunk_readers_stats.merged_io);
+    COUNTER_UPDATE(_parquet_profile.merged_bytes, _chunk_readers_stats.merged_bytes);
 }
 
 void ParquetReader::_collect_profile_before_close() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index b2aa156a79d088..c035f90fad6052 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -22,6 +22,7 @@
 #include <stdint.h>
 
 #include <list>
+#include <map>
 #include <memory>
 #include <string>
 #include <tuple>
@@ -38,7 +39,9 @@
 #include "util/obj_lru_cache.h"
 #include "util/runtime_profile.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/parquet/parquet_column_chunk_file_reader.h"
 #include "vec/exec/format/parquet/parquet_common.h"
+#include "vec/exec/format/parquet/reference_counted_reader.h"
 #include "vparquet_column_reader.h"
 #include "vparquet_group_reader.h"
 
@@ -190,6 +193,9 @@ class ParquetReader : public GenericReader {
         RuntimeProfile::Counter* skip_page_header_num = nullptr;
         RuntimeProfile::Counter* parse_page_header_num = nullptr;
         RuntimeProfile::Counter* predicate_filter_time = nullptr;
+
+        RuntimeProfile::Counter* merged_io = nullptr;
+        RuntimeProfile::Counter* merged_bytes = nullptr;
     };
 
     Status _open_file();
@@ -207,6 +213,67 @@ class ParquetReader : public GenericReader {
     Status _process_page_index(const tparquet::RowGroup& row_group,
                                std::vector<RowRange>& candidate_row_ranges);
 
+    class ParquetChunkReader : public ChunkReader {
+    public:
+        ParquetChunkReader(std::shared_ptr<ReferenceCountedReader> loader,
+                           const io::PrefetchRange& range, const io::PrefetchRange& merged_range)
+                : loader_(std::move(loader)), range_(range), merged_range_(merged_range) {}
+
+        ~ParquetChunkReader() override { free(); }
+
+        int64_t getDiskOffset() override { return range_.start_offset; }
+
+        Status read(const io::IOContext* io_ctx, std::shared_ptr<Slice>* result) override {
+            DCHECK(result != nullptr);
+            std::shared_ptr<Slice> slice;
+            RETURN_IF_ERROR(loader_->read(io_ctx, &slice));
+            if (!slice) {
+                return Status::InternalError("Failed to read data from loader");
+            }
+
+            // 计算在合并范围中的偏移量
+            int64_t offset = range_.start_offset - merged_range_.start_offset;
+            int64_t length = range_.end_offset - range_.start_offset;
+
+            // 创建新的切片,指向原始数据的子范围
+            *result = std::make_shared<Slice>(slice->data + offset, length);
+            return Status::OK();
+        }
+
+        void free() override {
+            if (loader_) {
+                loader_->free();
+                loader_.reset();
+            }
+        }
+
+        size_t size() override { return loader_->size(); }
+
+        size_t file_size() override { return loader_->file_size(); }
+        const io::Path& path() const override { return loader_->path(); }
+
+        //std::shared_ptr<io::FileSystem> fs() const override { return loader_->fs(); }
+
+    private:
+        std::shared_ptr<ReferenceCountedReader> loader_;
+        io::PrefetchRange range_;
+        io::PrefetchRange merged_range_;
+    };
+
+    std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>> _plan_read(
+            const std::multimap<ChunkKey, io::PrefetchRange>& ranges);
+
+    std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> _plan_chunks_read(
+            const std::multimap<ChunkKey, io::PrefetchRange>& ranges);
+
+    std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> _read_small_ranges(
+            const std::multimap<ChunkKey, io::PrefetchRange>& ranges);
+
+    std::multimap<ChunkKey, std::shared_ptr<ChunkReader>> _read_large_ranges(
+            const std::multimap<ChunkKey, io::PrefetchRange>& ranges);
+
+    std::vector<io::PrefetchRange> _split_large_range(const io::PrefetchRange& range);
+
     // Row Group Filter
     bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
     Status _process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& column_meta,
@@ -220,6 +287,10 @@ class ParquetReader : public GenericReader {
     std::string _meta_cache_key(const std::string& path) { return "meta_" + path; }
     std::vector<io::PrefetchRange> _generate_random_access_ranges(
             const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
+    // 递归获取所有叶子节点字段
+    void _get_leaf_fields(const FieldSchema* field, std::vector<const FieldSchema*>* leaf_fields);
+    std::multimap<ChunkKey, io::PrefetchRange> _generate_random_access_ranges2(size_t* avg_io_size);
+
     void _collect_profile();
 
     static SortOrder _determine_sort_order(const tparquet::SchemaElement& parquet_schema);
@@ -288,5 +359,8 @@ class ParquetReader : public GenericReader {
     const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
     bool _hive_use_column_names = false;
     std::unordered_map<tparquet::Type::type, bool> _ignored_stats;
+    std::map<ChunkKey, std::shared_ptr<ParquetColumnChunkFileReader>>
+            _parquet_column_chunk_file_readers;
+    ChunkReader::Statistics _chunk_readers_stats;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 837269b0bb355d..50d60fe912b083 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -583,7 +583,8 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d
     for (int j = 0; j < meta_data->row_groups.size(); ++j) {
         auto& column_chunk = meta_data->row_groups[j].columns[ICEBERG_FILE_PATH_INDEX];
         if (!(column_chunk.__isset.meta_data &&
-              column_chunk.meta_data.__isset.dictionary_page_offset)) {
+              column_chunk.meta_data.__isset.dictionary_page_offset &&
+              column_chunk.meta_data.dictionary_page_offset > 0)) {
             dictionary_coded = false;
             break;
         }
diff --git a/be/test/vec/exec/format/parquet/parquet_column_chunk_file_reader_test.cpp b/be/test/vec/exec/format/parquet/parquet_column_chunk_file_reader_test.cpp
new file mode 100644
index 00000000000000..8872f5a940e764
--- /dev/null
+++ b/be/test/vec/exec/format/parquet/parquet_column_chunk_file_reader_test.cpp
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/parquet_column_chunk_file_reader.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "util/slice.h"
+#include "vec/exec/format/parquet/reference_counted_reader.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
+
+namespace doris::vectorized {
+
+class MockFileReader : public io::FileReader {
+public:
+    MockFileReader() = default;
+    ~MockFileReader() override = default;
+
+    Status close() override {
+        _closed = true;
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _path; }
+
+    size_t size() const override { return _data.size(); }
+
+    bool closed() const override { return _closed; }
+
+    void set_data(const std::string& data) { _data = data; }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* io_ctx) override {
+        if (offset >= _data.size()) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+        *bytes_read = std::min(result.size, _data.size() - offset);
+        memcpy(result.data, _data.data() + offset, *bytes_read);
+        return Status::OK();
+    }
+
+private:
+    std::string _data;
+    bool _closed = false;
+    io::Path _path = "/tmp/mock";
+};
+
+class ParquetColumnChunkFileReaderTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        _mock_file_reader = std::make_shared<MockFileReader>();
+        _statistics = std::make_unique<ChunkReader::Statistics>();
+    }
+
+    // 创建一个简单的 Parquet 列块数据
+    std::string create_mock_column_chunk(int32_t value, bool compressed = false) {
+        // 这里简化处理,创建一个包含单个 int32 值的列块
+        // 实际的 Parquet 文件格式会更复杂
+        std::string data;
+        data.resize(sizeof(int32_t));
+        memcpy(data.data(), &value, sizeof(int32_t));
+
+        if (compressed) {
+            // TODO: 如果需要测试压缩的情况,这里可以添加压缩逻辑
+        }
+
+        return data;
+    }
+
+    std::shared_ptr<MockFileReader> _mock_file_reader;
+    std::unique_ptr<ChunkReader::Statistics> _statistics;
+};
+
+// 测试基本的读取功能
+TEST_F(ParquetColumnChunkFileReaderTest, BasicRead) {
+    // 创建测试数据
+    int32_t test_value = 42;
+    std::string chunk_data = create_mock_column_chunk(test_value);
+    _mock_file_reader->set_data(chunk_data);
+
+    // 创建 ReferenceCountedReader
+    io::PrefetchRange range {0, chunk_data.size()};
+    auto ref_reader =
+            std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    // 创建列块读取器
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    chunks.push_back(ref_reader);
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    // 读取数据
+    char buffer[sizeof(int32_t)];
+    Slice result(buffer, sizeof(int32_t));
+    size_t bytes_read;
+    auto status = chunk_reader.read_at(0, result, &bytes_read, nullptr);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(bytes_read, sizeof(int32_t));
+
+    // 验证读取的值
+    int32_t read_value;
+    memcpy(&read_value, buffer, sizeof(int32_t));
+    EXPECT_EQ(read_value, test_value);
+
+    // 检查统计信息
+    EXPECT_EQ(_statistics->merged_io, 1);
+    EXPECT_EQ(_statistics->merged_bytes, sizeof(int32_t));
+}
+
+// 测试空数据读取
+TEST_F(ParquetColumnChunkFileReaderTest, EmptyRead) {
+    _mock_file_reader->set_data("");
+
+    io::PrefetchRange range {0, 0};
+    auto ref_reader =
+            std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    chunks.push_back(ref_reader);
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    char buffer[1];
+    Slice result(buffer, 1);
+    size_t bytes_read;
+    auto status = chunk_reader.read_at(0, result, &bytes_read, nullptr);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(bytes_read, 0);
+}
+
+// 测试范围读取
+TEST_F(ParquetColumnChunkFileReaderTest, RangeRead) {
+    // 创建一个包含多个值的数据块
+    std::string chunk_data;
+    for (int32_t i = 0; i < 5; i++) {
+        chunk_data += create_mock_column_chunk(i);
+    }
+    _mock_file_reader->set_data(chunk_data);
+
+    // 只读取中间的一部分
+    size_t start_offset = sizeof(int32_t);
+    size_t length = sizeof(int32_t) * 2;
+    io::PrefetchRange full_range {0, chunk_data.size()};
+    io::PrefetchRange read_range {start_offset, start_offset + length};
+
+    auto ref_reader =
+            std::make_shared<ReferenceCountedReader>(full_range, _mock_file_reader, *_statistics);
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    chunks.push_back(ref_reader);
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    // 使用 unique_ptr 来管理动态分配的内存
+    auto buffer = std::make_unique<char[]>(length);
+    Slice result(buffer.get(), length);
+    size_t bytes_read;
+    auto status = chunk_reader.read_at(start_offset, result, &bytes_read, nullptr);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(bytes_read, length);
+
+    // 验证读取的值
+    std::vector<int32_t> read_values(2);
+    memcpy(read_values.data(), buffer.get(), length);
+    EXPECT_EQ(read_values[0], 1);
+    EXPECT_EQ(read_values[1], 2);
+}
+
+// 测试多个 chunk reader
+TEST_F(ParquetColumnChunkFileReaderTest, MultipleChunks) {
+    // 创建多个 chunk,每个 chunk 包含不同的值
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    std::vector<int32_t> test_values = {10, 20, 30, 40, 50};
+    size_t chunk_size = sizeof(int32_t);
+    //    size_t total_size = 0;
+
+    // 创建包含所有数据的 file reader
+    std::string all_data;
+    for (int32_t value : test_values) {
+        all_data += create_mock_column_chunk(value);
+    }
+    auto mock_reader = std::make_shared<MockFileReader>();
+    mock_reader->set_data(all_data);
+
+    // 为每个 chunk 创建 ReferenceCountedReader,共享同一个 file reader
+    for (size_t i = 0; i < test_values.size(); ++i) {
+        io::PrefetchRange range {i * chunk_size, (i + 1) * chunk_size};
+        auto ref_reader =
+                std::make_shared<ReferenceCountedReader>(range, mock_reader, *_statistics);
+        chunks.push_back(ref_reader);
+        //        total_size += chunk_size;
+    }
+
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    // 读取每个 chunk 的数据并验证
+    for (size_t i = 0; i < test_values.size(); ++i) {
+        char buffer[sizeof(int32_t)];
+        Slice result(buffer, sizeof(int32_t));
+        size_t bytes_read;
+        auto status = chunk_reader.read_at(i * chunk_size, result, &bytes_read, nullptr);
+        EXPECT_TRUE(status.ok());
+        EXPECT_EQ(bytes_read, chunk_size);
+
+        int32_t read_value;
+        memcpy(&read_value, buffer, sizeof(int32_t));
+        EXPECT_EQ(read_value, test_values[i]);
+    }
+}
+
+// 测试连续顺序读取
+TEST_F(ParquetColumnChunkFileReaderTest, SequentialReads) {
+    // 创建两个 chunk
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    std::vector<int32_t> test_values = {100, 200};
+    size_t chunk_size = sizeof(int32_t);
+    size_t total_size = 0;
+
+    // 创建包含所有数据的 file reader
+    std::string all_data;
+    for (int32_t value : test_values) {
+        all_data += create_mock_column_chunk(value);
+    }
+    auto mock_reader = std::make_shared<MockFileReader>();
+    mock_reader->set_data(all_data);
+
+    // 为每个 chunk 创建 ReferenceCountedReader
+    for (size_t i = 0; i < test_values.size(); ++i) {
+        io::PrefetchRange range {i * chunk_size, (i + 1) * chunk_size};
+        auto ref_reader =
+                std::make_shared<ReferenceCountedReader>(range, mock_reader, *_statistics);
+        chunks.push_back(ref_reader);
+        total_size += chunk_size;
+    }
+
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    // 连续读取小块数据
+    for (size_t offset = 0; offset < total_size; offset += 2) {
+        char buffer[2];
+        Slice result(buffer, 2);
+        size_t bytes_read;
+        auto status = chunk_reader.read_at(offset, result, &bytes_read, nullptr);
+        EXPECT_TRUE(status.ok());
+        EXPECT_GT(bytes_read, 0);
+    }
+}
+
+// 测试跳跃式顺序读取
+TEST_F(ParquetColumnChunkFileReaderTest, SkipReads) {
+    // 创建多个 chunk
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    std::vector<int32_t> test_values = {1, 2, 3, 4, 5};
+    size_t chunk_size = sizeof(int32_t);
+    //    size_t total_size = 0;
+
+    // 创建包含所有数据的 file reader
+    std::string all_data;
+    for (int32_t value : test_values) {
+        all_data += create_mock_column_chunk(value);
+    }
+    auto mock_reader = std::make_shared<MockFileReader>();
+    mock_reader->set_data(all_data);
+
+    // 为每个 chunk 创建 ReferenceCountedReader
+    for (size_t i = 0; i < test_values.size(); ++i) {
+        io::PrefetchRange range {i * chunk_size, (i + 1) * chunk_size};
+        auto ref_reader =
+                std::make_shared<ReferenceCountedReader>(range, mock_reader, *_statistics);
+        chunks.push_back(ref_reader);
+        //        total_size += chunk_size;
+    }
+
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    // 跳跃式读取(读取第1、3、5个值)
+    std::vector<size_t> read_offsets = {0, 2 * chunk_size, 4 * chunk_size};
+    std::vector<int32_t> expected_values = {1, 3, 5};
+
+    for (size_t i = 0; i < read_offsets.size(); ++i) {
+        char buffer[sizeof(int32_t)];
+        Slice result(buffer, sizeof(int32_t));
+        size_t bytes_read;
+        auto status = chunk_reader.read_at(read_offsets[i], result, &bytes_read, nullptr);
+        EXPECT_TRUE(status.ok());
+        EXPECT_EQ(bytes_read, chunk_size);
+
+        int32_t read_value;
+        memcpy(&read_value, buffer, sizeof(int32_t));
+        EXPECT_EQ(read_value, expected_values[i]);
+    }
+}
+
+// 测试逆序读取(应该失败)
+TEST_F(ParquetColumnChunkFileReaderTest, ReverseReads) {
+    // 创建两个 chunk
+    std::vector<std::shared_ptr<ChunkReader>> chunks;
+    std::vector<int32_t> test_values = {1, 2};
+    size_t chunk_size = sizeof(int32_t);
+    //    size_t total_size = 0;
+
+    // 创建包含所有数据的 file reader
+    std::string all_data;
+    for (int32_t value : test_values) {
+        all_data += create_mock_column_chunk(value);
+    }
+    auto mock_reader = std::make_shared<MockFileReader>();
+    mock_reader->set_data(all_data);
+
+    // 为每个 chunk 创建 ReferenceCountedReader
+    for (size_t i = 0; i < test_values.size(); ++i) {
+        io::PrefetchRange range {i * chunk_size, (i + 1) * chunk_size};
+        auto ref_reader =
+                std::make_shared<ReferenceCountedReader>(range, mock_reader, *_statistics);
+        chunks.push_back(ref_reader);
+        //        total_size += chunk_size;
+    }
+
+    ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+
+    // 先读取后面的 offset
+    char buffer[sizeof(int32_t)];
+    Slice result(buffer, sizeof(int32_t));
+    size_t bytes_read;
+    auto status = chunk_reader.read_at(chunk_size, result, &bytes_read, nullptr);
+    EXPECT_TRUE(status.ok());
+
+    // 再读取前面的 offset(应该失败)
+    status = chunk_reader.read_at(0, result, &bytes_read, nullptr);
+    EXPECT_FALSE(status.ok());
+}
+
+// 测试引用计数
+TEST_F(ParquetColumnChunkFileReaderTest, ReferenceCount) {
+    std::string chunk_data = create_mock_column_chunk(42);
+    _mock_file_reader->set_data(chunk_data);
+
+    io::PrefetchRange range {0, chunk_data.size()};
+    auto ref_reader =
+            std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    {
+        // 创建一个作用域
+        std::vector<std::shared_ptr<ChunkReader>> chunks;
+        chunks.push_back(ref_reader);
+        ParquetColumnChunkFileReader chunk_reader(std::move(chunks), *_statistics);
+        EXPECT_EQ(ref_reader->reference_count(), 1);
+
+        // 读取数据
+        char buffer[sizeof(int32_t)];
+        Slice result(buffer, sizeof(int32_t));
+        size_t bytes_read;
+        auto status = chunk_reader.read_at(0, result, &bytes_read, nullptr);
+        EXPECT_TRUE(status.ok());
+    }
+    // chunk_reader 离开作用域后,引用计数应该减少
+    EXPECT_EQ(ref_reader->reference_count(), 0);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/format/parquet/reference_counted_reader_test.cpp b/be/test/vec/exec/format/parquet/reference_counted_reader_test.cpp
new file mode 100644
index 00000000000000..da50418e13ecf7
--- /dev/null
+++ b/be/test/vec/exec/format/parquet/reference_counted_reader_test.cpp
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/reference_counted_reader.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "util/slice.h"
+
+namespace doris::vectorized {
+
+class MockFileReader : public io::FileReader {
+public:
+    MockFileReader() = default;
+    ~MockFileReader() override = default;
+
+    Status close() override {
+        _closed = true;
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _path; }
+
+    size_t size() const override { return 0; }
+
+    bool closed() const override { return _closed; }
+
+    //    Status read_at(size_t offset, size_t nbytes, std::shared_ptr<Slice>* result,
+    //                  const io::IOContext* io_ctx) override {
+    //        size_t bytes_to_read = std::min(nbytes, _data.size() - offset);
+    //        auto buffer = new char[bytes_to_read];
+    //        memcpy(buffer, _data.data() + offset, bytes_to_read);
+    //        *result = std::make_shared<Slice>(buffer, bytes_to_read);
+    //        return Status::OK();
+    //    }
+
+    void set_data(const std::string& data) { _data = data; }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* io_ctx) override {
+        *bytes_read = std::min(result.size, _data.size() - offset);
+        memcpy(result.data, _data.data() + offset, *bytes_read);
+        return Status::OK();
+    }
+
+private:
+    std::string _data;
+    bool _closed = false;
+    io::Path _path = "/tmp/mock";
+};
+
+class ReferenceCountedReaderTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        _mock_file_reader = std::make_shared<MockFileReader>();
+        _statistics = std::make_unique<ChunkReader::Statistics>();
+    }
+
+    std::shared_ptr<MockFileReader> _mock_file_reader;
+    std::unique_ptr<ChunkReader::Statistics> _statistics;
+};
+
+// 测试引用计数的基本功能
+TEST_F(ReferenceCountedReaderTest, ReferenceCountBasic) {
+    io::PrefetchRange range {0, 10};
+    auto reader = std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    // 初始引用计数应该为1
+    EXPECT_EQ(reader->reference_count(), 1);
+
+    // 增加引用计数
+    reader->addReference();
+    EXPECT_EQ(reader->reference_count(), 2);
+
+    // 减少引用计数
+    reader->free();
+    EXPECT_EQ(reader->reference_count(), 1);
+
+    // 最后一次释放
+    reader->free();
+    // 引用计数为0时,reader应该已经被销毁
+}
+
+// 测试读取功能
+TEST_F(ReferenceCountedReaderTest, ReadData) {
+    std::string test_data = "Hello, World!";
+    _mock_file_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, test_data.size()};
+    auto reader = std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    std::shared_ptr<Slice> result;
+    auto status = reader->read(nullptr, &result);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(result->size, test_data.size());
+    EXPECT_EQ(std::string(result->data, result->size), test_data);
+
+    // 检查统计信息
+    EXPECT_EQ(_statistics->merged_io, 1);
+    EXPECT_EQ(_statistics->merged_bytes, test_data.size());
+}
+
+// 测试多次读取相同数据只会产生一次IO
+TEST_F(ReferenceCountedReaderTest, CacheRead) {
+    std::string test_data = "Hello, World!";
+    _mock_file_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, test_data.size()};
+    auto reader = std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    // 第一次读取
+    std::shared_ptr<Slice> result1;
+    auto status = reader->read(nullptr, &result1);
+    EXPECT_TRUE(status.ok());
+
+    // 第二次读取
+    std::shared_ptr<Slice> result2;
+    status = reader->read(nullptr, &result2);
+    EXPECT_TRUE(status.ok());
+
+    // 两次读取应该返回相同的数据
+    EXPECT_EQ(std::string(result1->data, result1->size), std::string(result2->data, result2->size));
+
+    // 但只应该有一次IO操作
+    EXPECT_EQ(_statistics->merged_io, 1);
+    EXPECT_EQ(_statistics->merged_bytes, test_data.size());
+}
+
+// 测试读取部分范围
+TEST_F(ReferenceCountedReaderTest, PartialRead) {
+    std::string test_data = "Hello, World!";
+    _mock_file_reader->set_data(test_data);
+
+    // 只读取部分数据
+    io::PrefetchRange range {0, 5}; // 只读取 "Hello"
+    auto reader = std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    std::shared_ptr<Slice> result;
+    auto status = reader->read(nullptr, &result);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(result->size, 5);
+    EXPECT_EQ(std::string(result->data, result->size), "Hello");
+
+    EXPECT_EQ(_statistics->merged_io, 1);
+    EXPECT_EQ(_statistics->merged_bytes, 5);
+}
+
+// 测试引用计数和共享数据
+TEST_F(ReferenceCountedReaderTest, SharedData) {
+    std::string test_data = "Hello, World!";
+    _mock_file_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, test_data.size()};
+    auto reader1 = std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+
+    // 增加引用计数
+    reader1->addReference();
+    auto reader2 = reader1;
+
+    // 两个reader应该共享相同的数据
+    std::shared_ptr<Slice> result1, result2;
+    auto status1 = reader1->read(nullptr, &result1);
+    auto status2 = reader2->read(nullptr, &result2);
+
+    EXPECT_TRUE(status1.ok());
+    EXPECT_TRUE(status2.ok());
+    EXPECT_EQ(result1->data, result2->data); // 应该指向相同的内存位置
+
+    // 只应该有一次IO操作
+    EXPECT_EQ(_statistics->merged_io, 1);
+
+    // 释放引用
+    reader1->free();
+    reader2->free();
+}
+
+// 测试大数据读取
+//TEST_F(ReferenceCountedReaderTest, LargeDataRead) {
+//    // 创建一个大于 MAX_ARRAY_SIZE 的数据
+//    std::string large_data(ReferenceCountedReader::MAX_ARRAY_SIZE + 1, 'A');
+//    _mock_file_reader->set_data(large_data);
+//
+//    io::PrefetchRange range{0, large_data.size()};
+//    auto reader = std::make_shared<ReferenceCountedReader>(range, _mock_file_reader, *_statistics);
+//
+//    std::shared_ptr<Slice> result;
+//    auto status = reader->read(nullptr, &result);
+//
+//    // 应该返回错误,因为数据太大
+//    EXPECT_FALSE(status.ok());
+//    EXPECT_TRUE(status.to_string().find("Data size exceeds maximum allowed size") != std::string::npos);
+//}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index fe2221bf8d3725..ff8b8de39563d0 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -186,9 +186,10 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column
                                 FieldSchema* field_schema, ColumnPtr& doris_column,
                                 DataTypePtr& data_type, level_t* definitions) {
     tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
-    size_t start_offset = chunk_meta.__isset.dictionary_page_offset
-                                  ? chunk_meta.dictionary_page_offset
-                                  : chunk_meta.data_page_offset;
+    size_t start_offset =
+            chunk_meta.__isset.dictionary_page_offset && chunk_meta.dictionary_page_offset > 0
+                    ? chunk_meta.dictionary_page_offset
+                    : chunk_meta.data_page_offset;
     size_t chunk_size = chunk_meta.total_compressed_size;
 
     cctz::time_zone ctz;