diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index 75c2a73d8..24a12ce1b 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -562,6 +562,21 @@ bool TabletIO::Read(const leveldb::Slice& key, std::string* value, return true; } +void TabletIO::SeekIterator(const std::string& row, const std::string& col, + const std::string& qual, int64_t ts, + leveldb::Iterator* scan_it) { + std::string seek_key; + m_key_operator->EncodeTeraKey(row, col, qual, ts, leveldb::TKT_FORSEEK, + &seek_key); + + VLOG(10) << "ll-scan: " << "seek to " << DebugString(row) << ":" + << DebugString(col) << ":" << DebugString(qual) + << std::hex << ts; + + scan_it->Seek(seek_key); + VLOG(10) << "ll-scan: seek done"; +} + StatusCode TabletIO::InitedScanInterator(const std::string& start_tera_key, const ScanOptions& scan_options, leveldb::Iterator** scan_it) { @@ -586,6 +601,7 @@ StatusCode TabletIO::InitedScanInterator(const std::string& start_tera_key, VLOG(10) << "ll-scan: " << "startkey=[" << DebugString(start_key.ToString()) << ":" << DebugString(start_col.ToString()) << ":" << DebugString(start_qual.ToString()); std::string start_seek_key; + m_key_operator->EncodeTeraKey(start_key.ToString(), "", "", kLatestTs, leveldb::TKT_FORSEEK, &start_seek_key); (*scan_it)->Seek(start_seek_key); @@ -638,8 +654,9 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, *read_bytes = 0; int64_t now_time = GetTimeStampInMs(); int64_t time_out = now_time + scan_options.timeout; + bool has_filter = scan_options.filter_list.filter_size() > 0; KeyValuePair next_start_kv_pair; - VLOG(9) << "ll-scan timeout set to be " << scan_options.timeout; + VLOG(9) << "ll-scan timeout set to be " << scan_options.timeout << ", has filter " << has_filter; for (; it->Valid();) { bool has_merged = false; @@ -658,11 +675,6 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, it->Next(); continue; } - if (now_time > time_out) { - VLOG(9) << "ll-scan timeout. Mark next start key: " << DebugString(tera_key.ToString()); - MakeKvPair(key, col, qual, ts, "", &next_start_kv_pair); - break; - } VLOG(10) << "ll-scan: " << "tablet=[" << m_tablet_path << "] key=[" << DebugString(key.ToString()) @@ -675,6 +687,62 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, break; } + if (now_time > time_out) { + VLOG(9) << "ll-scan timeout. Mark next start key: " << DebugString(tera_key.ToString()); + MakeKvPair(key, col, qual, ts, "", &next_start_kv_pair); + break; + } + + // qualifier range seek + if (scan_options.qu_range.size()) { + VLOG(10) << "filter by qualifier, size " << scan_options.qu_range.size(); + QualifierRange::const_iterator qu_it; + qu_it = scan_options.qu_range.lower_bound(col.ToString()); + if (qu_it == scan_options.qu_range.end()) { + // seek to next key + std::string next_key; + m_key_operator->FindSuccessor(key.ToString(), &next_key); + SeekIterator(next_key, "", "", kLatestTs, it); + VLOG(10) << "seek to next_key " << DebugString(next_key); + continue; + } else { + leveldb::Slice scan_cf = qu_it->first; + if (scan_cf.compare(col) > 0) { + // seek to next cf + SeekIterator(key.ToString(), scan_cf.ToString(), "", kLatestTs, it); + VLOG(10) << "seek to next cf " << DebugString(scan_cf.ToString()); + continue; + } + // check qualifier range wether match or not + leveldb::Slice scan_qu_start, scan_qu_end; + const std::pair& scan_qu_range = qu_it->second; + scan_qu_start = scan_qu_range.first; + scan_qu_end = scan_qu_range.second; + // qualifier range not match + if (scan_qu_start.compare(qual) > 0) { + // seek to start qual + SeekIterator(key.ToString(), scan_cf.ToString(), scan_qu_start.ToString(), kLatestTs, it); + continue; + } else if (scan_qu_end.compare(qual) < 0) { + // seek to next cf + ++qu_it; + if (qu_it == scan_options.qu_range.end()) { + // seek to next key + std::string next_key; + m_key_operator->FindSuccessor(key.ToString(), &next_key); + SeekIterator(next_key, "", "", kLatestTs, it); + continue; + } else { + scan_cf = qu_it->first; + // seek to next cf + SeekIterator(key.ToString(), scan_cf.ToString(), "", kLatestTs, it); + continue; + } + } + // qualifier range match + } + } + const std::set& cf_set = scan_options.iter_cf_set; if (cf_set.size() > 0 && cf_set.find(col.ToString()) == cf_set.end() && @@ -709,8 +777,10 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, if (key.compare(last_key) != 0) { *read_row_count += 1; - ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); - row_buf.clear(); + if (has_filter) { + ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); + row_buf.clear(); + } } // max version filter @@ -744,7 +814,14 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, KeyValuePair kv; MakeKvPair(key, col, qual, ts, value, &kv); - row_buf.push_back(kv); + if (!has_filter) { + if (!FilterCell(scan_options, col.ToString(), qual.ToString(), ts)) { + value_list->add_key_values()->CopyFrom(kv); + buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size(); + } + } else { + row_buf.push_back(kv); + } // check scan buffer if (buffer_size >= scan_options.max_size) { @@ -757,8 +834,9 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, } // process the last row of tablet - ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); - + if (has_filter) { + ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); + } leveldb::Status it_status; if (!it->Valid()) { it_status = it->status(); @@ -1425,6 +1503,15 @@ void TabletIO::SetupScanRowOptions(const ScanTabletRequest* request, if (request->timeout()) { scan_options->timeout = request->timeout(); } + // setup qualifier range + if (request->qu_range_size()) { + for (uint32_t i = 0; i < (uint32_t)request->qu_range_size(); i++) { + const ScanQualifierRange& range = request->qu_range(i); + scan_options->qu_range.insert( + std::pair >(range.cf(), + std::pair(range.qu_start(), range.qu_end()))); + } + } scan_options->snapshot_id = request->snapshot_id(); } @@ -1677,31 +1764,38 @@ void TabletIO::ProcessRowBuffer(std::list& row_buf, const std::string& value = it->value(); int64_t ts = it->timestamp(); - // skip unnecessary columns and qualifiers - if (scan_options.column_family_list.size() > 0) { - ColumnFamilyMap::const_iterator it = - scan_options.column_family_list.find(col); - if (it != scan_options.column_family_list.end()) { - const std::set& qual_list = it->second; - if (qual_list.size() > 0 && qual_list.end() == qual_list.find(qual)) { - continue; - } - } else { - continue; - } - } - // time range filter - if (ts < scan_options.ts_start || ts > scan_options.ts_end) { + if (FilterCell(scan_options, col, qual, ts)) { continue; } - value_list->add_key_values()->CopyFrom(*it); - *buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size(); } } +bool TabletIO::FilterCell(const ScanOptions& scan_options, + const std::string& col, + const std::string& qual, int64_t ts) { + // skip unnecessary columns and qualifiers + if (scan_options.column_family_list.size() > 0) { + ColumnFamilyMap::const_iterator it = + scan_options.column_family_list.find(col); + if (it != scan_options.column_family_list.end()) { + const std::set& qual_list = it->second; + if (qual_list.size() > 0 && qual_list.end() == qual_list.find(qual)) { + return true; + } + } else { + return true; + } + } + // time range filter + if (ts < scan_options.ts_start || ts > scan_options.ts_end) { + return true; + } + return false; +} + uint64_t TabletIO::GetSnapshot(uint64_t id, uint64_t snapshot_sequence, StatusCode* status) { { diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index 1934b88c8..21a2cbf99 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -44,11 +44,13 @@ class TabletIO { kUnLoading2 = kTabletUnLoading2 }; typedef std::map< std::string, std::set > ColumnFamilyMap; + typedef std::map< std::string, std::pair > QualifierRange; struct ScanOptions { uint32_t max_versions; uint32_t max_size; int64_t ts_start; int64_t ts_end; + QualifierRange qu_range;// {cf, } uint64_t snapshot_id; FilterList filter_list; ColumnFamilyMap column_family_list; @@ -188,6 +190,9 @@ class TabletIO { const ScanOptions& scan_options, RowResult* value_list, uint32_t* buffer_size); + bool FilterCell(const ScanOptions& scan_options, + const std::string& col, + const std::string& qual, int64_t ts); StatusCode InitedScanInterator(const std::string& start_tera_key, const ScanOptions& scan_options, @@ -220,6 +225,11 @@ class TabletIO { void MakeKvPair(leveldb::Slice key, leveldb::Slice col, leveldb::Slice qual, int64_t ts, leveldb::Slice value, KeyValuePair* kv); + + void SeekIterator(const std::string& row, const std::string& col, + const std::string& qual, int64_t ts, + leveldb::Iterator* scan_it); + private: mutable Mutex m_mutex; TabletWriter* m_async_writer; diff --git a/src/leveldb/include/leveldb/raw_key_operator.h b/src/leveldb/include/leveldb/raw_key_operator.h index b80390cf4..a575a24b2 100644 --- a/src/leveldb/include/leveldb/raw_key_operator.h +++ b/src/leveldb/include/leveldb/raw_key_operator.h @@ -42,6 +42,9 @@ class RawKeyOperator { TeraKeyType* type) const = 0; virtual int Compare(const Slice& key1, const Slice& key2) const = 0; + + virtual void FindSuccessor(const std::string& row_key, + std::string* successor_key) const = 0; }; const RawKeyOperator* ReadableRawKeyOperator(); diff --git a/src/leveldb/util/raw_key_operator.cc b/src/leveldb/util/raw_key_operator.cc index 16bbd2899..ada615c17 100644 --- a/src/leveldb/util/raw_key_operator.cc +++ b/src/leveldb/util/raw_key_operator.cc @@ -102,6 +102,12 @@ class ReadableRawKeyOperatorImpl : public RawKeyOperator { virtual int Compare(const Slice& key1, const Slice& key2) const { return key1.compare(key2); } + + virtual void FindSuccessor(const std::string& row_key, + std::string* successor_key) const { + *successor_key = row_key; + successor_key->push_back('\x1'); + } }; /** @@ -228,6 +234,12 @@ class BinaryRawKeyOperatorImpl : public RawKeyOperator { Slice ts_type2(data2 + size2 - 12, 8); return ts_type1.compare(ts_type2); } + + virtual void FindSuccessor(const std::string& row_key, + std::string* successor_key) const { + *successor_key = row_key; + successor_key->push_back('\x0'); + } }; // support KV-pair with TTL, Key's format : @@ -275,6 +287,12 @@ class KvRawKeyOperatorImpl : public RawKeyOperator { } return r; } + + virtual void FindSuccessor(const std::string& row_key, + std::string* successor_key) const { + *successor_key = row_key; + successor_key->push_back('\x0'); + } }; static pthread_once_t once = PTHREAD_ONCE_INIT; diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index 53b0df546..8042177ac 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -211,6 +211,11 @@ message ResultCell { optional bytes value = 5; }; +message ScanQualifierRange { + optional bytes cf = 1; + optional bytes qu_start = 2; + optional bytes qu_end = 3; +}; message ScanTabletRequest { optional uint64 sequence_id = 1; @@ -232,6 +237,7 @@ message ScanTabletRequest { optional bool part_of_session = 17; optional int64 timestamp = 18 [default = 0]; optional int64 timeout = 19; + repeated ScanQualifierRange qu_range = 20; } message ScanTabletResponse { diff --git a/src/sdk/scan.cc b/src/sdk/scan.cc index b9cfa9d88..23f736535 100644 --- a/src/sdk/scan.cc +++ b/src/sdk/scan.cc @@ -35,6 +35,12 @@ void ScanDescriptor::SetPackInterval(int64_t interval) { _impl->SetPackInterval(interval); } +void ScanDescriptor::AddQualifierRange(const std::string& cf, + const std::string& qu_start, + const std::string& qu_end) { + _impl->AddQualifierRange(cf, qu_start, qu_end); +} + void ScanDescriptor::SetTimeRange(int64_t ts_end, int64_t ts_start) { _impl->SetTimeRange(ts_end, ts_start); } diff --git a/src/sdk/scan_impl.cc b/src/sdk/scan_impl.cc index 0ba89d089..b51782c23 100644 --- a/src/sdk/scan_impl.cc +++ b/src/sdk/scan_impl.cc @@ -536,6 +536,7 @@ ScanDescImpl::ScanDescImpl(const ScanDescImpl& impl) for (int32_t i = 0; i < impl.GetSizeofColumnFamilyList(); ++i) { _cf_list.push_back(new tera::ColumnFamily(*(impl.GetColumnFamily(i)))); } + _qu_range = impl._qu_range; } ScanDescImpl::~ScanDescImpl() { @@ -589,6 +590,28 @@ void ScanDescImpl::SetPackInterval(int64_t interval) { _pack_interval = interval; } +void ScanDescImpl::AddQualifierRange(const std::string& cf, + const std::string& qu_start, + const std::string& qu_end) { + if (cf.size()) { + VLOG(12) << "add qual, " << cf << ":" << qu_start << ":" << qu_end; + _qu_range.insert(std::pair >(cf, std::pair(qu_start, qu_end))); + } +} + +void ScanDescImpl::SetQualifierRange(ScanTabletRequest* request) { + std::map >::iterator it = _qu_range.begin(); + for (; it != _qu_range.end(); ++it) { + ScanQualifierRange* qu_range = request->add_qu_range(); + qu_range->set_cf(it->first); + const std::pair& range = it->second; + qu_range->set_qu_start(range.first); + qu_range->set_qu_end(range.second); + VLOG(12) << "set qual " << qu_range->DebugString(); + } +} + void ScanDescImpl::SetTimeRange(int64_t ts_end, int64_t ts_start) { if (_timer_range == NULL) { _timer_range = new tera::TimeRange; diff --git a/src/sdk/scan_impl.h b/src/sdk/scan_impl.h index d0cbca2f4..a902909ce 100644 --- a/src/sdk/scan_impl.h +++ b/src/sdk/scan_impl.h @@ -183,6 +183,12 @@ class ScanDescImpl { void SetPackInterval(int64_t timeout); + void AddQualifierRange(const std::string& cf, + const std::string& qu_start, + const std::string& qu_end); + + void SetQualifierRange(ScanTabletRequest* request); + void SetTimeRange(int64_t ts_end, int64_t ts_start); bool SetFilterString(const std::string& filter_string); @@ -236,6 +242,8 @@ class ScanDescImpl { bool IsKvOnlyTable(); + typedef std::map< std::string, std::pair > QualifierRange; + private: bool ParseSubFilterString(const std::string& filter_str, Filter* filter); @@ -249,6 +257,7 @@ class ScanDescImpl { int64_t _start_timestamp; std::vector _cf_list; tera::TimeRange* _timer_range; + QualifierRange _qu_range; int64_t _buf_size; bool _is_async; int32_t _max_version; diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index ef4183375..565159989 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -358,6 +358,8 @@ void TableImpl::CommitScan(ScanTask* scan_task, tera::ColumnFamily* column_family = request->add_cf_list(); column_family->CopyFrom(*(impl->GetColumnFamily(i))); } + // set qualifier range + impl->SetQualifierRange(request); request->set_timestamp(common::timer::get_micros()); Closure* done = diff --git a/src/sdk/tera.h b/src/sdk/tera.h index dbb7cdbbf..d2d3dac98 100644 --- a/src/sdk/tera.h +++ b/src/sdk/tera.h @@ -259,6 +259,10 @@ class ScanDescriptor { void SetMaxVersions(int32_t versions); /// 设置scan的超时时间 void SetPackInterval(int64_t timeout); + /// set qu range search + void AddQualifierRange(const std::string& cf, + const std::string& qu_start, + const std::string& qu_end); /// 设置返回版本的时间范围 void SetTimeRange(int64_t ts_end, int64_t ts_start); /// 设置过滤表达式(仅支持AND)