Skip to content

Commit

Permalink
making the DBM methods return an iterator instead of a collection
Browse files Browse the repository at this point in the history
Collecting is memory intensive. Colleting in vecs might be better than
collecting in hashsets and hashmaps but still incurs a lot of memory
usage for big DB loads.

This commit tries to avoid collecting data and stream it using an
iterator instead.
  • Loading branch information
mariocynicys committed Feb 23, 2023
1 parent 92ad4e5 commit f12cb90
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 18 deletions.
38 changes: 38 additions & 0 deletions teos/src/db_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use rusqlite::{MappedRows, Params, Result, Row, Statement};
use std::iter::Map;

/// A struct that owns a [Statement] and has an `iter` method to iterate over the
/// results of that DB query statement.
pub struct QueryIterator<'db, P, T> {
stmt: Statement<'db>,
params_and_mapper: Option<(P, Box<dyn Fn(&Row) -> T>)>,
}

impl<'db, P, T> QueryIterator<'db, P, T>
where
P: Params,
{
/// Construct a new [QueryIterator].
pub fn new(stmt: Statement<'db>, params: P, f: impl Fn(&Row) -> T + 'static) -> Self {
Self {
stmt,
params_and_mapper: Some((params, Box::new(f))),
}
}

/// Returns an iterator over the results of the query.
///
/// This method should be called only once per [QueryIterator] and then consumed.
/// After calling this method, subsequent calls will return [None].
pub fn iter(
&mut self,
) -> Option<Map<MappedRows<'_, impl FnMut(&Row) -> Result<T>>, impl FnMut(Result<T>) -> T>>
{
self.params_and_mapper.take().map(move |(params, mapper)| {
self.stmt
.query_map(params, move |row| Ok((mapper)(row)))
.unwrap()
.map(|row| row.unwrap())
})
}
}
35 changes: 18 additions & 17 deletions teos/src/dbm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::path::PathBuf;
use std::str::FromStr;

use rusqlite::limits::Limit;
use rusqlite::{params, params_from_iter, Connection, Error as SqliteError};
use rusqlite::{params, params_from_iter, Connection, Error as SqliteError, Row, ParamsFromIter};

use bitcoin::consensus;
use bitcoin::hashes::Hash;
Expand All @@ -19,6 +19,7 @@ use teos_common::constants::ENCRYPTED_BLOB_MAX_SIZE;
use teos_common::dbm::{DatabaseConnection, DatabaseManager, Error};
use teos_common::UserId;

use crate::db_iterator::*;
use crate::extended_appointment::{AppointmentSummary, ExtendedAppointment, UUID};
use crate::gatekeeper::UserInfo;
use crate::responder::{ConfirmationStatus, TrackerSummary, TransactionTracker};
Expand Down Expand Up @@ -306,31 +307,31 @@ impl DBM {
}

/// Loads all [AppointmentSummary]s from that database.
pub(crate) fn load_appointment_summaries(&self) -> HashMap<UUID, AppointmentSummary> {
let mut summaries = HashMap::new();

let mut stmt = self
.connection
.prepare(
"SELECT a.UUID, a.locator, a.user_id
FROM appointments as a LEFT JOIN trackers as t ON a.UUID=t.UUID WHERE t.UUID IS NULL",
)
.unwrap();
let mut rows = stmt.query([]).unwrap();
pub(crate) fn load_appointment_summaries(
&self,
) -> QueryIterator<ParamsFromIter<[u8; 0]>, (UUID, AppointmentSummary)> {
let stmt = self
.connection
.prepare(
"SELECT a.UUID, a.locator, a.user_id
FROM appointments as a LEFT JOIN trackers as t ON a.UUID=t.UUID WHERE t.UUID IS NULL",
)
.unwrap();

while let Ok(Some(row)) = rows.next() {
let func = |row: &Row| {
let raw_uuid: Vec<u8> = row.get(0).unwrap();
let raw_locator: Vec<u8> = row.get(1).unwrap();
let raw_userid: Vec<u8> = row.get(2).unwrap();
summaries.insert(
(
UUID::from_slice(&raw_uuid).unwrap(),
AppointmentSummary::new(
Locator::from_slice(&raw_locator).unwrap(),
UserId::from_slice(&raw_userid).unwrap(),
),
);
}
summaries
)
};

QueryIterator::new(stmt, params_from_iter([]), func)
}

/// Loads appointments from the database. If a locator is given, this method loads only the appointments
Expand Down
1 change: 1 addition & 0 deletions teos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod carrier;
pub mod chain_monitor;
pub mod cli_config;
pub mod config;
pub mod db_iterator;
pub mod dbm;
#[doc(hidden)]
mod errors;
Expand Down
8 changes: 7 additions & 1 deletion teos/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ impl Watcher {
) -> Self {
let mut appointments = HashMap::new();
let mut locator_uuid_map: HashMap<Locator, HashSet<UUID>> = HashMap::new();
for (uuid, summary) in dbm.lock().unwrap().load_appointment_summaries() {
for (uuid, summary) in dbm
.lock()
.unwrap()
.load_appointment_summaries()
.iter()
.unwrap()
{
if let Some(map) = locator_uuid_map.get_mut(&summary.locator) {
map.insert(uuid);
} else {
Expand Down

0 comments on commit f12cb90

Please sign in to comment.