diff --git a/core/services/llo/cleanup.go b/core/services/llo/cleanup.go index dab00965809..1bd7207cf2c 100644 --- a/core/services/llo/cleanup.go +++ b/core/services/llo/cleanup.go @@ -86,7 +86,7 @@ func (t *transmissionReaper) runLoop(ctx context.Context) { // make a final effort to clear the database that goes into // overtime overtimeCtx, cancel := context.WithTimeout(context.Background(), OvertimeDeleteTimeout) - if n, err := t.reapStale(overtimeCtx, TransmissionReaperBatchSize); err != nil { + if n, err := t.reap(overtimeCtx, TransmissionReaperBatchSize, "stale"); err != nil { t.lggr.Errorw("Failed to reap stale transmissions on exit", "err", err) } else if n > 0 { t.lggr.Infow("Reaped stale transmissions on exit", "nDeleted", n) @@ -94,46 +94,45 @@ func (t *transmissionReaper) runLoop(ctx context.Context) { cancel() return case <-ticker.C: - // TODO: Could also automatically reap orphaned transmissions - // that don't have a job with a matching DON ID (from job - // deletion) - // - // https://smartcontract-it.atlassian.net/browse/MERC-6807 // TODO: Should also reap other LLO garbage that can be left // behind e.g. channel definitions etc - n, err := t.reapStale(ctx, TransmissionReaperBatchSize) - if err != nil { - t.lggr.Errorw("Failed to reap", "err", err) - continue - } - if n > 0 { - t.lggr.Infow("Reaped stale transmissions", "nDeleted", n) - } + t.reapAndLog(ctx, TransmissionReaperBatchSize, "stale") + t.reapAndLog(ctx, TransmissionReaperBatchSize, "orphaned") } } } -func (t *transmissionReaper) reapStale(ctx context.Context, batchSize int) (rowsDeleted int64, err error) { +func (t *transmissionReaper) reapAndLog(ctx context.Context, batchSize int, reapType string) { + n, err := t.reap(ctx, batchSize, reapType) + if err != nil { + t.lggr.Errorw("Failed to reap", "type", reapType, "err", err) + return + } + if n > 0 { + t.lggr.Infow("Reaped transmissions", "type", reapType, "nDeleted", n) + } +} + +func (t *transmissionReaper) reap(ctx context.Context, batchSize int, reapType string) (rowsDeleted int64, err error) { for { var res sql.Result - res, err = t.ds.ExecContext(ctx, ` -DELETE FROM llo_mercury_transmit_queue AS q -USING ( - SELECT transmission_hash - FROM llo_mercury_transmit_queue - WHERE inserted_at < $1 - ORDER BY inserted_at ASC - LIMIT $2 -) AS to_delete -WHERE q.transmission_hash = to_delete.transmission_hash; - `, time.Now().Add(-t.maxAge), batchSize) + switch reapType { + case "stale": + res, err = t.reapStale(ctx, batchSize) + case "orphaned": + res, err = t.reapOrphaned(ctx, batchSize) + default: + return 0, fmt.Errorf("transmissionReaper: unknown reap type: %s", reapType) + } + if err != nil { - return rowsDeleted, fmt.Errorf("transmissionReaper: failed to delete stale transmissions: %w", err) + return rowsDeleted, fmt.Errorf("transmissionReaper: failed to delete %s transmissions: %w", reapType, err) } + var rowsAffected int64 rowsAffected, err = res.RowsAffected() if err != nil { - return rowsDeleted, fmt.Errorf("transmissionReaper: failed to get rows affected: %w", err) + return rowsDeleted, fmt.Errorf("transmissionReaper: failed to get %s rows affected: %w", reapType, err) } if rowsAffected == 0 { break @@ -142,3 +141,38 @@ WHERE q.transmission_hash = to_delete.transmission_hash; } return rowsDeleted, nil } + +func (t *transmissionReaper) reapStale(ctx context.Context, batchSize int) (sql.Result, error) { + return t.ds.ExecContext(ctx, ` +DELETE FROM llo_mercury_transmit_queue AS q +USING ( + SELECT transmission_hash + FROM llo_mercury_transmit_queue + WHERE inserted_at < NOW() - ($1 * INTERVAL '1 MICROSECOND') + ORDER BY inserted_at ASC + LIMIT $2 +) AS to_delete +WHERE q.transmission_hash = to_delete.transmission_hash; +`, t.maxAge.Microseconds(), batchSize) +} + +func (t *transmissionReaper) reapOrphaned(ctx context.Context, batchSize int) (sql.Result, error) { + return t.ds.ExecContext(ctx, ` +WITH activeDonIds AS ( + SELECT DISTINCT cast(relay_config->>'lloDonID' as bigint) as don_id + FROM ocr2_oracle_specs + WHERE + relay_config->>'lloDonID' IS NOT NULL + AND relay_config->>'lloDonID' <> '' +) +DELETE FROM llo_mercury_transmit_queue as q +USING ( + SELECT transmission_hash + FROM llo_mercury_transmit_queue + WHERE don_id NOT IN (SELECT don_id FROM activeDonIds) + ORDER BY inserted_at ASC + LIMIT $1 +) AS to_delete +WHERE q.transmission_hash = to_delete.transmission_hash; +`, batchSize) +} diff --git a/core/services/llo/cleanup_test.go b/core/services/llo/cleanup_test.go index f53a6e083f2..f8f19cf2d20 100644 --- a/core/services/llo/cleanup_test.go +++ b/core/services/llo/cleanup_test.go @@ -111,7 +111,7 @@ func Test_Cleanup(t *testing.T) { }) } -func Test_TransmissionReaper(t *testing.T) { +func Test_StaleTransmissionReaper(t *testing.T) { ds := pgtest.NewSqlxDB(t) lggr := logger.TestLogger(t) tr := &transmissionReaper{ds: ds, lggr: lggr, maxAge: 24 * time.Hour} @@ -133,13 +133,45 @@ WHERE transmission_hash IN ( `) // test batching - d, err := tr.reapStale(ctx, n/3) + d, err := tr.reap(ctx, n/3, "stale") require.NoError(t, err) assert.Equal(t, int64(5), d) pgtest.MustExec(t, ds, "UPDATE llo_mercury_transmit_queue SET inserted_at = NOW() - INTERVAL '48 hours'") - d, err = tr.reapStale(ctx, n/3) + d, err = tr.reap(ctx, n/3, "stale") require.NoError(t, err) assert.Equal(t, int64(n-5), d) } + +func Test_OrphanedTransmissionReaper(t *testing.T) { + ds := pgtest.NewSqlxDB(t) + lggr := logger.TestLogger(t) + tr := &transmissionReaper{ds: ds, lggr: lggr, maxAge: 24 * time.Hour} + ctx := testutils.Context(t) + + const n = 13 + + pgtest.MustExec(t, ds, ` + INSERT INTO ocr2_oracle_specs (contract_id, p2pv2_bootstrappers, contract_config_confirmations, created_at, + updated_at, relay, relay_config, plugin_config, plugin_type, onchain_signing_strategy, allow_no_bootstrappers + ) VALUES ('0x','{}', 0, NOW(), NOW(), 'evm', '{"chainID": 421614, "lloDonID": 2}', '{"donID": 2}', 'llo', '{}', FALSE);`) + + // add transmissions from a DON not present in ocr2 specs + transmissions := makeSampleTransmissions(n) + torm := mercurytransmitter.NewORM(ds, 1) + err := torm.Insert(testutils.Context(t), transmissions) + require.NoError(t, err) + + d, err := tr.reap(ctx, n, "orphaned") + require.NoError(t, err) + assert.Equal(t, int64(n), d) + + torm2 := mercurytransmitter.NewORM(ds, 2) + err = torm2.Insert(testutils.Context(t), transmissions) + require.NoError(t, err) + + d, err = tr.reap(ctx, n, "orphaned") + require.NoError(t, err) + assert.Equal(t, int64(0), d) +}