From 2d97d14e9da31b037a9447eee7ce591d30fd84c1 Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Thu, 6 Jun 2024 20:14:19 +0000 Subject: [PATCH 1/3] Safely acquire lock on multiple tables --- README.md | 18 +- lib/pg_ha_migrations/relation.rb | 32 +++- lib/pg_ha_migrations/safe_statements.rb | 68 +++++--- spec/safe_statements_spec.rb | 219 +++++++++++++++++++++--- 4 files changed, 279 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index 2101720..d8933a5 100644 --- a/README.md +++ b/README.md @@ -457,7 +457,7 @@ safe_partman_reapply_privileges :table #### safely\_acquire\_lock\_for\_table -Acquires a lock on a table using the following algorithm: +Acquires a lock (in `ACCESS EXCLUSIVE` mode by default) on a table using the following algorithm: 1. Verify that no long-running queries are using the table. - If long-running queries are currently using the table, sleep `PgHaMigrations::LOCK_TIMEOUT_SECONDS` and check again. @@ -465,15 +465,13 @@ Acquires a lock on a table using the following algorithm: - If the lock is not acquired, sleep `PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS`, and start again at step 1. 3. If the lock is acquired, proceed to run the given block. -Safely acquire an access exclusive lock for a table. - ```ruby safely_acquire_lock_for_table(:table) do ... end ``` -Safely acquire a lock for a table in a different mode. +Safely acquire a lock on a table in `SHARE` mode. ```ruby safely_acquire_lock_for_table(:table, mode: :share) do @@ -481,10 +479,18 @@ safely_acquire_lock_for_table(:table, mode: :share) do end ``` +Safely acquire a lock on multiple tables in `EXCLUSIVE` mode. + +```ruby +safely_acquire_lock_for_table(:table_a, :table_b, mode: :exclusive) do + ... +end +``` + Note: -We enforce that only one table (or a table and its partitions) can be locked at a time. -Attempting to acquire a nested lock on a different table will result in an error. +We enforce that only one set of tables can be locked at a time. +Attempting to acquire a nested lock on a different set of tables will result in an error. #### adjust\_lock\_timeout diff --git a/lib/pg_ha_migrations/relation.rb b/lib/pg_ha_migrations/relation.rb index 4f68772..afac536 100644 --- a/lib/pg_ha_migrations/relation.rb +++ b/lib/pg_ha_migrations/relation.rb @@ -14,7 +14,7 @@ def initialize(name, schema, mode=nil) end def conflicts_with?(other) - self == other && ( + self.eql?(other) && ( mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode) ) end @@ -30,8 +30,12 @@ def present? name.present? && schema.present? end - def ==(other) - other.is_a?(Relation) && name == other.name && schema == other.schema + def eql?(other) + other.is_a?(Relation) && hash == other.hash + end + + def hash + [name, schema].hash end end @@ -152,4 +156,26 @@ def valid? SQL end end + + class TableCollection < Set + def self.from_table_names(tables, mode=nil) + new(tables) { |table| Table.from_table_name(table, mode) } + end + + def mode + first&.mode + end + + def to_sql + map(&:fully_qualified_name).join(", ") + end + + def with_partitions + tables = flat_map do |table| + table.partitions(include_sub_partitions: true, include_self: true) + end + + self.class.new(tables) + end + end end diff --git a/lib/pg_ha_migrations/safe_statements.rb b/lib/pg_ha_migrations/safe_statements.rb index 6d3e2a1..aac8792 100644 --- a/lib/pg_ha_migrations/safe_statements.rb +++ b/lib/pg_ha_migrations/safe_statements.rb @@ -3,6 +3,14 @@ def safe_added_columns_without_default_value @safe_added_columns_without_default_value ||= [] end + # This variable is used to track nested lock acquisition. + # Each element is a PgHaMigrations::TableCollection object. + # The order of the array represents the current call stack, + # where the most recent method call is the last element. + def safely_acquire_lock_for_table_history + @safely_acquire_lock_for_table_history ||= [] + end + def safe_create_table(table, options={}, &block) if options[:force] raise PgHaMigrations::UnsafeMigrationError.new(":force is NOT SAFE! Explicitly call unsafe_drop_table first if you want to recreate an existing table") @@ -528,40 +536,48 @@ def exec_migration(conn, direction) super(conn, direction) end - def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) - nested_target_table = Thread.current[__method__] + def safely_acquire_lock_for_table(*tables, mode: :access_exclusive, &block) + # So this variable is always available in the ensure block + successfully_acquired_lock = false _check_postgres_adapter! - target_table = PgHaMigrations::Table.from_table_name(table, mode) + target_tables = PgHaMigrations::TableCollection.from_table_names(tables, mode) - if nested_target_table - if nested_target_table != target_table - raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked." - elsif nested_target_table.mode < target_table.mode - raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}." - end - else - Thread.current[__method__] = target_table - end + # Grab the latest locked tables from the call stack. + # This will be nil if we are not in a nested context. + nested_target_tables = safely_acquire_lock_for_table_history.last - # Locking a partitioned table will also lock child tables (including sub-partitions), - # so we need to check for blocking queries on those tables as well - target_tables = target_table.partitions(include_sub_partitions: true, include_self: true) + if nested_target_tables + if !target_tables.subset?(nested_target_tables) + raise PgHaMigrations::InvalidMigrationError, + "Nested lock detected! Cannot acquire lock on #{target_tables.to_sql} " \ + "while #{nested_target_tables.to_sql} is locked." + end - successfully_acquired_lock = false + if nested_target_tables.mode < target_tables.mode + raise PgHaMigrations::InvalidMigrationError, + "Lock escalation detected! Cannot change lock level from :#{nested_target_tables.mode} " \ + "to :#{target_tables.mode} for #{target_tables.to_sql}." + end + end until successfully_acquired_lock - while ( + loop do blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds") - blocking_transactions.any? do |query| + + # Locking a partitioned table will also lock child tables (including sub-partitions), + # so we need to check for blocking queries on those tables as well + target_tables_with_partitions = target_tables.with_partitions + + break unless blocking_transactions.any? do |query| query.tables_with_locks.any? do |locked_table| - target_tables.any? do |target_table| + target_tables_with_partitions.any? do |target_table| target_table.conflicts_with?(locked_table) end end end - ) + say "Waiting on blocking transactions:" blocking_transactions.each do |blocking_transaction| say blocking_transaction.description @@ -570,16 +586,15 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) end connection.transaction do - adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout begin - method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do - connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;") + adjust_statement_timeout(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do + connection.execute("LOCK #{target_tables.to_sql} IN #{target_tables.mode.to_sql} MODE;") end successfully_acquired_lock = true rescue ActiveRecord::StatementInvalid => e - if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/ + if e.message =~ /PG::QueryCanceled.+ statement timeout/ sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS - say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table." + say "Timed out trying to acquire #{target_tables.mode.to_sql} lock on #{target_tables.to_sql}." say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing." sleep(sleep_seconds) @@ -590,12 +605,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) end if successfully_acquired_lock + safely_acquire_lock_for_table_history.push(target_tables) block.call end end end ensure - Thread.current[__method__] = nil unless nested_target_table + safely_acquire_lock_for_table_history.pop if successfully_acquired_lock end # since rails versions < 7.1 has bug which does not handle symbol for diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index 5d8cb96..12e4398 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -4064,14 +4064,15 @@ def up let(:alternate_connection_pool) do ActiveRecord::ConnectionAdapters::ConnectionPool.new(pool_config) end - let(:alternate_connection) do - alternate_connection_pool.connection - end + + let(:alternate_connection) { alternate_connection_pool.connection } + let(:alternate_connection_2) { alternate_connection_pool.connection } let(:migration) { Class.new(migration_klass).new } before(:each) do ActiveRecord::Base.connection.execute(<<~SQL) CREATE TABLE #{table_name}(pk SERIAL, i INTEGER); + CREATE TABLE #{table_name}_2(pk SERIAL, i INTEGER); CREATE SCHEMA partman; CREATE EXTENSION pg_partman SCHEMA partman; SQL @@ -4100,6 +4101,28 @@ def up end end + it "acquires exclusive locks by default when multiple tables provided" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + end + end + it "acquires a lock in a different mode when provided" do migration.safely_acquire_lock_for_table(table_name, mode: :share) do expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( @@ -4113,6 +4136,28 @@ def up end end + it "acquires locks in a different mode when multiple tables and mode provided" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2", mode: :share_row_exclusive) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "ShareRowExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "ShareRowExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + end + end + it "raises error when invalid lock mode provided" do expect do migration.safely_acquire_lock_for_table(table_name, mode: :garbage) {} @@ -4154,6 +4199,39 @@ def up end end + it "times out the lock query after LOCK_TIMEOUT_SECONDS when multiple tables provided" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + stub_const("PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER", 0) + allow(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).and_return([]) + allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original + + expect(ActiveRecord::Base.connection).to receive(:execute) + .with("LOCK \"public\".\"bogus_table\", \"public\".\"bogus_table_2\" IN ACCESS EXCLUSIVE MODE;") + .at_least(2) + .times + + begin + query_thread = Thread.new do + alternate_connection.execute("BEGIN; LOCK bogus_table_2;") + sleep 3 + alternate_connection.execute("ROLLBACK") + end + + sleep 0.5 + + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + aggregate_failures do + expect(locks_for_table(table_name, connection: alternate_connection_2)).not_to be_empty + expect(locks_for_table("bogus_table_2", connection: alternate_connection_2)).not_to be_empty + end + end + end + ensure + query_thread.join + end + end + it "does not wait to acquire a lock if the table has an existing but non-conflicting lock" do stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) @@ -4574,6 +4652,121 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + it "allows re-entrancy when multiple tables provided" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + # The ordering of the args is intentional here to ensure + # the array sorting and equality logic works as intended + migration.safely_acquire_lock_for_table("bogus_table_2", table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to be_empty + end + + it "allows re-entrancy when multiple tables provided and nested lock targets a subset of tables" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to be_empty + end + + it "does not allow re-entrancy when multiple tables provided and nested lock targets a superset of tables" do + expect do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") {} + end + end.to raise_error( + PgHaMigrations::InvalidMigrationError, + "Nested lock detected! Cannot acquire lock on \"public\".\"bogus_table\", \"public\".\"bogus_table_2\" while \"public\".\"bogus_table\" is locked." + ) + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to be_empty + end + it "allows re-entrancy when inner lock is a lower level" do migration.safely_acquire_lock_for_table(table_name) do migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) do @@ -4676,26 +4869,6 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end - - it "uses statement_timeout instead of lock_timeout when on Postgres 9.1" do - allow(ActiveRecord::Base.connection).to receive(:postgresql_version).and_wrap_original do |m, *args| - if caller.detect { |line| line =~ /lib\/pg_ha_migrations\/blocking_database_transactions\.rb/ } - # The long-running transactions check needs to know the actual - # Postgres version to use the proper columns, so we don't want - # to mock any calls from it. - m.call(*args) - else - 9_01_12 - end - end - - expect do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - end - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - end.not_to make_database_queries(matching: /lock_timeout/i) - end end end From 9441f7c10c89628550ee7b25ee6e49a180d49c05 Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Thu, 6 Jun 2024 20:15:32 +0000 Subject: [PATCH 2/3] Fix potential deadlock in nested locking --- lib/pg_ha_migrations/safe_statements.rb | 6 ++++ spec/safe_statements_spec.rb | 39 +++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/lib/pg_ha_migrations/safe_statements.rb b/lib/pg_ha_migrations/safe_statements.rb index aac8792..e9f18bb 100644 --- a/lib/pg_ha_migrations/safe_statements.rb +++ b/lib/pg_ha_migrations/safe_statements.rb @@ -564,6 +564,12 @@ def safely_acquire_lock_for_table(*tables, mode: :access_exclusive, &block) until successfully_acquired_lock loop do + # If in a nested context and all of the above checks have passed, + # we have already acquired the lock so this check is unnecessary. + # In fact, it could actually cause a deadlock if a blocking query + # was executed shortly after the initial lock acquisition. + break if nested_target_tables + blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds") # Locking a partitioned table will also lock child tables (including sub-partitions), diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index 12e4398..d93a694 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -4817,6 +4817,45 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + it "skips blocking query check for nested lock acquisition" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + + query_thread = nil + + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .once + .and_call_original + + begin + migration.safely_acquire_lock_for_table(table_name) do + query_thread = Thread.new { alternate_connection.execute("SELECT * FROM bogus_table") } + + sleep 2 + + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection_2)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + having_attributes( + table: "bogus_table", + lock_type: "AccessShareLock", + granted: false, + pid: kind_of(Integer), + ), + ) + end + end + ensure + query_thread.join if query_thread + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end + it "does not allow re-entrancy when lock escalation detected" do expect do migration.safely_acquire_lock_for_table(table_name, mode: :share) do From 0660b33be963a1b7e31cac8848b2c162a1fe725f Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Thu, 6 Jun 2024 20:17:19 +0000 Subject: [PATCH 3/3] Test to ensure lock is released after failed query --- spec/safe_statements_spec.rb | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index d93a694..ecfecc8 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -4167,7 +4167,7 @@ def up ) end - it "releases the lock (even after an exception)" do + it "releases the lock even after an exception" do begin migration.safely_acquire_lock_for_table(table_name) do raise "bogus error" @@ -4178,6 +4178,32 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + it "releases the lock even after a swallowed postgres exception" do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + begin + migration.connection.execute("SELECT * FROM garbage") + rescue + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + + expect do + migration.connection.execute("SELECT * FROM bogus_table") + end.to raise_error(ActiveRecord::StatementInvalid, /PG::InFailedSqlTransaction/) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end + it "waits to acquire a lock if the table is already blocked" do block_call_count = 0 expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(3).times do |*args|