Skip to content

Commit

Permalink
use explicit lock modes
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrage committed Nov 17, 2023
1 parent ccfe1d2 commit 53ab242
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
17 changes: 8 additions & 9 deletions lib/pg_ha_migrations/safe_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def safe_add_column(table, column, type, options = {})
end

def unsafe_add_column(table, column, type, options = {})
safely_acquire_lock_for_table(table) do
safely_acquire_lock_for_table(table, mode: :access_exclusive) do
super(table, column, type, **options)
end
end
Expand Down Expand Up @@ -137,19 +137,19 @@ def safe_change_column_default(table_name, column_name, default_value)
ERROR
end

safely_acquire_lock_for_table(table_name) do
safely_acquire_lock_for_table(table_name, mode: :access_exclusive) do
unsafe_change_column_default(table_name, column_name, default_value)
end
end

def safe_make_column_nullable(table, column)
safely_acquire_lock_for_table(table) do
safely_acquire_lock_for_table(table, mode: :access_exclusive) do
unsafe_execute "ALTER TABLE #{table} ALTER COLUMN #{column} DROP NOT NULL"
end
end

def unsafe_make_column_not_nullable(table, column, options={}) # options arg is only present for backwards compatiblity
safely_acquire_lock_for_table(table) do
safely_acquire_lock_for_table(table, mode: :access_exclusive) do
unsafe_execute "ALTER TABLE #{table} ALTER COLUMN #{column} SET NOT NULL"
end
end
Expand Down Expand Up @@ -204,8 +204,7 @@ def safe_add_concurrent_partitioned_index(
PgHaMigrations::Index.from_table_and_columns(child_table, columns)
end

# TODO: take out ShareLock after issue #39 is implemented
safely_acquire_lock_for_table(parent_table.fully_qualified_name) do
safely_acquire_lock_for_table(parent_table.fully_qualified_name, mode: :share) do
# CREATE INDEX ON ONLY parent_table
unsafe_add_index(
parent_table.fully_qualified_name,
Expand Down Expand Up @@ -235,7 +234,7 @@ def safe_add_concurrent_partitioned_index(

# Avoid taking out an unnecessary lock if there are no child tables to attach
if child_indexes.present?
safely_acquire_lock_for_table(parent_table.fully_qualified_name) do
safely_acquire_lock_for_table(parent_table.fully_qualified_name, mode: :access_exclusive) do
child_indexes.each do |child_index|
say_with_time "Attaching index #{child_index.inspect} to #{parent_index.inspect}" do
connection.execute(<<~SQL)
Expand Down Expand Up @@ -279,7 +278,7 @@ def safe_rename_constraint(table, from:, to:)
quoted_constraint_to_name = connection.quote_table_name(to)
sql = "ALTER TABLE #{quoted_table_name} RENAME CONSTRAINT #{quoted_constraint_from_name} TO #{quoted_constraint_to_name}"

safely_acquire_lock_for_table(table) do
safely_acquire_lock_for_table(table, mode: :access_exclusive) do
say_with_time "rename_constraint(#{table.inspect}, from: #{from.inspect}, to: #{to.inspect})" do
connection.execute(sql)
end
Expand All @@ -293,7 +292,7 @@ def unsafe_remove_constraint(table, name:)
quoted_constraint_name = connection.quote_table_name(name)
sql = "ALTER TABLE #{quoted_table_name} DROP CONSTRAINT #{quoted_constraint_name}"

safely_acquire_lock_for_table(table) do
safely_acquire_lock_for_table(table, mode: :access_exclusive) do
say_with_time "remove_constraint(#{table.inspect}, name: #{name.inspect})" do
connection.execute(sql)
end
Expand Down
36 changes: 29 additions & 7 deletions spec/safe_statements_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ def up
it "calls safely_acquire_lock_for_table" do
migration = Class.new(migration_klass).new

expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos)
expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos, mode: :access_exclusive)
migration.unsafe_add_column(:foos, :bar, :text)
end
end
Expand Down Expand Up @@ -796,7 +796,7 @@ def up

migration = Class.new(migration_klass).new

expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos)
expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos, mode: :access_exclusive)
migration.safe_change_column_default(:foos, :bar, 5)
end

Expand Down Expand Up @@ -1081,7 +1081,7 @@ def up
it "calls safely_acquire_lock_for_table" do
migration = Class.new(migration_klass).new

expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos)
expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos, mode: :access_exclusive)
migration.safe_make_column_nullable(:foos, :bar)
end
end
Expand Down Expand Up @@ -1251,7 +1251,7 @@ def up
it "calls safely_acquire_lock_for_table" do
migration = Class.new(migration_klass).new

expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos)
expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos, mode: :access_exclusive)
migration.unsafe_make_column_not_nullable(:foos, :bar, :estimated_rows => 0)
end
end
Expand Down Expand Up @@ -1302,8 +1302,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_updated_at" ON ONLY/).once
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/CREATE INDEX CONCURRENTLY/)
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/)
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/)
end

Expand Down Expand Up @@ -1332,8 +1334,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "foos3_idx" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1371,8 +1375,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1406,8 +1412,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE UNIQUE INDEX "index_foos3_on_created_at_and_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE UNIQUE INDEX CONCURRENTLY/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1442,8 +1450,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1478,8 +1488,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_lower_text_column" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1521,8 +1533,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/)
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/CREATE INDEX IF NOT EXISTS "index_foos3_on_updated_at" ON ONLY/)
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/CREATE INDEX CONCURRENTLY IF NOT EXISTS/)
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/)
expect(ActiveRecord::Base.connection).to_not receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/)
end

Expand Down Expand Up @@ -1555,8 +1569,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX IF NOT EXISTS "index_foos3_on_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY IF NOT EXISTS/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1588,8 +1604,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3'" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "foos3'_bar""_idx" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY "index_foos3'_child_on_updated_at" ON/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3'" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).once.ordered
end

Expand Down Expand Up @@ -1634,8 +1652,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "partman"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY/).exactly(10).times.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "partman"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).exactly(10).times.ordered
end

Expand Down Expand Up @@ -1680,8 +1700,10 @@ def up
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original

aggregate_failures do
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN SHARE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX "index_foos3_on_updated_at" ON ONLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/CREATE INDEX CONCURRENTLY/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/LOCK "public"."foos3" IN ACCESS EXCLUSIVE MODE/).once.ordered
expect(ActiveRecord::Base.connection).to receive(:execute).with(/ALTER INDEX .+\nATTACH PARTITION/).once.ordered
end

Expand Down Expand Up @@ -2194,7 +2216,7 @@ def up
it "calls safely_acquire_lock_for_table" do
migration = Class.new(migration_klass).new

expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos)
expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos, mode: :access_exclusive)
migration.safe_rename_constraint(:foos, :from => :constraint_foo_bar_is_not_null, :to => :other_constraint)
end
end
Expand All @@ -2214,7 +2236,7 @@ def up
it "calls safely_acquire_lock_for_table" do
migration = Class.new(migration_klass).new

expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos)
expect(migration).to receive(:safely_acquire_lock_for_table).with(:foos, mode: :access_exclusive)
migration.unsafe_remove_constraint(:foos, :name => :constraint_foo_bar_is_not_null)
end

Expand Down

0 comments on commit 53ab242

Please sign in to comment.