Skip to content

Commit

Permalink
Continuous Aggregates DSL (#73)
Browse files Browse the repository at this point in the history
* Add aggregations with macro

* Remove experimental references

* Remove download references from specs
  • Loading branch information
jonatas authored Sep 18, 2024
1 parent fd83fd6 commit 2faa168
Show file tree
Hide file tree
Showing 13 changed files with 810 additions and 199 deletions.
190 changes: 188 additions & 2 deletions docs/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ hypertable_options = {
}

create_table(:events, id: false, hypertable: hypertable_options) do |t|
t.datetime :created_at, null: false
t.string :identifier, null: false
t.jsonb :payload
t.timestamps
end
```

## The `create_continuous_aggregate` helper

This example shows a ticks table grouping ticks as OHLCV histograms for every
This goes in the model file. This example shows a ticks table grouping ticks as OHLCV histograms for every
minute.

First make sure you have the model with the `acts_as_hypertable` method to be
Expand Down Expand Up @@ -96,5 +96,191 @@ function that can be reusing candlesticks from smaller timeframes.
end
```

# Create a continuous aggregate using the macro

To setup complex hierarchies of continuous aggregates, you can use the `continuous_aggregates` macro.

This setup allows for creating multiple continuous aggregates with customizable refresh policies, making it ideal for complex aggregation and retention policies.

```ruby
class Download < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper

acts_as_hypertable time_column: 'ts'

scope :total_downloads, -> { select("count(*) as total") }
scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) }
scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) }

continuous_aggregates(
timeframes: [:minute, :hour, :day, :month],
scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})
end
```

Then edit the migration file to add the continuous aggregates:

```ruby
class CreateCaggs < ActiveRecord::Migration[7.0]
def up
Download.create_continuous_aggregates
end

def down
Download.drop_continuous_aggregates
end
end
```

Here is the output of the migration:

```sql
CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_minute
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM "downloads" GROUP BY 1
WITH NO DATA;

SELECT add_continuous_aggregate_policy('total_downloads_per_minute',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM "total_downloads_per_minute" GROUP BY 1
WITH NO DATA;

SELECT add_continuous_aggregate_policy('total_downloads_per_hour',
start_offset => INTERVAL '4 hour',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_day
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM "total_downloads_per_hour" GROUP BY 1
WITH NO DATA;

SELECT add_continuous_aggregate_policy('total_downloads_per_day',
start_offset => INTERVAL '3 day',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_month
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM "total_downloads_per_day" GROUP BY 1
WITH NO DATA;

SELECT add_continuous_aggregate_policy('total_downloads_per_month',
start_offset => INTERVAL '3 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_minute
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', ts) as ts, gem_name, count(*) as total FROM "downloads" GROUP BY 1, "downloads"."gem_name"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_minute',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', ts) as ts, gem_name, sum(total) as total FROM "downloads_by_gem_per_minute" GROUP BY 1, "downloads_by_gem_per_minute"."gem_name"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_hour',
start_offset => INTERVAL '4 hour',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_day
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', ts) as ts, gem_name, sum(total) as total FROM "downloads_by_gem_per_hour" GROUP BY 1, "downloads_by_gem_per_hour"."gem_name"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_day',
start_offset => INTERVAL '3 day',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_month
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 month', ts) as ts, gem_name, sum(total) as total FROM "downloads_by_gem_per_day" GROUP BY 1, "downloads_by_gem_per_day"."gem_name"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_month',
start_offset => INTERVAL '3 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_minute
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', ts) as ts, gem_name, gem_version, count(*) as total FROM "downloads" GROUP BY 1, "downloads"."gem_name", "downloads"."gem_version"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_minute',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', ts) as ts, gem_name, gem_version, sum(total) as total FROM "downloads_by_version_per_minute" GROUP BY 1, "downloads_by_version_per_minute"."gem_name", "downloads_by_version_per_minute"."gem_version"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_hour',
start_offset => INTERVAL '4 hour',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_day
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', ts) as ts, gem_name, gem_version, sum(total) as total FROM "downloads_by_version_per_hour" GROUP BY 1, "downloads_by_version_per_hour"."gem_name", "downloads_by_version_per_hour"."gem_version"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_day',
start_offset => INTERVAL '3 day',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_month
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 month', ts) as ts, gem_name, gem_version, sum(total) as total FROM "downloads_by_version_per_day" GROUP BY 1, "downloads_by_version_per_day"."gem_name", "downloads_by_version_per_day"."gem_version"
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_month',
start_offset => INTERVAL '3 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');
```

When `drop_continuous_aggregates` is called, it considers the reverse order of creation.

```sql
DROP MATERIALIZED VIEW IF EXISTS total_downloads_per_month CASCADE
DROP MATERIALIZED VIEW IF EXISTS total_downloads_per_day CASCADE
DROP MATERIALIZED VIEW IF EXISTS total_downloads_per_hour CASCADE
DROP MATERIALIZED VIEW IF EXISTS total_downloads_per_minute CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_gem_per_month CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_gem_per_day CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_gem_per_hour CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_gem_per_minute CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_month CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_day CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_hour CASCADE
DROP MATERIALIZED VIEW IF EXISTS downloads_by_version_per_minute CASCADE


The convention of naming the scopes is important as they mix with the name of the continuous aggregate.


[1]: https://ideia.me/timescale-continuous-aggregates-with-ruby
164 changes: 155 additions & 9 deletions docs/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,171 @@

The ActiveRecord is the default ORM in the Ruby community. We have introduced a macro that helps you to inject the behavior as other libraries do in the Rails ecosystem.

You need to extend the Timescaledb::ActsAsHypertable module in your model. Ideally, you should include the Timescaledb::ContinuousAggregates module and also separate the definition of the continuous aggregate from the model.

```ruby
class Hypertable < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregates

def abstract_class
true
end
end
```

## The `acts_as_hypertable` macro

You can declare a Rails model as a Hypertable by invoking the `acts_as_hypertable` macro. This macro extends your existing model with timescaledb-related functionality.
model:
You can declare a Rails model as a Hypertable by invoking the `acts_as_hypertable` macro. This macro extends your existing model with timescaledb-related functionality. Here's the macro using the default options, you can pass the following options:

- `time_column`: The name of the column that will be used as the time column.
- `chunk_time_interval`: The interval at which chunks will be created.

```ruby
class Event < ActiveRecord::Base
acts_as_hypertable
class Event < Hypertable
acts_as_hypertable time_column: :created_at, chunk_time_interval: '1 day'
end
```

By default, ActsAsHypertable assumes a record's _time_column_ is called `created_at`.
## The `continuous_aggregates` macro

You can declare continuous aggregates for a Rails model by invoking the `continuous_aggregates` macro. This macro extends your existing model with TimescaleDB-related functionality for efficient data aggregation and querying.

```ruby
# Example from RubyGems server
class Download < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper

acts_as_hypertable time_column: 'ts'

scope :total_downloads, -> { select("count(*) as total") }
scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) }
scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) }

