From 4b95a694d98b9c7fea8dee50858b6c619a8727f9 Mon Sep 17 00:00:00 2001 From: Ivan Novosad Date: Mon, 25 Nov 2024 15:20:48 +0100 Subject: [PATCH 1/2] fix(unique-jobs): Fix on retry in unique jobs --- .../aggregator/credit_notes/create_job.rb | 4 +++- .../aggregator/invoices/create_job.rb | 4 +++- .../crm/create_customer_association_job.rb | 2 -- .../aggregator/invoices/crm/create_job.rb | 2 -- .../aggregator/invoices/crm/update_job.rb | 2 -- .../aggregator/payments/create_job.rb | 4 +++- .../crm/create_customer_association_job.rb | 2 -- .../subscriptions/crm/create_job.rb | 2 -- .../subscriptions/crm/update_job.rb | 2 -- .../create_pay_in_advance_charge_job.rb | 2 +- .../invoices/payments/adyen_create_job.rb | 2 +- .../invoices/payments/stripe_create_job.rb | 2 +- app/jobs/invoices/prepaid_credit_job.rb | 2 +- .../payments/adyen_create_job.rb | 2 +- .../payments/stripe_create_job.rb | 2 +- .../wallets/refresh_ongoing_balance_job.rb | 2 +- config/application.rb | 2 ++ .../strategies/until_executed_patch.rb | 20 +++++++++++++++++++ 18 files changed, 38 insertions(+), 22 deletions(-) create mode 100644 lib/active_job/uniqueness/strategies/until_executed_patch.rb diff --git a/app/jobs/integrations/aggregator/credit_notes/create_job.rb b/app/jobs/integrations/aggregator/credit_notes/create_job.rb index f575bec8c08..585087380de 100644 --- a/app/jobs/integrations/aggregator/credit_notes/create_job.rb +++ b/app/jobs/integrations/aggregator/credit_notes/create_job.rb @@ -6,7 +6,9 @@ module CreditNotes class CreateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log + # https://github.com/veeqo/activejob-uniqueness/issues/75 + # retry_on does not work with until_executed strategy + unique :until_executed_patch, on_conflict: :log retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 3 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/invoices/create_job.rb b/app/jobs/integrations/aggregator/invoices/create_job.rb index 6d2c00604d0..e052b23f8b2 100644 --- a/app/jobs/integrations/aggregator/invoices/create_job.rb +++ b/app/jobs/integrations/aggregator/invoices/create_job.rb @@ -6,7 +6,9 @@ module Invoices class CreateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log + # https://github.com/veeqo/activejob-uniqueness/issues/75 + # retry_on does not work with until_executed strategy + unique :until_executed_patch, on_conflict: :log retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 3 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/invoices/crm/create_customer_association_job.rb b/app/jobs/integrations/aggregator/invoices/crm/create_customer_association_job.rb index 938748eec1d..dbf980328b5 100644 --- a/app/jobs/integrations/aggregator/invoices/crm/create_customer_association_job.rb +++ b/app/jobs/integrations/aggregator/invoices/crm/create_customer_association_job.rb @@ -7,8 +7,6 @@ module Crm class CreateCustomerAssociationJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log - retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 10 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/invoices/crm/create_job.rb b/app/jobs/integrations/aggregator/invoices/crm/create_job.rb index 85770a419a4..44155e8a363 100644 --- a/app/jobs/integrations/aggregator/invoices/crm/create_job.rb +++ b/app/jobs/integrations/aggregator/invoices/crm/create_job.rb @@ -7,8 +7,6 @@ module Crm class CreateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log - retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 10 retry_on Integrations::Aggregator::BasePayload::Failure, wait: :polynomially_longer, attempts: 10 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/invoices/crm/update_job.rb b/app/jobs/integrations/aggregator/invoices/crm/update_job.rb index 288a1494a56..45418dd6eea 100644 --- a/app/jobs/integrations/aggregator/invoices/crm/update_job.rb +++ b/app/jobs/integrations/aggregator/invoices/crm/update_job.rb @@ -7,8 +7,6 @@ module Crm class UpdateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log - retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 10 retry_on Integrations::Aggregator::BasePayload::Failure, wait: :polynomially_longer, attempts: 10 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/payments/create_job.rb b/app/jobs/integrations/aggregator/payments/create_job.rb index 7bb27404fa1..ddd45240b0f 100644 --- a/app/jobs/integrations/aggregator/payments/create_job.rb +++ b/app/jobs/integrations/aggregator/payments/create_job.rb @@ -6,7 +6,9 @@ module Payments class CreateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log + # https://github.com/veeqo/activejob-uniqueness/issues/75 + # retry_on does not work with until_executed strategy + unique :until_executed_patch, on_conflict: :log retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 5 retry_on Integrations::Aggregator::BasePayload::Failure, wait: :polynomially_longer, attempts: 10 diff --git a/app/jobs/integrations/aggregator/subscriptions/crm/create_customer_association_job.rb b/app/jobs/integrations/aggregator/subscriptions/crm/create_customer_association_job.rb index 93ec015ceff..cfc36a76771 100644 --- a/app/jobs/integrations/aggregator/subscriptions/crm/create_customer_association_job.rb +++ b/app/jobs/integrations/aggregator/subscriptions/crm/create_customer_association_job.rb @@ -7,8 +7,6 @@ module Crm class CreateCustomerAssociationJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log - retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 10 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/subscriptions/crm/create_job.rb b/app/jobs/integrations/aggregator/subscriptions/crm/create_job.rb index dec20f5e496..0a2cf9ce63e 100644 --- a/app/jobs/integrations/aggregator/subscriptions/crm/create_job.rb +++ b/app/jobs/integrations/aggregator/subscriptions/crm/create_job.rb @@ -7,8 +7,6 @@ module Crm class CreateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log - retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 10 retry_on Integrations::Aggregator::BasePayload::Failure, wait: :polynomially_longer, attempts: 10 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/subscriptions/crm/update_job.rb b/app/jobs/integrations/aggregator/subscriptions/crm/update_job.rb index 347e3361c31..8860fd0d9e8 100644 --- a/app/jobs/integrations/aggregator/subscriptions/crm/update_job.rb +++ b/app/jobs/integrations/aggregator/subscriptions/crm/update_job.rb @@ -7,8 +7,6 @@ module Crm class UpdateJob < ApplicationJob queue_as 'integrations' - unique :until_executed, on_conflict: :log - retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 10 retry_on Integrations::Aggregator::BasePayload::Failure, wait: :polynomially_longer, attempts: 10 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/invoices/create_pay_in_advance_charge_job.rb b/app/jobs/invoices/create_pay_in_advance_charge_job.rb index 0b8af74a670..e7ab748b621 100644 --- a/app/jobs/invoices/create_pay_in_advance_charge_job.rb +++ b/app/jobs/invoices/create_pay_in_advance_charge_job.rb @@ -6,7 +6,7 @@ class CreatePayInAdvanceChargeJob < ApplicationJob retry_on Sequenced::SequenceError - unique :until_executed, on_conflict: :log + unique :until_executed_patch, on_conflict: :log def perform(charge:, event:, timestamp:, invoice: nil) result = Invoices::CreatePayInAdvanceChargeService.call(charge:, event:, timestamp:, invoice:) diff --git a/app/jobs/invoices/payments/adyen_create_job.rb b/app/jobs/invoices/payments/adyen_create_job.rb index b962c65f5dd..2f5cab1533c 100644 --- a/app/jobs/invoices/payments/adyen_create_job.rb +++ b/app/jobs/invoices/payments/adyen_create_job.rb @@ -5,7 +5,7 @@ module Payments class AdyenCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed + unique :until_executed_patch retry_on Faraday::ConnectionFailed, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/invoices/payments/stripe_create_job.rb b/app/jobs/invoices/payments/stripe_create_job.rb index 4fd6361fd68..f5655961625 100644 --- a/app/jobs/invoices/payments/stripe_create_job.rb +++ b/app/jobs/invoices/payments/stripe_create_job.rb @@ -5,7 +5,7 @@ module Payments class StripeCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed, on_conflict: :log + unique :until_executed_patch, on_conflict: :log retry_on ::Stripe::RateLimitError, wait: :polynomially_longer, attempts: 6 retry_on ::Stripe::APIConnectionError, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/invoices/prepaid_credit_job.rb b/app/jobs/invoices/prepaid_credit_job.rb index 1abe3183983..474c874f198 100644 --- a/app/jobs/invoices/prepaid_credit_job.rb +++ b/app/jobs/invoices/prepaid_credit_job.rb @@ -5,7 +5,7 @@ class PrepaidCreditJob < ApplicationJob queue_as 'wallets' retry_on ActiveRecord::StaleObjectError, wait: :polynomially_longer, attempts: 6 - unique :until_executed, on_conflict: :log + unique :until_executed_patch, on_conflict: :log def perform(invoice) wallet_transaction = invoice.fees.find_by(fee_type: 'credit')&.invoiceable diff --git a/app/jobs/payment_requests/payments/adyen_create_job.rb b/app/jobs/payment_requests/payments/adyen_create_job.rb index 45758e9a095..5d9f05c31d9 100644 --- a/app/jobs/payment_requests/payments/adyen_create_job.rb +++ b/app/jobs/payment_requests/payments/adyen_create_job.rb @@ -5,7 +5,7 @@ module Payments class AdyenCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed + unique :until_executed_patch retry_on Faraday::ConnectionFailed, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/payment_requests/payments/stripe_create_job.rb b/app/jobs/payment_requests/payments/stripe_create_job.rb index 9bfcbc42cf3..1b779385391 100644 --- a/app/jobs/payment_requests/payments/stripe_create_job.rb +++ b/app/jobs/payment_requests/payments/stripe_create_job.rb @@ -5,7 +5,7 @@ module Payments class StripeCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed, on_conflict: :log + unique :until_executed_patch, on_conflict: :log retry_on ::Stripe::RateLimitError, wait: :polynomially_longer, attempts: 6 retry_on ::Stripe::APIConnectionError, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/wallets/refresh_ongoing_balance_job.rb b/app/jobs/wallets/refresh_ongoing_balance_job.rb index 16923d155fe..b58ed275735 100644 --- a/app/jobs/wallets/refresh_ongoing_balance_job.rb +++ b/app/jobs/wallets/refresh_ongoing_balance_job.rb @@ -4,7 +4,7 @@ module Wallets class RefreshOngoingBalanceJob < ApplicationJob queue_as 'wallets' - unique :until_executed, on_conflict: :log + unique :until_executed_patch, on_conflict: :log retry_on ActiveRecord::StaleObjectError, wait: :polynomially_longer, attempts: 6 diff --git a/config/application.rb b/config/application.rb index 351d7e87a8c..9907d8200b6 100644 --- a/config/application.rb +++ b/config/application.rb @@ -42,3 +42,5 @@ class Application < Rails::Application config.active_support.cache_format_version = 7.1 end end + +require_relative "../lib/active_job/uniqueness/strategies/until_executed_patch" diff --git a/lib/active_job/uniqueness/strategies/until_executed_patch.rb b/lib/active_job/uniqueness/strategies/until_executed_patch.rb new file mode 100644 index 00000000000..b12b9097ae9 --- /dev/null +++ b/lib/active_job/uniqueness/strategies/until_executed_patch.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require "active_job/uniqueness/strategies/until_executed" + +module ActiveJob + module Uniqueness + module Strategies + class UntilExecutedPatch < UntilExecuted + def before_enqueue + return if lock(resource: lock_key, ttl: lock_ttl) + # We're retrying the job, so we don't need to lock again + return if job.executions > 0 + + handle_conflict(resource: lock_key, on_conflict: on_conflict) + abort_job + end + end + end + end +end From 8c5e9bd25d72c66d029167578c35fb4fcbcd923c Mon Sep 17 00:00:00 2001 From: Ivan Novosad Date: Mon, 25 Nov 2024 15:49:48 +0100 Subject: [PATCH 2/2] fix(unique-jobs): Fix on retry in unique jobs, prepend to until_executed strategy --- .../integrations/aggregator/credit_notes/create_job.rb | 4 +--- app/jobs/integrations/aggregator/invoices/create_job.rb | 4 +--- app/jobs/integrations/aggregator/payments/create_job.rb | 4 +--- app/jobs/invoices/create_pay_in_advance_charge_job.rb | 2 +- app/jobs/invoices/payments/adyen_create_job.rb | 2 +- app/jobs/invoices/payments/stripe_create_job.rb | 2 +- app/jobs/invoices/prepaid_credit_job.rb | 2 +- app/jobs/payment_requests/payments/adyen_create_job.rb | 2 +- app/jobs/payment_requests/payments/stripe_create_job.rb | 2 +- app/jobs/wallets/refresh_ongoing_balance_job.rb | 2 +- .../uniqueness/strategies/until_executed_patch.rb | 9 ++++++++- 11 files changed, 18 insertions(+), 17 deletions(-) diff --git a/app/jobs/integrations/aggregator/credit_notes/create_job.rb b/app/jobs/integrations/aggregator/credit_notes/create_job.rb index 585087380de..f575bec8c08 100644 --- a/app/jobs/integrations/aggregator/credit_notes/create_job.rb +++ b/app/jobs/integrations/aggregator/credit_notes/create_job.rb @@ -6,9 +6,7 @@ module CreditNotes class CreateJob < ApplicationJob queue_as 'integrations' - # https://github.com/veeqo/activejob-uniqueness/issues/75 - # retry_on does not work with until_executed strategy - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 3 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/invoices/create_job.rb b/app/jobs/integrations/aggregator/invoices/create_job.rb index e052b23f8b2..6d2c00604d0 100644 --- a/app/jobs/integrations/aggregator/invoices/create_job.rb +++ b/app/jobs/integrations/aggregator/invoices/create_job.rb @@ -6,9 +6,7 @@ module Invoices class CreateJob < ApplicationJob queue_as 'integrations' - # https://github.com/veeqo/activejob-uniqueness/issues/75 - # retry_on does not work with until_executed strategy - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 3 retry_on RequestLimitError, wait: :polynomially_longer, attempts: 100 diff --git a/app/jobs/integrations/aggregator/payments/create_job.rb b/app/jobs/integrations/aggregator/payments/create_job.rb index ddd45240b0f..7bb27404fa1 100644 --- a/app/jobs/integrations/aggregator/payments/create_job.rb +++ b/app/jobs/integrations/aggregator/payments/create_job.rb @@ -6,9 +6,7 @@ module Payments class CreateJob < ApplicationJob queue_as 'integrations' - # https://github.com/veeqo/activejob-uniqueness/issues/75 - # retry_on does not work with until_executed strategy - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log retry_on LagoHttpClient::HttpError, wait: :polynomially_longer, attempts: 5 retry_on Integrations::Aggregator::BasePayload::Failure, wait: :polynomially_longer, attempts: 10 diff --git a/app/jobs/invoices/create_pay_in_advance_charge_job.rb b/app/jobs/invoices/create_pay_in_advance_charge_job.rb index e7ab748b621..0b8af74a670 100644 --- a/app/jobs/invoices/create_pay_in_advance_charge_job.rb +++ b/app/jobs/invoices/create_pay_in_advance_charge_job.rb @@ -6,7 +6,7 @@ class CreatePayInAdvanceChargeJob < ApplicationJob retry_on Sequenced::SequenceError - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log def perform(charge:, event:, timestamp:, invoice: nil) result = Invoices::CreatePayInAdvanceChargeService.call(charge:, event:, timestamp:, invoice:) diff --git a/app/jobs/invoices/payments/adyen_create_job.rb b/app/jobs/invoices/payments/adyen_create_job.rb index 2f5cab1533c..b962c65f5dd 100644 --- a/app/jobs/invoices/payments/adyen_create_job.rb +++ b/app/jobs/invoices/payments/adyen_create_job.rb @@ -5,7 +5,7 @@ module Payments class AdyenCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed_patch + unique :until_executed retry_on Faraday::ConnectionFailed, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/invoices/payments/stripe_create_job.rb b/app/jobs/invoices/payments/stripe_create_job.rb index f5655961625..4fd6361fd68 100644 --- a/app/jobs/invoices/payments/stripe_create_job.rb +++ b/app/jobs/invoices/payments/stripe_create_job.rb @@ -5,7 +5,7 @@ module Payments class StripeCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log retry_on ::Stripe::RateLimitError, wait: :polynomially_longer, attempts: 6 retry_on ::Stripe::APIConnectionError, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/invoices/prepaid_credit_job.rb b/app/jobs/invoices/prepaid_credit_job.rb index 474c874f198..1abe3183983 100644 --- a/app/jobs/invoices/prepaid_credit_job.rb +++ b/app/jobs/invoices/prepaid_credit_job.rb @@ -5,7 +5,7 @@ class PrepaidCreditJob < ApplicationJob queue_as 'wallets' retry_on ActiveRecord::StaleObjectError, wait: :polynomially_longer, attempts: 6 - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log def perform(invoice) wallet_transaction = invoice.fees.find_by(fee_type: 'credit')&.invoiceable diff --git a/app/jobs/payment_requests/payments/adyen_create_job.rb b/app/jobs/payment_requests/payments/adyen_create_job.rb index 5d9f05c31d9..45758e9a095 100644 --- a/app/jobs/payment_requests/payments/adyen_create_job.rb +++ b/app/jobs/payment_requests/payments/adyen_create_job.rb @@ -5,7 +5,7 @@ module Payments class AdyenCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed_patch + unique :until_executed retry_on Faraday::ConnectionFailed, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/payment_requests/payments/stripe_create_job.rb b/app/jobs/payment_requests/payments/stripe_create_job.rb index 1b779385391..9bfcbc42cf3 100644 --- a/app/jobs/payment_requests/payments/stripe_create_job.rb +++ b/app/jobs/payment_requests/payments/stripe_create_job.rb @@ -5,7 +5,7 @@ module Payments class StripeCreateJob < ApplicationJob queue_as 'providers' - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log retry_on ::Stripe::RateLimitError, wait: :polynomially_longer, attempts: 6 retry_on ::Stripe::APIConnectionError, wait: :polynomially_longer, attempts: 6 diff --git a/app/jobs/wallets/refresh_ongoing_balance_job.rb b/app/jobs/wallets/refresh_ongoing_balance_job.rb index b58ed275735..16923d155fe 100644 --- a/app/jobs/wallets/refresh_ongoing_balance_job.rb +++ b/app/jobs/wallets/refresh_ongoing_balance_job.rb @@ -4,7 +4,7 @@ module Wallets class RefreshOngoingBalanceJob < ApplicationJob queue_as 'wallets' - unique :until_executed_patch, on_conflict: :log + unique :until_executed, on_conflict: :log retry_on ActiveRecord::StaleObjectError, wait: :polynomially_longer, attempts: 6 diff --git a/lib/active_job/uniqueness/strategies/until_executed_patch.rb b/lib/active_job/uniqueness/strategies/until_executed_patch.rb index b12b9097ae9..1dbc2217917 100644 --- a/lib/active_job/uniqueness/strategies/until_executed_patch.rb +++ b/lib/active_job/uniqueness/strategies/until_executed_patch.rb @@ -2,10 +2,13 @@ require "active_job/uniqueness/strategies/until_executed" +# https://github.com/veeqo/activejob-uniqueness/issues/75 +# retry_on does not work with until_executed strategy + module ActiveJob module Uniqueness module Strategies - class UntilExecutedPatch < UntilExecuted + module UntilExecutedPatch def before_enqueue return if lock(resource: lock_key, ttl: lock_ttl) # We're retrying the job, so we don't need to lock again @@ -18,3 +21,7 @@ def before_enqueue end end end + +ActiveJob::Uniqueness::Strategies::UntilExecuted.prepend( + ActiveJob::Uniqueness::Strategies::UntilExecutedPatch +)