diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..dad33a45 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ + +target/ +dbt_modules/ +logs/ diff --git a/README.md b/README.md new file mode 100644 index 00000000..5a50927e --- /dev/null +++ b/README.md @@ -0,0 +1,59 @@ +# Tails.com's dbt Artifacts Package + +This package builds `fct_dbt_model_executions` and `fct_dbt_run_results` tables from dbt artifacts loaded into a table. It is compatible with Snowflake only. The models are based off of the v1 schema introduced in dbt 0.19.0: https://docs.getdbt.com/reference/artifacts/dbt-artifacts/#notes + +## Generating the source table + +This package requires that the source data already exists in a table in Snowflake. How you achieve that will depend on your implementation. + +The author recommends generating the source table using the following query to copy from an external stage (in a snowpipe): + +``` +copy into ${snowflake_table.dbt_artifacts.database}.${snowflake_table.dbt_artifacts.schema}.${snowflake_table.dbt_artifacts.name} +from ( + select + $1 as data, + $1:metadata:generated_at::timestamp_tz as generated_at, + metadata$filename as path, + regexp_substr(metadata$filename, '([a-z_]+.json)$') as artifact_type + from @${snowflake_stage.dbt_artifacts.database}.${snowflake_stage.dbt_artifacts.schema}.${snowflake_stage.dbt_artifacts.name} +) +file_format = (type = 'JSON') +``` + +Where the external stage's prefix is a destination for all dbt artifacts. + +## Usage + +Add the package to your `packages.yml` following the instructions at https://docs.getdbt.com/docs/building-a-dbt-project/package-management/ + +Configure the required variables in your `dbt_project.yml`: + +``` +vars: + dbt_artifacts: + dbt_artifacts_database: your_db + dbt_artifacts_schema: your_schema + dbt_artifacts_table: your_table + +models: + ... + dbt_artifacts: + +schema: your_destination_schema + +materialized: table + staging: + +schema: your_destination_schema + +materialized: view # The staging tables cannot be ephemeral + +``` + +Run `dbt deps` and then run the package specifically to test with `dbt run -m dbt_artifacts`. + +The two fct_ tables are both [incremental](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models/). + +## Resources: +- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) +- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers +- Join the [chat](http://slack.getdbt.com/) on Slack for live discussions and support +- Find [dbt events](https://events.getdbt.com) near you +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices diff --git a/dbt_project.yml b/dbt_project.yml new file mode 100644 index 00000000..aba166f8 --- /dev/null +++ b/dbt_project.yml @@ -0,0 +1,3 @@ +name: 'dbt_artifacts' +version: '0.1.0' +config-version: 2 diff --git a/models/fct_dbt__model_executions.sql b/models/fct_dbt__model_executions.sql new file mode 100644 index 00000000..e7a86cd3 --- /dev/null +++ b/models/fct_dbt__model_executions.sql @@ -0,0 +1,62 @@ +{{ config( materialized='incremental', unique_key='model_execution_id' ) }} + +with models as ( + + select * + from {{ ref('stg_dbt__models') }} + +), + +model_executions as ( + + select * + from {{ ref('stg_dbt__model_executions') }} + +), + +model_executions_incremental as ( + + select * + from model_executions + + {% if is_incremental() %} + -- this filter will only be applied on an incremental run + where artifact_generated_at > (select max(artifact_generated_at) from {{ this }}) + {% endif %} + +), + +model_executions_with_materialization as ( + + select + model_executions_incremental.*, + models.model_materialization, + models.model_schema, + models.name + from model_executions_incremental + left join models on ( + model_executions_incremental.command_invocation_id = models.command_invocation_id + and model_executions_incremental.node_id = models.node_id + ) + +), + +fields as ( + + select + model_execution_id, + command_invocation_id, + artifact_generated_at, + was_full_refresh, + node_id, + status, + execution_time, + rows_affected, + model_materialization, + model_schema, + name + from model_executions_with_materialization + +) + +select * from fields \ No newline at end of file diff --git a/models/fct_dbt__run_results.sql b/models/fct_dbt__run_results.sql new file mode 100644 index 00000000..19847155 --- /dev/null +++ b/models/fct_dbt__run_results.sql @@ -0,0 +1,43 @@ +{{ config( materialized='incremental', unique_key='command_invocation_id' ) }} + +{% set env_keys = dbt_utils.get_column_values(table=ref('stg_dbt__run_results_env_keys'), column='key') %} + +with run_results as ( + + select * + from {{ ref('stg_dbt__run_results') }} + +), + +incremental_run_results as ( + + select * + from run_results + + {% if is_incremental() %} + -- this filter will only be applied on an incremental run + where artifact_generated_at > (select max(artifact_generated_at) from {{ this }}) + {% endif %} + +), + +fields as ( + + select + artifact_generated_at, + command_invocation_id, + dbt_version, + elapsed_time, + execution_command, + selected_models, + target, + was_full_refresh + + {% for key in env_keys %} + ,env:{{ key }} as env_{{ key }} + {% endfor %} + from incremental_run_results + +) + +select * from fields \ No newline at end of file diff --git a/models/schemas.yml b/models/schemas.yml new file mode 100644 index 00000000..3f39dc08 --- /dev/null +++ b/models/schemas.yml @@ -0,0 +1,57 @@ +version: 2 + +models: + + - name: fct_dbt__model_executions + description: All historic dbt model executions. + columns: + - name: model_execution_id + description: Primary key. + tests: + - unique + - not_null + - name: name + description: The name of the model. + - name: model_schema + description: The schema containing the model. + - name: was_full_refresh + description: Was this model executed with a --full-refresh flag? + - name: model_materialization + description: The configured materialization of the model. + - name: execution_time + description: How long did the model take to run? + - name: status + description: success/fail status of the model's execution. + - name: command_invocation_id + description: Foreign key to fct_dbt_run_results. The id of the command which resulted in the source artifact's generation. + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: node_id + description: Unique id for the node, in the form of model.[package_name].[model_name] + - name: rows_affected + description: The number of rows affected by the model's execution. Always 1 for non-incremental executions. + + - name: fct_dbt__run_results + description: Metadata for dbt run commands. + columns: + - name: command_invocation_id + description: The id of the command which resulted in the source artifact's generation. + tests: + - unique + - not_null + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: dbt_version + description: The version of dbt used to generate the source artifact. + - name: elapsed_time + description: The total run time of the command. + - name: execution_command + description: The actual command used. + - name: selected_models + description: A list of model selectors used in the command. + - name: target + description: The configured target for the command. + - name: was_full_refresh + description: Was the run executed with a --full-refresh flag? + - name: env_* + description: Columns for the environment variables set when the command was executed. diff --git a/models/staging/sources.yml b/models/staging/sources.yml new file mode 100644 index 00000000..5be2b41b --- /dev/null +++ b/models/staging/sources.yml @@ -0,0 +1,20 @@ +version: 2 + +sources: + - name: dbt_artifacts + database: "{{ var('dbt_artifacts_database') }}" + schema: "{{ var('dbt_artifacts_schema') }}" + tables: + - name: artifacts + identifier: "{{ var('dbt_artifacts_table') }}" + description: | + The source table containing loaded dbt artifacts. All of the artifacts must be loaded into this table. See the README for more info. + columns: + - name: data + description: A variant type object containing all the artifact's data. + - name: generated_at + description: Timestamp for when the artifact was generated. + - name: path + description: The path of the artifact in the external stage. + - name: artifact_type + description: The type of the artifact, e.g. manifest.json \ No newline at end of file diff --git a/models/staging/stg_dbt__artifacts.sql b/models/staging/stg_dbt__artifacts.sql new file mode 100644 index 00000000..29251e6a --- /dev/null +++ b/models/staging/stg_dbt__artifacts.sql @@ -0,0 +1,19 @@ +with base as ( + + select * + from {{ source('dbt_artifacts', 'artifacts') }} + +), + +fields as ( + + select + data, + generated_at, + path, + artifact_type + from base + +) + +select * from fields \ No newline at end of file diff --git a/models/staging/stg_dbt__model_executions.sql b/models/staging/stg_dbt__model_executions.sql new file mode 100644 index 00000000..5f7e444e --- /dev/null +++ b/models/staging/stg_dbt__model_executions.sql @@ -0,0 +1,54 @@ +with base as ( + + select * + from {{ ref('stg_dbt__artifacts') }} + +), + +run_results as ( + + select * + from base + where artifact_type = 'run_results.json' + +), + +dbt_run as ( + + select * + from run_results + where data:args:which = 'run' + +), + +fields as ( + + select + data:metadata:invocation_id::string as command_invocation_id, + generated_at as artifact_generated_at, + coalesce(data:args:full_refresh, 'false')::boolean as was_full_refresh, + result.value:unique_id::string as node_id, + result.value:status::string as status, + result.value:execution_time::float as execution_time, + result.value:adapter_response:rows_affected::int as rows_affected + from dbt_run, + lateral flatten(input => data:results) as result + +), + +surrogate_key as ( + + select + {{ dbt_utils.surrogate_key(['command_invocation_id', 'node_id']) }} as model_execution_id, + command_invocation_id, + artifact_generated_at, + was_full_refresh, + node_id, + status, + execution_time, + rows_affected + from fields + +) + +select * from surrogate_key \ No newline at end of file diff --git a/models/staging/stg_dbt__models.sql b/models/staging/stg_dbt__models.sql new file mode 100644 index 00000000..e9f06be5 --- /dev/null +++ b/models/staging/stg_dbt__models.sql @@ -0,0 +1,51 @@ +with base as ( + + select * + from {{ ref('stg_dbt__artifacts') }} + +), + +manifests as ( + + select * + from base + where artifact_type = 'manifest.json' + +), + +flatten as ( + + select + data:metadata:invocation_id::string as command_invocation_id, + generated_at as artifact_generated_at, + node.key as node_id, + node.value:name::string as name, + node.value:schema::string as model_schema, + node.value:package_name::string as package_name, + node.value:path::string as model_path, + node.value:checksum.checksum::string as checksum, + node.value:config.materialized::string as model_materialization + from manifests, + lateral flatten(input => data:nodes) as node + where node.value:resource_type = 'model' + +), + +surrogate_key as ( + + select + {{ dbt_utils.surrogate_key(['command_invocation_id', 'checksum']) }} as manifest_model_id, + command_invocation_id, + artifact_generated_at, + node_id, + name, + model_schema, + package_name, + model_path, + checksum, + model_materialization + from flatten + +) + +select * from surrogate_key diff --git a/models/staging/stg_dbt__run_results.sql b/models/staging/stg_dbt__run_results.sql new file mode 100644 index 00000000..7ee52f11 --- /dev/null +++ b/models/staging/stg_dbt__run_results.sql @@ -0,0 +1,40 @@ +with base as ( + + select * + from {{ ref('stg_dbt__artifacts') }} + +), + +run_results as ( + + select * + from base + where artifact_type = 'run_results.json' + +), + +dbt_run as ( + + select * + from run_results + where data:args:which = 'run' + +), + +fields as ( + + select + generated_at as artifact_generated_at, + data:metadata:invocation_id::string as command_invocation_id, + data:metadata:dbt_version::string as dbt_version, + data:metadata:env as env, + data:elapsed_time::float as elapsed_time, + data:args:which::string as execution_command, + coalesce(data:args:full_refresh, 'false')::boolean as was_full_refresh, + data:args:models as selected_models, + data:args:target::string as target + from dbt_run + +) + +select * from fields \ No newline at end of file diff --git a/models/staging/stg_dbt__run_results_env_keys.sql b/models/staging/stg_dbt__run_results_env_keys.sql new file mode 100644 index 00000000..cd1a6f7b --- /dev/null +++ b/models/staging/stg_dbt__run_results_env_keys.sql @@ -0,0 +1,33 @@ +with base as ( + + select * + from {{ ref('stg_dbt__artifacts') }} + +), + +run_results as ( + + select * + from base + where artifact_type = 'run_results.json' + +), + +dbt_run as ( + + select * + from run_results + where data:args:which = 'run' + +), + +env_keys as ( + + select + distinct(env.key) + from dbt_run, + lateral flatten(input => data:metadata:env) as env + +) + +select * from env_keys \ No newline at end of file diff --git a/packages.yml b/packages.yml new file mode 100644 index 00000000..908557a0 --- /dev/null +++ b/packages.yml @@ -0,0 +1,3 @@ +packages: + - package: fishtown-analytics/dbt_utils + version: [">=0.6.0", "<0.7.0"] \ No newline at end of file