A CKAN extension for integrating the AirFlow-based AirCan Data Factory into CKAN. Specifically, this extension provides:
- New APIs in CKAN for triggering and monitoring workflows (DAGs) in AirFlow
- Hooking key events in CKAN into the AirFlow instance. Specifically, resource creation and update trigger DAGs in AirFlow to load resource data into the DataStore. See https://tech.datopian.com/load/#ckan-v3
There are two potential cases:
- Docker for Deployment: Install this extension in the usual way, see https://tech.datopian.com/ckan/install-extension.html
- Local Development Install manually via cloning the desired commit into the docker-ckan/src directory:
[email protected]:datopian/ckanext-aircan.git
- Enable the extension in CKAN by adding the plugin
aircan_connector
toCKAN__PLUGINS
list in your.env
. Make sure to disabledatapusher
andxloader
if you have them there as AirCan replaces them. - Add details of the AirFlow instance -- details below for Local case and Cloud case
- In your
.env
file addCKAN__AIRFLOW__URL
according to Apache AirFlow REST API Reference. If you are running CKAN in a Docker container, make sure to specify the Airflow URL withhost.docker.internal
instead oflocalhost
:CKAN__AIRFLOW__URL=http://host.docker.internal:8080/api/experimental/dags/ckan_api_load_multiple_steps/dag_runs
. Also notice Airflow requires, by default, the endpointapi/experimental/dags/DAG_ID
for API access. - Add Airflow admin username and password for authorization:
CKAN__AIRFLOW__USERNAME=airflow_amin_username
andCKAN__AIRFLOW__PASSWORD=airflow_admin_password
- Also in your
.env
file, specify a temporary directory for files:CKAN__AIRFLOW__STORAGE_PATH=/tmp/
andCKAN__AIRFLOW__CLOUD=local
.
Assuming you already have a Google Cloud Composer properly set up, it is possible to trigger a DAG on GoogleCloud Platform following these steps:
-
Download your credentials file (a JSON file) from Google Cloud Platform. Convert it to a single-line JSON.
-
Set up the following environment variables on your
.env
file:CKAN__AIRFLOW__CLOUD=GCP # this line activates the integration with GCP CKAN__AIRFLOW__CLOUD__PROJECT_ID=YOUR_PROJECT_ID_ON_COMPOSER CKAN__AIRFLOW__CLOUD__LOCATION=us-east1_OR_OTHER CKAN__AIRFLOW__CLOUD__COMPOSER_ENVIRONMENT=NAME_OF_COMPOSER_ENVIRONMENT CKAN__AIRFLOW__CLOUD__WEB_UI_ID=ID_FROM_AIRFLOW_UI_ON_COMPOSER CKAN__AIRFLOW__CLOUD__GOOGLE_APPLICATION_CREDENTIALS={ YOUR SINGLE LINE CREDENTIALS JSON FILE }
Make a request to http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=DAG_NAME
, specifying your CKAN_API_KEY
on the header and send the following information on the body of the request, replacing the values accordingly:
{
"package_id": "YOUR_PACKAGE_ID",
"url": "http://url.for.your.resource.com",
"description": "This is the best resource ever!" ,
"schema": {
"fields": [
{
"name": "FID",
"type": "int",
"format": "default"
},
{
"name": "another-field",
"type": "float",
"format": "default"
}
]
}
}
Replace dag_name
with the DAG you want to invoke, for example, http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=ckan_api_load_gcp
. This will trigger the DAG ckan_api_load_gcp
.
NB: the DAG ckan_api_load_gcp
is designed for Google Cloud Composer AirFlow instance and will load a resource into the DataStore.
The endpoint http://YOUR-CKAN:5000/api/3/action/resource_create
produces the same effect of http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=DAG_NAME
. Make sure you set up an extra variable on your .env
file specifying the DAG you want to trigger:
# .env
# all other variables
CKAN__AIRFLOW__CLOUD__DAG_NAME=DAG_YOU_WANT_TO_TRIGGER
- Add
ckanext.aircan.load_with_postgres_copy=True
env to load with postgres copy loader. By default it loads with datastore API. - Add
ckanext.aircan.datastore_chunk_insert_rows_size=300
env variable to configure number of records to send a request to datastore. Default 250 rows. - addd
append_or_update_datastore = true
if new data schema matches with old schema append or update data, otherwise create new table - add
ckanext.aircan.enable_datastore_upload_configuration=true
to enable the upload configuration UI option. - add
ckanext.aircan.notification_to = author, maintainer, editor, [email protected]
failure email notification sent to. - add
ckanext.aircan.notification_from = [email protected]
failure notification from email. - add
ckanext.aircan.notification_subject
configure notification subject.
The aircan_status_update
API can be use to store or update the run status for given resource. It accepts the POST request with authorized user.
{
"resource_id": "a4a520aa-c790-4b53-93aa-de61e1a2813c",
"state": "progress",
"message":"Pusing dataset records.",
"dag_run_id":"394a1f0f-d8b3-47f2-9a51-08732349b785",
"error": {
"message" : "Failed to push data records."
}
}
Use aircan_status
API to get aircan run status for given resource providing resource id.
eg.
http://YOUR-CKAN:5000/api/3/action/aircan_status
{
"resource_id": "a4a520aa-c790-4b53-93aa-de61e1a2813c"
}
Test the aircan-connector with cypress.
npm install
Opens up the cypress app and you can choose the specs to run.
npm test