Skip to content

Commit

Permalink
code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrage committed Sep 25, 2023
1 parent d7a996c commit b9cd300
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 57 deletions.
2 changes: 1 addition & 1 deletion lib/pg_ha_migrations/blocking_database_transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class BlockingDatabaseTransactions
def initialize(*args)
super

self.tables_with_locks = tables_with_locks.map { |args| TableWithLock.new(*args) }.select(&:present?)
self.tables_with_locks = tables_with_locks.map { |args| Table.new(*args) }.select(&:present?)
end

def description
Expand Down
3 changes: 2 additions & 1 deletion lib/pg_ha_migrations/lock_mode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ def initialize(mode)
end

def to_sql
to_s
mode
.to_s
.upcase
.gsub("_", " ")
end
Expand Down
72 changes: 33 additions & 39 deletions lib/pg_ha_migrations/relation.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
module PgHaMigrations
Relation = Struct.new(:name, :schema) do
Relation = Struct.new(:name, :schema, :mode) do
def self.connection
ActiveRecord::Base.connection
end

delegate :inspect, to: :name
delegate :connection, to: :class

def initialize(name, schema, mode=nil)
super(name, schema)

self.mode = mode
end

def mode=(mode)
if mode.present?
self[:mode] = LockMode.new(mode)
else
self[:mode] = mode
end
end

def conflicts_with?(other)
self == other && (
mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode)
)
end

def fully_qualified_name
@fully_qualified_name ||= [
PG::Connection.quote_ident(schema),
Expand All @@ -17,6 +37,10 @@ def fully_qualified_name
def present?
name.present? && schema.present?
end

def ==(other)
other.is_a?(Relation) && name == other.name && schema == other.schema
end
end

class Table < Relation
Expand All @@ -42,6 +66,10 @@ def self.from_table_name(table)
new(pg_name.identifier, schema)
end

def self.from_table_name_with_lock(table, mode)
from_table_name(table).tap { |table| table.mode = mode }
end

def natively_partitioned?
!!connection.select_value(<<~SQL)
SELECT true
Expand All @@ -53,9 +81,9 @@ def natively_partitioned?
SQL
end

def partitions(include_sub_partitions: false)
def partitions(include_sub_partitions: false, include_self: false)
tables = connection.structs_from_sql(self.class, <<~SQL)
SELECT child.relname AS name, child_ns.nspname AS schema
SELECT child.relname AS name, child_ns.nspname AS schema, NULLIF('#{mode}', '') AS mode
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
Expand All @@ -73,43 +101,9 @@ def partitions(include_sub_partitions: false)
tables.concat(sub_partitions)
end

tables
end
end

class TableWithLock < Table
def self.from_table_name(table, mode)
super(table).tap do |table_with_lock|
table_with_lock.mode = mode
end
end

attr_reader :mode

def initialize(name, schema, mode=nil)
super(name, schema)

self.mode = mode
end
tables.prepend(self) if include_self

def mode=(mode)
if mode.present?
@mode = LockMode.new(mode)
else
@mode = mode
end
end

def conflicts_with?(other)
mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode)
end

def partitions(include_sub_partitions: false)
super.each { |table_with_lock| table_with_lock.mode = mode }
end

def ==(other)
other.is_a?(Table) && fully_qualified_name == other.fully_qualified_name
tables
end
end

Expand Down
32 changes: 16 additions & 16 deletions lib/pg_ha_migrations/safe_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -520,36 +520,36 @@ def exec_migration(conn, direction)
end

def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
nested_table_with_lock = Thread.current[__method__]
nested_targeted_table = Thread.current[__method__]

_check_postgres_adapter!

table_with_lock = PgHaMigrations::TableWithLock.from_table_name(table, mode)
targeted_table = PgHaMigrations::Table.from_table_name_with_lock(table, mode)

if nested_table_with_lock
if nested_table_with_lock != table_with_lock
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{table_with_lock.fully_qualified_name} while #{nested_table_with_lock.fully_qualified_name} is locked."
elsif nested_table_with_lock.mode < table_with_lock.mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from #{nested_table_with_lock.mode.inspect} to #{table_with_lock.mode.inspect} for #{table_with_lock.fully_qualified_name}."
if nested_targeted_table
if nested_targeted_table != targeted_table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{targeted_table.fully_qualified_name} while #{nested_targeted_table.fully_qualified_name} is locked."
elsif nested_targeted_table.mode < targeted_table.mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from #{nested_targeted_table.mode.inspect} to #{targeted_table.mode.inspect} for #{targeted_table.fully_qualified_name}."
end
else
Thread.current[__method__] = table_with_lock
Thread.current[__method__] = targeted_table
end

# Locking a partitioned table will also lock child tables (including sub-partitions),
# so we need to check for blocking queries on those tables as well
target_tables = table_with_lock
.partitions(include_sub_partitions: true)
.prepend(table_with_lock)
targeted_tables = targeted_table.partitions(include_sub_partitions: true, include_self: true)

successfully_acquired_lock = false

until successfully_acquired_lock
while (
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds")
blocking_transactions.any? do |query|
query.tables_with_locks.any? do |table|
target_tables.include?(table) && table_with_lock.conflicts_with?(table)
query.tables_with_locks.any? do |locked_table|
targeted_tables.any? do |target_table|
target_table.conflicts_with?(locked_table)
end
end
end
)
Expand All @@ -564,13 +564,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout
begin
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
connection.execute("LOCK #{table_with_lock.fully_qualified_name} IN #{table_with_lock.mode.to_sql} MODE;")
connection.execute("LOCK #{targeted_table.fully_qualified_name} IN #{targeted_table.mode.to_sql} MODE;")
end
successfully_acquired_lock = true
rescue ActiveRecord::StatementInvalid => e
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS
say "Timed out trying to acquire #{table_with_lock.mode.to_sql} lock on the #{table_with_lock.fully_qualified_name} table."
say "Timed out trying to acquire #{targeted_table.mode.to_sql} lock on the #{targeted_table.fully_qualified_name} table."
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing."
sleep(sleep_seconds)

Expand All @@ -586,7 +586,7 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
end
end
ensure
Thread.current[__method__] = nil unless nested_table_with_lock
Thread.current[__method__] = nil unless nested_targeted_table
end

def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block)
Expand Down

0 comments on commit b9cd300

Please sign in to comment.