Skip to content

Commit

Permalink
feat: retrying package fetching
Browse files Browse the repository at this point in the history
Signed-off-by: Guillaume Hivert <[email protected]>
  • Loading branch information
ghivert committed May 3, 2024
1 parent f50c2bf commit 5c0529d
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 24 deletions.
21 changes: 14 additions & 7 deletions apps/backend/src/backend.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import gleam/otp/supervisor
import mist
import periodic
import setup
import wisp
import tasks/hex
import wisp

pub fn main() {
dot_env.load()
setup.radiate()
wisp.configure_logger()
dot_env.load()

let secret_key_base = config.get_secret_key_base()
let cnf = config.read_config()
Expand All @@ -24,7 +24,11 @@ pub fn main() {
|> mist.port(3000)
|> mist.start_http()

let _ = start_hex_sync(cnf)
let assert Ok(_) =
supervisor.start(fn(children) {
let assert Ok(_) = start_hex_sync(cnf, children)
children
})

process.sleep_forever()
}
Expand All @@ -37,11 +41,14 @@ fn supervise(start: fn() -> _) {
})
}

fn sync_hex(cnf: Config) {
hex.sync_new_gleam_releases(cnf)
fn sync_hex(cnf: Config, children: supervisor.Children(Nil)) {
hex.sync_new_gleam_releases(cnf, children)
}

fn start_hex_sync(cnf: Config) {
fn start_hex_sync(cnf: Config, children: supervisor.Children(Nil)) {
use <- supervise()
periodic.periodically(do: fn() { sync_hex(cnf) }, waiting: 60 * 1000)
periodic.periodically(
do: fn() { sync_hex(cnf, children) },
waiting: 60 * 1000,
)
}
54 changes: 54 additions & 0 deletions apps/backend/src/retrier.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import backend/index/error.{type Error}
import gleam/erlang/process.{type Subject}
import gleam/function
import gleam/io
import gleam/otp/actor

pub opaque type Message {
Rerun
}

type State(a) {
State(self: Subject(Message), work: fn() -> Result(a, Error), interval: Int)
}

fn enqueue_next_rerun(state: State(a)) {
process.send_after(state.self, state.interval, Rerun)
}

/// Repeatedly call a function, leaving `interval` milliseconds between each call.
/// When the `work` function returns an error it is printed.
pub fn retry(
do work: fn() -> Result(a, Error),
) -> Result(Subject(Message), actor.StartError) {
fn() { init(60_000, work) }
|> actor.Spec(loop: loop, init_timeout: 100)
|> actor.start_spec()
}

fn init(
interval: Int,
work: fn() -> Result(a, Error),
) -> actor.InitResult(State(a), Message) {
let subject = process.new_subject()
let state = State(subject, work, interval)
process.new_selector()
|> process.selecting(subject, function.identity)
|> actor.Ready(state, _)
|> function.tap(fn(_) { enqueue_next_rerun(state) })
}

fn loop(message: Message, state: State(a)) -> actor.Next(Message, State(a)) {
case message {
Rerun -> {
case state.work() {
Ok(_) -> actor.Stop(process.Normal)
Error(e) -> {
io.debug(e)
enqueue_next_rerun(state)
actor.continue(state)
}
}
}
}
}
67 changes: 50 additions & 17 deletions apps/backend/src/tasks/hex.gleam
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import api/hex as api
import api/hex_repo
import api/signatures
import backend/config.{type Config}
import backend/data/hex_read.{type HexRead}
import backend/index/connect as postgres
Expand All @@ -9,9 +11,11 @@ import birl/duration
import gleam/hexpm.{type Package}
import gleam/list
import gleam/order
import gleam/otp/supervisor
import gleam/pgo
import gleam/result
import gleam/string
import retrier
import wisp

type State {
Expand All @@ -25,20 +29,24 @@ type State {
)
}

pub fn sync_new_gleam_releases(cnf: Config) -> Result(HexRead, Error) {
pub fn sync_new_gleam_releases(
cnf: Config,
children: supervisor.Children(Nil),
) -> Result(HexRead, Error) {
let ctx = postgres.connect(cnf)
wisp.log_info("Syncing new releases from Hex")
use limit <- result.try(index.get_last_hex_date(ctx.connection))
use latest <- result.try(
sync_packages(State(
use latest <- result.try(sync_packages(
State(
page: 1,
limit: limit,
newest: limit,
hex_api_key: cnf.hex_api_key,
last_logged: birl.now(),
db: ctx.connection,
)),
)
),
children,
))
let latest = index.upsert_most_recent_hex_timestamp(ctx.connection, latest)
wisp.log_info("\nUp to date!")
latest
Expand All @@ -58,28 +66,42 @@ fn first_timestamp(packages: List(hexpm.Package), state: State) -> Time {
|> result.unwrap(state.newest)
}

fn sync_packages(state: State) -> Result(Time, Error) {
fn sync_packages(
state: State,
children: supervisor.Children(Nil),
) -> Result(Time, Error) {
let page = state.page
let api_key = state.hex_api_key
use all_packages <- result.try(api.get_api_packages_page(page, api_key))
let state = State(..state, newest: first_timestamp(all_packages, state))
let new_packages = take_fresh_packages(all_packages, state.limit)
use state <- result.try(list.try_fold(new_packages, state, sync_package))
use state <- result.try(list.try_fold(
new_packages,
state,
sync_package(children),
))
case list.length(all_packages) == list.length(new_packages) {
_ if all_packages == [] -> Ok(state.newest)
False -> Ok(state.newest)
True -> sync_packages(State(..state, page: state.page + 1))
True -> sync_packages(State(..state, page: state.page + 1), children)
}
}

fn sync_package(state: State, package: hexpm.Package) -> Result(State, Error) {
let secret = state.hex_api_key
use releases <- result.try(lookup_gleam_releases(package, secret: secret))
case releases {
[] -> Ok(log_if_needed(state, package.updated_at))
_ -> {
use _ <- result.map(insert_package_and_releases(package, releases, state))
State(..state, last_logged: birl.now())
fn sync_package(children: supervisor.Children(Nil)) {
fn(state: State, package: hexpm.Package) -> Result(State, Error) {
let secret = state.hex_api_key
use releases <- result.try(lookup_gleam_releases(package, secret: secret))
case releases {
[] -> Ok(log_if_needed(state, package.updated_at))
_ -> {
use _ <- result.map(insert_package_and_releases(
package,
releases,
state,
children,
))
State(..state, last_logged: birl.now())
}
}
}
}
Expand All @@ -88,6 +110,7 @@ fn insert_package_and_releases(
package: hexpm.Package,
releases: List(hexpm.Release),
state: State,
children: supervisor.Children(Nil),
) {
let secret = state.hex_api_key
let versions =
Expand All @@ -100,7 +123,17 @@ fn insert_package_and_releases(
use owners <- result.try(api.get_package_owners(package.name, secret: secret))
use _ <- result.try(index.sync_package_owners(state.db, id, owners))
wisp.log_info("Saving releases for " <> package.name)
list.try_each(releases, fn(r) { index.upsert_release(state.db, id, r) })
list.try_each(releases, fn(r) {
use _ <- result.map(index.upsert_release(state.db, id, r))
supervisor.add(children, {
use _ <- supervisor.worker()
retrier.retry(fn() {
let infos = hex_repo.get_package_infos(package.name, r.version)
use #(package, gleam_toml) <- result.try(infos)
signatures.extract_signatures(state.db, package, gleam_toml)
})
})
})
}

fn lookup_gleam_releases(
Expand Down

0 comments on commit 5c0529d

Please sign in to comment.