This repository contains the dependencies and code necessary to run
Spark extract jobs targeting Cook
County's iasWorld property system-of-record. It is a replacement for
service-sqoop-iasworld
,
which is now deprecated.
Each Spark job pulls an iasWorld table (or part of a table) via JDBC and writes it as Hive-partitioned Parquet files to AWS S3. The Data Department then queries the Parquet files using AWS Athena, providing a 1-1 mirror of the system-of-record for analytical queries.
Jobs are submitted in "batches" (called applications by Spark). Each batch may contain multiple extract jobs. Once all jobs for a batch are complete, we also (optionally) trigger four additional processes. In order:
- Upload the extracted Parquet files to AWS S3. Uploads to the bucket and
prefix specified in the
.env
file. - Run an AWS Glue crawler to update table data types and/or partitions in the Glue data catalog (which powers Athena). This process only occurs if new files are uploaded i.e. those not previously seen on S3.
- Run a dbt testing workflow on GitHub Actions. This automatically tests the iasWorld data for issues and outputs the results to various tables and reports.
- Upload the final logs to AWS CloudWatch.
Note
Before attempting to submit batches to the cluster, first make sure the Spark
Docker Compose stack is active by running docker compose up -d
in the
repository. Also, make sure all secret and .env
files are populated, see
Files not included for more information.
service-spark-iasworld
job batches are submitted via JSON, either as a string
or as a file. All batches should have the format below. Note that the name
of the job itself (e.g. job2
below) is arbitrary.
{
"addn": {
"table_name": "iasworld.addn",
"min_year": 2020,
"max_year": 2024,
"cur": ["Y", "D"],
"predicates_path": "default_predicates.sql"
},
"job2": {
"table_name": "iasworld.asmt_all",
"min_year": 2021,
"max_year": 2021,
"cur": ["Y"],
"predicates_path": "default_predicates.sql"
}
}
table_name (required)
- Name of the iasWorld table to extract, must be prefixed withiasworld.
(orias.
for test environment).min_year (optional)
- Minimum tax year (inclusive) to extract from the table. Set tonull
in a job definition to ignore this column when filtering and partitioning. Defaults to1999
.max_year (optional)
- Maximum tax year (inclusive) to extract from the table. To extract a single year, setmin_year
andmax_year
to the same value. Set tonull
in a job definition to ignore this column when filtering and partitioning. Defaults to the current year.cur (optional)
- Values of thecur
column to extract from the table. Must be an array. Set tonull
in a job definition to ignore this column when filtering and partitioning. Defaults to["Y", "N", "D"]
.predicates_path (optional)
- String path to a SQL file within theconfig/
directory. The SQL file should define SQL BETWEEN expressions, where each expression is one chunk that will be extracted by Spark during JDBC reads. Expressions should not be overlapping. Set tonull
in a job definition to disable using predicates completely. Defaults todefault_predicates.sql
.
The example batch above contains two separate jobs, one per table. If you want to add additional tables/jobs to the batch, you can manually add the corresponding table objects and modify the fields as listed above.
In practice, modifying JSON is a bit of a pain, so we store long-lived
batch and job definitions in YAML, then convert them to JSON using yq
.
The file config/default_jobs.yaml
contains definitions for three common job
batches:
- A daily batch that pulls the most recent 2 years of each critical table.
- A weekend batch that pulls all tables and years.
- A test batch that pulls a subset of tables with representative situations.
Batches are submitted to the Spark Docker cluster via the command line. The
main job submission argument is either --json-string
or --json-file
.
For example, to submit the test jobs in config/default_jobs.yaml
via
--json-string
, run the following command:
docker exec spark-node-master ./submit.sh \
--json-string "$(yq -o=json .test_jobs ./config/default_jobs.yaml)"
Or from a file:
yq -o=json .test_jobs ./config/default_jobs.yaml > /tmp/jobs.json
docker exec spark-node-master ./submit.sh --json-file /tmp/jobs.json
The command line interface also has multiple optional flags:
--extract-target
- iasWorld target environment to extract data from. Must be one ofprod
ortest
. Defaults toprod
.--run-github-workflow/--no-run-github-workflow
- Run thetest_dbt_models
workflow on batch completion?--run-glue-crawler/--no-run-glue-crawler
- Run the iasWorld Glue crawler on batch completion?--upload-data/--no-upload-data
- Upload extracted data to the iasWorld S3 bucket?--upload-logs/--no-upload-logs
- Upload batch logs to AWS CloudWatch?
The default values for these flags are set in the config/default_settings.yaml
file. The boolean flags are all True
by default.
Spark automatically attempts to mirror the data types within iasWorld using its own equivalent types. However, on occasion, it may use an incorrect or undesirable type. In such cases, this repository provides a hierarchical system of column-level schema/type overrides, with each type overriding the previous one:
- By default, all
NUMBER
Oracle types are converted toDECIMAL(10,0)
andTIMESTAMP
Oracle types are converted toSTRING
. This behavior is ignored for columns with an override specified via the options below. - Global schema overrides apply to all columns of a given name across all
tables. They can be specified even for columns that do not exist in every
table. They are defined in
config/default_settings.yaml
. - Table schema overrides apply only to the columns of a single table. They
take precedence over all other overrides. They are defined in
config/table_definitions.yaml
.
Warning
NUMERIC
types are implicitly converted to DECIMAL(10,0)
because as of
2024, all NUMERIC
columns without a specified precision and scale are
actually just integers. If this changes in the future, it's possible that
we could begin to silently truncate numbers via this implicit type
conversion. As such, stay on top of schema updates from the iasWorld team.
Predicates, filters, and partitions are Spark concepts used to construct individual jobs in a batch. They are mostly handled automatically, but you may need to change them in rare cases. The list below outlines the role of each concept and how to change them if needed:
- Predicates are SQL statements used to chunk a table during reads
against the iasWorld database. The statements define mutually exclusive
queries that run in parallel (in order to speed up query execution).
Predicates are defined via a file of SQL statements in the
config/
directory, then passed to each table job via a file path. - Filters are logic conditions included in queries to the database. Spark
uses predicate pushdown
to compose the predicates and filter for each query into a single SQL
statement. Think of filters as a SQL WHERE clause applied across all the
predicate chunks specified above.
Filters are constructed automatically from any
min_year
,max_year
, and/orcur
values passed as part of a job definition. If these values are all null, then the entire table is returned. - Partitions define how the output Parquet files returned from each should
be broken up. We use Hive partitioning by default, which yields partitions
with the structure
$TABLE/taxyr=$YEAR/cur=$CUR_VALUE/part-0.parquet
. Like filters, partitions are determined automatically via anymin_year
,max_year
, and/orcur
values that are set. If these values are all null, then the table is returned as a single file e.g.$TABLE/part-0.parquet
.
Some necessary setup and credential files are not included in this repository for security or licensing reasons. Templated versions are included for instructional purposes. If you want to use this repository, you will need to populate the following:
drivers/ojdbc8.jar
- This is the JDBC driver for our Oracle backend and can be found for free on Oracle's site.secrets/
- These are credential files needed to connect to other systems..env
- This file sets a few non-critical but still private options.
Batches are currently scheduled via
cron
. To edit the
schedule file, use crontab -e
as the main server user. The example crontab
file below schedules daily jobs for frequently updated tables and weekly ones
for rarely-updated tables. Note that the jobs currently must be run as
user 1003.
# Extract recent years from frequently used tables on weekdays at 1 AM CST
0 6 * * 1,2,3,4,5 docker exec spark-node-master ./submit.sh --json-string "$(yq -o=json .default_jobs /full/path/to/default_jobs.yaml)"
# Extract all tables on Saturday at 1 AM CST
0 6 * * 6 docker exec spark-node-master ./submit.sh --json-string "$(yq -o=json .weekend_jobs /full/path/to/default_jobs.yaml)"
# Extract all test environment tables on Sunday at 1 AM CST
0 6 * * 7 docker exec spark-node-master ./submit.sh --json-string "$(yq -o=json .weekend_jobs_test /full/path/to/default_jobs.yaml)" --no-run-github-workflow --extract-target test
Here's a breakdown of important files and the purpose of each one:
.
├── docker-compose.yaml - Defines the Spark nodes, environment, and networking
├── Dockerfile - Defines dependencies bundled in each Spark node
├── .env - Runtime configuration variables passed to containers
├── pyproject.toml - Project metadata and tool settings
├── README.md - This file!
├── run.sh - Entrypoint shell script to create Spark jobs
├── .github/ - GitHub Actions workflows for linting, builds, etc.
├── config/
│ ├── default_jobs.yaml - Define batches of Spark jobs (one per table)
│ ├── default_predicates.sql - List of mutually exclusive SQL BETWEEN expressions
│ ├── default_settings.yaml - Runtime defaults and schema overrides
│ ├── spark-defaults.conf - Spark memory and driver settings
│ └── table_definitions.yaml - Possible job values per table and schema overrides
├── drivers/
│ └── ojdbc8.jar - Not included, but necessary to connect to iasWorld
├── secrets/
│ ├── AWS_CREDENTIALS_FILE - AWS credentials config file specific to this job
│ ├── GH_PEM - GitHub PEM file used to authorize workflow dispatch
│ └── IPTS_PASSWORD - Password file loaded at runtime into containers
├── src/
│ ├── submit_jobs.py - Job submission entrypoint. Takes JSON as input
│ ├── submit.sh - Helper to launch jobs using spark-submit
│ └── utils/
│ ├── aws.py - AWS client class for triggering Glue crawlers
│ ├── github.py - GitHub client class for running Actions workflows
│ ├── helpers.py - Miscellaneous helper functions
│ └── spark.py - Spark job and session classes
└── target/
├── final/ - Landing directory after Parquet repartitioning
└── initial/ - Landing directory for initial JDBC read output