Skip to content

Commit

Permalink
Fix scope rollup with where clause
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatas committed Jan 15, 2025
1 parent 84a7d5c commit b9f01e7
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
13 changes: 12 additions & 1 deletion lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ module ContinuousAggregatesHelper
/state_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/percentile_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3',
/heartbeat_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/stats_agg\(([^)]+)\)\s+(as\s+(\w+))/ => 'rollup(\3) \2',
/stats_agg\((.*)\)\s+(as\s+(\w+))/ => 'rollup(\3) \2'
}

scope :rollup, ->(interval) do
Expand Down Expand Up @@ -147,12 +149,21 @@ class << self
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
# Note there's no where clause here, because we're using the previous timeframe's data
self.base_query = "SELECT #{tb} as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.select{|e|!e.downcase.start_with?("time_bucket")}.join(', ')
config[:group_by] = scope.group_values
self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]} FROM \"#{scope.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
config[:where] = if scope.where_values_hash.present?
scope.where_values_hash.to_sql
elsif scope.where_clause.ast.present? && scope.where_clause.ast.to_sql.present?
scope.where_clause.ast.to_sql
end
self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]}"
self.base_query += " FROM \"#{base_model.table_name}\""
self.base_query += " WHERE #{config[:where]}" if config[:where]
self.base_query += " GROUP BY #{[tb, *config[:group_by]].join(', ')}"
end

def self.refresh!(start_time = nil, end_time = nil)
Expand Down
5 changes: 3 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
require "dotenv"
require "database_cleaner/active_record"
require "active_support/testing/time_helpers"
require_relative "support/active_record/models"
require_relative "support/active_record/schema"

Dotenv.load! if File.exist?(".env")

Expand All @@ -18,6 +16,9 @@
ActiveRecord::Base.establish_connection(ENV['PG_URI_TEST'])
Timescaledb.establish_connection(ENV['PG_URI_TEST'])

require_relative "support/active_record/models"
require_relative "support/active_record/schema"

def destroy_all_chunks_for!(klass)
sql = <<-SQL
SELECT drop_chunks('#{klass.table_name}', '#{1.week.from_now}'::date)
Expand Down
16 changes: 7 additions & 9 deletions spec/support/active_record/models.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@


class Event < ActiveRecord::Base
self.primary_key = "identifier"

acts_as_hypertable
end

class HypertableWithNoOptions < ActiveRecord::Base
self.primary_key = "identifier"

acts_as_hypertable
end

class HypertableWithOptions < ActiveRecord::Base
self.primary_key = "identifier"

acts_as_hypertable time_column: :timestamp
end

class HypertableWithCustomTimeColumn < ActiveRecord::Base
self.table_name = "hypertable_with_custom_time_column"
self.primary_key = "identifier"

acts_as_hypertable time_column: :timestamp
end
Expand All @@ -35,16 +28,20 @@ class HypertableWithContinuousAggregates < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper

acts_as_hypertable time_column: 'ts', segment_by: :identifier
acts_as_hypertable time_column: 'ts',
segment_by: :identifier,
value_column: "cast(payload->>'price' as float)"

scope :total, -> { select("count(*) as total") }
scope :by_identifier, -> { select("identifier, count(*) as total").group(:identifier) }
scope :by_version, -> { select("identifier, version, count(*) as total").group(:identifier, :version) }
scope :purchase, -> { where("identifier = 'purchase'") }
scope :purchase_stats, -> { select("stats_agg(#{value_column}) as stats_agg").purchase }

continuous_aggregates(
time_column: 'ts',
timeframes: [:minute, :hour, :day, :month],
scopes: [:total, :by_identifier, :by_version],
scopes: [:total, :by_identifier, :by_version, :purchase_stats],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
Expand All @@ -54,6 +51,7 @@ class HypertableWithContinuousAggregates < ActiveRecord::Base
)
descendants.each do |cagg|
cagg.hypertable_options = hypertable_options.merge(value_column: :total)
cagg.scope :stats, -> { select("average(stats_agg), stddev(stats_agg)") }
end
end

Expand Down
28 changes: 25 additions & 3 deletions spec/timescaledb/continuous_aggregates_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
scope_name: :total,
select: "count(*) as total",
group_by: [],
where: nil,
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
Expand All @@ -76,6 +77,11 @@
expect(test_class::TotalPerMinute.config).to eq(expected_config)
end

it "sets the where clause for each aggregate" do
expect(test_class::PurchaseStatsPerMinute.config[:where]).to eq("(identifier = 'purchase')")
end


it 'defines rollup scope for aggregates' do
test_class.create_continuous_aggregates
aggregate_classes = [test_class::TotalPerMinute, test_class::TotalPerHour, test_class::TotalPerDay, test_class::TotalPerMonth]
Expand All @@ -94,6 +100,11 @@
expect(test_class::ByIdentifierPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_day\" GROUP BY time_bucket('1 month', ts), identifier")
expect(test_class::ByIdentifierPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_hour\" GROUP BY time_bucket('1 day', ts), identifier")
expect(test_class::ByIdentifierPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_minute\" GROUP BY time_bucket('1 hour', ts), identifier")

expect(test_class::PurchaseStatsPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, stats_agg(cast(payload->>'price' as float)) as stats_agg FROM \"hypertable_with_continuous_aggregates\" WHERE (identifier = 'purchase') GROUP BY time_bucket('1 minute', ts)")
expect(test_class::PurchaseStatsPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, rollup(stats_agg) as stats_agg FROM \"purchase_stats_per_minute\" GROUP BY time_bucket('1 hour', ts)")
expect(test_class::PurchaseStatsPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, rollup(stats_agg) as stats_agg FROM \"purchase_stats_per_hour\" GROUP BY time_bucket('1 day', ts)")
expect(test_class::PurchaseStatsPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, rollup(stats_agg) as stats_agg FROM \"purchase_stats_per_day\" GROUP BY time_bucket('1 month', ts)")
end
end

Expand All @@ -113,8 +124,10 @@
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_version_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_version_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_identifier_per_month/i)

expect(test_class::TotalPerMinute.select("sum(total) as total").rollup("'1 hour'").to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"total_per_minute\" GROUP BY time_bucket('1 hour', ts), \"identifier\"")
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS purchase_stats_per_month/i)
end

it 'sets up refresh policies for each aggregate' do
Expand All @@ -128,6 +141,10 @@
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*by_identifier_per_month/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*by_version_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*by_version_per_month/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*purchase_stats_per_month/i)
end
end

Expand All @@ -141,7 +158,7 @@
}

policies.each do |timeframe, expected_policy|
%w[Total ByVersion ByIdentifier].each do |klass|
%w[Total ByVersion ByIdentifier PurchaseStats].each do |klass|
actual_policy = test_class.const_get("#{klass}Per#{timeframe.to_s.capitalize}").refresh_policy
expect(actual_policy).to eq(expected_policy)
end
Expand Down Expand Up @@ -169,6 +186,11 @@
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS by_identifier_per_day CASCADE/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS by_identifier_per_hour CASCADE/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS by_identifier_per_minute CASCADE/i)

expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_month CASCADE/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_day CASCADE/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_hour CASCADE/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/DROP MATERIALIZED VIEW IF EXISTS purchase_stats_per_minute CASCADE/i)
end
end
end

0 comments on commit b9f01e7

Please sign in to comment.