Skip to content

Commit

Permalink
agent: cleanup useless loop in publications
Browse files Browse the repository at this point in the history
  • Loading branch information
psFried committed Jan 27, 2025
1 parent ce6356e commit a6ea1b7
Showing 1 changed file with 50 additions and 53 deletions.
103 changes: 50 additions & 53 deletions crates/agent/src/publications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,68 +20,65 @@ impl Handler for Publisher {
pg_pool: &sqlx::PgPool,
allow_background: bool,
) -> anyhow::Result<HandleResult> {
loop {
let mut txn = pg_pool.begin().await?;
let mut txn = pg_pool.begin().await?;

let row: Row =
match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};
let row: Row = match agent_sql::publications::dequeue(&mut txn, allow_background).await? {
None => return Ok(HandleResult::NoJobs),
Some(row) => row,
};

let id = row.pub_id;
let background = row.background;
let dry_run = row.dry_run;
let draft_id = row.draft_id;
let id = row.pub_id;
let background = row.background;
let dry_run = row.dry_run;
let draft_id = row.draft_id;

// Remove draft errors from a previous publication attempt.
agent_sql::drafts::delete_errors(row.draft_id, &mut txn)
.await
.context("clearing old errors")?;

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);

let (status, draft_errors, final_pub_id) = match self.process(row).await {
Ok(result) => {
if dry_run {
specs::add_built_specs_to_draft_specs(draft_id, &result.built, &self.db)
.await
.context("adding built specs to draft")?;
}
let errors = result.draft_errors();
let final_id = if result.status.is_success() {
Some(result.pub_id)
} else {
None
};
(result.status, errors, final_id)
}
Err(error) => {
tracing::warn!(?error, pub_id = %id, "build finished with error");
let errors = vec![draft_error::Error {
catalog_name: String::new(),
scope: None,
detail: format!("{error:#}"),
}];
(JobStatus::PublishFailed, errors, None)
// Remove draft errors from a previous publication attempt.
agent_sql::drafts::delete_errors(row.draft_id, &mut txn)
.await
.context("clearing old errors")?;

let time_queued = chrono::Utc::now().signed_duration_since(row.updated_at);

let (status, draft_errors, final_pub_id) = match self.process(row).await {
Ok(result) => {
if dry_run {
specs::add_built_specs_to_draft_specs(draft_id, &result.built, &self.db)
.await
.context("adding built specs to draft")?;
}
};
let errors = result.draft_errors();
let final_id = if result.status.is_success() {
Some(result.pub_id)
} else {
None
};
(result.status, errors, final_id)
}
Err(error) => {
tracing::warn!(?error, pub_id = %id, "build finished with error");
let errors = vec![draft_error::Error {
catalog_name: String::new(),
scope: None,
detail: format!("{error:#}"),
}];
(JobStatus::PublishFailed, errors, None)
}
};

draft::insert_errors(draft_id, draft_errors, &mut txn).await?;
draft::insert_errors(draft_id, draft_errors, &mut txn).await?;

info!(%id, %time_queued, %background, ?status, "publication finished");
agent_sql::publications::resolve(id, &status, final_pub_id, &mut txn).await?;
info!(%id, %time_queued, %background, ?status, "publication finished");
agent_sql::publications::resolve(id, &status, final_pub_id, &mut txn).await?;

txn.commit().await?;
txn.commit().await?;

// As a separate transaction, delete the draft. Note that the user technically could
// have inserted or updated draft specs after we started the publication, and those
// would still be removed by this.
if status.is_success() && !dry_run {
agent_sql::publications::delete_draft(draft_id, pg_pool).await?;
}
return Ok(HandleResult::HadJob);
// As a separate transaction, delete the draft. Note that the user technically could
// have inserted or updated draft specs after we started the publication, and those
// would still be removed by this.
if status.is_success() && !dry_run {
agent_sql::publications::delete_draft(draft_id, pg_pool).await?;
}
Ok(HandleResult::HadJob)
}

fn table_name(&self) -> &'static str {
Expand Down

0 comments on commit a6ea1b7

Please sign in to comment.