From b9f01e75f0c655a279fb3e13c215b2fa3dd329b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Mon, 13 Jan 2025 11:21:27 -0300 Subject: [PATCH] Fix scope rollup with where clause --- .../continuous_aggregates_helper.rb | 13 ++++++++- spec/spec_helper.rb | 5 ++-- spec/support/active_record/models.rb | 16 +++++------ .../continuous_aggregates_helper_spec.rb | 28 +++++++++++++++++-- 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/lib/timescaledb/continuous_aggregates_helper.rb b/lib/timescaledb/continuous_aggregates_helper.rb index 33bda19..f25631b 100644 --- a/lib/timescaledb/continuous_aggregates_helper.rb +++ b/lib/timescaledb/continuous_aggregates_helper.rb @@ -18,6 +18,8 @@ module ContinuousAggregatesHelper /state_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2', /percentile_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3', /heartbeat_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2', + /stats_agg\(([^)]+)\)\s+(as\s+(\w+))/ => 'rollup(\3) \2', + /stats_agg\((.*)\)\s+(as\s+(\w+))/ => 'rollup(\3) \2' } scope :rollup, ->(interval) do @@ -147,12 +149,21 @@ class << self if previous_timeframe prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify) select_clause = base_model.apply_rollup_rules("#{config[:select]}") + # Note there's no where clause here, because we're using the previous timeframe's data self.base_query = "SELECT #{tb} as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}" else scope = base_model.public_send(config[:scope_name]) config[:select] = scope.select_values.select{|e|!e.downcase.start_with?("time_bucket")}.join(', ') config[:group_by] = scope.group_values - self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]} FROM \"#{scope.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}" + config[:where] = if scope.where_values_hash.present? + scope.where_values_hash.to_sql + elsif scope.where_clause.ast.present? && scope.where_clause.ast.to_sql.present? + scope.where_clause.ast.to_sql + end + self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]}" + self.base_query += " FROM \"#{base_model.table_name}\"" + self.base_query += " WHERE #{config[:where]}" if config[:where] + self.base_query += " GROUP BY #{[tb, *config[:group_by]].join(', ')}" end def self.refresh!(start_time = nil, end_time = nil) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 92f2193..994e6a8 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,8 +6,6 @@ require "dotenv" require "database_cleaner/active_record" require "active_support/testing/time_helpers" -require_relative "support/active_record/models" -require_relative "support/active_record/schema" Dotenv.load! if File.exist?(".env") @@ -18,6 +16,9 @@ ActiveRecord::Base.establish_connection(ENV['PG_URI_TEST']) Timescaledb.establish_connection(ENV['PG_URI_TEST']) +require_relative "support/active_record/models" +require_relative "support/active_record/schema" + def destroy_all_chunks_for!(klass) sql = <<-SQL SELECT drop_chunks('#{klass.table_name}', '#{1.week.from_now}'::date) diff --git a/spec/support/active_record/models.rb b/spec/support/active_record/models.rb index 2f04be2..0ceec23 100644 --- a/spec/support/active_record/models.rb +++ b/spec/support/active_record/models.rb @@ -2,26 +2,19 @@ class Event < ActiveRecord::Base - self.primary_key = "identifier" - acts_as_hypertable end class HypertableWithNoOptions < ActiveRecord::Base - self.primary_key = "identifier" - acts_as_hypertable end class HypertableWithOptions < ActiveRecord::Base - self.primary_key = "identifier" - acts_as_hypertable time_column: :timestamp end class HypertableWithCustomTimeColumn < ActiveRecord::Base self.table_name = "hypertable_with_custom_time_column" - self.primary_key = "identifier" acts_as_hypertable time_column: :timestamp end @@ -35,16 +28,20 @@ class HypertableWithContinuousAggregates < ActiveRecord::Base extend Timescaledb::ActsAsHypertable include Timescaledb::ContinuousAggregatesHelper - acts_as_hypertable time_column: 'ts', segment_by: :identifier + acts_as_hypertable time_column: 'ts', + segment_by: :identifier, + value_column: "cast(payload->>'price' as float)" scope :total, -> { select("count(*) as total") } scope :by_identifier, -> { select("identifier, count(*) as total").group(:identifier) } scope :by_version, -> { select("identifier, version, count(*) as total").group(:identifier, :version) } + scope :purchase, -> { where("identifier = 'purchase'") } + scope :purchase_stats, -> { select("stats_agg(#{value_column}) as stats_agg").purchase } continuous_aggregates( time_column: 'ts', timeframes: [:minute, :hour, :day, :month], - scopes: [:total, :by_identifier, :by_version], + scopes: [:total, :by_identifier, :by_version, :purchase_stats], refresh_policy: { minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" }, hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" }, @@ -54,6 +51,7 @@ class HypertableWithContinuousAggregates < ActiveRecord::Base ) descendants.each do |cagg| cagg.hypertable_options = hypertable_options.merge(value_column: :total) + cagg.scope :stats, -> { select("average(stats_agg), stddev(stats_agg)") } end end diff --git a/spec/timescaledb/continuous_aggregates_helper_spec.rb b/spec/timescaledb/continuous_aggregates_helper_spec.rb index c82b310..304cf77 100644 --- a/spec/timescaledb/continuous_aggregates_helper_spec.rb +++ b/spec/timescaledb/continuous_aggregates_helper_spec.rb @@ -66,6 +66,7 @@ scope_name: :total, select: "count(*) as total", group_by: [], + where: nil, refresh_policy: { minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" }, hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" }, @@ -76,6 +77,11 @@ expect(test_class::TotalPerMinute.config).to eq(expected_config) end + it "sets the where clause for each aggregate" do + expect(test_class::PurchaseStatsPerMinute.config[:where]).to eq("(identifier = 'purchase')") + end + + it 'defines rollup scope for aggregates' do test_class.create_continuous_aggregates aggregate_classes = [test_class::TotalPerMinute, test_class::TotalPerHour, test_class::TotalPerDay, test_class::TotalPerMonth] @@ -94,6 +100,11 @@ expect(test_class::ByIdentifierPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_day\" GROUP BY time_bucket('1 month', ts), identifier") expect(test_class::ByIdentifierPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_hour\" GROUP BY time_bucket('1 day', ts), identifier") expect(test_class::ByIdentifierPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_minute\" GROUP BY time_bucket('1 hour', ts), identifier") + + expect(test_class::PurchaseStatsPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, stats_agg(cast(payload->>'price' as float)) as stats_agg FROM \"hypertable_with_continuous_aggregates\" WHERE (identifier = 'purchase') GROUP BY time_bucket('1 minute', ts)") + expect(test_class::PurchaseStatsPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, rollup(stats_agg) as stats_agg FROM \"purchase_stats_per_minute\" GROUP BY time_bucket('1 hour', ts)") + expect(test_class::PurchaseStatsPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, rollup(stats_agg) as stats_agg FROM \"purchase_stats_per_hour\" GROUP BY time_bucket('1 day', ts)") + expect(test_class::PurchaseStatsPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, rollup(stats_agg) as stats_agg FROM \"purchase_stats_per_day\" GROUP BY time_bucket('1 month', ts)") end end @@ -113,8 +124,10 @@ expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_version_per_hour/i) expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_version_per_minute/i) expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_identifier_per_month/i) - - expect(test_class::TotalPerMinute.select("sum(total) as total").rollup("'1 hour'").to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"total_per_minute\" GROUP BY time_bucket('1 hour', ts), \"identifier\"") + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_month/i) end it 'sets up refresh policies for each aggregate' do @@ -128,6 +141,10 @@ expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*by_identifier_per_month/i) expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*by_version_per_day/i) expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*by_version_per_month/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_month/i) end end @@ -141,7 +158,7 @@ } policies.each do |timeframe, expected_policy| - %w[Total ByVersion ByIdentifier].each do |klass| + %w[Total ByVersion ByIdentifier PurchaseStats].each do |klass| actual_policy = test_class.const_get("#{klass}Per#{timeframe.to_s.capitalize}").refresh_policy expect(actual_policy).to eq(expected_policy) end @@ -169,6 +186,11 @@ expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS by_identifier_per_day CASCADE/i) expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS by_identifier_per_hour CASCADE/i) expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS by_identifier_per_minute CASCADE/i) + + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_month CASCADE/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_day CASCADE/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_hour CASCADE/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_minute CASCADE/i) end end end \ No newline at end of file