Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BQ Integration for historic model for AMB #21

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,9 @@ ENV/
# mypy
.mypy_cache/
node_modules

# GCloud Credentials
gcloud-credentials.json

# IDEs
.vscode
60 changes: 52 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,24 @@ Execute the function in Carto:

```sql
SELECT traffico_create_tables(
'mycity', clean_tables:=FALSE
);
'mycity',
clean_tables:=FALSE,
create_mviews:=TRUE,
create_historic_agg_tables:=FALSE
);
```

In some cases you will need to specify the CARTO username, if the query above
fails then execute this:

```sql
SELECT traffico_create_tables(
'mycity',
'mycartousername',
clean_tables:=FALSE,
create_mviews:=TRUE,
create_historic_agg_tables:=FALSE
);
```

This function creates 3 tables and 3 materialized views:
Expand All @@ -32,6 +48,29 @@ This function creates 3 tables and 3 materialized views:
- {mycity}_waze_data_jams_mv
- {mycity}_waze_data_irrgs_mv

If you wish to create historic tables, set `create_historic_agg_tables` to `TRUE`.
The historic tables are:

- `{mycity}_waze_data_jams_agg_hour`: contains aggregated information of jams and irregularities
grouped by road segment.
- `{mycity}_waze_data_jams_agg_times`: contains information about start and end times of jams
and irregularities.

**Note: The tables that compose the historic model are adapted to the use cases of the AMB
project. For other cities this model will require additional changes.**

### 2.b Google Big Query model (for historic purposes)

Data processing of historic models is done with Big Query. The `db/big_query` directory
contains the necessary DDL scripts for replicating the Postgres model in Big Query and
store historic data.

The `db/big_query/jobs` directory contains the necessary scheduled queries/jobs
used to populate the aggregated historic data.

**Note: tables related to historic data are adapted to the uses cases of the AMB project.
For other cities aggregated models and jobs related to historic data will require additional changes.**

### 3. AWS Lambda deploy

