From 5c0529d1cb02b19340c321d1530ec229471f1690 Mon Sep 17 00:00:00 2001 From: Guillaume Hivert Date: Fri, 3 May 2024 21:37:16 +0200 Subject: [PATCH] feat: retrying package fetching Signed-off-by: Guillaume Hivert --- apps/backend/src/backend.gleam | 21 ++++++---- apps/backend/src/retrier.gleam | 54 +++++++++++++++++++++++++ apps/backend/src/tasks/hex.gleam | 67 ++++++++++++++++++++++++-------- 3 files changed, 118 insertions(+), 24 deletions(-) create mode 100644 apps/backend/src/retrier.gleam diff --git a/apps/backend/src/backend.gleam b/apps/backend/src/backend.gleam index b6ed446..f66b265 100644 --- a/apps/backend/src/backend.gleam +++ b/apps/backend/src/backend.gleam @@ -6,13 +6,13 @@ import gleam/otp/supervisor import mist import periodic import setup -import wisp import tasks/hex +import wisp pub fn main() { + dot_env.load() setup.radiate() wisp.configure_logger() - dot_env.load() let secret_key_base = config.get_secret_key_base() let cnf = config.read_config() @@ -24,7 +24,11 @@ pub fn main() { |> mist.port(3000) |> mist.start_http() - let _ = start_hex_sync(cnf) + let assert Ok(_) = + supervisor.start(fn(children) { + let assert Ok(_) = start_hex_sync(cnf, children) + children + }) process.sleep_forever() } @@ -37,11 +41,14 @@ fn supervise(start: fn() -> _) { }) } -fn sync_hex(cnf: Config) { - hex.sync_new_gleam_releases(cnf) +fn sync_hex(cnf: Config, children: supervisor.Children(Nil)) { + hex.sync_new_gleam_releases(cnf, children) } -fn start_hex_sync(cnf: Config) { +fn start_hex_sync(cnf: Config, children: supervisor.Children(Nil)) { use <- supervise() - periodic.periodically(do: fn() { sync_hex(cnf) }, waiting: 60 * 1000) + periodic.periodically( + do: fn() { sync_hex(cnf, children) }, + waiting: 60 * 1000, + ) } diff --git a/apps/backend/src/retrier.gleam b/apps/backend/src/retrier.gleam new file mode 100644 index 0000000..d7ae52c --- /dev/null +++ b/apps/backend/src/retrier.gleam @@ -0,0 +1,54 @@ +import backend/index/error.{type Error} +import gleam/erlang/process.{type Subject} +import gleam/function +import gleam/io +import gleam/otp/actor + +pub opaque type Message { + Rerun +} + +type State(a) { + State(self: Subject(Message), work: fn() -> Result(a, Error), interval: Int) +} + +fn enqueue_next_rerun(state: State(a)) { + process.send_after(state.self, state.interval, Rerun) +} + +/// Repeatedly call a function, leaving `interval` milliseconds between each call. +/// When the `work` function returns an error it is printed. +pub fn retry( + do work: fn() -> Result(a, Error), +) -> Result(Subject(Message), actor.StartError) { + fn() { init(60_000, work) } + |> actor.Spec(loop: loop, init_timeout: 100) + |> actor.start_spec() +} + +fn init( + interval: Int, + work: fn() -> Result(a, Error), +) -> actor.InitResult(State(a), Message) { + let subject = process.new_subject() + let state = State(subject, work, interval) + process.new_selector() + |> process.selecting(subject, function.identity) + |> actor.Ready(state, _) + |> function.tap(fn(_) { enqueue_next_rerun(state) }) +} + +fn loop(message: Message, state: State(a)) -> actor.Next(Message, State(a)) { + case message { + Rerun -> { + case state.work() { + Ok(_) -> actor.Stop(process.Normal) + Error(e) -> { + io.debug(e) + enqueue_next_rerun(state) + actor.continue(state) + } + } + } + } +} diff --git a/apps/backend/src/tasks/hex.gleam b/apps/backend/src/tasks/hex.gleam index 7930786..bd8c29f 100644 --- a/apps/backend/src/tasks/hex.gleam +++ b/apps/backend/src/tasks/hex.gleam @@ -1,4 +1,6 @@ import api/hex as api +import api/hex_repo +import api/signatures import backend/config.{type Config} import backend/data/hex_read.{type HexRead} import backend/index/connect as postgres @@ -9,9 +11,11 @@ import birl/duration import gleam/hexpm.{type Package} import gleam/list import gleam/order +import gleam/otp/supervisor import gleam/pgo import gleam/result import gleam/string +import retrier import wisp type State { @@ -25,20 +29,24 @@ type State { ) } -pub fn sync_new_gleam_releases(cnf: Config) -> Result(HexRead, Error) { +pub fn sync_new_gleam_releases( + cnf: Config, + children: supervisor.Children(Nil), +) -> Result(HexRead, Error) { let ctx = postgres.connect(cnf) wisp.log_info("Syncing new releases from Hex") use limit <- result.try(index.get_last_hex_date(ctx.connection)) - use latest <- result.try( - sync_packages(State( + use latest <- result.try(sync_packages( + State( page: 1, limit: limit, newest: limit, hex_api_key: cnf.hex_api_key, last_logged: birl.now(), db: ctx.connection, - )), - ) + ), + children, + )) let latest = index.upsert_most_recent_hex_timestamp(ctx.connection, latest) wisp.log_info("\nUp to date!") latest @@ -58,28 +66,42 @@ fn first_timestamp(packages: List(hexpm.Package), state: State) -> Time { |> result.unwrap(state.newest) } -fn sync_packages(state: State) -> Result(Time, Error) { +fn sync_packages( + state: State, + children: supervisor.Children(Nil), +) -> Result(Time, Error) { let page = state.page let api_key = state.hex_api_key use all_packages <- result.try(api.get_api_packages_page(page, api_key)) let state = State(..state, newest: first_timestamp(all_packages, state)) let new_packages = take_fresh_packages(all_packages, state.limit) - use state <- result.try(list.try_fold(new_packages, state, sync_package)) + use state <- result.try(list.try_fold( + new_packages, + state, + sync_package(children), + )) case list.length(all_packages) == list.length(new_packages) { _ if all_packages == [] -> Ok(state.newest) False -> Ok(state.newest) - True -> sync_packages(State(..state, page: state.page + 1)) + True -> sync_packages(State(..state, page: state.page + 1), children) } } -fn sync_package(state: State, package: hexpm.Package) -> Result(State, Error) { - let secret = state.hex_api_key - use releases <- result.try(lookup_gleam_releases(package, secret: secret)) - case releases { - [] -> Ok(log_if_needed(state, package.updated_at)) - _ -> { - use _ <- result.map(insert_package_and_releases(package, releases, state)) - State(..state, last_logged: birl.now()) +fn sync_package(children: supervisor.Children(Nil)) { + fn(state: State, package: hexpm.Package) -> Result(State, Error) { + let secret = state.hex_api_key + use releases <- result.try(lookup_gleam_releases(package, secret: secret)) + case releases { + [] -> Ok(log_if_needed(state, package.updated_at)) + _ -> { + use _ <- result.map(insert_package_and_releases( + package, + releases, + state, + children, + )) + State(..state, last_logged: birl.now()) + } } } } @@ -88,6 +110,7 @@ fn insert_package_and_releases( package: hexpm.Package, releases: List(hexpm.Release), state: State, + children: supervisor.Children(Nil), ) { let secret = state.hex_api_key let versions = @@ -100,7 +123,17 @@ fn insert_package_and_releases( use owners <- result.try(api.get_package_owners(package.name, secret: secret)) use _ <- result.try(index.sync_package_owners(state.db, id, owners)) wisp.log_info("Saving releases for " <> package.name) - list.try_each(releases, fn(r) { index.upsert_release(state.db, id, r) }) + list.try_each(releases, fn(r) { + use _ <- result.map(index.upsert_release(state.db, id, r)) + supervisor.add(children, { + use _ <- supervisor.worker() + retrier.retry(fn() { + let infos = hex_repo.get_package_infos(package.name, r.version) + use #(package, gleam_toml) <- result.try(infos) + signatures.extract_signatures(state.db, package, gleam_toml) + }) + }) + }) } fn lookup_gleam_releases(