Skip to content

Commit

Permalink
Make vector scopes works with continuous_aggregates macro (#74)
Browse files Browse the repository at this point in the history
* Seamless integration between continuous_aggregates and acts_as_time_vector
* Allow lttb to receive an alternative value_exp
* Remove obsolete example
  • Loading branch information
jonatas authored Oct 25, 2024
1 parent 2faa168 commit 4d192c1
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 448 deletions.
4 changes: 3 additions & 1 deletion docs/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ function that can be reusing candlesticks from smaller timeframes.

# Create a continuous aggregate using the macro

To setup complex hierarchies of continuous aggregates, you can use the `continuous_aggregates` macro.
To setup complex [hierarchies][hierarchical] of continuous aggregates, you can use the `continuous_aggregates` macro.

This setup allows for creating multiple continuous aggregates with customizable refresh policies, making it ideal for complex aggregation and retention policies.

Expand Down Expand Up @@ -278,9 +278,11 @@ DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_month CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_day CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_hour CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_minute CASCADE
```


The convention of naming the scopes is important as they mix with the name of the continuous aggregate.


[1]: https://ideia.me/timescale-continuous-aggregates-with-ruby
[hierarchical]: https://docs.timescale.com/use-timescale/latest/continuous-aggregates/hierarchical-continuous-aggregates/
543 changes: 160 additions & 383 deletions docs/toolkit_candlestick.md

Large diffs are not rendered by default.

71 changes: 41 additions & 30 deletions examples/toolkit-demo/candlestick.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,9 @@ class Tick < ActiveRecord::Base
acts_as_hypertable time_column: "time"
acts_as_time_vector segment_by: "symbol", value_column: "price"

scope :ohlcv, -> do
select("symbol,
first(price, time) as open,
max(price) as high,
min(price) as low,
last(price, time) as close,
sum(volume) as volume").group("symbol")
end

scope :plotly_candlestick, -> (from: nil) do
data = all.to_a
data = ohlcv.to_a
{
type: 'candlestick',
xaxis: 'x',
Expand All @@ -50,25 +43,49 @@ class Tick < ActiveRecord::Base
volume: data.map(&:volume)
}
end


continuous_aggregates(
timeframes: [:minute, :hour, :day, :month],
scopes: [:ohlcv],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})

descendants.each{|e|e.time_vector_options = time_vector_options.merge(value_column: :close)}
scopes: [:_candlestick]
)

descendants.each do |cagg|
cagg.class_eval do
self.time_vector_options = time_vector_options.merge(value_column: :close)
[:open, :high, :low, :close].each do |attr|
attribute attr, :decimal, precision: 10, scale: 2
end
[:volume, :vwap].each do |attr|
attribute attr, :integer
end
[:open_time, :high_time, :low_time, :close_time].each do |attr|
attribute attr, :time
end
scope :ohlcv, -> do
unscoped
.from("(#{to_sql}) AS candlestick")
.select(time_column, *segment_by_column,
"open(candlestick),
high(candlestick),
low(candlestick),
close(candlestick),
open_time(candlestick),
high_time(candlestick),
low_time(candlestick),
close_time(candlestick),
volume(candlestick),
vwap(candlestick)")
end
end
end
end


db do
if true
Tick.drop_continuous_aggregates
drop_table :ticks, if_exists: true, force: :cascade
#Tick.drop_continuous_aggregates
#drop_table :ticks, if_exists: true, force: :cascade

hypertable_options = {
time_column: "time",
Expand All @@ -78,7 +95,7 @@ class Tick < ActiveRecord::Base
compression_interval: "1 week"
}
create_table :ticks, id: false, hypertable: hypertable_options, if_not_exists: true do |t|
t.timestamp :time, null: false
t.timestamptz :time, null: false
t.string :symbol, null: false
t.decimal :price
t.integer :volume
Expand Down Expand Up @@ -116,30 +133,24 @@ class App < Sinatra::Base
get '/daily_close_price' do
json({
title: "Daily",
data: Tick::OhlcvPerDay.previous_week.plotly_candlestick
data: Tick::CandlestickPerDay.previous_week.plotly_candlestick
})
end
get '/candlestick_1m' do
json({
title: "Candlestick 1 minute last hour",
data: Tick::OhlcvPerMinute.last_hour.plotly_candlestick
data: Tick::CandlestickPerMinute.last_hour.plotly_candlestick
})
end

get '/candlestick_1h' do
json({
title: "Candlestick yesterday hourly",
data:Tick::OhlcvPerHour.yesterday.plotly_candlestick
data:Tick::CandlestickPerHour.yesterday.plotly_candlestick
})

end

get '/candlestick_1d' do
json({
title: "Candlestick daily this month",
data: Tick::OhlcvPerDay.previous_week.plotly_candlestick
})
end

get '/' do
<<~HTML
Expand Down
7 changes: 5 additions & 2 deletions examples/toolkit-demo/lttb-zoom/lttb_zoomable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ def download_weather_dataset size: :small

def setup size: :small
file = "weather_#{size}.tar.gz"
download_weather_dataset(size: size)# unless File.exists? file
download_weather_dataset(size: size) unless File.exists? file
puts "extracting #{file}"
system "tar -xvzf #{file} "
puts "creating data structures"
system "psql #{PG_URI} < weather.sql"
system %|psql #{PG_URI} -c "\\COPY locations FROM weather_#{size}_locations.csv CSV"|
system %|psql #{PG_URI} -c "\\COPY conditions FROM weather_#{size}_conditions.csv CSV"|
system %|psql #{PG_URI} -c "\\COPY locations FROM weather_#{size}_locations.csv CSV"|
end

ActiveRecord::Base.establish_connection(PG_URI)

class Condition < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
extend Timescaledb::ActsAsTimeVector

acts_as_hypertable time_column: "time"
acts_as_time_vector value_column: "temperature", segment_by: "device_id"
end
Expand Down
1 change: 0 additions & 1 deletion lib/timescaledb/acts_as_time_vector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ def acts_as_time_vector?
end
end
end
ActiveRecord::Base.extend Timescaledb::ActsAsTimeVector
38 changes: 27 additions & 11 deletions lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module ContinuousAggregatesHelper
/high\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'max(\1) as \1',
/low\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'min(\1) as \1',
/last\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'last(\3, \2) as \3',
/candlestick_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/candlestick_agg\((\w+),\s*(\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\4) as \4',
/stats_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3',
/stats_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/state_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
Expand All @@ -21,16 +21,27 @@ module ContinuousAggregatesHelper
}

scope :rollup, ->(interval) do
select_values = self.select_values.join(', ')
if select_values.include?('time_bucket(')
select_values = (self.select_values - ["time"]).select{|e|!e.downcase.start_with?("time_bucket")}
if self.select_values.any?{|e|e.downcase.start_with?('time_bucket(')} || self.select_values.include?('time')
select_values = apply_rollup_rules(select_values)
select_values.gsub!(/time_bucket\((.+), (.+)\)/, "time_bucket(#{interval}, \2)")
select_values.gsub!(/\btime\b/, "time_bucket(#{interval}, time) as time")
end
group_values = self.group_values.dup

if self.segment_by_column
if !group_values.include?(self.segment_by_column)
group_values << self.segment_by_column
end
if !select_values.include?(self.segment_by_column.to_s)
select_values.insert(0, self.segment_by_column.to_s)
end
end
group_values = self.group_values
where_values = self.where_values_hash
self.unscoped.select("time_bucket(#{interval}, #{time_column}) as #{time_column}, #{select_values}")
tb = "time_bucket(#{interval}, #{time_column})"
self.unscoped.select("#{tb} as #{time_column}, #{select_values.join(', ')}")
.where(where_values)
.group(1, *group_values)
.group(tb, *group_values)
end
end

Expand Down Expand Up @@ -94,9 +105,13 @@ def create_continuous_aggregates(with_data: false)
end

def apply_rollup_rules(select_values)
rollup_rules.reduce(select_values) do |result, (pattern, replacement)|
result.gsub(pattern, replacement)
result = select_values.dup
rollup_rules.each do |pattern, replacement|
result.gsub!(pattern, replacement)
end
# Remove any remaining time_bucket
result.gsub!(/time_bucket\(.+?\)( as \w+)?/, '')
result
end

def drop_continuous_aggregates
Expand Down Expand Up @@ -128,15 +143,16 @@ class << self

interval = "'1 #{timeframe.to_s}'"
self.base_model = base_model
tb = "time_bucket(#{interval}, #{time_column})"
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
self.base_query = "SELECT time_bucket(#{interval}, #{time_column}) as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[1, *config[:group_by]].join(', ')}"
self.base_query = "SELECT #{tb} as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.join(', ')
config[:select] = scope.select_values.select{|e|!e.downcase.start_with?("time_bucket")}.join(', ')
config[:group_by] = scope.group_values
self.base_query = "SELECT time_bucket(#{interval}, #{time_column}) as #{time_column}, #{config[:select]} FROM \"#{scope.table_name}\" GROUP BY #{[1, *config[:group_by]].join(', ')}"
self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]} FROM \"#{scope.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
end

def self.refresh!(start_time = nil, end_time = nil)
Expand Down
14 changes: 9 additions & 5 deletions lib/timescaledb/toolkit/time_vector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def segment_by_column
protected

def define_default_scopes
scope :volatility, -> (segment_by: segment_by_column) do
scope :volatility, -> (segment_by: segment_by_column, value: value_column) do
select([*segment_by,
"timevector(#{time_column}, #{value_column}) -> sort() -> delta() -> abs() -> sum() as volatility"
"timevector(#{time_column}, #{value}) -> sort() -> delta() -> abs() -> sum() as volatility"
].join(", ")).group(segment_by)
end

Expand All @@ -36,11 +36,15 @@ def define_default_scopes
.group(segment_by)
end

scope :lttb, -> (threshold:, segment_by: segment_by_column, time: time_column, value: value_column) do
scope :lttb, -> (threshold:, segment_by: segment_by_column, time: time_column, value: value_column, value_exp: value_column) do
if value =~ /(.*)\bas\b(.*)/
value_exp = $1
value = $2
end
lttb_query = <<~SQL
WITH x AS ( #{select(*segment_by, time_column, value_column).to_sql})
WITH x AS ( #{select(*segment_by, time_column, value_exp || value).to_sql})
SELECT #{"x.#{segment_by}," if segment_by}
(lttb( x.#{time_column}, x.#{value_column}, #{threshold}) -> unnest()).*
(lttb( x.#{time_column}, x.#{value}, #{threshold}) -> unnest()).*
FROM x
#{"GROUP BY #{segment_by}" if segment_by}
SQL
Expand Down
5 changes: 5 additions & 0 deletions spec/support/active_record/models.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ class HypertableSkipAllScopes < ActiveRecord::Base
class HypertableWithContinuousAggregates < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper
extend Timescaledb::ActsAsTimeVector

acts_as_hypertable time_column: 'ts'
acts_as_time_vector segment_by: :identifier

scope :total, -> { select("count(*) as total") }
scope :by_identifier, -> { select("identifier, count(*) as total").group(:identifier) }
Expand All @@ -51,6 +53,9 @@ class HypertableWithContinuousAggregates < ActiveRecord::Base
month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" }
}
)
descendants.each do |cagg|
cagg.time_vector_options = time_vector_options.merge(value_column: :total)
end
end

class NonHypertable < ActiveRecord::Base
Expand Down
30 changes: 15 additions & 15 deletions spec/timescaledb/continuous_aggregates_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,20 @@
test_class.create_continuous_aggregates
aggregate_classes = [test_class::TotalPerMinute, test_class::TotalPerHour, test_class::TotalPerDay, test_class::TotalPerMonth]

expect(test_class::TotalPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY 1")
expect(test_class::TotalPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_per_day\" GROUP BY 1")
expect(test_class::TotalPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM \"total_per_hour\" GROUP BY 1")
expect(test_class::TotalPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_per_minute\" GROUP BY 1")

expect(test_class::ByVersionPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, version, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY 1, identifier, version")
expect(test_class::ByVersionPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_day\" GROUP BY 1, identifier, version")
expect(test_class::ByVersionPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_hour\" GROUP BY 1, identifier, version")
expect(test_class::ByVersionPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_minute\" GROUP BY 1, identifier, version")

expect(test_class::ByIdentifierPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY 1, identifier")
expect(test_class::ByIdentifierPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_day\" GROUP BY 1, identifier")
expect(test_class::ByIdentifierPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_hour\" GROUP BY 1, identifier")
expect(test_class::ByIdentifierPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_minute\" GROUP BY 1, identifier")
expect(test_class::TotalPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts)")
expect(test_class::TotalPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_per_day\" GROUP BY time_bucket('1 month', ts)")
expect(test_class::TotalPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM \"total_per_hour\" GROUP BY time_bucket('1 day', ts)")
expect(test_class::TotalPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_per_minute\" GROUP BY time_bucket('1 hour', ts)")

expect(test_class::ByVersionPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, version, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts), identifier, version")
expect(test_class::ByVersionPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_day\" GROUP BY time_bucket('1 month', ts), identifier, version")
expect(test_class::ByVersionPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_hour\" GROUP BY time_bucket('1 day', ts), identifier, version")
expect(test_class::ByVersionPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, version, sum(total) as total FROM \"by_version_per_minute\" GROUP BY time_bucket('1 hour', ts), identifier, version")

expect(test_class::ByIdentifierPerMinute.base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, identifier, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts), identifier")
expect(test_class::ByIdentifierPerMonth.base_query).to eq("SELECT time_bucket('1 month', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_day\" GROUP BY time_bucket('1 month', ts), identifier")
expect(test_class::ByIdentifierPerDay.base_query).to eq("SELECT time_bucket('1 day', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_hour\" GROUP BY time_bucket('1 day', ts), identifier")
expect(test_class::ByIdentifierPerHour.base_query).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"by_identifier_per_minute\" GROUP BY time_bucket('1 hour', ts), identifier")
end
end

Expand All @@ -114,7 +114,7 @@
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_version_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS by_identifier_per_month/i)

expect(test_class::TotalPerMinute.select("sum(total) as total").rollup("'1 hour'").to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_per_minute\" GROUP BY 1")
expect(test_class::TotalPerMinute.select("sum(total) as total").rollup("'1 hour'").to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, identifier, sum(total) as total FROM \"total_per_minute\" GROUP BY time_bucket('1 hour', ts), \"identifier\"")
end

it 'sets up refresh policies for each aggregate' do
Expand Down
6 changes: 6 additions & 0 deletions spec/timescaledb/toolkit_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

let(:model) do
Measurement = Class.new(ActiveRecord::Base) do
extend Timescaledb::ActsAsHypertable
extend Timescaledb::ActsAsTimeVector
self.table_name = 'measurements'
self.primary_key = nil

Expand Down Expand Up @@ -207,6 +209,8 @@

let(:model) do
Measurement = Class.new(ActiveRecord::Base) do
extend Timescaledb::ActsAsHypertable
extend Timescaledb::ActsAsTimeVector
self.table_name = 'measurements'
self.primary_key = nil

Expand Down Expand Up @@ -297,6 +301,8 @@

let(:model) do
Tick = Class.new(ActiveRecord::Base) do
extend Timescaledb::ActsAsTimeVector
extend Timescaledb::ActsAsHypertable
self.table_name = 'ticks'
self.primary_key = nil

Expand Down

0 comments on commit 4d192c1

Please sign in to comment.