Skip to content

Commit

Permalink
Prepare index (project-openubl#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosthe19916 authored Jan 8, 2024
1 parent 7370690 commit d8a867f
Show file tree
Hide file tree
Showing 25 changed files with 2,105 additions and 114 deletions.
717 changes: 664 additions & 53 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ members = [
"openubl/cli",
"openubl/oidc",
"openubl/storage",
# "openubl/index",
]
2 changes: 1 addition & 1 deletion openubl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ docker-compose -f openubl/deploy/compose/compose.yaml up
```

```shell
RUST_LOG=info cargo watch -x 'run -p openubl-cli -- server --db-user user --db-password password --oidc-auth-server-url http://localhost:9001/realms/openubl --storage-type minio --storage-minio-host http://localhost:9002 --storage-minio-bucket openubl --storage-minio-access-key admin --storage-minio-secret-key password'
RUST_LOG=info cargo watch -x 'run -p openubl-cli -- server --db-user user --db-password password --oidc-auth-server-url http://localhost:9001/realms/openubl minio --storage-minio-host http://localhost:9002 --storage-minio-access-key admin --storage-minio-secret-key password'
```
6 changes: 3 additions & 3 deletions openubl/deploy/compose/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ services:
environment:
MINIO_ROOT_USER: "admin"
MINIO_ROOT_PASSWORD: "password"
MINIO_NOTIFY_NATS_ENABLE_OPENUBL: "on"
MINIO_NOTIFY_NATS_ADDRESS_OPENUBL: "nats:4222"
MINIO_NOTIFY_NATS_SUBJECT_OPENUBL: "openubl"
MINIO_NOTIFY_NATS_ENABLE_UBL: "on"
MINIO_NOTIFY_NATS_ADDRESS_UBL: "nats:4222"
MINIO_NOTIFY_NATS_SUBJECT_UBL: "ubl"
healthcheck:
test: timeout 5s bash -c ':> /dev/tcp/127.0.0.1/9000' || exit 1
interval: 10s
Expand Down
7 changes: 4 additions & 3 deletions openubl/deploy/compose/scripts/minio/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
# Connect to minio
/usr/bin/mc config host add myminio http://minio:9000 admin password;

# Create bucket
/usr/bin/mc mb myminio/openubl || true;
# Create buckets
/usr/bin/mc mb myminio/ubl || true;
/usr/bin/mc mb myminio/index || true;

# Config events
/usr/bin/mc event add myminio/openubl arn:minio:sqs::OPENUBL:nats --event "put,delete";
/usr/bin/mc event add myminio/ubl arn:minio:sqs::UBL:nats --event "put,delete";

# Restart service
/usr/bin/mc admin service restart myminio;
24 changes: 24 additions & 0 deletions openubl/index/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "openubl-index"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
openubl-storage = { path = "../storage" }

clap = {version = "4.4.11", features = ["derive", "env"]}
anyhow = "1.0.78"
thiserror = "1.0.53"
tantivy = "0.21.1"
humantime = "2.1.0"
bytesize = "1.3.0"
log = "0.4.20"
zstd = "0.13.0"
sha2 = "0.10.8"
tar = "0.4.40"
rand = "0.8.5"
time = "0.3.31"
reqwest = "0.11.23"
serde = "1.0.195"
2 changes: 2 additions & 0 deletions openubl/index/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod search;
pub mod write;
189 changes: 189 additions & 0 deletions openubl/index/src/client/search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use log::{debug, warn};
use tantivy::collector::TopDocs;
use tantivy::query::Query;
use tantivy::schema::{Field, Type};
use tantivy::{DateTime, DocAddress, Order, Searcher};

use crate::client::write::WriteIndex;
use crate::search_options::SearchOptions;
use crate::{Error, IndexStore};

/// A search query.
#[derive(Debug)]
pub struct SearchQuery {
/// The tantivy query to execute.
pub query: Box<dyn Query>,
/// A custom sort order to apply to the results.
pub sort_by: Option<(Field, Order)>,
}

/// Defines the interface for an index that can be searched.
pub trait SearchableIndex: WriteIndex {
/// Type of the matched document returned from a search.
type MatchedDocument: core::fmt::Debug;

/// Prepare a query for searching and return a query object.
fn prepare_query(&self, q: &str) -> Result<SearchQuery, Error>;

/// Search the index for a query and return a list of matched documents.
fn search(
&self,
searcher: &Searcher,
query: &dyn Query,
offset: usize,
limit: usize,
) -> Result<(Vec<(f32, DocAddress)>, usize), Error>;

/// Invoked for every matched document to process the document and return a result.
fn process_hit(
&self,
doc: DocAddress,
score: f32,
searcher: &Searcher,
query: &dyn Query,
options: &SearchOptions,
) -> Result<Self::MatchedDocument, Error>;
}

impl<INDEX: SearchableIndex> IndexStore<INDEX> {
/// Search the index for a given query and return matching documents.
pub fn search(
&self,
q: &str,
offset: usize,
limit: usize,
options: SearchOptions,
) -> Result<(Vec<INDEX::MatchedDocument>, usize), Error> {
if limit == 0 {
return Err(Error::InvalidLimitParameter(limit));
}

let inner = self.inner.read().unwrap();
let reader = inner.reader()?;
let searcher = reader.searcher();

let query = self.index.prepare_query(q)?;

log::trace!("Processed query: {:?}", query);

let (top_docs, count) = if let Some(sort_by) = query.sort_by {
let field = sort_by.0;
let order = sort_by.1;
let order_by_str = self.index.schema().get_field_name(field).to_string();
let vtype = self
.index
.schema()
.get_field_entry(field)
.field_type()
.value_type();
let mut hits = Vec::new();
let total = match vtype {
Type::U64 => {
let result = searcher.search(
&query.query,
&(
TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_fast_field::<u64>(&order_by_str, order.clone()),
tantivy::collector::Count,
),
)?;
for r in result.0 {
hits.push((1.0, r.1));
}
result.1
}
Type::I64 => {
let result = searcher.search(
&query.query,
&(
TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_fast_field::<i64>(&order_by_str, order.clone()),
tantivy::collector::Count,
),
)?;
for r in result.0 {
hits.push((1.0, r.1));
}
result.1
}
Type::F64 => {
let result = searcher.search(
&query.query,
&(
TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_fast_field::<f64>(&order_by_str, order.clone()),
tantivy::collector::Count,
),
)?;
for r in result.0 {
hits.push((1.0, r.1));
}
result.1
}
Type::Bool => {
let result = searcher.search(
&query.query,
&(
TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_fast_field::<bool>(&order_by_str, order.clone()),
tantivy::collector::Count,
),
)?;
for r in result.0 {
hits.push((1.0, r.1));
}
result.1
}
Type::Date => {
let result = searcher.search(
&query.query,
&(
TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_fast_field::<DateTime>(&order_by_str, order.clone()),
tantivy::collector::Count,
),
)?;
for r in result.0 {
hits.push((1.0, r.1));
}
result.1
}
_ => return Err(Error::NotSortable(order_by_str)),
};
(hits, total)
} else {
self.index.search(&searcher, &query.query, offset, limit)?
};

log::info!("#matches={count} for query '{q}'");

if options.summaries {
let mut hits = Vec::new();
for hit in top_docs {
match self
.index
.process_hit(hit.1, hit.0, &searcher, &query.query, &options)
{
Ok(value) => {
debug!("HIT: {:?}", value);
hits.push(value);
}
Err(e) => {
warn!("Error processing hit {:?}: {:?}", hit, e);
}
}
}

debug!("Filtered to {}", hits.len());

Ok((hits, count))
} else {
Ok((Vec::new(), count))
}
}
}
73 changes: 73 additions & 0 deletions openubl/index/src/client/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use tantivy::schema::Schema;
use tantivy::tokenizer::TokenizerManager;
use tantivy::{Document, IndexSettings, Term};

use crate::Error;

/// Defines the interface for an index that can be written to.
pub trait WriteIndex {
/// Input document type expected by the index.
type Document;

/// Name of the index. Must be unique across trait implementations.
fn name(&self) -> &str;

/// Tokenizers used by the index.
fn tokenizers(&self) -> Result<TokenizerManager, Error> {
Ok(TokenizerManager::default())
}

/// Parse a document from a byte slice.
fn parse_doc(&self, data: &[u8]) -> Result<Self::Document, Error>;

/// Index settings required for this index.
fn settings(&self) -> IndexSettings;

/// Schema required for this index.
fn schema(&self) -> Schema;

/// Process an input document and return a tantivy document to be added to the index.
fn index_doc(
&self,
id: &str,
document: &Self::Document,
) -> Result<Vec<(String, tantivy::Document)>, Error>;

/// Convert a document id to a term for referencing that document.
fn doc_id_to_term(&self, id: &str) -> Term;
}

impl<DOC> WriteIndex for Box<dyn WriteIndex<Document = DOC>> {
type Document = DOC;
fn name(&self) -> &str {
self.as_ref().name()
}

fn tokenizers(&self) -> Result<TokenizerManager, Error> {
self.as_ref().tokenizers()
}

fn parse_doc(&self, data: &[u8]) -> Result<Self::Document, Error> {
self.as_ref().parse_doc(data)
}

fn settings(&self) -> IndexSettings {
self.as_ref().settings()
}

fn schema(&self) -> Schema {
self.as_ref().schema()
}

fn index_doc(
&self,
id: &str,
document: &Self::Document,
) -> Result<Vec<(String, Document)>, Error> {
self.as_ref().index_doc(id, document)
}

fn doc_id_to_term(&self, id: &str) -> Term {
self.as_ref().doc_id_to_term(id)
}
}
64 changes: 64 additions & 0 deletions openubl/index/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::fmt::Display;

#[derive(clap::Args, Debug)]
pub struct SearchEngine {
/// Synchronization interval for index persistence.
#[arg(
id = "search-engine-mode",
long,
env = "SEARCH_ENGINE_MODE",
default_value_t = IndexMode::File
)]
pub mode: IndexMode,

#[arg(
id = "search-engine-sync-interval",
long,
env = "SEARCH_ENGINE_SYNC_INTERVAL",
default_value = "30S"
)]
pub sync_interval: humantime::Duration,
#[arg(
id = "search-engine-writer-memory",
long,
env = "SEARCH_ENGINE_WRITER_MEMORY",
default_value = "default_value_t = ByteSize::mb(256)"
)]
pub index_writer_memory_bytes: bytesize::ByteSize,

#[arg(
id = "search-engine-dir",
long,
env = "SEARCH_ENGINE_DIR",
default_value = "indexes"
)]
pub index_dir: Option<std::path::PathBuf>,
#[arg(
id = "search-engine-dir",
long,
env = "SEARCH_ENGINE_BUCKET",
default_value = "openubl-indexes"
)]
pub bucket: String,
}

#[derive(Clone, Debug, clap::ValueEnum)]
pub enum IndexMode {
File,
S3,
}

impl Display for IndexMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::File => write!(f, "file"),
Self::S3 => write!(f, "s3"),
}
}
}

impl Default for IndexMode {
fn default() -> Self {
Self::File
}
}
Loading

0 comments on commit d8a867f

Please sign in to comment.