Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Merge #315
Browse files Browse the repository at this point in the history
315: introduce stats route r=MarinPostma a=MarinPostma

This PR introduces the `GET /v1/stats` route that return the total amount of rows read/written by an instance:

```
{
    "rows_read_count": 0,
    "rows_written_count": 0
}
```

The number of rows read/written persists accross restarts, thanks to a sync loop that flushes the information to disk about every 5s.


Co-authored-by: ad hoc <[email protected]>
Co-authored-by: Piotr Sarna <[email protected]>
  • Loading branch information
3 people authored Mar 30, 2023
2 parents 46e4d88 + 7b55732 commit 90c96ba
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 373 deletions.
782 changes: 425 additions & 357 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sqld-libsql-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
anyhow = "1.0.66"
mvfs = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true }
mwal = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true }
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "63b7aabfccbc21738", default-features = false, features = [
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
"buildtime_bindgen",
"bundled-libsql-wasm-experimental",
"column_decltype"
Expand Down
2 changes: 1 addition & 1 deletion sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pin-project-lite = "0.2.9"
postgres-protocol = "0.6.4"
prost = "0.11.3"
regex = "1.7.0"
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "63b7aabfccbc21738", default-features = false, features = [
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
"buildtime_bindgen",
"bundled-libsql-wasm-experimental",
"column_decltype"
Expand Down
29 changes: 23 additions & 6 deletions sqld/src/database/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::str::FromStr;
use std::time::{Duration, Instant};

use crossbeam::channel::RecvTimeoutError;
use rusqlite::OpenFlags;
use rusqlite::{OpenFlags, StatementStatus};
use tokio::sync::oneshot;
use tracing::warn;

use crate::error::Error;
use crate::libsql::wal_hook::WalHook;
use crate::query::{Column, Query, QueryResponse, QueryResult, ResultSet, Row};
use crate::query_analysis::{State, Statement};
use crate::stats::Stats;
use crate::Result;

use super::{Cond, Database, Program, Step, TXN_TIMEOUT_SECS};
Expand Down Expand Up @@ -143,11 +144,13 @@ impl LibSqlDb {
path: impl AsRef<Path> + Send + 'static,
wal_hook: impl WalHook + Send + Clone + 'static,
with_bottomless: bool,
stats: Stats,
) -> crate::Result<Self> {
let (sender, receiver) = crossbeam::channel::unbounded::<Message>();

tokio::task::spawn_blocking(move || {
let mut connection = Connection::new(path.as_ref(), wal_hook, with_bottomless).unwrap();
let mut connection =
Connection::new(path.as_ref(), wal_hook, with_bottomless, stats).unwrap();
loop {
let Message { pgm, resp } = match connection.state.deadline() {
Some(deadline) => match receiver.recv_deadline(deadline) {
Expand Down Expand Up @@ -189,18 +192,21 @@ struct Connection {
state: ConnectionState,
conn: rusqlite::Connection,
timed_out: bool,
stats: Stats,
}

impl Connection {
fn new(
path: &Path,
wal_hook: impl WalHook + Send + Clone + 'static,
with_bottomless: bool,
stats: Stats,
) -> anyhow::Result<Self> {
Ok(Self {
conn: open_db(path, wal_hook, with_bottomless)?,
state: ConnectionState::initial(),
timed_out: false,
stats,
})
}

Expand Down Expand Up @@ -245,8 +251,8 @@ impl Connection {

fn execute_query_inner(&self, query: &Query) -> QueryResult {
let mut rows = vec![];
let mut prepared = self.conn.prepare(&query.stmt.stmt)?;
let columns = prepared
let mut stmt = self.conn.prepare(&query.stmt.stmt)?;
let columns = stmt
.columns()
.iter()
.map(|col| Column {
Expand All @@ -262,10 +268,10 @@ impl Connection {

query
.params
.bind(&mut prepared)
.bind(&mut stmt)
.map_err(Error::LibSqlInvalidQueryParams)?;

let mut qresult = prepared.raw_query();
let mut qresult = stmt.raw_query();
while let Some(row) = qresult.next()? {
let mut values = vec![];
for (i, _) in columns.iter().enumerate() {
Expand All @@ -288,6 +294,10 @@ impl Connection {
false => None,
};

drop(qresult);

self.update_stats(&stmt);

Ok(QueryResponse::ResultSet(ResultSet {
columns,
rows,
Expand All @@ -302,6 +312,13 @@ impl Connection {
.execute("rollback transaction;", ())
.expect("failed to rollback");
}

fn update_stats(&self, stmt: &rusqlite::Statement) {
self.stats
.inc_rows_read(stmt.get_status(StatementStatus::RowsRead) as usize);
self.stats
.inc_rows_written(stmt.get_status(StatementStatus::RowsWritten) as usize);
}
}

fn eval_cond(cond: &Cond, results: &[Option<QueryResult>]) -> Result<bool> {
Expand Down
14 changes: 11 additions & 3 deletions sqld/src/database/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
use crate::rpc::proxy::rpc::query_result::RowResult;
use crate::rpc::proxy::rpc::DisconnectMessage;
use crate::rpc::replication_log::rpc::replication_log_client::ReplicationLogClient;
use crate::stats::Stats;
use crate::Result;

use super::Program;
Expand All @@ -25,6 +26,7 @@ use super::{libsql::LibSqlDb, service::DbFactory, Database};
pub struct WriteProxyDbFactory {
write_proxy: ProxyClient<Channel>,
db_path: PathBuf,
stats: Stats,
/// abort handle: abort db update loop on drop
_abort_handle: crossbeam::channel::Sender<()>,
}
Expand All @@ -37,6 +39,7 @@ impl WriteProxyDbFactory {
key_path: Option<PathBuf>,
ca_cert_path: Option<PathBuf>,
db_path: PathBuf,
stats: Stats,
) -> anyhow::Result<(Self, JoinHandle<anyhow::Result<()>>)> {
let mut endpoint = Channel::from_shared(addr.to_string())?;
if tls {
Expand Down Expand Up @@ -84,6 +87,7 @@ impl WriteProxyDbFactory {
let this = Self {
write_proxy,
db_path,
stats,
_abort_handle,
};
Ok((this, handle))
Expand All @@ -93,7 +97,11 @@ impl WriteProxyDbFactory {
#[async_trait::async_trait]
impl DbFactory for WriteProxyDbFactory {
async fn create(&self) -> Result<Arc<dyn Database>> {
let db = WriteProxyDatabase::new(self.write_proxy.clone(), self.db_path.clone())?;
let db = WriteProxyDatabase::new(
self.write_proxy.clone(),
self.db_path.clone(),
self.stats.clone(),
)?;
Ok(Arc::new(db))
}
}
Expand All @@ -106,8 +114,8 @@ pub struct WriteProxyDatabase {
}

impl WriteProxyDatabase {
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf) -> Result<Self> {
let read_db = LibSqlDb::new(path, (), false)?;
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf, stats: Stats) -> Result<Self> {
let read_db = LibSqlDb::new(path, (), false, stats)?;
Ok(Self {
read_db,
write_proxy,
Expand Down
8 changes: 8 additions & 0 deletions sqld/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod hrana_over_http;
mod stats;
mod types;

use std::future::poll_fn;
Expand Down Expand Up @@ -29,6 +30,7 @@ use crate::hrana;
use crate::http::types::HttpQuery;
use crate::query::{self, Query, QueryResult, ResultSet};
use crate::query_analysis::{predict_final_state, State, Statement};
use crate::stats::Stats;
use crate::utils::services::idle_shutdown::IdleShutdownLayer;

use self::types::QueryObject;
Expand Down Expand Up @@ -236,6 +238,7 @@ async fn handle_request(
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
db_factory: Arc<dyn DbFactory>,
enable_console: bool,
stats: Stats,
) -> anyhow::Result<Response<Body>> {
if hyper_tungstenite::is_upgrade_request(&req) {
return Ok(handle_upgrade(&upgrade_tx, req).await);
Expand All @@ -257,6 +260,7 @@ async fn handle_request(
(&Method::GET, "/v1") => hrana_over_http::handle_index(req).await,
(&Method::POST, "/v1/execute") => hrana_over_http::handle_execute(db_factory, req).await,
(&Method::POST, "/v1/batch") => hrana_over_http::handle_batch(db_factory, req).await,
(&Method::GET, "/v1/stats") => Ok(stats::handle_stats(&stats)),
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()),
}
}
Expand All @@ -266,6 +270,8 @@ fn handle_version() -> Response<Body> {
Response::new(Body::from(version.as_bytes()))
}

// TODO: refactor
#[allow(clippy::too_many_arguments)]
pub async fn run_http<F>(
addr: SocketAddr,
auth: Arc<Auth>,
Expand All @@ -274,6 +280,7 @@ pub async fn run_http<F>(
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
enable_console: bool,
idle_shutdown_layer: Option<IdleShutdownLayer>,
stats: Stats,
) -> anyhow::Result<()>
where
F: MakeService<(), Vec<Query>> + Send + 'static,
Expand Down Expand Up @@ -317,6 +324,7 @@ where
upgrade_tx.clone(),
db_factory.clone(),
enable_console,
stats.clone(),
)
});

Expand Down
23 changes: 23 additions & 0 deletions sqld/src/http/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use hyper::{Body, Response};
use serde::Serialize;

use crate::stats::Stats;

#[derive(Serialize)]
struct StatsResponse {
rows_read_count: usize,
rows_written_count: usize,
}

pub fn handle_stats(stats: &Stats) -> Response<Body> {
let resp = StatsResponse {
rows_read_count: stats.rows_read(),
rows_written_count: stats.rows_written(),
};

let payload = serde_json::to_vec(&resp).unwrap();
Response::builder()
.header("Content-Type", "application/json")
.body(Body::from(payload))
.unwrap()
}
21 changes: 16 additions & 5 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::auth::Auth;
use crate::error::Error;
use crate::postgres::service::PgConnectionFactory;
use crate::server::Server;
use crate::stats::Stats;

pub use sqld_libsql_bindings as libsql;

Expand All @@ -39,6 +40,7 @@ mod query_analysis;
mod replication;
pub mod rpc;
mod server;
mod stats;
mod utils;

#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -97,6 +99,7 @@ async fn run_service(
config: &Config,
join_set: &mut JoinSet<anyhow::Result<()>>,
idle_shutdown_layer: Option<IdleShutdownLayer>,
stats: Stats,
) -> anyhow::Result<()> {
let auth = get_auth(config)?;

Expand All @@ -122,6 +125,7 @@ async fn run_service(
upgrade_tx,
config.enable_http_console,
idle_shutdown_layer.clone(),
stats,
));
}

Expand Down Expand Up @@ -183,6 +187,7 @@ async fn start_replica(
join_set: &mut JoinSet<anyhow::Result<()>>,
addr: &str,
idle_shutdown_layer: Option<IdleShutdownLayer>,
stats: Stats,
) -> anyhow::Result<()> {
let (factory, handle) = WriteProxyDbFactory::new(
addr,
Expand All @@ -191,6 +196,7 @@ async fn start_replica(
config.writer_rpc_key.clone(),
config.writer_rpc_ca_cert.clone(),
config.db_path.clone(),
stats.clone(),
)
.await
.context("failed to start WriteProxy DB")?;
Expand All @@ -201,7 +207,7 @@ async fn start_replica(
join_set.spawn(async move { handle.await.expect("WriteProxy DB task failed") });

let service = DbFactoryService::new(Arc::new(factory));
run_service(service, config, join_set, idle_shutdown_layer).await?;
run_service(service, config, join_set, idle_shutdown_layer, stats).await?;

Ok(())
}
Expand All @@ -214,6 +220,7 @@ async fn start_primary(
config: &Config,
join_set: &mut JoinSet<anyhow::Result<()>>,
idle_shutdown_layer: Option<IdleShutdownLayer>,
stats: Stats,
) -> anyhow::Result<()> {
let is_fresh_db = check_fresh_db(&config.db_path);
let logger = Arc::new(ReplicationLogger::open(
Expand All @@ -237,10 +244,12 @@ async fn start_primary(
dump_loader.load_dump(path.into()).await?;
}

let stats_clone = stats.clone();
let db_factory = Arc::new(move || {
let db_path = path_clone.clone();
let hook = hook.clone();
async move { LibSqlDb::new(db_path, hook, enable_bottomless) }
let stats_clone = stats_clone.clone();
async move { LibSqlDb::new(db_path, hook, enable_bottomless, stats_clone) }
});
let service = DbFactoryService::new(db_factory.clone());
if let Some(ref addr) = config.rpc_server_addr {
Expand All @@ -256,7 +265,7 @@ async fn start_primary(
));
}

run_service(service, config, join_set, idle_shutdown_layer).await?;
run_service(service, config, join_set, idle_shutdown_layer, stats).await?;

Ok(())
}
Expand Down Expand Up @@ -309,11 +318,13 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
.idle_shutdown_timeout
.map(|d| IdleShutdownLayer::new(d, shutdown_notify.clone()));

let stats = Stats::new(&config.db_path)?;

match config.writer_rpc_addr {
Some(ref addr) => {
start_replica(&config, &mut join_set, addr, idle_shutdown_layer).await?
start_replica(&config, &mut join_set, addr, idle_shutdown_layer, stats).await?
}
None => start_primary(&config, &mut join_set, idle_shutdown_layer).await?,
None => start_primary(&config, &mut join_set, idle_shutdown_layer, stats).await?,
}

let reset = HARD_RESET.clone();
Expand Down
Loading

0 comments on commit 90c96ba

Please sign in to comment.