Skip to content

Commit

Permalink
test: adding unit test for kill signal on database
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez committed Dec 7, 2024
1 parent 511a4b7 commit 894de32
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integrationos-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ tracing-subscriber.workspace = true
tracing.workspace = true

[dev-dependencies]
mockito.workspace = true
testcontainers-modules = { workspace = true, features = ["postgres"] }
21 changes: 6 additions & 15 deletions integrationos-database/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,14 @@ fn main() -> Result<()> {
.block_on(async move {
match config.database_connection_type {
DatabaseConnectionType::PostgreSql => {
match PostgresDatabaseConnection::init(&config).await {
Ok(server) => {
if let Err(e) = server.run().await {
PostgresDatabaseConnection::kill(&config, e.to_string()).await?;
return Err(anyhow::anyhow!("Could not run server: {e}"));
}
let server = PostgresDatabaseConnection::init(&config).await?;

Ok(())
}
Err(e) => {
tracing::error!("Could not initialize storage: {e}");

PostgresDatabaseConnection::kill(&config, e.to_string()).await?;

Err(anyhow::anyhow!("Could not initialize storage: {e}"))
}
if let Err(e) = server.run().await {
PostgresDatabaseConnection::kill(&config, e.to_string()).await?;
return Err(e);
}

Ok(())
}
}
})
Expand Down
31 changes: 21 additions & 10 deletions integrationos-database/src/service/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@ pub trait Initializer {
#[async_trait]
impl Initializer for PostgresDatabaseConnection {
async fn init(config: &DatabaseConnectionConfig) -> Result<Server, anyhow::Error> {
let postgres: PostgresDatabaseConnection = PostgresDatabaseConnection::new(config).await?;
let storage: Arc<dyn Storage> = Arc::new(postgres);

Ok(Server {
state: Arc::new(AppState {
config: config.clone(),
storage,
}),
})
let postgres = PostgresDatabaseConnection::new(config).await;

match postgres {
Ok(postgres) => {
let storage: Arc<dyn Storage> = Arc::new(postgres);

Ok(Server {
state: Arc::new(AppState {
config: config.clone(),
storage,
}),
})
}
Err(e) => {
PostgresDatabaseConnection::kill(config, e.to_string()).await?;
Err(e)
}
}
}

async fn kill(
Expand All @@ -50,10 +59,12 @@ impl Initializer for PostgresDatabaseConnection {
client
.post(format!("{}/v1/emit", emit_url))
.header("content-type", "application/json")
.body(value.to_string())
.json(&value)
.send()
.await?;

tracing::info!("Event for dispose of connection {connection_id} emitted");

Ok(())
}
}
19 changes: 12 additions & 7 deletions integrationos-database/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ static TRACING: OnceLock<()> = OnceLock::new();
pub struct TestServer {
pub port: u16,
pub client: reqwest::Client,
// pub mock_server: ServerGuard,
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -34,7 +35,7 @@ pub struct ApiResponse<T: DeserializeOwned = Value> {
}

impl TestServer {
pub async fn new() -> Result<Self, IntegrationOSError> {
pub async fn new(r#override: HashMap<String, String>) -> Result<Self, IntegrationOSError> {
TRACING.get_or_init(|| {
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
Expand All @@ -53,7 +54,7 @@ impl TestServer {
.expect("Failed to get local address")
.port();

let config = DatabaseConnectionConfig::init_from_hashmap(&HashMap::from([
let config_map: HashMap<String, String> = HashMap::from([
(
"INTERNAL_SERVER_ADDRESS".to_string(),
format!("0.0.0.0:{server_port}"),
Expand All @@ -71,12 +72,15 @@ impl TestServer {
("POSTGRES_HOST".to_string(), "localhost".to_string()),
("POSTGRES_PORT".to_string(), port.to_string()),
("POSTGRES_NAME".to_string(), "postgres".to_string()),
]))
.expect("Failed to initialize storage config");
])
.into_iter()
.chain(r#override.into_iter())
.collect::<HashMap<String, String>>();

let server = PostgresDatabaseConnection::init(&config)
.await
.expect("Failed to initialize storage");
let config = DatabaseConnectionConfig::init_from_hashmap(&config_map)
.expect("Failed to initialize storage config");

let server = PostgresDatabaseConnection::init(&config).await?;

tokio::task::spawn(async move { server.run().await });

Expand All @@ -87,6 +91,7 @@ impl TestServer {
Ok(Self {
port: server_port,
client,
// mock_server,
})
}

Expand Down
11 changes: 6 additions & 5 deletions integrationos-database/tests/http/connection.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::context::TestServer;
use http::{Method, StatusCode};
use integrationos_domain::IntegrationOSError;
use integrationos_domain::{IntegrationOSError, Unit};
use serde_json::Value;
use std::collections::HashMap;

#[tokio::test]
async fn test_execute_probe() -> Result<(), IntegrationOSError> {
let server = TestServer::new().await?;
async fn test_execute_probe() -> Result<Unit, IntegrationOSError> {
let server = TestServer::new(HashMap::new()).await?;
let result = server
.send_request::<Value, Value>("database/probe", Method::GET, None)
.await?;
Expand All @@ -18,8 +19,8 @@ async fn test_execute_probe() -> Result<(), IntegrationOSError> {
}

#[tokio::test]
async fn test_execute_raw() -> Result<(), IntegrationOSError> {
let server = TestServer::new().await?;
async fn test_execute_raw() -> Result<Unit, IntegrationOSError> {
let server = TestServer::new(HashMap::new()).await?;

let create_query =
"CREATE TABLE IF NOT EXISTS users (id BIGSERIAL PRIMARY KEY, name TEXT NOT NULL);";
Expand Down
1 change: 1 addition & 0 deletions integrationos-database/tests/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod connection;
pub mod signal;
37 changes: 16 additions & 21 deletions integrationos-emit/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct ApiResponse<T: DeserializeOwned = Value> {
}

impl TestServer {
pub async fn new(stream: bool) -> Result<Self, IntegrationOSError> {
pub async fn new() -> Result<Self, IntegrationOSError> {
TRACING.get_or_init(|| {
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
Expand Down Expand Up @@ -69,7 +69,9 @@ impl TestServer {
.expect("Failed to get local address")
.port();

let mut config = vec![
let mock_server = MockServer::new_async().await;
let mock_uri = mock_server.url();
let config = vec![
(
"INTERNAL_SERVER_ADDRESS".to_string(),
format!("0.0.0.0:{server_port}"),
Expand All @@ -90,32 +92,25 @@ impl TestServer {
),
("PARTITION_COUNT".to_string(), "1".to_string()),
("ENVIRONMENT".to_string(), "test".to_string()),
];

let mock_server = MockServer::new_async().await;

if stream {
let uri = mock_server.url();

config.push(("EVENT_STREAM_PROVIDER".to_string(), "fluvio".to_string()));
config.push(("EVENT_STREAM_PORT".to_string(), "9103".to_string()));
config.push((
("EVENT_STREAM_PROVIDER".to_string(), "fluvio".to_string()),
("EVENT_STREAM_PORT".to_string(), "9103".to_string()),
(
"EVENT_STREAM_PRODUCER_TOPIC".to_string(),
"events".to_string(),
));
config.push((
),
(
"EVENT_STREAM_CONSUMER_TOPIC".to_string(),
"events".to_string(),
));
config.push((
),
(
"EVENT_STREAM_CONSUMER_GROUP".to_string(),
"event-all-partitions-consumer".to_string(),
));
config.push((
),
(
"EVENT_CALLBACK_URL".to_string(),
format!("{uri}/v1/event-callbacks"),
));
}
format!("{mock_uri}/v1/event-callbacks"),
),
];

let config = EmitterConfig::init_from_hashmap(&HashMap::from_iter(config))
.expect("Failed to initialize storage config");
Expand Down
4 changes: 2 additions & 2 deletions integrationos-emit/tests/http/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const PARALLEL_REQUESTS: usize = 10;

#[tokio::test]
async fn test_concurrent_requests() -> Result<Unit, IntegrationOSError> {
let server = TestServer::new(true).await?;
let server = TestServer::new().await?;
let payload = json!({
"type": "DatabaseConnectionLost",
"connectionId": "conn::GAL2svWJp9k::MtmXaau5Qf6R5n3Y-L9ejQ"
Expand Down Expand Up @@ -76,7 +76,7 @@ async fn test_concurrent_requests() -> Result<Unit, IntegrationOSError> {

#[tokio::test]
async fn test_event_processed() -> Result<Unit, IntegrationOSError> {
let mut server = TestServer::new(true).await?;
let mut server = TestServer::new().await?;

let id = Id::now(IdPrefix::Connection).to_string();
let payload = json!({
Expand Down

0 comments on commit 894de32

Please sign in to comment.