Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inverted_index.search): add index applier #2

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,6 @@ strip = true
lto = "thin"
debug = false
incremental = false

[patch.'https://github.com/GreptimeTeam/greptime-proto.git']
greptime-proto = { git = "https://github.com/zhongzc/greptime-proto.git", branch = "zhongzc/index-add-row-count" }
1 change: 1 addition & 0 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
drop_if_exists: false,
}),
}),
DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask {
Expand Down
13 changes: 12 additions & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub enum Error {
payload_size: u64,
},

#[snafu(display("Unexpected zero segment row count"))]
UnexpectedZeroSegmentRowCount { location: Location },

#[snafu(display("Failed to decode fst"))]
DecodeFst {
#[snafu(source)]
Expand Down Expand Up @@ -109,6 +112,12 @@ pub enum Error {
location: Location,
predicates: Vec<Predicate>,
},

#[snafu(display("Tag not found in index, tag_name: {tag_name}"))]
TagNotFoundInIndex {
tag_name: String,
location: Location,
},
}

impl ErrorExt for Error {
Expand All @@ -118,6 +127,7 @@ impl ErrorExt for Error {
Seek { .. }
| Read { .. }
| UnexpectedFooterPayloadSize { .. }
| UnexpectedZeroSegmentRowCount { .. }
| UnexpectedOffsetSize { .. }
| UnexpectedBlobSize { .. }
| DecodeProto { .. }
Expand All @@ -128,7 +138,8 @@ impl ErrorExt for Error {
| ParseDFA { .. }
| KeysApplierWithoutInList { .. }
| IntersectionApplierWithInList { .. }
| EmptyPredicates { .. } => StatusCode::InvalidArguments,
| EmptyPredicates { .. }
| TagNotFoundInIndex { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/format/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::inverted_index::FstMap;
/// InvertedIndexReader defines an asynchronous reader of inverted index data
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexReader {
pub trait InvertedIndexReader: Send {
/// Retrieve metadata of all inverted indices stored within the blob.
async fn metadata(&mut self) -> Result<InvertedIndexMetas>;

Expand Down
6 changes: 5 additions & 1 deletion src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ mod tests {
};

// metas
let mut metas = InvertedIndexMetas::default();
let mut metas = InvertedIndexMetas {
total_row_count: 10,
segment_row_count: 1,
..Default::default()
};
metas.metas.insert(meta.name.clone(), meta);
metas.metas.insert(meta1.name.clone(), meta1);
let mut meta_buf = Vec::new();
Expand Down
16 changes: 10 additions & 6 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{
DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
UnexpectedOffsetSizeSnafu,
UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;

Expand Down Expand Up @@ -85,6 +85,11 @@ impl<R: AsyncRead + AsyncSeek + Unpin> InvertedIndeFooterReader<R> {

/// Check if the read metadata is consistent with expected sizes and offsets.
fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
ensure!(
metas.segment_row_count > 0,
UnexpectedZeroSegmentRowCountSnafu
);

for meta in metas.metas.values() {
let InvertedIndexMeta {
base_offset,
Expand Down Expand Up @@ -116,7 +121,10 @@ mod tests {
use super::*;

fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
let mut metas = InvertedIndexMetas::default();
let mut metas = InvertedIndexMetas {
segment_row_count: 1,
..Default::default()
};
metas.metas.insert("test".to_string(), meta);

let mut payload_buf = vec![];
Expand All @@ -131,7 +139,6 @@ mod tests {
async fn test_read_payload() {
let meta = InvertedIndexMeta {
name: "test".to_string(),
segment_row_count: 4096,
..Default::default()
};

Expand All @@ -145,14 +152,12 @@ mod tests {
assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
assert_eq!(index_meta.segment_row_count, 4096);
}

#[tokio::test]
async fn test_invalid_footer_payload_size() {
let meta = InvertedIndexMeta {
name: "test".to_string(),
segment_row_count: 4096,
..Default::default()
};

Expand All @@ -171,7 +176,6 @@ mod tests {
name: "test".to_string(),
base_offset: 0,
inverted_index_size: 1, // Set size to 1 to make ecceed the blob size
segment_row_count: 4096,
..Default::default()
};

Expand Down
1 change: 1 addition & 0 deletions src/index/src/inverted_index/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@

pub mod fst_apply;
pub mod fst_values_mapper;
pub mod index_apply;
pub mod predicate;
1 change: 1 addition & 0 deletions src/index/src/inverted_index/search/fst_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::inverted_index::FstMap;

/// A trait for objects that can process a finite state transducer (FstMap) and return
/// associated values.
#[mockall::automock]
pub trait FstApplier: Send + Sync {
/// Retrieves values from an FstMap.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl KeysFstApplier {
fn get_list(p: &Predicate) -> &HashSet<Bytes> {
match p {
Predicate::InList(i) => &i.list,
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists`
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/index/src/inverted_index/search/index_apply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod predicates_apply;

use async_trait::async_trait;
pub use predicates_apply::PredicatesIndexApplier;

use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;

/// A trait for processing and transforming indices obtained from an inverted index.
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[async_trait]
pub trait IndexApplier {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply(&self, reader: &mut dyn InvertedIndexReader) -> Result<Vec<usize>>;
}
Loading
Loading