Skip to content

Commit

Permalink
refresh static data for new bus stops and use shapes.txt for train ge…
Browse files Browse the repository at this point in the history
…ometry
  • Loading branch information
jonerrr committed Jan 27, 2025
1 parent 2be471a commit db81236
Show file tree
Hide file tree
Showing 10 changed files with 1,183 additions and 545 deletions.
19 changes: 15 additions & 4 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ geo = { version = "0.29.3", features = ["use-serde"] }
headers = "0.4.0"
http = "1.2.0"
indicatif = "0.17.9"
itertools = "0.14.0"
# itertools = "0.13.0"
polyline = "0.11.0"
prost = "0.13.4"
Expand Down
2 changes: 1 addition & 1 deletion backend/src/api/static_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn cache_headers(hash: String) -> HeaderMap {
Parameters
),
responses(
(status = 200, description = "Subway and bus routes. WARNING: SIR geometry is missing.", body = [Route]),
(status = 200, description = "Subway and bus routes. WARNING: W train geometry is missing.", body = [Route]),
(status = 304, description = "If no parameters are provided and the etag matches the request")
)
)]
Expand Down
8 changes: 7 additions & 1 deletion backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

static_data::import(pg_pool.clone(), notify, redis_pool.clone()).await;
static_data::import(
pg_pool.clone(),
Some(notify),
redis_pool.clone(),
var("FORCE_UPDATE").is_ok(),
)
.await;
// Wait for static data to be loaded
notify2.notified().await;
}
Expand Down
28 changes: 26 additions & 2 deletions backend/src/realtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub async fn import(
let t_pool = pool.clone();
let b_pool = pool.clone();
let s_pool = pool.clone();
let redis_pool = redis_pool.clone();

tokio::spawn(async move {
loop {
Expand All @@ -100,9 +101,32 @@ pub async fn import(

tokio::spawn(async move {
loop {
let _ = bus::import(&b_pool).await.inspect_err(|e| {
if let Err(e) = bus::import(&b_pool).await {
tracing::error!("bus::import: {:#?}", e);
});
if let ImportError::Sqlx(err) = e {
// This error probably means theres a new bus stop, so we will update static data
if err.to_string().contains("stop_time_stop_id_fkey") {
tracing::warn!("updating static data for new bus stop");

crate::static_data::import(
b_pool.clone(),
None,
redis_pool.clone(),
true,
)
.await;
}
}
}
// let _ = bus::import(&b_pool).await.inspect_err(async move |e| {
// tracing::error!("bus::import: {:#?}", e);
// if let ImportError::Sqlx(err) = e {
// // This error probably means theres a new bus stop, so we will update static data
// if err.to_string().contains("stop_time_stop_id_fkey") {
// crate::static_data::cache_all(&b_pool, &redis_pool).await;
// }
// }
// });

sleep(Duration::from_secs(35)).await;
}
Expand Down
94 changes: 52 additions & 42 deletions backend/src/static_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ pub mod stop;

pub async fn import(
pool: PgPool,
notify: Arc<Notify>,
notify: Option<Arc<Notify>>,
redis_pool: bb8::Pool<RedisConnectionManager>,
force_update: bool,
) {
// let pool = pool.clone();
// let redis_pool = redis_pool.clone();
tokio::spawn(async move {
loop {
let last_updated = sqlx::query!("SELECT update_at FROM last_update")
Expand All @@ -28,13 +31,15 @@ pub async fn import(
.unwrap();

// If user wants to FORCE_UPDATE, then don't check for last updated
if var("FORCE_UPDATE").is_err() {
if !force_update {
// Data should be refreshed every 3 days
if let Some(last_updated) = last_updated {
tracing::info!("Last updated at: {}", last_updated.update_at);

// if there is a last updated that means theres already data and the rest of the app can start
notify.notify_one();
if let Some(notify) = &notify {
notify.notify_one();
}

let duration_since_last_update =
Utc::now().signed_duration_since(last_updated.update_at);
Expand All @@ -50,7 +55,9 @@ pub async fn import(
}
} else {
// Remove the FORCE_UPDATE env variable so it doesn't keep updating
remove_var("FORCE_UPDATE");
if let Ok(_) = var("FORCE_UPDATE") {
remove_var("FORCE_UPDATE");
}
}
tracing::info!("Updating static data");

Expand All @@ -63,42 +70,43 @@ pub async fn import(
.await
.unwrap();

// bc the zip crate doesn't support tokio, we need to read it all here (I think i can remove this and the issue i was having was just bc i forgot to clone the archive)
let (gtfs_routes, gtfs_transfers) = tokio::task::spawn_blocking(move || {
let reader = Cursor::new(gtfs);
let mut archive = zip::ZipArchive::new(reader).unwrap();
let mut archive2 = archive.clone();
// let mut archive3 = archive.clone();

let routes_file = archive.by_name("routes.txt").unwrap();
let transfers_file = archive2.by_name("transfers.txt").unwrap();
// let shape_file = archive3.by_name("shapes.txt").unwrap();

let mut routes_rdr = csv::Reader::from_reader(routes_file);
let mut transfers_rdr = csv::Reader::from_reader(transfers_file);
// let mut shape_rdr = csv::Reader::from_reader(shape_file);

let routes = routes_rdr
.deserialize()
.collect::<Result<Vec<route::GtfsRoute>, csv::Error>>()
.unwrap();

let transfers = transfers_rdr
.deserialize()
.collect::<Result<Vec<stop::Transfer<String>>, csv::Error>>()
.unwrap();

// let geom = shape_rdr
// .deserialize()
// .collect::<Result<Vec<route::GtfsRouteGeom>, csv::Error>>()
// .unwrap();

(routes, transfers)
})
.await
.unwrap();

let mut routes = route::Route::parse_train(gtfs_routes).await;
// TODO: figure out a cleaner way to extract multiple files from zip (without cloning)
let (gtfs_routes, gtfs_transfers, gtfs_shapes) =
tokio::task::spawn_blocking(move || {
let reader = Cursor::new(gtfs);
let mut archive = zip::ZipArchive::new(reader).unwrap();
let mut archive2 = archive.clone();
let mut archive3 = archive.clone();

let routes_file = archive.by_name("routes.txt").unwrap();
let transfers_file = archive2.by_name("transfers.txt").unwrap();
let shape_file = archive3.by_name("shapes.txt").unwrap();

let mut routes_rdr = csv::Reader::from_reader(routes_file);
let mut transfers_rdr = csv::Reader::from_reader(transfers_file);
let mut shape_rdr = csv::Reader::from_reader(shape_file);

let routes = routes_rdr
.deserialize()
.collect::<Result<Vec<route::GtfsRoute>, csv::Error>>()
.unwrap();

let transfers = transfers_rdr
.deserialize()
.collect::<Result<Vec<stop::Transfer<String>>, csv::Error>>()
.unwrap();

let shapes = shape_rdr
.deserialize()
.collect::<Result<Vec<route::GtfsShape>, csv::Error>>()
.unwrap();

(routes, transfers, shapes)
})
.await
.unwrap();

let mut routes = route::Route::parse_train(gtfs_routes, gtfs_shapes).await;
let (mut stops, mut route_stops) = stop::Stop::parse_train(
routes.iter().map(|r| r.id.clone()).collect(),
gtfs_transfers,
Expand Down Expand Up @@ -129,7 +137,9 @@ pub async fn import(
cache_all(&pool, &redis_pool).await.unwrap();

tracing::info!("Data updated");
notify.notify_one();
if let Some(notify) = &notify {
notify.notify_one();
}
}
});
}
Expand Down Expand Up @@ -174,7 +184,7 @@ pub async fn cache_all(

// TODO: remove extra filter once we have all routes with geom
let (bus_route_features, train_route_features) =
routes.iter().filter(|r| &r.id != "SI" && serde_json::from_value::<geo::MultiLineString>(r.geom.clone().unwrap()).is_ok()).fold(
routes.iter().filter(|r| serde_json::from_value::<geo::MultiLineString>(r.geom.clone().unwrap()).is_ok()).fold(
(Vec::new(), Vec::new()),
|(mut bus_acc, mut train_acc), r| {
let geom: geo::MultiLineString =
Expand Down
Loading

0 comments on commit db81236

Please sign in to comment.