-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Safely acquire lock on multiple tables #100
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ def initialize(name, schema, mode=nil) | |
end | ||
|
||
def conflicts_with?(other) | ||
self == other && ( | ||
self.eql?(other) && ( | ||
mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode) | ||
) | ||
end | ||
|
@@ -30,8 +30,12 @@ def present? | |
name.present? && schema.present? | ||
end | ||
|
||
def ==(other) | ||
other.is_a?(Relation) && name == other.name && schema == other.schema | ||
def eql?(other) | ||
other.is_a?(Relation) && hash == other.hash | ||
end | ||
|
||
def hash | ||
[name, schema].hash | ||
end | ||
end | ||
|
||
|
@@ -152,4 +156,26 @@ def valid? | |
SQL | ||
end | ||
end | ||
|
||
class TableCollection < Set | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inheriting from Ruby collection classes is usually considered pretty dangerous now (e.g., lots of discussions about some old ActiveSupport classes) because it's easy to end up with an inconsistent API or internally inconsistent data. For example, I could construct an instance of this class and then add another table to the set and break the assumptions of the I think it would probably preferable to just maintain and internal ivar of the set instance and present a very limited external API. |
||
def self.from_table_names(tables, mode=nil) | ||
new(tables) { |table| Table.from_table_name(table, mode) } | ||
end | ||
|
||
def mode | ||
first&.mode | ||
end | ||
|
||
def to_sql | ||
map(&:fully_qualified_name).join(", ") | ||
end | ||
|
||
def with_partitions | ||
tables = flat_map do |table| | ||
table.partitions(include_sub_partitions: true, include_self: true) | ||
end | ||
|
||
self.class.new(tables) | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,14 @@ def safe_added_columns_without_default_value | |
@safe_added_columns_without_default_value ||= [] | ||
end | ||
|
||
# This variable is used to track nested lock acquisition. | ||
# Each element is a PgHaMigrations::TableCollection object. | ||
# The order of the array represents the current call stack, | ||
# where the most recent method call is the last element. | ||
def safely_acquire_lock_for_table_history | ||
@safely_acquire_lock_for_table_history ||= [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we explicitly want to allow nested lock acquisition? If I'm reading the source directly, we didn't explicitly support nested lock acquisition before per se, though we allowed it (as long as it wasn't for the same table). The nested checks were introduced as a result of #39 to ensure that we didn't upgrade locks, but that was focused on preventing one kind of bug rather than explicitly allowing nested locks. AFAICT it was only incidentally allowed before that. I can't remember if we need that support for e.g. partitions. But without good justification I'm feeling a bit squeamish about supporting it: it's very easy to get into a bad situation here for multiple reasons:
I'm wondering if we should focus instead on safer APIs (like the multiple table locking one here) that target specific use cases directly. Note: I'm not sure that the concern about deadlocks (2) is actually prevented by this approach either, because I assume Postgres acquires locks on the relations in order of their presentation here...so maybe that's unavoidable. |
||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems way cleaner than using a thread variable. Not sure why I did that in the first place... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably the argument is that you don't have to worry about how the method is invoked. But maybe that means we should actually query Postgres for what locks we already currently hold instead of trying to track them ourselves? |
||
|
||
def safe_create_table(table, options={}, &block) | ||
if options[:force] | ||
raise PgHaMigrations::UnsafeMigrationError.new(":force is NOT SAFE! Explicitly call unsafe_drop_table first if you want to recreate an existing table") | ||
|
@@ -528,40 +536,54 @@ def exec_migration(conn, direction) | |
super(conn, direction) | ||
end | ||
|
||
def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) | ||
nested_target_table = Thread.current[__method__] | ||
def safely_acquire_lock_for_table(*tables, mode: :access_exclusive, &block) | ||
# So this variable is always available in the ensure block | ||
successfully_acquired_lock = false | ||
|
||
_check_postgres_adapter! | ||
|
||
target_table = PgHaMigrations::Table.from_table_name(table, mode) | ||
target_tables = PgHaMigrations::TableCollection.from_table_names(tables, mode) | ||
|
||
if nested_target_table | ||
if nested_target_table != target_table | ||
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked." | ||
elsif nested_target_table.mode < target_table.mode | ||
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}." | ||
end | ||
else | ||
Thread.current[__method__] = target_table | ||
end | ||
# Grab the latest locked tables from the call stack. | ||
# This will be nil if we are not in a nested context. | ||
nested_target_tables = safely_acquire_lock_for_table_history.last | ||
|
||
# Locking a partitioned table will also lock child tables (including sub-partitions), | ||
# so we need to check for blocking queries on those tables as well | ||
target_tables = target_table.partitions(include_sub_partitions: true, include_self: true) | ||
if nested_target_tables | ||
if !target_tables.subset?(nested_target_tables) | ||
raise PgHaMigrations::InvalidMigrationError, | ||
"Nested lock detected! Cannot acquire lock on #{target_tables.to_sql} " \ | ||
"while #{nested_target_tables.to_sql} is locked." | ||
end | ||
|
||
successfully_acquired_lock = false | ||
if nested_target_tables.mode < target_tables.mode | ||
raise PgHaMigrations::InvalidMigrationError, | ||
"Lock escalation detected! Cannot change lock level from :#{nested_target_tables.mode} " \ | ||
"to :#{target_tables.mode} for #{target_tables.to_sql}." | ||
end | ||
end | ||
|
||
until successfully_acquired_lock | ||
while ( | ||
loop do | ||
# If in a nested context and all of the above checks have passed, | ||
# we have already acquired the lock so this check is unnecessary. | ||
# In fact, it could actually cause a deadlock if a blocking query | ||
# was executed shortly after the initial lock acquisition. | ||
break if nested_target_tables | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the bug I was referring to in the PR description There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to run the loop at all in this case? I.e., why break out of the loop when we could make the whole loop conditional? |
||
|
||
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds") | ||
blocking_transactions.any? do |query| | ||
|
||
# Locking a partitioned table will also lock child tables (including sub-partitions), | ||
# so we need to check for blocking queries on those tables as well | ||
target_tables_with_partitions = target_tables.with_partitions | ||
|
||
break unless blocking_transactions.any? do |query| | ||
query.tables_with_locks.any? do |locked_table| | ||
target_tables.any? do |target_table| | ||
target_tables_with_partitions.any? do |target_table| | ||
target_table.conflicts_with?(locked_table) | ||
end | ||
end | ||
end | ||
) | ||
|
||
say "Waiting on blocking transactions:" | ||
blocking_transactions.each do |blocking_transaction| | ||
say blocking_transaction.description | ||
|
@@ -570,16 +592,15 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) | |
end | ||
|
||
connection.transaction do | ||
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout | ||
begin | ||
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do | ||
connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;") | ||
adjust_statement_timeout(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lock timeout applies to each individual relation in the query, while statement timeout applies to the entire query. So, if we were to use lock timeout here, the upper limit for the query timeout would be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add a comment to the code explaining why we don't use the (seemingly) obvious |
||
connection.execute("LOCK #{target_tables.to_sql} IN #{target_tables.mode.to_sql} MODE;") | ||
end | ||
successfully_acquired_lock = true | ||
rescue ActiveRecord::StatementInvalid => e | ||
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/ | ||
if e.message =~ /PG::QueryCanceled.+ statement timeout/ | ||
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS | ||
say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table." | ||
say "Timed out trying to acquire #{target_tables.mode.to_sql} lock on #{target_tables.to_sql}." | ||
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing." | ||
sleep(sleep_seconds) | ||
|
||
|
@@ -590,12 +611,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) | |
end | ||
|
||
if successfully_acquired_lock | ||
safely_acquire_lock_for_table_history.push(target_tables) | ||
block.call | ||
end | ||
end | ||
end | ||
ensure | ||
Thread.current[__method__] = nil unless nested_target_table | ||
safely_acquire_lock_for_table_history.pop if successfully_acquired_lock | ||
end | ||
|
||
# since rails versions < 7.1 has bug which does not handle symbol for | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this isn't new per se, but I'm wondering again why we don't include
mode
in equality/hashing (I understand why we include it in a special way inconflicts_with?
).