continuous_aggregates(
timeframes: [:minute, :hour, :day, :month],
scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})
end
```

### Options
#### The `create_continuous_aggregates` method and `drop_continuous_aggregates` methods for migrations

If you are using a different time_column name, you can specify it as follows when invoking the `acts_as_hypertable` macro:
The macro will create a continuous aggregate for each timeframe and scope you specify.
After defining the continuous aggregate, you can use the `create_continuous_aggregate` method to create the continuous aggregate in the database.

```ruby
class Event < ActiveRecord::Base
acts_as_hypertable time_column :timestamp
class SetupMyAmazingCaggsMigration < ActiveRecord::Migration[7.0]
def up
Download.create_continuous_aggregates
end

def down
Download.drop_continuous_aggregates
end
end
```

It will automatically rollup all materialized views for all timeframes and scopes.


## How rollup works

The most important part of using multiple timeframes and scopes is to understand how the rollup works.

The rollup is a process that will create a new row for each timeframe and scope.

For example, if you have a scope called `total_downloads` and a timeframe of `day`, the rollup will rewrite the query to group by the day.

```sql
# Original query
SELECT count(*) FROM downloads;

# Rolled up query
SELECT time_bucket('1 day', created_at) AS day, count(*) FROM downloads GROUP BY day;
```

The rollup method will help to rollup such queries in a more efficient way.

```ruby
Download.total_downloads.map(&:attributes) # => [{"total"=>6175}
# SELECT count(*) as total FROM "downloads"
```

Rollup to 1 minute:

```ruby
Download.total_downloads.rollup("'1 min'").map(&:attributes)
# SELECT time_bucket('1 min', ts) as ts, count(*) as total FROM "downloads" GROUP BY 1
=> [{"ts"=>2024-04-26 00:10:00 UTC, "total"=>110},
{"ts"=>2024-04-26 00:11:00 UTC, "total"=>1322},
{"ts"=>2024-04-26 00:12:00 UTC, "total"=>1461},
{"ts"=>2024-04-26 00:13:00 UTC, "total"=>1150},
{"ts"=>2024-04-26 00:14:00 UTC, "total"=>1127},
{"ts"=>2024-04-26 00:15:00 UTC, "total"=>1005}]
```

### Aggregates classes

The `continuous_aggregates` macro will also create a class for each aggregate.

```ruby
Download::TotalDownloadsPerMinute.all.map(&:attributes)
# SELECT "total_downloads_per_minute".* FROM "total_downloads_per_minute"
=> [{"ts"=>2024-04-26 00:10:00 UTC, "total"=>110},
{"ts"=>2024-04-26 00:11:00 UTC, "total"=>1322},
{"ts"=>2024-04-26 00:12:00 UTC, "total"=>1461},
{"ts"=>2024-04-26 00:13:00 UTC, "total"=>1150},
{"ts"=>2024-04-26 00:14:00 UTC, "total"=>1127},
{"ts"=>2024-04-26 00:15:00 UTC, "total"=>1005}]
```

The class also can rollup to other timeframes:

```ruby
Download::TotalDownloadsPerMinute.select("sum(total) as total").rollup("'2 min'").map(&:attributes)
# SELECT time_bucket('2 min', ts) as ts, sum(total) as total FROM "total_downloads_per_minute" GROUP BY 1
=> [{"ts"=>2024-04-26 00:12:00 UTC, "total"=>2611}, {"ts"=>2024-04-26 00:14:00 UTC, "total"=>2132}, {"ts"=>2024-04-26 00:10:00 UTC, "total"=>1432}]
```

You can also get the base query where continuous aggregate is created from:

```ruby
Download::TotalDownloadsPerMinute.base_query.to_sql
=> "SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"downloads\" GROUP BY 1"
```

In case of hierarchy of continuous aggregates, you can get the parent query:

```ruby
Download::TotalDownloadsPerMonth.parent_query.to_sql
=> "SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_downloads_per_day\" GROUP BY 1"
```

The config is the same as the one you pass to the `continuous_aggregates` macro. But it will be nested with the scope name.

```ruby
Download::DownloadsByGemPerMonth.config
=> {:scope_name=>:downloads_by_gem,
:select=>"gem_name, count(*) as total",
:group_by=>[:gem_name],
:refresh_policy=>
{:minute=>{:start_offset=>"10 minutes", :end_offset=>"1 minute", :schedule_interval=>"1 minute"},
:hour=>{:start_offset=>"4 hour", :end_offset=>"1 hour", :schedule_interval=>"1 hour"},
:day=>{:start_offset=>"3 day", :end_offset=>"1 day", :schedule_interval=>"1 day"},
:month=>{:start_offset=>"3 month", :end_offset=>"1 day", :schedule_interval=>"1 day"}}}
```

## Metadata from the hypertable

When you use the `acts_as_hypertable` macro, it will define several methods to help you to inspect timescaledb metadata like chunks and hypertable metadata.

### Chunks

To get all the chunks from a model's hypertable, you can use `.chunks`.
Expand Down Expand Up @@ -95,3 +237,7 @@ The [Scenic](https://github.com/scenic-views/scenic) gem is easy to
manage database view definitions for a Rails application. Unfortunately, TimescaleDB's continuous aggregates are more complex than regular PostgreSQL views, and the schema dumper included with Scenic can't dump a complete definition.

This gem automatically configures Scenic to use a `Timescaledb::Scenic::Adapter.` which will correctly handle schema dumping.

## Managing Continuous Aggregates

You can manage your continuous aggregates with these methods:
Loading

0 comments on commit 2faa168

Please sign in to comment.