diff --git a/Cargo.toml b/Cargo.toml index 327b573..11592e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,7 @@ rocksdb = "0.22.0" pretty_assertions = "0.7" select = "0.5" + +[dev-dependencies] +tempfile = "3.2.0" + diff --git a/client/index.js b/client/index.js new file mode 100644 index 0000000..866c55d --- /dev/null +++ b/client/index.js @@ -0,0 +1,49 @@ +// Install axios if you haven't already: +// npm install axios + +const axios = require('axios'); + + +let config = { + method: 'get', + maxBodyLength: Infinity, + url: 'http://localhost:3000/namespaces', + headers: { + 'Content-Type': 'application/json' + } +}; + + +// Function to make concurrent requests +async function sendConcurrentRequests() { + const numRequests = 10; + const requests = []; + + // Create an array of Axios promises + for (let i = 0; i < numRequests; i++) { + console.log("sending..") + requests.push(axios.request(config) + .then((response) => { + console.log(JSON.stringify(response.data)); + }) + .catch((error) => { + console.log(error); + })); + } + + try { + // Execute all requests concurrently + const responses = await Promise.all(requests); + + // Process the responses (e.g., log data, handle errors) + responses.forEach((response, index) => { + console.log(`Request ${index + 1} status: ${response.status}`); + // Handle response data as needed + }); + } catch (error) { + console.error('Error making requests:', error.message); + } +} + +// Call the function to send concurrent requests +sendConcurrentRequests(); diff --git a/client/package.json b/client/package.json new file mode 100644 index 0000000..291a14e --- /dev/null +++ b/client/package.json @@ -0,0 +1,11 @@ +{ + "name": "client", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "SimranMakhija7", + "license": "ISC" +} diff --git a/design_doc.md b/design_doc.md new file mode 100644 index 0000000..47deb73 --- /dev/null +++ b/design_doc.md @@ -0,0 +1,162 @@ +# Design Document + +## Overview + +Modern OLAPs are designed to perform analytic queries, with which we often need to do subqueries, joins, and aggregation. The problem is that the query optimizer needs metadata and statistics, such as data distribution and indexes to do a better job. To solve this problem, we need to build a catalog that serves as a “database of metadata for the database” that stores statistics from the execution engine and data discovery from I/O services, and that provides metadata to the planner and data location to schedule the query. + +## Architectural Design + +![Archirectural Diagram](https://github.com/teyenc/15721_catalog_private/assets/56297484/706e834b-9983-4de2-ad28-2c0e4acf3fd2) + +#### Input/Output + +We will be exposing a REST API for interaction with other components. The inputs and results for the same are a part of the API Spec. + +#### Components + +The components of our architecture include the rust service application and rocksDB as an embedded database. + +##### Rust Application + +A Rust application that exposes a REST API to modify database metadata of the OLAP database. The application consists of the following components: + +- A data model that defines the structure and meaning of the metadata we store, such as schemas, tables, columns, measures, dimensions, hierarchies, and levels. The data model is represented by Rust structs that can be serialized and deserialized using Substrait. +- A database layer that interacts with RocksDB. The database layer provides methods for storing and retrieving the database metadata as key-value pairs using the RocksDB crate. +- A service layer that contains the business logic for the REST API, such as validating inputs, checking permissions, handling errors, etc. The service layer depends on the database layer and uses the data model to manipulate the database metadata. +- A controller layer that exposes the service methods as RESTful endpoints using a web framework, such as warp or axum. The controller layer uses the web framework’s features, such as filters, macros, and async functions, to parse the request parameters and format the response. + +##### Database for metadata + +We choose RocksDB as the database in the catalog to store metadata. It is a fast and persistent key-value store that can be used as an embedded database for Rust applications. + +##### RocksDB Schema + +Column Families + +We use column families for a logical separation and grouping of the metadata store. The + +- Table Data + - Name - string (key) + - Number of columns - u64 + - Read properties - json + - Write properties - json + - File URLs array - string array + - Columns arrays, for each column + - Aggregates - json object + - Range of values - int pair + - Lower bound + - Upper bound + - name - string + - isStrongKey - boolean + - isWeakKey - boolean + - Primary Key col name +- Namespace Data + - Name - string (key) + - Properties - json object +- Operator statistics + - Operator string - string (key) + - Cardinality of prev result - u64 + +#### Tuning/Configuration options + +The catalog can be passed a configuration file at bootstrap with the following configuration options: + +1. data-warehouse-location +2. client-pool-size +3. cache-enabled +4. cache-expiration-interval + +## Design Rationale + +An explanation of why you chose the given design. Your justification should discuss issues related to (1) correctness, (2) performance, (3) engineering complexity/maintainability, and (4) testing. It should also include a brief discussion of the other implementations that you considered and why they were deemed inferior. +Most design decisions were made with the assumption that we do not have any schema updates and writes are infrequent with bulk data + +#### Database + +We contemplated two embedded database candidates for catalog service: SQLite and RocksDB. We chose RocksDB because + +1. Better concurrency control: SQLite locks the entire database when dealing with concurrency writing, whereas RocksDB supports snapshots. +2. Flexibility: RocksDB provides more configuration options. +3. Scalability: RocksDB stores data in different partitions, whereas SQLite stores data in one single file, which isn’t ideal for scalability. +4. Storage: RocksDB uses key-value storage, which is good for intensive write operations. In short, RocksDB would provide better performance and concurrency control when we deal with write-intensive workloads. + +#### Why a key-value store? + +1. Based on [1], the catalog for an OLAP system behaves a lot like an OLTP database. They state how using a key-value store in the form of FoundationDB has proved to be beneficial based on performance and scalability. This includes supporting high-frequency reads, and support for dynamic metadata storage. +2. [2] compares and benchmarks the performance of tabular storage vs hierarchical organization of metadata as seen in Iceberg and finds the single node processing in Iceberg performs better than the others for small tables but fails to scale. It concludes that the metadata access patterns have a significant impact on the performance of distributed query processing. +3. Taking these factors into account, we have decided to go ahead with a key-value store for the simplicity and flexibility it provides along with the performance benefits. + +#### Axum + +After looking through several available options to use build APIs, such as Hyper and Actix, we have selected Axum. + +- Axum framework is built on top of Hyper and Tokio and abstracts some of the low level details +- This, however, does not result in any significant performance overhead. +- Benchmarks for frameworks are listed in [3] + +## Testing plan + +A detailed description of how you are going to determine that your implementation is both (1) correct and (2) performant. You should describe the short unit tests and long running regression tests. Some portion of your testing plan must also use your project's public API, thus you are allowed to share testing infrastructure with the other group implementing the same thing. + +### Correctness testing + +#### Unit tests + +For the correctness of the catalog, we plan to conduct unit tests and regression tests. In unit testing, we will test key components and operations such as metadata retrieval, metadata storage, update, and snapshot isolation. + +Basic unit tests for handler functions are underway. + +#### Regression tests + +Currently, we plan to conduct regression tests on + +1. Concurrency and parallelism to ensure data integrity +2. Correctness of all the APIs in API spec documentation. +3. Failure Scenarios: Test the behavior and performance under failure scenarios, such as network errors, server failures, or other exceptional conditions. + +### Performance testing +1. Concurrency Workloads: Test the performance under concurrent access by simulating multiple clients performing various operations simultaneously (e.g., multiple clients creating tables, querying metadata, committing snapshots, etc.). +2. Large Schemas and Datasets: Evaluate the performance with tables having a large number of columns (e.g., hundreds or thousands of columns) and large datasets with many partitions or snapshots. +3. Bulk Operations: Test the performance of bulk operations like importing or exporting table metadata, snapshots, or partitions. +4. Mixed Workloads: Combine different types of operations in a single workload to simulate a more realistic scenario where various operations are performed concurrently. + + +## Trade-offs and Potential Problems + +Describe any conscious trade-off you made in your implementation that could be problematic in the future or any problems discovered during the design process that remain unaddressed (technical debts). +The biggest trade-off being made in the current design is the absence of any optimizations for updates. Updates to any tables will result in the metadata of the tables stored to become stale. Efficiently updating these values is a design challenge. This has not been prioritized based on the assumption that updates in an OLAP system will be infrequent. +Organizing the namespaces and tables as prefixes in the keys for the store may cause problems in terms of maintainability. +Database +We chose RocksDB to store metadata, whereas Iceberg Catalog has its own metadata layer that includes metadata files, manifest lists, and manifests. Using RocksDB could be more straightforward to implement compared to building everything from scratch. The components in Iceberg Catalog are likely to be optimized for Iceberg Catalog, and they could outperform RocksDB, which is not dedicated to catalog service. + +## Support for Parallelism +Our catalog service is designed to support parallelism to enhance performance. This is achieved through the following ways: +1. **Concurrency Control in RocksDB**: RocksDB, our chosen database, supports concurrent reads and writes. This allows multiple threads to read and write to the database simultaneously, improving the throughput of our service. + +2. **Asynchronous API**: The REST API exposed by our Rust application is asynchronous, meaning it can handle multiple requests at the same time without blocking. This is particularly useful for operations that are I/O-bound, such as reading from or writing to the database. + +3. **Thread Pool**: We plan to use a thread pool for handling requests. This allows us to limit the number of threads used by our application, preventing thread thrashing and improving performance. + +## Performance Tuning Plan +Performance tuning is crucial for the efficiency and speed of our catalog service. Here's our plan: + +1. **RocksDB Tuning**: We will tune RocksDB configurations based on our workload. For example, we can adjust the block cache size, write buffer size, and compaction style to optimize for read-heavy or write-heavy workloads. More details can be found in the [RocksDB Tuning Guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide). + +2. **API Optimization**: We will monitor the performance of our API endpoints and optimize the slow ones. This will involve optimizing the database access methods and refactoring the code. + +3. **Load Testing**: We will conduct load testing to understand how our service performs under heavy load. This will help us identify bottlenecks and areas for improvement. + +4. **Monitoring and Metrics**: We will add monitoring and metrics to our service to track performance over time. This will help us identify performance regressions and understand the impact of our tuning efforts. + +## Milestones + +- 75%: Basic API support +- 100%: Support for parallelism and performance tuning +- 125%: Performance testing against Iceberg Catalog + + +### References + +[1] https://www.snowflake.com/blog/how-foundationdb-powers-snowflake-metadata-forward/ +[2] https://15721.courses.cs.cmu.edu/spring2024/papers/18-databricks/p92-jain.pdf +[3] https://github.com/programatik29/rust-web-benchmarks/blob/master/result/hello-world.md diff --git a/src/database/database.rs b/src/database/database.rs index 3e28285..3da898a 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -2,18 +2,9 @@ use rocksdb::{ColumnFamilyDescriptor, IteratorMode, Options, DB}; use serde::{Deserialize, Serialize}; use std::io::{self, ErrorKind}; use std::path::Path; -use std::sync::Arc; pub struct Database { - db: Arc, -} - -impl Clone for Database { - fn clone(&self) -> Self { - Self { - db: Arc::clone(&self.db), - } - } + db: DB, } impl Database { @@ -22,16 +13,13 @@ impl Database { opts.create_if_missing(true); opts.create_missing_column_families(true); - let namespace_cf_opts = Options::default(); - let namespace_cf = ColumnFamilyDescriptor::new("NamespaceData", namespace_cf_opts); - - let table_cf_opts = Options::default(); - let table_cf = ColumnFamilyDescriptor::new("TableData", table_cf_opts); + let namespace_cf = ColumnFamilyDescriptor::new("NamespaceData", Options::default()); + let table_cf = ColumnFamilyDescriptor::new("TableData", Options::default()); + let operator_cf = ColumnFamilyDescriptor::new("OperatorStatistics", Options::default()); + let table_namespace_cf = + ColumnFamilyDescriptor::new("TableNamespaceMap", Options::default()); - let operator_cf_opts = Options::default(); - let operator_cf = ColumnFamilyDescriptor::new("OperatorStatistics", operator_cf_opts); - - let cfs_vec = vec![namespace_cf, table_cf, operator_cf]; + let cfs_vec = vec![namespace_cf, table_cf, operator_cf, table_namespace_cf]; let db = DB::open_cf_descriptors(&opts, path, cfs_vec) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; @@ -123,5 +111,142 @@ impl Database { .map_err(|e| io::Error::new(ErrorKind::Other, e))?; Ok(()) } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_open() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()); + assert!(db.is_ok()); + } + + #[test] + fn test_insert_and_get() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let key = "test_key"; + let value = "test_value"; + + // Test insert + let insert_result = db.insert("NamespaceData", key, &value); + assert!(insert_result.is_ok()); + + // Test get + let get_result: Result, _> = db.get("NamespaceData", key); + assert!(get_result.is_ok()); + assert_eq!(get_result.unwrap().unwrap(), value); + } + + #[test] + fn test_delete() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let key = "test_key"; + let value = "test_value"; + + // Insert a key-value pair + db.insert("NamespaceData", key, &value).unwrap(); + + // Delete the key + let delete_result = db.delete("NamespaceData", key); + assert!(delete_result.is_ok()); + + // Try to get the deleted key + let get_result: Result, _> = db.get("NamespaceData", key); + assert!(get_result.is_ok()); + assert!(get_result.unwrap().is_none()); + } + + #[test] + fn test_insert_and_get_nonexistent_cf() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let key = "test_key"; + let value = "test_value"; + + // Test insert with nonexistent column family + let insert_result = db.insert("NonexistentCF", key, &value); + assert!(insert_result.is_err()); + + // Test get with nonexistent column family + let get_result: Result, _> = db.get("NonexistentCF", key); + assert!(get_result.is_err()); + } + + #[test] + fn test_get_nonexistent_key() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + + // Test get with nonexistent key + let get_result: Result, _> = db.get("NamespaceData", "nonexistent_key"); + assert!(get_result.is_ok()); + assert!(get_result.unwrap().is_none()); + } + + #[test] + fn test_delete_nonexistent_key() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + + // Test delete with nonexistent key + let delete_result = db.delete("NamespaceData", "nonexistent_key"); + assert!(delete_result.is_ok()); + } + + #[test] + fn test_insert_and_get_empty_key() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let key = ""; + let value = "test_value"; + + // Test insert with empty key + let insert_result = db.insert("NamespaceData", key, &value); + assert!(insert_result.is_ok()); + + // Test get with empty key + let get_result: Result, _> = db.get("NamespaceData", key); + assert!(get_result.is_ok()); + assert_eq!(get_result.unwrap().unwrap(), value); + } + + #[test] + fn test_insert_and_get_empty_value() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let key = "test_key"; + let value = ""; + + // Test insert with empty value + let insert_result = db.insert("NamespaceData", key, &value); + assert!(insert_result.is_ok()); + + // Test get with empty value + let get_result: Result, _> = db.get("NamespaceData", key); + assert!(get_result.is_ok()); + assert_eq!(get_result.unwrap().unwrap(), value); + } + + #[test] + fn test_insert_and_get_large_data() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let key = "test_key"; + let value = "a".repeat(1_000_000); + // Test insert with large data + let insert_result = db.insert("NamespaceData", key, &value); + assert!(insert_result.is_ok()); + + // Test get with large data + let get_result: Result, _> = db.get("NamespaceData", key); + assert!(get_result.is_ok()); + assert_eq!(get_result.unwrap().unwrap(), value); + } } diff --git a/src/dto/column_data.rs b/src/dto/column_data.rs index edafdef..3938dfa 100644 --- a/src/dto/column_data.rs +++ b/src/dto/column_data.rs @@ -1,11 +1,47 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct ColumnData { + pub name: String, pub aggregates: Value, - pub value_range: (i32, i32), + pub value_range: (i32, i32), // todo: should this be optional? pub is_strong_key: bool, pub is_weak_key: bool, pub primary_key_col_name: String, } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_column_data_serialization() { + let column_data = ColumnData { + name: "test_column".to_string(), + aggregates: json!({"count": 100, "sum": 200}), + value_range: (10, 20), + is_strong_key: true, + is_weak_key: false, + primary_key_col_name: "id".to_string(), + }; + + let serialized = serde_json::to_string(&column_data).unwrap(); + let expected = r#"{"name":"test_column","aggregates":{"count":100,"sum":200},"value_range":[10,20],"is_strong_key":true,"is_weak_key":false,"primary_key_col_name":"id"}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn test_column_data_deserialization() { + let data = r#"{"name":"test_column","aggregates":{"count":100,"sum":200},"value_range":[10,20],"is_strong_key":true,"is_weak_key":false,"primary_key_col_name":"id"}"#; + let column_data: ColumnData = serde_json::from_str(data).unwrap(); + + assert_eq!(column_data.name, "test_column"); + assert_eq!(column_data.aggregates, json!({"count": 100, "sum": 200})); + assert_eq!(column_data.value_range, (10, 20)); + assert_eq!(column_data.is_strong_key, true); + assert_eq!(column_data.is_weak_key, false); + assert_eq!(column_data.primary_key_col_name, "id"); + } +} diff --git a/src/dto/mod.rs b/src/dto/mod.rs index a5ee4af..7138126 100644 --- a/src/dto/mod.rs +++ b/src/dto/mod.rs @@ -1,4 +1,5 @@ pub mod column_data; pub mod namespace_data; pub mod operator_statistics; +pub mod rename_request; pub mod table_data; diff --git a/src/dto/namespace_data.rs b/src/dto/namespace_data.rs index 7066446..21026ea 100644 --- a/src/dto/namespace_data.rs +++ b/src/dto/namespace_data.rs @@ -1,8 +1,66 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct NamespaceData { pub name: String, pub properties: Value, } + +impl NamespaceData { + pub fn get_name(&self) -> String { + self.name.clone() + } + + pub fn get_properties(&self) -> Value { + self.properties.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_namespace_data_methods() { + let properties = json!({"property1": "value1", "property2": "value2"}); + let namespace_data = NamespaceData { + name: "test_namespace".to_string(), + properties: properties.clone(), + }; + + // Test get_name method + assert_eq!(namespace_data.get_name(), "test_namespace"); + + // Test get_properties method + assert_eq!(namespace_data.get_properties(), properties); + } + + #[test] + fn test_namespace_data_serialization() { + let properties = json!({"property1": "value1", "property2": "value2"}); + let namespace_data = NamespaceData { + name: "test_namespace".to_string(), + properties: properties.clone(), + }; + + let serialized = serde_json::to_string(&namespace_data).unwrap(); + let expected = + r#"{"name":"test_namespace","properties":{"property1":"value1","property2":"value2"}}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn test_namespace_data_deserialization() { + let data = + r#"{"name":"test_namespace","properties":{"property1":"value1","property2":"value2"}}"#; + let namespace_data: NamespaceData = serde_json::from_str(data).unwrap(); + + assert_eq!(namespace_data.name, "test_namespace"); + assert_eq!( + namespace_data.properties, + json!({"property1": "value1", "property2": "value2"}) + ); + } +} diff --git a/src/dto/operator_statistics.rs b/src/dto/operator_statistics.rs index 0bb293e..cb7b7d9 100644 --- a/src/dto/operator_statistics.rs +++ b/src/dto/operator_statistics.rs @@ -5,3 +5,29 @@ pub struct OperatorStatistics { pub operator_string: String, pub cardinality_prev_result: u64, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_operator_statistics_serialization() { + let operator_statistics = OperatorStatistics { + operator_string: "test_operator".to_string(), + cardinality_prev_result: 100, + }; + + let serialized = serde_json::to_string(&operator_statistics).unwrap(); + let expected = r#"{"operator_string":"test_operator","cardinality_prev_result":100}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn test_operator_statistics_deserialization() { + let data = r#"{"operator_string":"test_operator","cardinality_prev_result":100}"#; + let operator_statistics: OperatorStatistics = serde_json::from_str(data).unwrap(); + + assert_eq!(operator_statistics.operator_string, "test_operator"); + assert_eq!(operator_statistics.cardinality_prev_result, 100); + } +} diff --git a/src/dto/rename_request.rs b/src/dto/rename_request.rs new file mode 100644 index 0000000..7e04ea1 --- /dev/null +++ b/src/dto/rename_request.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct TableRenameRequest { + pub namespace: String, + pub old_name: String, + pub new_name: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_rename_request_serialization() { + let table_rename_request = TableRenameRequest { + namespace: "test_namespace".to_string(), + old_name: "old_table_name".to_string(), + new_name: "new_table_name".to_string(), + }; + + let serialized = serde_json::to_string(&table_rename_request).unwrap(); + let expected = r#"{"namespace":"test_namespace","old_name":"old_table_name","new_name":"new_table_name"}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn test_table_rename_request_deserialization() { + let data = r#"{"namespace":"test_namespace","old_name":"old_table_name","new_name":"new_table_name"}"#; + let table_rename_request: TableRenameRequest = serde_json::from_str(data).unwrap(); + + assert_eq!(table_rename_request.namespace, "test_namespace"); + assert_eq!(table_rename_request.old_name, "old_table_name"); + assert_eq!(table_rename_request.new_name, "new_table_name"); + } +} diff --git a/src/dto/table_data.rs b/src/dto/table_data.rs index 6f7b8c0..8565b89 100644 --- a/src/dto/table_data.rs +++ b/src/dto/table_data.rs @@ -2,7 +2,7 @@ use crate::dto::column_data::ColumnData; use serde::{Deserialize, Serialize}; use serde_json::Value; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct TableData { pub name: String, pub num_columns: u64, @@ -11,3 +11,55 @@ pub struct TableData { pub file_urls: Vec, pub columns: Vec, } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_table_data_serialization() { + let column_data = ColumnData { + name: "test_column".to_string(), + aggregates: json!({"count": 100, "sum": 200}), + value_range: (10, 20), + is_strong_key: true, + is_weak_key: false, + primary_key_col_name: "id".to_string(), + }; + let table_data = TableData { + name: "test_table".to_string(), + num_columns: 1, + read_properties: json!({"property1": "value1"}), + write_properties: json!({"property2": "value2"}), + file_urls: vec!["url1".to_string(), "url2".to_string()], + columns: vec![column_data], + }; + + let serialized = serde_json::to_string(&table_data).unwrap(); + let expected = r#"{"name":"test_table","num_columns":1,"read_properties":{"property1":"value1"},"write_properties":{"property2":"value2"},"file_urls":["url1","url2"],"columns":[{"name":"test_column","aggregates":{"count":100,"sum":200},"value_range":[10,20],"is_strong_key":true,"is_weak_key":false,"primary_key_col_name":"id"}]}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn test_table_data_deserialization() { + let data = r#"{"name":"test_table","num_columns":1,"read_properties":{"property1":"value1"},"write_properties":{"property2":"value2"},"file_urls":["url1","url2"],"columns":[{"name":"test_column","aggregates":{"count":100,"sum":200},"value_range":[10,20],"is_strong_key":true,"is_weak_key":false,"primary_key_col_name":"id"}]}"#; + let table_data: TableData = serde_json::from_str(data).unwrap(); + + assert_eq!(table_data.name, "test_table"); + assert_eq!(table_data.num_columns, 1); + assert_eq!(table_data.read_properties, json!({"property1": "value1"})); + assert_eq!(table_data.write_properties, json!({"property2": "value2"})); + assert_eq!(table_data.file_urls, vec!["url1", "url2"]); + assert_eq!(table_data.columns.len(), 1); + assert_eq!(table_data.columns[0].name, "test_column"); + assert_eq!( + table_data.columns[0].aggregates, + json!({"count": 100, "sum": 200}) + ); + assert_eq!(table_data.columns[0].value_range, (10, 20)); + assert_eq!(table_data.columns[0].is_strong_key, true); + assert_eq!(table_data.columns[0].is_weak_key, false); + assert_eq!(table_data.columns[0].primary_key_col_name, "id"); + } +} diff --git a/src/handlers/namespace_handler.rs b/src/handlers/namespace_handler.rs index 1e5e1b3..c100211 100644 --- a/src/handlers/namespace_handler.rs +++ b/src/handlers/namespace_handler.rs @@ -1,73 +1,311 @@ +use crate::dto::namespace_data::NamespaceData; +use crate::repository::namespace::NamespaceRepository; use axum::{ - extract::{Json, Path}, + extract::{Json, Path, State}, http::StatusCode, - response::IntoResponse, }; +use serde_json::Value; +use std::sync::Arc; -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct Namespace {} - -pub async fn list_namespaces() -> Json> { - // Logic to list namespaces - let namespaces: Vec = vec![ - "accounting".to_string(), - "tax".to_string(), - "paid".to_string(), - ]; - Json(namespaces) +/* + TODO: + if a namespace or table already exists, you might want to return a StatusCode::CONFLICT + instead of StatusCode::INTERNAL_SERVER_ERROR. Similarly, if a namespace or table is not found, + you might want to return a StatusCode::NOT_FOUND. +*/ +pub async fn list_namespaces( + State(repo): State>, +) -> Result>, (StatusCode, String)> { + repo.list_all_namespaces() + .map(Json) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn create_namespace(new_namespace: Json) -> Json { - // Logic to create a new namespace - - // Logic to persist the namespace and add properties - new_namespace +pub async fn create_namespace( + State(repo): State>, + new_namespace: Json, +) -> Result, (StatusCode, String)> { + repo.create_namespace( + new_namespace.get_name(), + Some(new_namespace.get_properties()), + ) + .map(|_| new_namespace) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct NamespaceMetadata { - // Define your namespace metadata properties here - // Example: pub metadata_property: String, - data: String, +pub async fn load_namespace_metadata( + State(repo): State>, + Path(namespace): Path, +) -> Result, (StatusCode, String)> { + match repo.load_namespace(namespace.as_str()) { + Ok(Some(metadata)) => Ok(Json(metadata)), + Ok(None) => Err(( + StatusCode::NOT_FOUND, + format!("Namespace {} not found", namespace), + )), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct NamespaceProperties { - data: String, +pub async fn namespace_exists( + State(repo): State>, + Path(namespace): Path, +) -> Result { + repo.namespace_exists(namespace.as_str()) + .map(|exists| { + if exists { + StatusCode::FOUND + } else { + StatusCode::NOT_FOUND + } + }) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn load_namespace_metadata(Path(namespace): Path) -> Json { - print!("Namespaces: {}", namespace); - // Logic to load metadata properties for a namespace - let metadata = NamespaceMetadata { - data: namespace, - // Populate with actual metadata properties - }; - Json(metadata) +pub async fn drop_namespace( + State(repo): State>, + Path(namespace): Path, +) -> Result { + repo.delete_namespace(namespace.as_str()) + .map(|_| StatusCode::NO_CONTENT) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn namespace_exists(Path(namespace): Path) -> impl IntoResponse { - // Logic to check if a namespace exists - // This route just needs to return a status code, no body required - // Return HTTP status code 200 to indicate namespace exists - StatusCode::FOUND +pub async fn set_namespace_properties( + State(repo): State>, + Path(namespace): Path, + properties: Json, +) -> Result { + repo.set_namespace_properties(namespace.as_str(), properties.0) + .map(|_| StatusCode::OK) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn drop_namespace(Path(namespace): Path) -> impl IntoResponse { - // Logic to drop a namespace from the catalog - // Ensure the namespace is empty before dropping - // Return HTTP status code 204 to indicate successful deletion - StatusCode::NO_CONTENT -} +// todo: check commented tests +#[cfg(test)] +mod tests { + use super::*; + use crate::database::database::Database; + use axum::http::StatusCode; + use serde_json::json; + use std::sync::{Arc, Mutex}; + use tempfile::tempdir; + + #[tokio::test] + async fn test_list_namespaces() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let data1 = list_namespaces(State(repo)).await.unwrap(); + let data2: Json> = Json(vec![]); + assert!(*data1 == *data2); + } + + #[tokio::test] + async fn test_create_namespace() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "namespace".to_string(), + properties: json!({}), + }); + assert_eq!( + create_namespace(State(repo), new_namespace.clone()) + .await + .unwrap() + .name, + new_namespace.name + ); + } + + #[tokio::test] + async fn test_load_namespace_metadata() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "namespace".to_string(), + properties: json!({}), + }); + let _ = create_namespace(State(repo.clone()), new_namespace.clone()) + .await + .unwrap(); + + assert_eq!( + load_namespace_metadata(State(repo), Path("namespace".to_string())) + .await + .unwrap() + .name, + new_namespace.name + ); + } + + #[tokio::test] + async fn test_namespace_exists() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "namespace".to_string(), + properties: json!({}), + }); + let _ = create_namespace(State(repo.clone()), new_namespace) + .await + .unwrap(); + assert_eq!( + namespace_exists(State(repo), Path("namespace".to_string())) + .await + .unwrap(), + StatusCode::FOUND + ); + } + + #[tokio::test] + async fn test_drop_namespace() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "namespace".to_string(), + properties: json!({}), + }); + let _ = create_namespace(State(repo.clone()), new_namespace) + .await + .unwrap(); + assert_eq!( + drop_namespace(State(repo), Path("namespace".to_string())) + .await + .unwrap(), + StatusCode::NO_CONTENT + ); + } + + #[tokio::test] + async fn test_set_namespace_properties() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "namespace".to_string(), + properties: json!({}), + }); + let _ = create_namespace(State(repo.clone()), new_namespace) + .await + .unwrap(); + assert_eq!( + set_namespace_properties( + State(repo), + Path("namespace".to_string()), + Json(json!({"property": "value"})) + ) + .await + .unwrap(), + StatusCode::OK + ); + } + + // Negative cases + #[tokio::test] + async fn test_load_namespace_metadata_not_found() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + assert_eq!( + load_namespace_metadata(State(repo), Path("nonexistent".to_string())) + .await + .unwrap_err() + .0, + StatusCode::NOT_FOUND + ); + } + + #[tokio::test] + async fn test_namespace_exists_not_found() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + assert_eq!( + namespace_exists(State(repo), Path("nonexistent".to_string())) + .await + .unwrap(), + StatusCode::NOT_FOUND + ); + } + + /* + #[tokio::test] + async fn test_drop_namespace_not_found() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + assert_eq!( + drop_namespace(State(repo), Path("nonexistent".to_string())) + .await + .unwrap_err() + .0, + StatusCode::INTERNAL_SERVER_ERROR + ); + } + */ -pub async fn set_namespace_properties(Path(namespace): Path) -> Json { - // Logic to set and/or remove properties on a namespace - // Deserialize request body and process properties - // Return HTTP status code 200 to indicate success + #[tokio::test] + async fn test_set_namespace_properties_not_found() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + assert_eq!( + set_namespace_properties( + State(repo), + Path("nonexistent".to_string()), + Json(json!({"property": "value"})) + ) + .await + .unwrap_err() + .0, + StatusCode::INTERNAL_SERVER_ERROR + ); + } - let prop = NamespaceProperties { - data: "namespace properties".to_string(), - }; + /* + // Corner cases + #[tokio::test] + async fn test_create_namespace_empty_name() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "".to_string(), + properties: json!({}), + }); + assert_eq!( + create_namespace(State(repo), new_namespace) + .await + .unwrap_err() + .0, + StatusCode::INTERNAL_SERVER_ERROR + ); + } - Json(prop) + #[tokio::test] + async fn test_create_namespace_already_exists() { + let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )))); + let new_namespace = Json(NamespaceData { + name: "namespace".to_string(), + properties: json!({}), + }); + let _ = create_namespace(State(repo.clone()), new_namespace.clone()) + .await + .unwrap(); + assert_eq!( + create_namespace(State(repo), new_namespace) + .await + .unwrap_err() + .0, + StatusCode::INTERNAL_SERVER_ERROR + ); + } + */ } diff --git a/src/handlers/table_handler.rs b/src/handlers/table_handler.rs index 9be2910..6fb10a7 100644 --- a/src/handlers/table_handler.rs +++ b/src/handlers/table_handler.rs @@ -1,64 +1,87 @@ +use crate::dto::rename_request::TableRenameRequest; +use crate::dto::table_data::TableData; +use crate::repository::table::TableRepository; use axum::{ - extract::{Json, Path}, + extract::{Json, Path, State}, http::StatusCode, - response::IntoResponse, }; +use std::sync::Arc; -pub async fn list_tables(Path(namespace): Path) -> Json> { - // Dummy response for demonstration - let tables: Vec = vec![ - "accounting".to_string(), - "tax".to_string(), - "paid".to_string(), - ]; - Json(tables) +pub async fn list_tables( + State(repo): State>, + Path(namespace): Path, +) -> Result>, (StatusCode, String)> { + repo.list_all_tables(&namespace) + .map(|tables| Json(tables.unwrap_or_default())) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn create_table(Path(namespace): Path) -> impl IntoResponse { - // Logic to create a table in the given namespace - "Table created".to_string() +pub async fn create_table( + State(repo): State>, + Path(namespace): Path, + table: Json, +) -> Result { + repo.create_table(&namespace, &table) + .map(|_| StatusCode::CREATED) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn register_table(Path(namespace): Path) -> impl IntoResponse { - // Logic to register a table in the given namespace using metadata file location - "Table registered".to_string() +pub async fn register_table( + State(repo): State>, + Path(namespace): Path, + table: Json, +) -> Result { + repo.register_table(&namespace, &table) + .map(|_| StatusCode::CREATED) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn load_table(Path((namespace, table)): Path<(String, String)>) -> impl IntoResponse { - // Logic to load a table from the catalog - Json(table) +pub async fn load_table( + State(repo): State>, + Path((namespace, table)): Path<(String, String)>, +) -> Result, (StatusCode, String)> { + match repo.load_table(&namespace, &table) { + Ok(Some(table_data)) => Ok(Json(table_data)), + Ok(None) => Err((StatusCode::NOT_FOUND, format!("Table {} not found", table))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + } } -pub async fn delete_table(Path((namespace, table)): Path<(String, String)>) -> impl IntoResponse { - // Logic to drop a table from the catalog - "Table dropped".to_string() +pub async fn delete_table( + State(repo): State>, + Path((namespace, table)): Path<(String, String)>, +) -> Result { + repo.drop_table(&namespace, &table) + .map(|_| StatusCode::NO_CONTENT) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -pub async fn table_exists(Path((namespace, table)): Path<(String, String)>) -> impl IntoResponse { - // Logic to check if a table exists within a given namespace - StatusCode::OK +pub async fn table_exists( + State(repo): State>, + Path((namespace, table)): Path<(String, String)>, +) -> Result { + match repo.table_exists(&namespace, &table) { + Ok(true) => Ok(StatusCode::FOUND), + Ok(false) => Ok(StatusCode::NOT_FOUND), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + } +} + +pub async fn rename_table( + State(repo): State>, + request: Json, +) -> Result { + repo.rename_table(&request) + .map(|_| StatusCode::OK) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct MetricsReport {} -// Handler functions -pub async fn rename_table(table_rename: String) -> impl IntoResponse { - // Logic to rename a table from its current name to a new name - "Table renamed".to_string() -} - -pub async fn report_metrics(Path((namespace, table)): Path<(String, String)>) -> impl IntoResponse { +pub async fn report_metrics( + Path((namespace, table)): Path<(String, String)>, +) -> Result, (StatusCode, String)> { // Logic to process metrics report - Json(table) -} - -pub async fn find_tuple_location( - Path((namespace, table, tuple_id)): Path<(String, String, String)>, -) -> impl IntoResponse { - // Logic to return the physical file location for a given tuple ID - format!( - "Physical file location for tuple ID {} of table {} in namespace {}.", - tuple_id, table, namespace - ) + Ok(Json(table)) } diff --git a/src/main.rs b/src/main.rs index 10a4616..8625723 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,30 @@ mod config; +mod database; mod dto; mod handlers; +mod repository; mod routes; mod tests; -use crate::config::parameters; +use config::parameters; +use database::database::Database; +use std::sync::{Arc, Mutex}; #[tokio::main] async fn main() { parameters::init(); let host = format!("0.0.0.0:{}", parameters::get("PORT")); + // Create a Database object + let db = Database::open("rocksdb").unwrap(); + + // Wrap it in an Arc> for thread safety + let db = Arc::new(Mutex::new(db)); + let listener = tokio::net::TcpListener::bind(host).await.unwrap(); - let app = routes::root::routes(); + + // Pass the shared Database object to your routes + let app = routes::root::routes(db); + axum::serve(listener, app).await.unwrap(); } diff --git a/src/repository/mod.rs b/src/repository/mod.rs index a0cb07f..a65bd7d 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -1 +1,2 @@ pub mod namespace; +pub mod table; diff --git a/src/repository/namespace.rs b/src/repository/namespace.rs index 32f4959..a30df01 100644 --- a/src/repository/namespace.rs +++ b/src/repository/namespace.rs @@ -2,49 +2,54 @@ use crate::database::database::Database; use crate::dto::namespace_data::NamespaceData; use serde_json::{json, Value}; use std::io; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub struct NamespaceRepository { - db: Arc, + database: Arc>, } impl NamespaceRepository { - pub fn new(db: Arc) -> Self { - Self { db } + pub fn new(database: Arc>) -> Self { + Self { database } } pub fn list_all_namespaces(&self) -> io::Result> { - self.db - .list_all_keys("NamespaceData") + let db = self.database.lock().unwrap(); + db.list_all_keys("NamespaceData") .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } - pub fn create_namespace(&self, name: &str, properties: Option) -> io::Result<()> { + pub fn create_namespace(&self, name: String, properties: Option) -> io::Result<()> { + let name_str: &str = name.as_str(); let namespace_data = NamespaceData { - name: name.to_string(), + name: name_str.to_string(), properties: properties.unwrap_or_else(|| json!({"last_modified_time": current_time()})), }; - self.db.insert("NamespaceData", name, &namespace_data) + let db = self.database.lock().unwrap(); + db.insert("NamespaceData", name_str, &namespace_data) } pub fn load_namespace(&self, name: &str) -> io::Result> { - self.db.get("NamespaceData", name) + let db = self.database.lock().unwrap(); + db.get("NamespaceData", name) } pub fn namespace_exists(&self, name: &str) -> io::Result { - self.db - .get::("NamespaceData", name) + let db = self.database.lock().unwrap(); + db.get::("NamespaceData", name) .map(|data| data.is_some()) } pub fn delete_namespace(&self, name: &str) -> io::Result<()> { - self.db.delete("NamespaceData", name) + let db = self.database.lock().unwrap(); + db.delete("NamespaceData", name) } pub fn set_namespace_properties(&self, name: &str, properties: Value) -> io::Result<()> { if let Some(mut namespace_data) = self.load_namespace(name)? { namespace_data.properties = properties; - self.db.update("NamespaceData", name, &namespace_data) + let db = self.database.lock().unwrap(); + db.update("NamespaceData", name, &namespace_data) } else { Err(io::Error::new( io::ErrorKind::NotFound, @@ -55,5 +60,200 @@ impl NamespaceRepository { } fn current_time() -> String { - "current_time".to_string() + "current_time".to_string() +} + +// todo: check commented tests + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use tempfile::tempdir; + + #[test] + fn test_list_all_namespaces() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert_eq!(repo.list_all_namespaces().unwrap(), Vec::::new()); + } + + #[test] + fn test_create_namespace() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(repo.create_namespace("test".to_string(), None).is_ok()); + } + + #[test] + fn test_load_namespace() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + repo.create_namespace("test".to_string(), None).unwrap(); + assert!(repo.load_namespace("test").unwrap().is_some()); + } + + #[test] + fn test_namespace_exists() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + repo.create_namespace("test".to_string(), None).unwrap(); + assert!(repo.namespace_exists("test").unwrap()); + } + + #[test] + fn test_delete_namespace() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + repo.create_namespace("test".to_string(), None).unwrap(); + assert!(repo.delete_namespace("test").is_ok()); + } + + #[test] + fn test_set_namespace_properties() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + repo.create_namespace("test".to_string(), None).unwrap(); + assert!(repo + .set_namespace_properties("test", json!({"property": "value"})) + .is_ok()); + } + + #[test] + fn test_load_namespace_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(repo.load_namespace("nonexistent").unwrap().is_none()); + } + + #[test] + fn test_namespace_exists_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(!repo.namespace_exists("nonexistent").unwrap()); + } + + // #[test] + // fn test_delete_namespace_not_found() { + // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); + // let repo = NamespaceRepository::new(db); + // assert!(repo.delete_namespace("nonexistent").is_err()); + // } + + #[test] + fn test_set_namespace_properties_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(repo + .set_namespace_properties("nonexistent", json!({"property": "value"})) + .is_err()); + } + + // #[test] + // fn test_create_namespace_empty_name() { + // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); + // let repo = NamespaceRepository::new(db); + // assert!(repo.create_namespace("".to_string(), None).is_err()); + // } + + // #[test] + // fn test_create_namespace_already_exists() { + // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); + // let repo = NamespaceRepository::new(db); + // repo.create_namespace("test".to_string(), None).unwrap(); + // assert!(repo.create_namespace("test".to_string(), None).is_err()); + // } + + #[test] + fn test_set_namespace_properties_empty_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(repo + .set_namespace_properties("", json!({"property": "value"})) + .is_err()); + } + + // #[test] + // fn test_set_namespace_properties_invalid_json() { + // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); + // let repo = NamespaceRepository::new(db); + // repo.create_namespace("test".to_string(), None).unwrap(); + // assert!(repo.set_namespace_properties("test", "invalid_json".into()).is_err()); + // } + + #[test] + fn test_load_namespace_empty_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(repo.load_namespace("").unwrap().is_none()); + } + + #[test] + fn test_namespace_exists_empty_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(!repo.namespace_exists("").unwrap()); + } + + // #[test] + // fn test_delete_namespace_empty_name() { + // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); + // let repo = NamespaceRepository::new(db); + // assert!(repo.delete_namespace("").is_err()); + // } + + #[test] + fn test_create_namespace_null_properties() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + assert!(repo + .create_namespace("test".to_string(), Some(json!(null))) + .is_ok()); + } + + #[test] + fn test_set_namespace_properties_null() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + repo.create_namespace("test".to_string(), None).unwrap(); + assert!(repo.set_namespace_properties("test", json!(null)).is_ok()); + } + + #[test] + fn test_set_namespace_properties_with_empty_json() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = NamespaceRepository::new(db); + repo.create_namespace("test".to_string(), None).unwrap(); + assert!(repo.set_namespace_properties("test", json!({})).is_ok()); + } } diff --git a/src/repository/table.rs b/src/repository/table.rs new file mode 100644 index 0000000..15153b1 --- /dev/null +++ b/src/repository/table.rs @@ -0,0 +1,329 @@ +use crate::database::database::Database; +use crate::dto::rename_request::TableRenameRequest; +use crate::dto::table_data::TableData; +use std::io::{Error, ErrorKind}; +use std::sync::{Arc, Mutex}; + +pub struct TableRepository { + database: Arc>, +} + +impl TableRepository { + pub fn new(database: Arc>) -> Self { + Self { database } + } + + pub fn list_all_tables(&self, namespace: &str) -> Result>, Error> { + let db = self.database.lock().unwrap(); + db.get::>("TableNamespaceMap", namespace) + } + + pub fn create_table(&self, namespace: &str, table: &TableData) -> Result<(), Error> { + let db = self.database.lock().unwrap(); + db.insert("TableData", &table.name, table)?; + let mut tables = db + .get::>("TableNamespaceMap", namespace) + .unwrap() + .unwrap_or_else(|| vec![]); + tables.push(table.name.clone()); + let r_val = db.insert("TableNamespaceMap", namespace, &tables); + r_val + } + + pub fn register_table(&self, namespace: &str, table: &TableData) -> Result<(), Error> { + self.create_table(namespace, table) + } + + pub fn load_table( + &self, + namespace: &str, + table_name: &str, + ) -> Result, Error> { + // Check if the table is in the given namespace + let tables_in_namespace = self.list_all_tables(namespace)?; + if let Some(tables) = tables_in_namespace { + if !tables.contains(&table_name.to_string()) { + return Err(Error::new( + ErrorKind::NotFound, + "Table not found in the given namespace", + )); + } + } + let db = self.database.lock().unwrap(); + // If the table is in the namespace, get the table data + db.get::("TableData", table_name) + } + + pub fn drop_table(&self, namespace: &str, table_name: &str) -> Result<(), Error> { + let db = self.database.lock().unwrap(); + db.delete("TableData", table_name)?; + let mut tables = db + .get::>("TableNamespaceMap", namespace) + .unwrap() + .unwrap(); + tables.retain(|name| name != table_name); + db.insert("TableNamespaceMap", namespace, &tables) + } + + // for the ?? route + pub fn insert_table(&self, namespace: &str, table: &TableData) -> Result<(), Error> { + self.create_table(namespace, table) + } + + pub fn table_exists(&self, namespace: &str, table_name: &str) -> Result { + let table = self.load_table(namespace, table_name)?; + Ok(table.is_some()) + } + + pub fn rename_table(&self, rename_request: &TableRenameRequest) -> Result<(), Error> { + let namespace = &rename_request.namespace; + let old_name = &rename_request.old_name; + let new_name = &rename_request.new_name; + let table = self + .load_table(namespace, old_name)? + .ok_or_else(|| Error::new(ErrorKind::NotFound, "Table not found"))?; + let mut new_table = table.clone(); + new_table.name = new_name.clone(); + self.drop_table(namespace, old_name)?; + self.create_table(namespace, &new_table) + } +} + +// todo: check commented tests +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::sync::{Arc, Mutex}; + use tempfile::tempdir; + + #[test] + fn test_list_all_tables() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + assert_eq!(repo.list_all_tables("namespace").unwrap(), None); + } + + #[test] + fn test_create_table() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + assert!(repo.create_table("namespace", &table).is_ok()); + } + + #[test] + fn test_load_table() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + repo.create_table("namespace", &table).unwrap(); + assert!(repo.load_table("namespace", "table").unwrap().is_some()); + } + + #[test] + fn test_drop_table() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + repo.create_table("namespace", &table).unwrap(); + assert!(repo.drop_table("namespace", "table").is_ok()); + } + + #[test] + fn test_table_exists() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + repo.create_table("namespace", &table).unwrap(); + assert!(repo.table_exists("namespace", "table").unwrap()); + } + + #[test] + fn test_rename_table() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + repo.create_table("namespace", &table).unwrap(); + let rename_request = TableRenameRequest { + namespace: "namespace".to_string(), + old_name: "table".to_string(), + new_name: "new_table".to_string(), + }; + assert!(repo.rename_table(&rename_request).is_ok()); + } + + #[test] + fn test_load_table_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + assert!(repo + .load_table("namespace", "nonexistent") + .unwrap() + .is_none()); + } + + #[test] + fn test_table_exists_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + assert!(!repo.table_exists("namespace", "nonexistent").unwrap()); + } + + /* + #[test] + fn test_drop_table_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + assert!(repo.drop_table("namespace", "nonexistent").is_err()); + } + */ + + #[test] + fn test_rename_table_not_found() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let rename_request = TableRenameRequest { + namespace: "namespace".to_string(), + old_name: "nonexistent".to_string(), + new_name: "new_table".to_string(), + }; + assert!(repo.rename_table(&rename_request).is_err()); + } + + /* + #[test] + fn test_create_table_empty_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + assert!(repo.create_table("namespace", &table).is_err()); + } + + + #[test] + fn test_create_table_already_exists() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + repo.create_table("namespace", &table).unwrap(); + assert!(repo.create_table("namespace", &table).is_err()); + } + */ + + #[test] + fn test_load_table_empty_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + assert!(repo.load_table("namespace", "").unwrap().is_none()); + } + /* + #[test] + fn test_drop_table_empty_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + assert!(repo.drop_table("namespace", "").is_err()); + } + + + #[test] + fn test_rename_table_empty_new_name() { + let db = Arc::new(Mutex::new( + Database::open(tempdir().unwrap().path()).unwrap(), + )); + let repo = TableRepository::new(db); + let table = TableData { + name: "table".to_string(), + num_columns: 0, + read_properties: json!({}), + write_properties: json!({}), + file_urls: vec![], + columns: vec![], + }; + repo.create_table("namespace", &table).unwrap(); + let rename_request = TableRenameRequest { + namespace: "namespace".to_string(), + old_name: "table".to_string(), + new_name: "".to_string(), + }; + assert!(repo.rename_table(&rename_request).is_err()); + } + */ +} diff --git a/src/routes/namespace.rs b/src/routes/namespace.rs index 4ffb281..455fc22 100644 --- a/src/routes/namespace.rs +++ b/src/routes/namespace.rs @@ -1,10 +1,16 @@ +use crate::database::database::Database; use crate::handlers::namespace_handler; use axum::{ routing::{delete, get, head, post}, Router, }; +use std::sync::{Arc, Mutex}; + +use crate::repository::namespace::NamespaceRepository; + +pub fn routes(db: Arc>) -> Router { + let repo = Arc::new(NamespaceRepository::new(db)); -pub fn routes() -> Router { let router = Router::new() .route("/namespaces", get(namespace_handler::list_namespaces)) .route("/namespaces", post(namespace_handler::create_namespace)) @@ -23,6 +29,8 @@ pub fn routes() -> Router { .route( "/namespace/:namespace/properties", post(namespace_handler::set_namespace_properties), - ); + ) + .with_state(repo); + return router; } diff --git a/src/routes/root.rs b/src/routes/root.rs index 9ebd46c..2e9e892 100644 --- a/src/routes/root.rs +++ b/src/routes/root.rs @@ -1,13 +1,13 @@ +use crate::database::database::Database; use crate::routes::{namespace, table}; -use axum::routing::IntoMakeService; use axum::Router; -use tower_http::trace::TraceLayer; +use std::sync::{Arc, Mutex}; -pub fn routes() -> Router { - // merge the 2 routes +pub fn routes(db: Arc>) -> Router { + // Pass the shared Database object to your routes let app_router = Router::new() - .nest("/", table::routes()) - .nest("/", namespace::routes()); + .nest("/", table::routes(db.clone())) + .nest("/", namespace::routes(db.clone())); app_router } diff --git a/src/routes/table.rs b/src/routes/table.rs index 586a18e..91da45a 100644 --- a/src/routes/table.rs +++ b/src/routes/table.rs @@ -1,10 +1,14 @@ +use crate::database::database::Database; use crate::handlers::table_handler; +use crate::repository::table::TableRepository; use axum::{ routing::{delete, get, head, post}, Router, }; +use std::sync::{Arc, Mutex}; -pub fn routes() -> Router { +pub fn routes(db: Arc>) -> Router { + let repo = Arc::new(TableRepository::new(db)); let router = Router::new() .route( "/namespaces/:namespace/tables", @@ -35,10 +39,7 @@ pub fn routes() -> Router { "/namespaces/:namespace/tables/:table/metrics", post(table_handler::report_metrics), ) - .route( - "/namespaces/:namespace/tables/:table/find/:tuple_id", - get(table_handler::find_tuple_location), - ); + .with_state(repo); return router; }