Create serverless YAML config file:
Expand All @@ -46,28 +85,33 @@ Change service name with your city prefix in new YAML file:
service: carto-waze-lambda-mycity
```

AWS Lambda deploy function:
In order to enable the storage of historic data in Big Query you will need to set
the appropiate environment variables in the `serverless.yml` file and provide a
Google Cloud Application Credentials (`gcloud-credentials.json`) in the root directory.
During deployment, the credentials file will be included in the lambda package with the code.

#### AWS Lambda deploy function:

```
$ serverless deploy -v --stage prod
```

AWS Lambda invoke function:
#### AWS Lambda invoke function:

```
$ serverless invoke -f georss -l --stage prod
$ serverless invoke -f {georss, daily-aggs} -l --stage prod
```

AWS Lambda update function (without AWS CloudFormation because is slow):
#### AWS Lambda update function (without AWS CloudFormation because is slow):

```
$ serverless deploy function -f georss --stage prod -v
$ serverless deploy function -f {georss, daily-aggs} --stage prod -v
```

## Development

Run function without AWS Lambda (only for development purpose):

```
$ python3 run_handler_dev.py
$ python3 run_handler_dev.py {georss, daily-aggs}
```
9 changes: 9 additions & 0 deletions config.example.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ set -o allexport
# Carto
CARTO_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
CARTO_USER=xxxxxxxxxxxxx
## Set the maximum time of life, in hours, of historic data in CARTO.
## Set to 0 if you do not wish to remove historic data.
CARTO_MAX_HOURS_DATA_RETENTION=0

# Big Query
## Set this to "TRUE" if you wish to store data in Google Big Query
BIG_QUERY_ENABLE_HISTORIC=FALSE
BIG_QUERY_HISTORIC_PROJECT=xxxxx
BIG_QUERY_HISTORIC_DATASET=xxxxx

# Waze
WAZE_API_URL=xxxxxxxxxxxxxx
Expand Down
25 changes: 25 additions & 0 deletions db/big_query/alerts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_alerts
(
the_geom GEOGRAPHY,
country STRING,
city STRING,
reportdescription STRING,
confidence INT64,
reportrating INT64,
reliability INT64,
date TIMESTAMP,
street STRING,
roadtype INT64,
magvar INT64,
nthumbsup INT64,
type STRING,
subtype STRING,
uuid STRING,
jam_uuid STRING,
georss_date TIMESTAMP
)
PARTITION BY TIMESTAMP_TRUNC(georss_date, DAY)
OPTIONS(require_partition_filter=true)
15 changes: 15 additions & 0 deletions db/big_query/alerts_agg_hour.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_alerts_agg_hour
(
uuid STRING,
georss_date TIMESTAMP,
start_date TIMESTAMP,
the_geom GEOGRAPHY,
street STRING,
type STRING,
subtype STRING
)
PARTITION BY TIMESTAMP_TRUNC(georss_date, DAY)
OPTIONS(require_partition_filter=true)
11 changes: 11 additions & 0 deletions db/big_query/durations_agg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_durations_agg
(
uuid STRING,
georss_date TIMESTAMP,
duration_seconds INT64
)
PARTITION BY TIMESTAMP_TRUNC(georss_date, DAY)
OPTIONS(require_partition_filter=true)
33 changes: 33 additions & 0 deletions db/big_query/irregularities.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_irrgs
(
the_geom GEOGRAPHY,
country STRING,
city STRING,
speed FLOAT64,
regularspeed FLOAT64,
length FLOAT64,
jamlevel INT64,
severity INT64,
highway BOOL,
trend INT64,
seconds INT64,
delayseconds INT64,
detectiondate TIMESTAMP,
updatedate TIMESTAMP,
startnode STRING,
endnode STRING,
street STRING,
ncomments INT64,
nimages INT64,
nthumbsup INT64,
id INT64,
type STRING,
alertscount INT64,
alerts_uuid ARRAY<STRING>,
georss_date TIMESTAMP
)
PARTITION BY TIMESTAMP_TRUNC(georss_date, DAY)
OPTIONS(require_partition_filter=true)
25 changes: 25 additions & 0 deletions db/big_query/jams.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_jams
(
the_geom GEOGRAPHY,
country STRING,
city STRING,
speed FLOAT64,
length FLOAT64,
level INT64,
delay INT64,
date TIMESTAMP,
startnode STRING,
endnode STRING,
street STRING,
roadtype INT64,
type STRING,
turntype STRING,
uuid STRING,
blockingalert_uuid STRING,
georss_date TIMESTAMP
)
PARTITION BY TIMESTAMP_TRUNC(georss_date, DAY)
OPTIONS(require_partition_filter=true)
17 changes: 17 additions & 0 deletions db/big_query/jams_agg_hour.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_jams_agg_hour
(
georss_date TIMESTAMP,
ntram INT64,
avg_level INT64,
avg_speed FLOAT64,
avg_length FLOAT64,
duration_seconds INT64,
alert_types ARRAY<STRING>,
alert_subtypes ARRAY<STRING>,
road_type INT64
)
PARTITION BY TIMESTAMP_TRUNC(georss_date, DAY)
OPTIONS(require_partition_filter=true)
14 changes: 14 additions & 0 deletions db/big_query/jams_agg_levels_times.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_jams_agg_levels_times
(
id STRING,
ntram INT64,
level INT64,
start_ts TIMESTAMP,
end_ts TIMESTAMP,
avg_speed FLOAT64
)
PARTITION BY TIMESTAMP_TRUNC(start_ts, DAY)
OPTIONS(require_partition_filter=true)
11 changes: 11 additions & 0 deletions db/big_query/jams_agg_times.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Substitute DATASET_NAME & PREFIX with the appropiate values */

CREATE TABLE IF NOT EXISTS
DATASET_NAME.PREFIX_waze_data_jams_agg_times
(
ntram INT64,
start_ts TIMESTAMP,
end_ts TIMESTAMP
)
PARTITION BY TIMESTAMP_TRUNC(start_ts, DAY)
OPTIONS(require_partition_filter=true)
Loading