Skip to content

Commit

Permalink
only block on conflicting lock modes
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrage committed Sep 19, 2023
1 parent e639e54 commit cd6caf9
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 121 deletions.
4 changes: 2 additions & 2 deletions lib/pg_ha_migrations/blocking_database_transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class BlockingDatabaseTransactions
def initialize(*args)
super

self.tables_with_locks = tables_with_locks.map { |name, schema| Table.new(name, schema) }.select(&:present?)
self.tables_with_locks = tables_with_locks.map { |args| TableWithLock.new(*args) }.select(&:present?)
end

def description
Expand Down Expand Up @@ -48,7 +48,7 @@ def self.find_blocking_transactions(minimum_transaction_age = "0 seconds")
psa.#{query_column} as current_query,
psa.state,
clock_timestamp() - psa.xact_start AS transaction_age,
array_agg(distinct array[c.relname, ns.nspname]) AS tables_with_locks
array_agg(distinct array[c.relname, ns.nspname, l.mode]) AS tables_with_locks
FROM pg_stat_activity psa -- Cluster wide
LEFT JOIN pg_locks l ON (psa.#{pid_column} = l.pid) -- Cluster wide
LEFT JOIN pg_class c ON ( -- Database wide
Expand Down
31 changes: 30 additions & 1 deletion lib/pg_ha_migrations/lock_mode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ class LockMode
delegate :inspect, :to_s, to: :mode

def initialize(mode)
@mode = mode.to_sym
@mode = mode
.to_s
.underscore
.delete_suffix("_lock")
.to_sym

raise ArgumentError, "Unrecognized lock mode #{@mode.inspect}. Valid modes: #{MODES}" unless MODES.include?(@mode)
end
Expand All @@ -33,5 +37,30 @@ def to_sql
def <=>(other)
MODES.index(mode) <=> MODES.index(other.mode)
end

def conflicts_with?(other)
conflicting_modes.include?(other.mode)
end

def conflicting_modes
case mode
when :access_share
%i[access_exclusive]
when :row_share
%i[exclusive access_exclusive]
when :row_exclusive
%i[share share_row_exclusive exclusive access_exclusive]
when :share_update_exclusive
%i[share_update_exclusive share share_row_exclusive exclusive access_exclusive]
when :share
%i[row_exclusive share_update_exclusive share_row_exclusive exclusive access_exclusive]
when :share_row_exclusive
%i[row_exclusive share_update_exclusive share share_row_exclusive exclusive access_exclusive]
when :exclusive
%i[row_share row_exclusive share_update_exclusive share share_row_exclusive exclusive access_exclusive]
when :access_exclusive
MODES
end
end
end
end
40 changes: 40 additions & 0 deletions lib/pg_ha_migrations/relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,46 @@ def partitions(include_sub_partitions: false)
end
end

class TableWithLock < Table
include Comparable

def self.from_table_name(table, mode)
super(table).tap do |table_with_lock|
table_with_lock.mode = mode
end
end

attr_reader :mode

def initialize(name, schema, mode=nil)
super(name, schema)

self.mode = mode
end

def mode=(mode)
return unless mode.present?

if mode.is_a?(LockMode)
@mode = mode
else
@mode = LockMode.new(mode)
end
end

def conflicts_with?(other)
mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode)
end

def partitions(include_sub_partitions: false)
super.each { |table_with_lock| table_with_lock.mode = mode }
end

def <=>(other)
fully_qualified_name <=> other.fully_qualified_name
end
end

class Index < Relation
# TODO: implement shortening to ensure < 63 bytes
def self.from_table_and_columns(table, columns)
Expand Down
32 changes: 14 additions & 18 deletions lib/pg_ha_migrations/safe_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -520,31 +520,27 @@ def exec_migration(conn, direction)
end

def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
nested_lock = Thread.current[__method__]
nested_table_with_lock = Thread.current[__method__]

_check_postgres_adapter!

table = PgHaMigrations::Table.from_table_name(table)
lock_mode = PgHaMigrations::LockMode.new(mode)
table_with_lock = PgHaMigrations::TableWithLock.from_table_name(table, mode)

if nested_lock
if nested_lock[:table] != table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{table.fully_qualified_name} while #{nested_lock[:table].fully_qualified_name} is locked."
elsif nested_lock[:mode] < lock_mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from #{nested_lock[:mode].inspect} to #{lock_mode.inspect} for #{table.fully_qualified_name}."
if nested_table_with_lock
if nested_table_with_lock != table_with_lock
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{table_with_lock.fully_qualified_name} while #{nested_table_with_lock.fully_qualified_name} is locked."
elsif nested_table_with_lock.mode < table_with_lock.mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from #{nested_table_with_lock.mode.inspect} to #{table_with_lock.mode.inspect} for #{table_with_lock.fully_qualified_name}."
end
else
Thread.current[__method__] = {
table: table,
mode: lock_mode,
}
Thread.current[__method__] = table_with_lock
end

target_tables = [table]
target_tables = [table_with_lock]

# Locking a partitioned table will also lock child tables (including sub-partitions),
# so we need to check for blocking queries on those tables as well
target_tables.concat(table.partitions(include_sub_partitions: true))
target_tables.concat(table_with_lock.partitions(include_sub_partitions: true))

successfully_acquired_lock = false

Expand All @@ -553,7 +549,7 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds")
blocking_transactions.any? do |query|
query.tables_with_locks.any? do |table|
target_tables.include?(table)
target_tables.include?(table) && table_with_lock.conflicts_with?(table)
end
end
)
Expand All @@ -568,13 +564,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout
begin
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
connection.execute("LOCK #{table.fully_qualified_name} IN #{lock_mode.to_sql} MODE;")
connection.execute("LOCK #{table_with_lock.fully_qualified_name} IN #{table_with_lock.mode.to_sql} MODE;")
end
successfully_acquired_lock = true
rescue ActiveRecord::StatementInvalid => e
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS
say "Timed out trying to acquire #{lock_mode.to_sql} lock on the #{table.fully_qualified_name} table."
say "Timed out trying to acquire #{table_with_lock.mode.to_sql} lock on the #{table_with_lock.fully_qualified_name} table."
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing."
sleep(sleep_seconds)

Expand All @@ -590,7 +586,7 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
end
end
ensure
Thread.current[__method__] = nil unless nested_lock
Thread.current[__method__] = nil unless nested_table_with_lock
end

def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block)
Expand Down
Loading

0 comments on commit cd6caf9

Please sign in to comment.