From 6fffef8aa2fd23529a339279205b700f9771c18f Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Fri, 17 Nov 2023 17:42:07 +0000 Subject: [PATCH 1/4] Method to safely add index on empty table --- README.md | 8 ++ lib/pg_ha_migrations/safe_statements.rb | 24 ++++++ spec/safe_statements_spec.rb | 104 ++++++++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/README.md b/README.md index 7a6a5d0..c8ecf3c 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,14 @@ Unsafely make a column not nullable. unsafe_make_column_not_nullable :table, :column ``` +#### safe\_add\_index\_on\_empty\_table + +Safely add an index on a table with zero rows. This will raise an error if the table contains data. + +```ruby +safe_add_index_on_empty_table :table, :column +``` + #### safe\_add\_concurrent\_index Add an index concurrently. diff --git a/lib/pg_ha_migrations/safe_statements.rb b/lib/pg_ha_migrations/safe_statements.rb index 422bb90..24de20f 100644 --- a/lib/pg_ha_migrations/safe_statements.rb +++ b/lib/pg_ha_migrations/safe_statements.rb @@ -154,6 +154,22 @@ def unsafe_make_column_not_nullable(table, column, options={}) # options arg is end end + def safe_add_index_on_empty_table(table, columns, options={}) + if options[:algorithm] == :concurrently + raise ArgumentError, "Cannot call safe_add_index_on_empty_table with :algorithm => :concurrently" + end + + # Avoids taking out an unnecessary SHARE lock if the table does have data + _ensure_empty_table!(table) + + safely_acquire_lock_for_table(table, mode: :share) do + # Ensure data wasn't written in the split second after the first check + _ensure_empty_table!(table) + + unsafe_add_index(table, columns, **options) + end + end + def safe_add_concurrent_index(table, columns, options={}) unsafe_add_index(table, columns, **options.merge(:algorithm => :concurrently)) end @@ -509,6 +525,14 @@ def _type_is_enum(type) ActiveRecord::Base.connection.select_values("SELECT typname FROM pg_type JOIN pg_enum ON pg_type.oid = pg_enum.enumtypid").include?(type.to_s) end + def _ensure_empty_table!(table) + table = PgHaMigrations::Table.from_table_name(table) + + if connection.select_value("SELECT EXISTS (SELECT 1 FROM #{table.fully_qualified_name} LIMIT 1)") + raise PgHaMigrations::InvalidMigrationError, "Table #{table.inspect} has rows. Please use safe_add_concurrent_index instead." + end + end + def migrate(direction) if respond_to?(:change) raise PgHaMigrations::UnsupportedMigrationError, "Tracking changes for automated rollback is not supported; use explicit #up instead." diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index 3b99938..415cfb5 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -1256,6 +1256,110 @@ def up end end + describe "safe_add_index_on_empty_table" do + it "creates index when table is empty" do + setup_migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_add_column :foos, :bar, :text + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + safe_add_index_on_empty_table :foos, [:bar] + end + end + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to make_database_queries(matching: /LOCK "public"\."foos" IN SHARE MODE/, count: 1) + .and(make_database_queries(matching: /CREATE INDEX "index_foos_on_bar"/, count: 1)) + + indexes = ActiveRecord::Base.connection.indexes("foos") + expect(indexes.size).to eq(1) + expect(indexes.first).to have_attributes( + table: "foos", + name: "index_foos_on_bar", + columns: ["bar"], + ) + end + + it "raises error when :algorithm => :concurrently provided" do + test_migration = Class.new(migration_klass) do + def up + safe_add_index_on_empty_table :foos, [:bar], algorithm: :concurrently + end + end + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(ArgumentError, "Cannot call safe_add_index_on_empty_table with :algorithm => :concurrently") + end + + it "raises error when table contains rows" do + setup_migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_add_column :foos, :bar, :text + + unsafe_execute("INSERT INTO foos DEFAULT VALUES") + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + safe_add_index_on_empty_table :foos, [:bar] + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows. Please use safe_add_concurrent_index instead.") + + indexes = ActiveRecord::Base.connection.indexes("foos") + expect(indexes).to be_empty + end + + it "raises error when table receives writes immediately after the first check" do + setup_migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_add_column :foos, :bar, :text + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + safe_add_index_on_empty_table :foos, [:bar] + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).twice.and_wrap_original do |m, *args| + m.call(*args).tap do + ActiveRecord::Base.connection.execute("INSERT INTO foos DEFAULT VALUES") + end + end + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows. Please use safe_add_concurrent_index instead.") + + indexes = ActiveRecord::Base.connection.indexes("foos") + expect(indexes).to be_empty + end + end + describe "safe_add_concurrent_index" do it "creates an index using the concurrent algorithm" do setup_migration = Class.new(migration_klass) do From 058c4d94367fe3679deff0fc8ceacaaa0f79fccd Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Thu, 11 Jan 2024 20:49:08 +0000 Subject: [PATCH 2/4] add check for table size on disk --- README.md | 16 +++ lib/pg_ha_migrations.rb | 11 ++ lib/pg_ha_migrations/relation.rb | 13 ++ lib/pg_ha_migrations/safe_statements.rb | 40 +++--- spec/safe_statements_spec.rb | 163 +++++++++++++++++++++++- 5 files changed, 218 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index c8ecf3c..7d24c71 100644 --- a/README.md +++ b/README.md @@ -484,6 +484,22 @@ Set maintenance work mem. safe_set_maintenance_work_mem_gb 1 ``` +#### ensure\_small\_table! + +Ensure a table on disk is below the default threshold (10 megabytes). +This will raise an error if the table is too large. + +```ruby +ensure_small_table! :table +``` + +Ensure a table on disk is below a custom threshold and is empty. +This will raise an error if the table is too large and/or contains data. + +```ruby +ensure_small_table! :table, empty: true, threshold: 100.megabytes +``` + ### Configuration The gem can be configured in an initializer. diff --git a/lib/pg_ha_migrations.rb b/lib/pg_ha_migrations.rb index 2a2debc..3a9b989 100644 --- a/lib/pg_ha_migrations.rb +++ b/lib/pg_ha_migrations.rb @@ -31,6 +31,17 @@ def self.configure LOCK_TIMEOUT_SECONDS = 5 LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER = 5 + SMALL_TABLE_THRESHOLD_BYTES = 10.megabytes + + PARTITION_TYPES = %i[range list hash] + + PARTMAN_UPDATE_CONFIG_OPTIONS = %i[ + infinite_time_partitions + inherit_privileges + premake + retention + retention_keep_table + ] # Safe versus unsafe in this context specifically means the following: # - Safe operations will not block for long periods of time. diff --git a/lib/pg_ha_migrations/relation.rb b/lib/pg_ha_migrations/relation.rb index c190046..4f68772 100644 --- a/lib/pg_ha_migrations/relation.rb +++ b/lib/pg_ha_migrations/relation.rb @@ -96,6 +96,19 @@ def partitions(include_sub_partitions: false, include_self: false) tables end + + def has_rows? + connection.select_value("SELECT EXISTS (SELECT 1 FROM #{fully_qualified_name} LIMIT 1)") + end + + def total_bytes + connection.select_value(<<~SQL) + SELECT pg_total_relation_size(pg_class.oid) + FROM pg_class, pg_namespace + WHERE pg_class.relname = #{connection.quote(name)} + AND pg_namespace.nspname = #{connection.quote(schema)} + SQL + end end class Index < Relation diff --git a/lib/pg_ha_migrations/safe_statements.rb b/lib/pg_ha_migrations/safe_statements.rb index 24de20f..0a6ef04 100644 --- a/lib/pg_ha_migrations/safe_statements.rb +++ b/lib/pg_ha_migrations/safe_statements.rb @@ -1,14 +1,4 @@ module PgHaMigrations::SafeStatements - PARTITION_TYPES = %i[range list hash] - - PARTMAN_UPDATE_CONFIG_OPTIONS = %i[ - infinite_time_partitions - inherit_privileges - premake - retention - retention_keep_table - ] - def safe_added_columns_without_default_value @safe_added_columns_without_default_value ||= [] end @@ -160,11 +150,11 @@ def safe_add_index_on_empty_table(table, columns, options={}) end # Avoids taking out an unnecessary SHARE lock if the table does have data - _ensure_empty_table!(table) + ensure_small_table!(table, empty: true) safely_acquire_lock_for_table(table, mode: :share) do # Ensure data wasn't written in the split second after the first check - _ensure_empty_table!(table) + ensure_small_table!(table, empty: true) unsafe_add_index(table, columns, **options) end @@ -323,8 +313,8 @@ def unsafe_remove_constraint(table, name:) def safe_create_partitioned_table(table, partition_key:, type:, infer_primary_key: nil, **options, &block) raise ArgumentError, "Expected to be present" unless partition_key.present? - unless PARTITION_TYPES.include?(type) - raise ArgumentError, "Expected to be symbol in #{PARTITION_TYPES} but received #{type.inspect}" + unless PgHaMigrations::PARTITION_TYPES.include?(type) + raise ArgumentError, "Expected to be symbol in #{PgHaMigrations::PARTITION_TYPES} but received #{type.inspect}" end if ActiveRecord::Base.connection.postgresql_version < 10_00_00 @@ -463,7 +453,7 @@ def safe_partman_update_config(table, **options) end def unsafe_partman_update_config(table, **options) - invalid_options = options.keys - PARTMAN_UPDATE_CONFIG_OPTIONS + invalid_options = options.keys - PgHaMigrations::PARTMAN_UPDATE_CONFIG_OPTIONS raise ArgumentError, "Unrecognized argument(s): #{invalid_options}" unless invalid_options.empty? @@ -525,14 +515,6 @@ def _type_is_enum(type) ActiveRecord::Base.connection.select_values("SELECT typname FROM pg_type JOIN pg_enum ON pg_type.oid = pg_enum.enumtypid").include?(type.to_s) end - def _ensure_empty_table!(table) - table = PgHaMigrations::Table.from_table_name(table) - - if connection.select_value("SELECT EXISTS (SELECT 1 FROM #{table.fully_qualified_name} LIMIT 1)") - raise PgHaMigrations::InvalidMigrationError, "Table #{table.inspect} has rows. Please use safe_add_concurrent_index instead." - end - end - def migrate(direction) if respond_to?(:change) raise PgHaMigrations::UnsupportedMigrationError, "Tracking changes for automated rollback is not supported; use explicit #up instead." @@ -655,4 +637,16 @@ def adjust_statement_timeout(timeout_seconds, &block) end end end + + def ensure_small_table!(table, empty: false, threshold: PgHaMigrations::SMALL_TABLE_THRESHOLD_BYTES) + table = PgHaMigrations::Table.from_table_name(table) + + if empty && table.has_rows? + raise PgHaMigrations::InvalidMigrationError, "Table #{table.inspect} has rows" + end + + if table.total_bytes > threshold + raise PgHaMigrations::InvalidMigrationError, "Table #{table.inspect} is larger than #{threshold} bytes" + end + end end diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index 415cfb5..1b0b6fc 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -1319,10 +1319,40 @@ def up allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/pg_total_relation_size/) expect do test_migration.suppress_messages { test_migration.migrate(:up) } - end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows. Please use safe_add_concurrent_index instead.") + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows") + + indexes = ActiveRecord::Base.connection.indexes("foos") + expect(indexes).to be_empty + end + + it "raises error when table is larger than small table threshold" do + setup_migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + safe_add_index_on_empty_table :foos, [:bar] + end + end + + stub_const("PgHaMigrations::SMALL_TABLE_THRESHOLD_BYTES", 1.kilobyte) + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") indexes = ActiveRecord::Base.connection.indexes("foos") expect(indexes).to be_empty @@ -1345,6 +1375,7 @@ def up end allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).twice.and_wrap_original do |m, *args| m.call(*args).tap do ActiveRecord::Base.connection.execute("INSERT INTO foos DEFAULT VALUES") @@ -1353,7 +1384,7 @@ def up expect do test_migration.suppress_messages { test_migration.migrate(:up) } - end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows. Please use safe_add_concurrent_index instead.") + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows") indexes = ActiveRecord::Base.connection.indexes("foos") expect(indexes).to be_empty @@ -4415,6 +4446,134 @@ def up end end + describe "ensure_small_table!" do + it "does not raise error when empty: false and table is below threshold and has rows" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + + unsafe_execute "INSERT INTO foos DEFAULT VALUES" + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/SELECT EXISTS/) + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to_not raise_error + end + + it "does not raise error when empty: true and table is below threshold and is empty" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, empty: true + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to_not raise_error + end + + it "raises error when empty: true and table has rows" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + + unsafe_execute "INSERT INTO foos DEFAULT VALUES" + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, empty: true + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/pg_total_relation_size/) + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows") + end + + it "raises error when empty: true and table is above threshold and is empty" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, empty: true, threshold: 1.kilobyte + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") + end + + it "raises error when empty: false and table is above threshold and has rows" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + + unsafe_execute "INSERT INTO foos DEFAULT VALUES" + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, threshold: 1.kilobyte + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/SELECT EXISTS/) + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") + end + end + describe "unsafe transformations" do it "renames create_table to unsafe_create_table" do migration = Class.new(migration_klass) do From 88fbe0e5d25fa994108402f72bb6e8442e64f39b Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Thu, 11 Jan 2024 20:54:09 +0000 Subject: [PATCH 3/4] use actual integer --- lib/pg_ha_migrations.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pg_ha_migrations.rb b/lib/pg_ha_migrations.rb index 3a9b989..b0fdb7f 100644 --- a/lib/pg_ha_migrations.rb +++ b/lib/pg_ha_migrations.rb @@ -31,7 +31,7 @@ def self.configure LOCK_TIMEOUT_SECONDS = 5 LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER = 5 - SMALL_TABLE_THRESHOLD_BYTES = 10.megabytes + SMALL_TABLE_THRESHOLD_BYTES = 1_048_576 # 10 megabytes PARTITION_TYPES = %i[range list hash] From 34016a5903029533c059012176b11feee5a2eacf Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Thu, 11 Jan 2024 21:20:04 +0000 Subject: [PATCH 4/4] explicitly require activesupport extension --- lib/pg_ha_migrations.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/pg_ha_migrations.rb b/lib/pg_ha_migrations.rb index b0fdb7f..3e5b48c 100644 --- a/lib/pg_ha_migrations.rb +++ b/lib/pg_ha_migrations.rb @@ -3,6 +3,7 @@ require "active_record" require "active_record/migration" require "active_record/connection_adapters/postgresql/utils" +require "active_support/core_ext/numeric/bytes" require "relation_to_struct" require "ruby2_keywords" @@ -31,7 +32,7 @@ def self.configure LOCK_TIMEOUT_SECONDS = 5 LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER = 5 - SMALL_TABLE_THRESHOLD_BYTES = 1_048_576 # 10 megabytes + SMALL_TABLE_THRESHOLD_BYTES = 10.megabytes PARTITION_TYPES = %i[range list hash]