Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for different lock modes #88

Merged
merged 17 commits into from
Nov 20, 2023
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,22 @@ safe_partman_reapply_privileges :table

#### safely\_acquire\_lock\_for\_table

Safely acquire a lock for a table.
Safely acquire an access exclusive lock for a table.

```ruby
safely_acquire_lock_for_table(:table) do
...
end
```

Safely acquire a lock for a table in a different mode.

```ruby
safely_acquire_lock_for_table(:table, mode: :share) do
...
end
```

Note:

We enforce that only one table (or a table and its partitions) can be locked at a time.
Expand Down
1 change: 1 addition & 0 deletions lib/pg_ha_migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def self.configure
require "pg_ha_migrations/blocking_database_transactions"
require "pg_ha_migrations/blocking_database_transactions_reporter"
require "pg_ha_migrations/partman_config"
require "pg_ha_migrations/lock_mode"
require "pg_ha_migrations/unsafe_statements"
require "pg_ha_migrations/safe_statements"
require "pg_ha_migrations/dependent_objects_checks"
Expand Down
4 changes: 2 additions & 2 deletions lib/pg_ha_migrations/blocking_database_transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class BlockingDatabaseTransactions
def initialize(*args)
super

self.tables_with_locks = tables_with_locks.map { |name, schema| Table.new(name, schema) }.select(&:present?)
self.tables_with_locks = tables_with_locks.map { |args| Table.new(*args) }.select(&:present?)
end

