From fafa4726f7a3f6c83dbe15bbf29b6e5d9bcc2fcb Mon Sep 17 00:00:00 2001 From: carlosthe19916 <2582866+carlosthe19916@users.noreply.github.com> Date: Tue, 9 Jan 2024 14:37:56 +0100 Subject: [PATCH] Save progress with manual code --- Cargo.lock | 1 + openubl/index/src/client/mod.rs | 2 - openubl/index/src/client/search.rs | 189 ---------------- openubl/index/src/client/write.rs | 73 ------ openubl/index/src/config.rs | 48 ++-- openubl/index/src/lib.rs | 64 +----- openubl/index/src/search_options.rs | 49 ---- openubl/index/src/system/directory.rs | 140 ------------ openubl/index/src/system/mod.rs | 48 +++- openubl/index/src/system/state.rs | 23 -- openubl/index/src/system/ubl_document.rs | 69 ++++++ openubl/index/src/system/writer.rs | 273 ----------------------- openubl/server/Cargo.toml | 1 + openubl/server/src/lib.rs | 30 ++- openubl/storage/src/lib.rs | 87 +------- 15 files changed, 155 insertions(+), 942 deletions(-) delete mode 100644 openubl/index/src/client/mod.rs delete mode 100644 openubl/index/src/client/search.rs delete mode 100644 openubl/index/src/client/write.rs delete mode 100644 openubl/index/src/search_options.rs delete mode 100644 openubl/index/src/system/directory.rs delete mode 100644 openubl/index/src/system/state.rs create mode 100644 openubl/index/src/system/ubl_document.rs delete mode 100644 openubl/index/src/system/writer.rs diff --git a/Cargo.lock b/Cargo.lock index 816bf3b0..bea1a3a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3230,6 +3230,7 @@ dependencies = [ "openubl-api", "openubl-common", "openubl-entity", + "openubl-index", "openubl-oidc", "openubl-storage", "sea-orm", diff --git a/openubl/index/src/client/mod.rs b/openubl/index/src/client/mod.rs deleted file mode 100644 index 80860882..00000000 --- a/openubl/index/src/client/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod search; -pub mod write; diff --git a/openubl/index/src/client/search.rs b/openubl/index/src/client/search.rs deleted file mode 100644 index f42cfede..00000000 --- a/openubl/index/src/client/search.rs +++ /dev/null @@ -1,189 +0,0 @@ -use log::{debug, warn}; -use tantivy::collector::TopDocs; -use tantivy::query::Query; -use tantivy::schema::{Field, Type}; -use tantivy::{DateTime, DocAddress, Order, Searcher}; - -use crate::client::write::WriteIndex; -use crate::search_options::SearchOptions; -use crate::{Error, IndexStore}; - -/// A search query. -#[derive(Debug)] -pub struct SearchQuery { - /// The tantivy query to execute. - pub query: Box, - /// A custom sort order to apply to the results. - pub sort_by: Option<(Field, Order)>, -} - -/// Defines the interface for an index that can be searched. -pub trait SearchableIndex: WriteIndex { - /// Type of the matched document returned from a search. - type MatchedDocument: core::fmt::Debug; - - /// Prepare a query for searching and return a query object. - fn prepare_query(&self, q: &str) -> Result; - - /// Search the index for a query and return a list of matched documents. - fn search( - &self, - searcher: &Searcher, - query: &dyn Query, - offset: usize, - limit: usize, - ) -> Result<(Vec<(f32, DocAddress)>, usize), Error>; - - /// Invoked for every matched document to process the document and return a result. - fn process_hit( - &self, - doc: DocAddress, - score: f32, - searcher: &Searcher, - query: &dyn Query, - options: &SearchOptions, - ) -> Result; -} - -impl IndexStore { - /// Search the index for a given query and return matching documents. - pub fn search( - &self, - q: &str, - offset: usize, - limit: usize, - options: SearchOptions, - ) -> Result<(Vec, usize), Error> { - if limit == 0 { - return Err(Error::InvalidLimitParameter(limit)); - } - - let inner = self.inner.read().unwrap(); - let reader = inner.reader()?; - let searcher = reader.searcher(); - - let query = self.index.prepare_query(q)?; - - log::trace!("Processed query: {:?}", query); - - let (top_docs, count) = if let Some(sort_by) = query.sort_by { - let field = sort_by.0; - let order = sort_by.1; - let order_by_str = self.index.schema().get_field_name(field).to_string(); - let vtype = self - .index - .schema() - .get_field_entry(field) - .field_type() - .value_type(); - let mut hits = Vec::new(); - let total = match vtype { - Type::U64 => { - let result = searcher.search( - &query.query, - &( - TopDocs::with_limit(limit) - .and_offset(offset) - .order_by_fast_field::(&order_by_str, order.clone()), - tantivy::collector::Count, - ), - )?; - for r in result.0 { - hits.push((1.0, r.1)); - } - result.1 - } - Type::I64 => { - let result = searcher.search( - &query.query, - &( - TopDocs::with_limit(limit) - .and_offset(offset) - .order_by_fast_field::(&order_by_str, order.clone()), - tantivy::collector::Count, - ), - )?; - for r in result.0 { - hits.push((1.0, r.1)); - } - result.1 - } - Type::F64 => { - let result = searcher.search( - &query.query, - &( - TopDocs::with_limit(limit) - .and_offset(offset) - .order_by_fast_field::(&order_by_str, order.clone()), - tantivy::collector::Count, - ), - )?; - for r in result.0 { - hits.push((1.0, r.1)); - } - result.1 - } - Type::Bool => { - let result = searcher.search( - &query.query, - &( - TopDocs::with_limit(limit) - .and_offset(offset) - .order_by_fast_field::(&order_by_str, order.clone()), - tantivy::collector::Count, - ), - )?; - for r in result.0 { - hits.push((1.0, r.1)); - } - result.1 - } - Type::Date => { - let result = searcher.search( - &query.query, - &( - TopDocs::with_limit(limit) - .and_offset(offset) - .order_by_fast_field::(&order_by_str, order.clone()), - tantivy::collector::Count, - ), - )?; - for r in result.0 { - hits.push((1.0, r.1)); - } - result.1 - } - _ => return Err(Error::NotSortable(order_by_str)), - }; - (hits, total) - } else { - self.index.search(&searcher, &query.query, offset, limit)? - }; - - log::info!("#matches={count} for query '{q}'"); - - if options.summaries { - let mut hits = Vec::new(); - for hit in top_docs { - match self - .index - .process_hit(hit.1, hit.0, &searcher, &query.query, &options) - { - Ok(value) => { - debug!("HIT: {:?}", value); - hits.push(value); - } - Err(e) => { - warn!("Error processing hit {:?}: {:?}", hit, e); - } - } - } - - debug!("Filtered to {}", hits.len()); - - Ok((hits, count)) - } else { - Ok((Vec::new(), count)) - } - } -} diff --git a/openubl/index/src/client/write.rs b/openubl/index/src/client/write.rs deleted file mode 100644 index f8c59a84..00000000 --- a/openubl/index/src/client/write.rs +++ /dev/null @@ -1,73 +0,0 @@ -use tantivy::schema::Schema; -use tantivy::tokenizer::TokenizerManager; -use tantivy::{Document, IndexSettings, Term}; - -use crate::Error; - -/// Defines the interface for an index that can be written to. -pub trait WriteIndex { - /// Input document type expected by the index. - type Document; - - /// Name of the index. Must be unique across trait implementations. - fn name(&self) -> &str; - - /// Tokenizers used by the index. - fn tokenizers(&self) -> Result { - Ok(TokenizerManager::default()) - } - - /// Parse a document from a byte slice. - fn parse_doc(&self, data: &[u8]) -> Result; - - /// Index settings required for this index. - fn settings(&self) -> IndexSettings; - - /// Schema required for this index. - fn schema(&self) -> Schema; - - /// Process an input document and return a tantivy document to be added to the index. - fn index_doc( - &self, - id: &str, - document: &Self::Document, - ) -> Result, Error>; - - /// Convert a document id to a term for referencing that document. - fn doc_id_to_term(&self, id: &str) -> Term; -} - -impl WriteIndex for Box> { - type Document = DOC; - fn name(&self) -> &str { - self.as_ref().name() - } - - fn tokenizers(&self) -> Result { - self.as_ref().tokenizers() - } - - fn parse_doc(&self, data: &[u8]) -> Result { - self.as_ref().parse_doc(data) - } - - fn settings(&self) -> IndexSettings { - self.as_ref().settings() - } - - fn schema(&self) -> Schema { - self.as_ref().schema() - } - - fn index_doc( - &self, - id: &str, - document: &Self::Document, - ) -> Result, Error> { - self.as_ref().index_doc(id, document) - } - - fn doc_id_to_term(&self, id: &str) -> Term { - self.as_ref().doc_id_to_term(id) - } -} diff --git a/openubl/index/src/config.rs b/openubl/index/src/config.rs index 8f77e3a4..9b8dc88e 100644 --- a/openubl/index/src/config.rs +++ b/openubl/index/src/config.rs @@ -1,49 +1,31 @@ -use std::fmt::Display; +#[derive(clap::Subcommand, Debug)] +pub enum SearchEngine { + Local(LocalEngine), +} #[derive(clap::Args, Debug)] -pub struct SearchEngine { - /// Synchronization interval for index persistence. +pub struct LocalEngine { #[arg( - id = "search-engine-mode", - long, - env = "SEARCH_ENGINE_MODE", - default_value_t = IndexMode::File + id = "search-engine-local-dir", + long, + env = "SEARCH_ENGINE_LOCAL_DIR", + default_value = "index" )] - pub mode: IndexMode, + pub index_dir: String, #[arg( - id = "search-engine-sync-interval", + id = "search-engine-local-sync-interval", long, - env = "SEARCH_ENGINE_SYNC_INTERVAL", + env = "SEARCH_ENGINE_LOCAL_SYNC_INTERVAL", default_value = "30S" )] pub sync_interval: humantime::Duration, + #[arg( - id = "search-engine-writer-memory", + id = "search-engine-local-writer-memory", long, - env = "SEARCH_ENGINE_WRITER_MEMORY", + env = "SEARCH_ENGINE_LOCAL_WRITER_MEMORY", default_value = "default_value_t = ByteSize::mb(256)" )] pub index_writer_memory_bytes: bytesize::ByteSize, } - -#[derive(Clone, Debug, clap::ValueEnum)] -pub enum IndexMode { - File, - S3, -} - -impl Display for IndexMode { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::File => write!(f, "file"), - Self::S3 => write!(f, "s3"), - } - } -} - -impl Default for IndexMode { - fn default() -> Self { - Self::File - } -} diff --git a/openubl/index/src/lib.rs b/openubl/index/src/lib.rs index 84d1ae60..cb2cee13 100644 --- a/openubl/index/src/lib.rs +++ b/openubl/index/src/lib.rs @@ -1,64 +1,2 @@ -use std::sync::RwLock; - -use sha2::Digest; -use tantivy::query::Query; -use tantivy::{Directory, Index}; - -use client::write::WriteIndex; -use openubl_storage::StorageSystemErr; - -use crate::system::directory::IndexDirectory; - -mod client; pub mod config; -mod search_options; -mod system; - -/// A search index. This is a wrapper around the tantivy index that handles loading and storing of the index to object storage (via the local filesystem). -/// -/// The index can be live-loaded and stored to/from object storage while serving queries. -pub struct IndexStore { - inner: RwLock, - index_dir: Option>, - index: INDEX, - index_writer_memory_bytes: usize, -} - -/// Errors returned by the index. -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("error opening index {0}")] - Open(String), - #[error("error taking snapshot of index")] - Snapshot, - #[error("value for field {0} not found")] - FieldNotFound(String), - #[error("field {0} cannot be sorted")] - NotSortable(String), - #[error("operation cannot be done because index is not persisted")] - NotPersisted, - #[error("error parsing document {0}")] - DocParser(String), - #[error("error parsing query {0}")] - QueryParser(String), - #[error("error from storage {0}")] - Storage(StorageSystemErr), - #[error("invalid limit parameter {0}")] - InvalidLimitParameter(usize), - #[error("error from search {0}")] - Search(tantivy::TantivyError), - #[error("I/O error {0}")] - Io(std::io::Error), -} - -impl From for Error { - fn from(e: tantivy::TantivyError) -> Self { - Self::Search(e) - } -} - -impl From for Error { - fn from(e: StorageSystemErr) -> Self { - Self::Storage(e) - } -} +pub mod system; diff --git a/openubl/index/src/search_options.rs b/openubl/index/src/search_options.rs deleted file mode 100644 index 300c8741..00000000 --- a/openubl/index/src/search_options.rs +++ /dev/null @@ -1,49 +0,0 @@ -#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] -pub struct SearchOptions { - /// Return index "explain" output - #[serde(default)] - pub explain: bool, - /// Return additional search metadata - #[serde(default)] - pub metadata: bool, - #[serde(default = "default_summaries")] - pub summaries: bool, -} - -const fn default_summaries() -> bool { - true -} - -impl Default for SearchOptions { - fn default() -> Self { - Self { - explain: false, - metadata: false, - summaries: true, - } - } -} - -pub trait Apply { - fn apply(self, value: &T) -> Self; -} - -impl Apply for reqwest::RequestBuilder { - fn apply(mut self, options: &SearchOptions) -> Self { - if options.explain { - self = self.query(&[("explain", "true")]); - } - - if options.metadata { - self = self.query(&[("metadata", "true")]); - } - - if !options.summaries { - self = self.query(&[("summaries", "false")]); - } else { - self = self.query(&[("summaries", "true")]); - } - - self - } -} diff --git a/openubl/index/src/system/directory.rs b/openubl/index/src/system/directory.rs deleted file mode 100644 index e9f0550e..00000000 --- a/openubl/index/src/system/directory.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::path::{Path, PathBuf}; - -use sha2::{Digest, Sha256}; -use tantivy::directory::MmapDirectory; -use tantivy::schema::Schema; -use tantivy::tokenizer::TokenizerManager; -use tantivy::{Index, IndexSettings}; - -use crate::system::state::IndexState; -use crate::Error; - -/// Represents state of the index on disk and managing index swaps. -#[derive(Debug)] -pub struct IndexDirectory { - path: PathBuf, - state: IndexState, - digest: Vec, -} - -impl IndexDirectory { - /// Attempt to build a new index from the serialized zstd data - pub fn sync( - &mut self, - schema: Schema, - settings: IndexSettings, - tokenizers: TokenizerManager, - data: &[u8], - ) -> Result, Error> { - let digest = Sha256::digest(data).to_vec(); - if self.digest != digest { - let next = self.state.next(); - let path = next.directory(&self.path); - let index = self.unpack(schema, settings, tokenizers, data, &path)?; - self.state = next; - self.digest = digest; - Ok(Some(index)) - } else { - Ok(None) - } - } - - pub fn new(path: &PathBuf) -> Result { - if path.exists() { - std::fs::remove_dir_all(path).map_err(|e| Error::Open(e.to_string()))?; - } - std::fs::create_dir_all(path).map_err(|e| Error::Open(e.to_string()))?; - let state = IndexState::A; - Ok(Self { - digest: Vec::new(), - path: path.clone(), - state, - }) - } - - pub fn reset( - &mut self, - settings: IndexSettings, - schema: Schema, - tokenizers: TokenizerManager, - ) -> Result { - let next = self.state.next(); - let path = next.directory(&self.path); - if path.exists() { - std::fs::remove_dir_all(&path).map_err(|e| Error::Open(e.to_string()))?; - } - std::fs::create_dir_all(&path).map_err(|e| Error::Open(e.to_string()))?; - let index = self.build_new(settings, schema, tokenizers, &path)?; - self.state = next; - Ok(index) - } - - fn build_new( - &self, - settings: IndexSettings, - schema: Schema, - tokenizers: TokenizerManager, - path: &Path, - ) -> Result { - std::fs::create_dir_all(path).map_err(|e| Error::Open(e.to_string()))?; - let dir = MmapDirectory::open(path).map_err(|e| Error::Open(e.to_string()))?; - let builder = Index::builder() - .schema(schema) - .settings(settings) - .tokenizers(tokenizers); - let index = builder - .open_or_create(dir) - .map_err(|e| Error::Open(e.to_string()))?; - Ok(index) - } - - pub fn build( - &self, - settings: IndexSettings, - schema: Schema, - tokenizers: TokenizerManager, - ) -> Result { - let path = self.state.directory(&self.path); - self.build_new(settings, schema, tokenizers, &path) - } - - fn unpack( - &mut self, - schema: Schema, - settings: IndexSettings, - tokenizers: TokenizerManager, - data: &[u8], - path: &Path, - ) -> Result { - if path.exists() { - std::fs::remove_dir_all(path).map_err(|e| Error::Open(e.to_string()))?; - } - std::fs::create_dir_all(path).map_err(|e| Error::Open(e.to_string()))?; - - let dec = zstd::stream::Decoder::new(data).map_err(Error::Io)?; - let mut archive = tar::Archive::new(dec); - archive.unpack(path).map_err(Error::Io)?; - log::trace!("Unpacked into {:?}", path); - - let dir = MmapDirectory::open(path).map_err(|e| Error::Open(e.to_string()))?; - let builder = Index::builder() - .schema(schema) - .settings(settings) - .tokenizers(tokenizers); - let inner = builder - .open_or_create(dir) - .map_err(|e| Error::Open(e.to_string()))?; - Ok(inner) - } - - pub fn pack(&mut self) -> Result, Error> { - let path = self.state.directory(&self.path); - let mut out = Vec::new(); - let enc = zstd::stream::Encoder::new(&mut out, 3).map_err(Error::Io)?; - let mut archive = tar::Builder::new(enc.auto_finish()); - log::trace!("Packing from {:?}", path); - archive.append_dir_all("", path).map_err(Error::Io)?; - drop(archive); - Ok(out) - } -} diff --git a/openubl/index/src/system/mod.rs b/openubl/index/src/system/mod.rs index 3cf0c166..402c7855 100644 --- a/openubl/index/src/system/mod.rs +++ b/openubl/index/src/system/mod.rs @@ -1,3 +1,45 @@ -pub mod directory; -pub mod state; -pub mod writer; +use tantivy::{Index, IndexWriter, TantivyError}; +use tantivy::directory::{MmapDirectory}; + +use crate::config::SearchEngine; +use crate::system::ubl_document::UblDocumentIndex; + +mod ubl_document; + +pub enum SearchEngineSystem { + Local(IndexWriter), +} + +#[derive(Debug, thiserror::Error)] +pub enum LocalIndexError { + #[error(transparent)] + Tantivy(TantivyError), +} + +#[derive(Debug, thiserror::Error)] +pub enum IndexError { + #[error(transparent)] + Local(#[from] LocalIndexError), +} + +impl From for IndexError { + fn from(e: TantivyError) -> Self { + Self::Local(LocalIndexError::Tantivy(e)) + } +} + +impl SearchEngineSystem { + pub async fn new(config: &SearchEngine) -> anyhow::Result { + match config { + SearchEngine::Local(config) => { + let ubl_document_index = UblDocumentIndex::new(); + + let directory = MmapDirectory::open(&config.index_dir)?; + let index = Index::open_or_create(directory, ubl_document_index.schema.clone())?; + + let index_writer = index.writer(config.index_writer_memory_bytes.as_u64() as usize)?; + Ok(Self::Local(index_writer)) + } + } + } +} diff --git a/openubl/index/src/system/state.rs b/openubl/index/src/system/state.rs deleted file mode 100644 index 0365df7e..00000000 --- a/openubl/index/src/system/state.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::path::{Path, PathBuf}; - -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum IndexState { - A, - B, -} - -impl IndexState { - pub fn directory(&self, root: &Path) -> PathBuf { - match self { - Self::A => root.join("a"), - Self::B => root.join("b"), - } - } - - pub fn next(&self) -> Self { - match self { - Self::A => Self::B, - Self::B => Self::A, - } - } -} diff --git a/openubl/index/src/system/ubl_document.rs b/openubl/index/src/system/ubl_document.rs new file mode 100644 index 00000000..996816be --- /dev/null +++ b/openubl/index/src/system/ubl_document.rs @@ -0,0 +1,69 @@ +use tantivy::Document; +use tantivy::schema::{Field, Schema, STORED, TEXT}; + +use crate::system::{IndexError, SearchEngineSystem}; + +pub(crate) struct UblDocumentIndex { + pub(crate) schema: Schema, + pub(crate) fields: UblDocumentIndexFields, +} + +pub(crate) struct UblDocumentIndexFields { + id: Field, + project_id: Field, + document_type: Field, + document_id: Field, + supplier_id: Field, +} + +impl UblDocumentIndex { + pub(crate) fn new() -> Self { + let mut schema = Schema::builder(); + let fields = UblDocumentIndexFields { + id: schema.add_text_field("id", TEXT | STORED), + project_id: schema.add_text_field("project_id", TEXT | STORED), + document_type: schema.add_text_field("document_type", TEXT | STORED), + document_id: schema.add_text_field("document_id", TEXT | STORED), + supplier_id: schema.add_text_field("supplier_id", TEXT | STORED), + }; + + Self { + schema: schema.build(), + fields, + } + } +} + +pub struct UblDocumentIndexModel { + id: i32, + project_id: i32, + document_type: String, + document_id: String, + supplier_id: String, +} + +impl SearchEngineSystem { + pub async fn index_ubl_document( + &mut self, + model: &UblDocumentIndexModel, + ) -> Result<(), IndexError> { + match self { + SearchEngineSystem::Local(ref mut index_writer) => { + let ubl_document_index = UblDocumentIndex::new(); + let fields = ubl_document_index.fields; + + let mut document = Document::default(); + document.add_u64(fields.id, model.id as u64); + document.add_u64(fields.project_id, model.project_id as u64); + document.add_text(fields.document_type, &model.document_type); + document.add_text(fields.document_id, &model.document_id); + document.add_text(fields.supplier_id, &model.supplier_id); + + index_writer.add_document(document)?; + index_writer.commit()?; + + Ok(()) + } + } + } +} diff --git a/openubl/index/src/system/writer.rs b/openubl/index/src/system/writer.rs deleted file mode 100644 index 08236f5b..00000000 --- a/openubl/index/src/system/writer.rs +++ /dev/null @@ -1,273 +0,0 @@ -use std::sync::RwLock; - -use openubl_storage::StorageSystem; -use tantivy::directory::INDEX_WRITER_LOCK; -use tantivy::{Directory, Index}; - -use crate::client::write::WriteIndex; -use crate::config::{IndexMode, SearchEngine}; -use crate::system::directory::IndexDirectory; -use crate::{Error, IndexStore}; - -/// A writer for an index that allows batching document writes before committing a batch. -/// -/// Batching document writes can improve performance by reducing the number of commits to the index. -pub struct IndexWriter { - writer: tantivy::IndexWriter, -} - -impl IndexWriter { - /// Add a document to the batch. - pub fn add_document( - &mut self, - index: &dyn WriteIndex, - id: &str, - data: &[u8], - ) -> Result<(), Error> { - self.add_document_with_id(index, data, id, |_| id.to_string()) - } - - /// Add a document with a given identifier to the batch. - pub fn add_document_with_id( - &mut self, - index: &dyn WriteIndex, - data: &[u8], - name: &str, - id: F, - ) -> Result<(), Error> - where - F: FnOnce(&DOC) -> String, - { - match index.parse_doc(data) { - Ok(doc) => { - let id = &id(&doc); - let docs = index.index_doc(id, &doc)?; - for (i, doc) in docs { - self.delete_document(index, &i); - self.writer.add_document(doc)?; - } - } - Err(e) => { - log::warn!("Error parsing document '{name}': {e:?}"); - } - } - - Ok(()) - } - - /// Commit the batch and consume the writer. May merge index segments. - pub fn commit(mut self) -> Result<(), Error> { - self.writer.commit()?; - self.writer.wait_merging_threads()?; - Ok(()) - } - - /// Add a delete operation to the batch. - pub fn delete_document(&self, index: &dyn WriteIndex, key: &str) { - let term = index.doc_id_to_term(key); - self.writer.delete_term(term); - } -} - -impl IndexStore { - pub fn new_in_memory(index: INDEX) -> Result { - let schema = index.schema(); - let settings = index.settings(); - let tokenizers = index.tokenizers()?; - let builder = Index::builder() - .schema(schema) - .settings(settings) - .tokenizers(tokenizers); - let inner = builder.create_in_ram()?; - let name = index.name().to_string(); - Ok(Self { - inner: RwLock::new(inner), - index, - index_writer_memory_bytes: 32 * 1024 * 1024, - index_dir: None, - }) - } - - pub fn new(config: &SearchEngine, index: INDEX) -> Result { - todo!() - // match config.mode { - // IndexMode::File => { - // let path = config - // .index_dir - // .clone() - // .unwrap_or_else(|| { - // use rand::RngCore; - // let r = rand::thread_rng().next_u32(); - // std::env::temp_dir().join(format!("index.{}", r)) - // }) - // .join(index.name()); - // - // let schema = index.schema(); - // let settings = index.settings(); - // let tokenizers = index.tokenizers()?; - // - // let index_dir = IndexDirectory::new(&path)?; - // let inner = index_dir.build(settings, schema, tokenizers)?; - // let name = index.name().to_string(); - // Ok(Self { - // inner: RwLock::new(inner), - // index_writer_memory_bytes: config.index_writer_memory_bytes.as_u64() as usize, - // index_dir: Some(RwLock::new(index_dir)), - // index, - // }) - // } - // IndexMode::S3 => { - // todo!("To be implemented"); - // // let bucket = config.bucket.clone().try_into()?; - // // let schema = index.schema(); - // // let settings = index.settings(); - // // let tokenizers = index.tokenizers()?; - // // let builder = Index::builder() - // // .schema(schema) - // // .settings(settings) - // // .tokenizers(tokenizers); - // // let dir = S3Directory::new(bucket); - // // let inner = builder.open_or_create(dir)?; - // // let name = index.name().to_string(); - // // Ok(Self { - // // inner: RwLock::new(inner), - // // index_writer_memory_bytes: config.index_writer_memory_bytes.as_u64() as usize, - // // index_dir: None, - // // index, - // // }) - // } - // } - } - - pub fn index(&self) -> &INDEX { - &self.index - } - - pub fn index_as_mut(&mut self) -> &mut INDEX { - &mut self.index - } - - /// Sync the index from a snapshot. - /// - /// NOTE: Only applicable for file indices. - pub async fn sync(&self, storage: &StorageSystem) -> Result<(), Error> { - if let Some(index_dir) = &self.index_dir { - let data = storage.get_index(self.index.name()).await?; - let mut index_dir = index_dir.write().unwrap(); - match index_dir.sync( - self.index.schema(), - self.index.settings(), - self.index.tokenizers()?, - &data, - ) { - Ok(Some(index)) => { - *self.inner.write().unwrap() = index; - log::debug!("Index reloaded"); - } - Ok(None) => { - // No index change - } - Err(e) => { - log::warn!("Error syncing index: {:?}, keeping old", e); - return Err(e); - } - } - log::debug!("Index reloaded"); - } - Ok(()) - } - - // Reset the index to an empty state. - pub fn reset(&mut self) -> Result<(), Error> { - if let Some(index_dir) = &self.index_dir { - let mut index_dir = index_dir.write().unwrap(); - let index = index_dir.reset( - self.index.settings(), - self.index.schema(), - self.index.tokenizers()?, - )?; - let mut inner = self.inner.write().unwrap(); - *inner = index; - } - Ok(()) - } - - pub fn commit(&self, writer: IndexWriter) -> Result<(), Error> { - writer.commit()?; - Ok(()) - } - - /// Take a snapshot of the index and push to object storage. - /// - /// NOTE: Only applicable for file indices. - /// - /// - // Disable the lint due to a [bug in clippy](https://github.com/rust-lang/rust-clippy/issues/6446). - #[allow(clippy::await_holding_lock)] - pub async fn snapshot( - &mut self, - writer: IndexWriter, - storage: &StorageSystem, - force: bool, - ) -> Result<(), Error> { - if let Some(index_dir) = &self.index_dir { - writer.commit()?; - - let mut dir = index_dir.write().unwrap(); - let mut inner = self.inner.write().unwrap(); - inner.directory_mut().sync_directory().map_err(Error::Io)?; - let lock = inner.directory_mut().acquire_lock(&INDEX_WRITER_LOCK); - - let managed_files = inner.directory().list_managed_files(); - - let mut total_size: i64 = 0; - for file in managed_files.iter() { - log::trace!("Managed file: {:?}", file); - let sz = std::fs::metadata(file).map(|m| m.len()).unwrap_or(0); - total_size += sz as i64; - } - - let gc_result = inner.directory_mut().garbage_collect(|| managed_files)?; - log::trace!( - "Gc result. Deleted: {:?}, failed: {:?}", - gc_result.deleted_files, - gc_result.failed_to_delete_files - ); - let changed = !gc_result.deleted_files.is_empty(); - inner.directory_mut().sync_directory().map_err(Error::Io)?; - if force || changed { - log::info!("Index has changed, publishing new snapshot"); - let out = dir.pack()?; - drop(lock); - drop(inner); - drop(dir); - match storage.put_index(self.index.name(), &out).await { - Ok(_) => { - log::trace!("Snapshot published successfully"); - Ok(()) - } - Err(e) => { - log::warn!("Error updating index: {:?}", e); - Err(e.into()) - } - } - } else { - log::trace!("No changes to index"); - Ok(()) - } - } else { - log::trace!("Committing index"); - writer.commit()?; - Ok(()) - } - } - - pub fn writer(&mut self) -> Result { - let writer = self - .inner - .write() - .unwrap() - .writer(self.index_writer_memory_bytes)?; - Ok(IndexWriter { writer }) - } -} diff --git a/openubl/server/Cargo.toml b/openubl/server/Cargo.toml index 07098e32..10fddf61 100644 --- a/openubl/server/Cargo.toml +++ b/openubl/server/Cargo.toml @@ -11,6 +11,7 @@ openubl-common = { path = "../common" } openubl-entity = { path = "../entity" } openubl-oidc = { path = "../oidc" } openubl-storage = { path = "../storage" } +openubl-index = { path = "../index" } xsender = { path = "../../xsender" } xbuilder = { path = "../../xbuilder" } diff --git a/openubl/server/src/lib.rs b/openubl/server/src/lib.rs index 980ed60f..cdd5b8c1 100644 --- a/openubl/server/src/lib.rs +++ b/openubl/server/src/lib.rs @@ -10,6 +10,7 @@ use actix_web::{web, App, HttpServer}; use openubl_api::system::InnerSystem; use openubl_common::config::Database; +use openubl_index::system::SearchEngineSystem; use openubl_storage::StorageSystem; use crate::server::{files, health, project}; @@ -33,12 +34,26 @@ pub struct Run { #[command(subcommand)] pub storage: openubl_storage::config::Storage, + + // #[command(flatten)] + // pub search_engine: openubl_index::config::SearchEngine, } impl Run { pub async fn run(self) -> anyhow::Result { env_logger::init(); + // Oidc + let oidc = Oidc::new(OidcConfig::Issuer(self.oidc.auth_server_url.clone().into())) + .await + .unwrap(); + let oidc_validator = OidcBiscuitValidator { + options: ValidationOptions { + issuer: Validation::Validate(self.oidc.auth_server_url.clone()), + ..ValidationOptions::default() + }, + }; + // Database let system = match self.bootstrap { true => { @@ -57,18 +72,10 @@ impl Run { // Storage let storage = StorageSystem::new(&self.storage).await?; - let app_state = Arc::new(AppState { system, storage }); + // Search Engine + // let search_engine = SearchEngineSystem::new(&self.search_engine).await?; - // Oidc - let oidc = Oidc::new(OidcConfig::Issuer(self.oidc.auth_server_url.clone().into())) - .await - .unwrap(); - let oidc_validator = OidcBiscuitValidator { - options: ValidationOptions { - issuer: Validation::Validate(self.oidc.auth_server_url.clone()), - ..ValidationOptions::default() - }, - }; + let app_state = Arc::new(AppState { system, storage }); HttpServer::new(move || { App::new() @@ -90,6 +97,7 @@ impl Run { pub struct AppState { pub system: InnerSystem, pub storage: StorageSystem, + // pub search_engine: SearchEngineSystem, } pub fn configure(config: &mut web::ServiceConfig) { diff --git a/openubl/storage/src/lib.rs b/openubl/storage/src/lib.rs index b21fb52f..166ab945 100644 --- a/openubl/storage/src/lib.rs +++ b/openubl/storage/src/lib.rs @@ -4,7 +4,6 @@ use std::io::{Read, Write}; use std::path::Path; use std::str::FromStr; -use anyhow::anyhow; use aws_config::meta::region::RegionProviderChain; use aws_config::retry::RetryConfig; use aws_config::BehaviorVersion; @@ -14,7 +13,7 @@ use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::put_object::PutObjectError; use aws_sdk_s3::primitives::{ByteStream, ByteStreamError}; use aws_smithy_runtime_api::client::orchestrator::HttpResponse; -use minio::s3::args::{GetObjectArgs, UploadObjectArgs}; +use minio::s3::args::UploadObjectArgs; use minio::s3::client::Client as MinioClient; use minio::s3::creds::StaticProvider; use minio::s3::http::BaseUrl; @@ -27,8 +26,6 @@ use crate::config::Storage; pub mod config; -const INDEX_PATH: &str = "/index"; - pub struct LocalDir { path: String, } @@ -38,7 +35,7 @@ pub struct Bucket { } pub enum StorageSystem { - FileSystem(LocalDir), + Local(LocalDir), Minio(Bucket, MinioClient), S3(Bucket, S3Client), } @@ -126,7 +123,7 @@ impl From for StorageSystemErr { impl StorageSystem { pub async fn new(config: &Storage) -> anyhow::Result { match config { - Storage::Local(config) => Ok(Self::FileSystem(LocalDir { + Storage::Local(config) => Ok(Self::Local(LocalDir { path: config.local_dir.clone(), })), Storage::Minio(config) => { @@ -172,82 +169,6 @@ impl StorageSystem { } } - pub async fn put_index(&self, name: &str, index: &[u8]) -> Result<(), StorageSystemErr> { - match self { - StorageSystem::FileSystem(directories) => { - let file_path = Path::new(&directories.path).join(name); - let mut file = File::create(file_path)?; - file.write_all(index)?; - - Ok(()) - } - StorageSystem::Minio(buckets, client) => { - let temp_filename = Uuid::new_v4().to_string(); - - let temp_dir = tempfile::tempdir()?; - let temp_file_path = temp_dir - .into_path() - .join(&temp_filename) - .to_str() - .map(|e| e.to_string()) - .ok_or(StorageSystemErr::Any(anyhow!( - "Could not determine with filename of created index" - )))?; - let mut temp_file = File::create(&temp_file_path)?; - temp_file.write_all(index)?; - - let object_name = format!("{}/{}", INDEX_PATH, name); - let object = &UploadObjectArgs::new(&buckets.name, &object_name, &temp_file_path)?; - client.upload_object(object).await?; - - Ok(()) - } - StorageSystem::S3(buckets, client) => { - let object_name = format!("{}/{}", INDEX_PATH, name); - let body = ByteStream::from(index.to_vec()); - client - .put_object() - .bucket(&buckets.name) - .key(object_name) - .body(body) - .send() - .await?; - Ok(()) - } - } - } - - pub async fn get_index(&self, name: &str) -> Result, StorageSystemErr> { - match self { - StorageSystem::FileSystem(directories) => { - let file_path = Path::new(&directories.path).join(name); - Ok(fs::read(file_path)?) - } - StorageSystem::Minio(buckets, client) => { - let object_name = format!("{}/{}", INDEX_PATH, name); - let object = &GetObjectArgs::new(&buckets.name, &object_name)?; - let response = client.get_object(object).await?; - Ok(response.bytes().await?.to_vec()) - } - StorageSystem::S3(buckets, client) => { - let object_name = format!("{}/{}", INDEX_PATH, name); - let mut response = client - .get_object() - .bucket(&buckets.name) - .key(object_name) - .send() - .await?; - - let mut result: Vec = vec![]; - while let Some(bytes) = response.body.try_next().await? { - result.append(&mut bytes.to_vec()); - } - - Ok(result) - } - } - } - pub async fn upload_ubl_xml( &self, project_id: i32, @@ -264,7 +185,7 @@ impl StorageSystem { let zip_name = format!("{}_{short_sha256}.zip", document_id.to_uppercase()); match self { - StorageSystem::FileSystem(directories) => { + StorageSystem::Local(directories) => { let object_name = Path::new(&directories.path) .join(project_id.to_string()) .join(ruc)