def description
Expand Down Expand Up @@ -48,7 +48,7 @@ def self.find_blocking_transactions(minimum_transaction_age = "0 seconds")
psa.#{query_column} as current_query,
psa.state,
clock_timestamp() - psa.xact_start AS transaction_age,
array_agg(distinct array[c.relname, ns.nspname]) AS tables_with_locks
array_agg(distinct array[c.relname, ns.nspname, l.mode]) AS tables_with_locks
FROM pg_stat_activity psa -- Cluster wide
LEFT JOIN pg_locks l ON (psa.#{pid_column} = l.pid) -- Cluster wide
LEFT JOIN pg_class c ON ( -- Database wide
Expand Down
100 changes: 100 additions & 0 deletions lib/pg_ha_migrations/lock_mode.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
module PgHaMigrations
class LockMode
include Comparable

MODE_CONFLICTS = ActiveSupport::OrderedHash.new

MODE_CONFLICTS[:access_share] = %i[
access_exclusive
]

MODE_CONFLICTS[:row_share] = %i[
exclusive
access_exclusive
]

MODE_CONFLICTS[:row_exclusive] = %i[
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:share_update_exclusive] = %i[
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:share] = %i[
row_exclusive
share_update_exclusive
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:share_row_exclusive] = %i[
row_exclusive
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:exclusive] = %i[
row_share
row_exclusive
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:access_exclusive] = %i[
access_share
row_share
row_exclusive
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


attr_reader :mode

delegate :to_s, to: :mode

def initialize(mode)
@mode = mode
.to_s
.underscore
.delete_suffix("_lock")
.to_sym

if !MODE_CONFLICTS.keys.include?(@mode)
raise ArgumentError, "Unrecognized lock mode #{@mode.inspect}. Valid modes: #{MODE_CONFLICTS.keys}"
end
end

def to_sql
mode
.to_s
.upcase
.gsub("_", " ")
end

def <=>(other)
MODE_CONFLICTS.keys.index(mode) <=> MODE_CONFLICTS.keys.index(other.mode)
end

def conflicts_with?(other)
MODE_CONFLICTS[mode].include?(other.mode)
end
end
end
28 changes: 23 additions & 5 deletions lib/pg_ha_migrations/relation.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
module PgHaMigrations
Relation = Struct.new(:name, :schema) do
Relation = Struct.new(:name, :schema, :mode) do
def self.connection
ActiveRecord::Base.connection
end

delegate :inspect, to: :name
delegate :connection, to: :class

def initialize(name, schema, mode=nil)
super(name, schema)

self.mode = LockMode.new(mode) if mode.present?
end

def conflicts_with?(other)
self == other && (
mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode)
)
end

def fully_qualified_name
@fully_qualified_name ||= [
PG::Connection.quote_ident(schema),
Expand All @@ -17,10 +29,14 @@ def fully_qualified_name
def present?
name.present? && schema.present?
end

def ==(other)
other.is_a?(Relation) && name == other.name && schema == other.schema
end
end

class Table < Relation
def self.from_table_name(table)
def self.from_table_name(table, mode=nil)
pg_name = ActiveRecord::ConnectionAdapters::PostgreSQL::Utils.extract_schema_qualified_name(table.to_s)

schema_conditional = if pg_name.schema
Expand All @@ -39,7 +55,7 @@ def self.from_table_name(table)

raise UndefinedTableError, "Table #{pg_name.quoted} does not exist#{" in search path" unless pg_name.schema}" unless schema.present?

new(pg_name.identifier, schema)
new(pg_name.identifier, schema, mode)
end

def natively_partitioned?
Expand All @@ -53,9 +69,9 @@ def natively_partitioned?
SQL
end

def partitions(include_sub_partitions: false)
def partitions(include_sub_partitions: false, include_self: false)
tables = connection.structs_from_sql(self.class, <<~SQL)
SELECT child.relname AS name, child_ns.nspname AS schema
SELECT child.relname AS name, child_ns.nspname AS schema, NULLIF('#{mode}', '') AS mode
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
Expand All @@ -73,6 +89,8 @@ def partitions(include_sub_partitions: false)
tables.concat(sub_partitions)
end

tables.prepend(self) if include_self

tables
end
end
Expand Down
33 changes: 18 additions & 15 deletions lib/pg_ha_migrations/safe_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -519,34 +519,37 @@ def exec_migration(conn, direction)
super(conn, direction)
end

def safely_acquire_lock_for_table(table, &block)
nested_lock = Thread.current[__method__]
def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
rkrage marked this conversation as resolved.
Show resolved Hide resolved
nested_target_table = Thread.current[__method__]

_check_postgres_adapter!

table = PgHaMigrations::Table.from_table_name(table)
target_table = PgHaMigrations::Table.from_table_name(table, mode)

# Disallow nested locks unless targeting the same table
if nested_lock && nested_lock != table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{table.fully_qualified_name} while #{nested_lock.fully_qualified_name} is locked."
if nested_target_table
if nested_target_table != target_table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked."
elsif nested_target_table.mode < target_table.mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}."
end
else
Thread.current[__method__] = table
Thread.current[__method__] = target_table
end

target_tables = [table]

# Locking a partitioned table will also lock child tables (including sub-partitions),
# so we need to check for blocking queries on those tables as well
target_tables.concat(table.partitions(include_sub_partitions: true))
target_tables = target_table.partitions(include_sub_partitions: true, include_self: true)

successfully_acquired_lock = false

until successfully_acquired_lock
while (
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds")
blocking_transactions.any? do |query|
query.tables_with_locks.any? do |table|
target_tables.include?(table)
query.tables_with_locks.any? do |locked_table|
target_tables.any? do |target_table|
target_table.conflicts_with?(locked_table)
end
end
end
)
Expand All @@ -561,13 +564,13 @@ def safely_acquire_lock_for_table(table, &block)
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout
begin
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
connection.execute("LOCK #{table.fully_qualified_name};")
connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;")
end
successfully_acquired_lock = true
rescue ActiveRecord::StatementInvalid => e
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS
say "Timed out trying to acquire an exclusive lock on the #{table.fully_qualified_name} table."
say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table."
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing."
sleep(sleep_seconds)

Expand All @@ -583,7 +586,7 @@ def safely_acquire_lock_for_table(table, &block)
end
end
ensure
Thread.current[__method__] = nil unless nested_lock
Thread.current[__method__] = nil unless nested_target_table
end

def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block)
Expand Down
Loading