Note — Eventarc for Cloud Run for Anthos is currently a feature in private preview. Only whitelisted projects can currently take advantage of it. Please stay tuned for the public preview!
In this sample, we'll build an BigQuery processing pipeline to query some public dataset on a schedule, create charts out of the data and then notify users about the new charts via SendGrid with Eventarc.
- Two
jobs are setup to call theQueryRunner
service once a day for two countries via PubSub Topicqueryscheduled
. QueryRunner
receives the scheduler event for both country, queries Covid-19 cases for the country using BigQuery's public Covid-19 dataset and saves the result in a separate BigQuery table. Once done,QueryRunner
sends a Pub/Sub message toquerycompleted
receives the event fromquerycompleted
topic, creates a chart from BigQuery data usingmathplotlib
and saves it to a Cloud Storage bucket.Notifier
receives the Cloud Storage event from the bucket via anAuditLog
and sends an email notification to users using SendGrid.
Before deploying services and triggers, go through some setup steps.
Make sure that the project id is setup:
gcloud config set project [YOUR-PROJECT-ID]
Enable all necessary services:
gcloud services enable
gcloud services enable
gcloud services enable
gcloud services enable
You will use Audit Logs
trigger for Cloud Storage. Make sure Admin Read
, Data Read
, and Data Write
log types are enabled for Cloud Storage.
Set region, location and platform for Cloud Run and Eventarc:
gcloud config set run/cluster $CLUSTER_NAME
gcloud config set run/cluster_location $CLUSTER_LOCATION
gcloud config set run/platform gke
gcloud config set eventarc/location $CLUSTER_LOCATION
Create a GKE cluster with Cloud Run for Anthos add-on and also Workload Identity (WI). The cluster needs to have WI enabled. WI is the recommended way to access Google Cloud services from applications running within GKE due to its improved security properties and manageability. It is needed to properly set up the Event Forwarder of Eventarc. See Using Workload Identity for more details.
gcloud beta container clusters create $CLUSTER_NAME \
--addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
--machine-type=n1-standard-4 \
--enable-autoscaling --min-nodes=3 --max-nodes=10 \
--no-issue-client-certificate --num-nodes=3 \
--enable-stackdriver-kubernetes \
--scopes=cloud-platform,logging-write,monitoring-write,pubsub \
--zone us-central1 \
--release-channel=rapid \
Make sure apps we'll deploy can authenticate to Google Cloud using Workload Identity. To do this, configure a Kubernetes service account (eg. default Kubernetes service account) to act as a Google service account (eg. default compute service account).
Allow the default Kubernetes service account to impersonate the default Google compute service account by creating an IAM policy binding between the two:
PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
gcloud iam service-accounts add-iam-policy-binding \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:$[default/default]" \
$PROJECT_NUMBER[email protected]
Add the
annotation to the Kubernetes service
account, using the email address of the Google service account:
kubectl annotate serviceaccount \
--namespace default \
default \$PROJECT_NUMBER[email protected]
Enable GKE destinations in Eventarc but creating a service account and binding the required roles with this command:
gcloud eventarc gke-destinations init
Create another service account with roles/pubsub.subscriber
roles. This is the minimum needed for Pub/Sub
triggers. For AuditLog triggers, you also need roles/eventarc.eventReceiver
gcloud iam service-accounts create $TRIGGER_GSA
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member "serviceAccount:$TRIGGER_GSA@$" \
--role "roles/pubsub.subscriber"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member "serviceAccount:$TRIGGER_GSA@$" \
--role "roles/monitoring.metricWriter"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member "serviceAccount:$TRIGGER_GSA@$" \
--role "roles/eventarc.eventReceiver"
You will use this service account in the Pub/Sub and Audit Log triggers later.
Create a unique storage bucket to save the charts and make sure the bucket and the charts in the bucket are all public and in the same region as the Evenarc trigger location:
gsutil mb -l $CLUSTER_LOCATION gs://$BUCKET
gsutil uniformbucketlevelaccess set on gs://$BUCKET
gsutil iam ch allUsers:objectViewer gs://$BUCKET
This service receives the Cloud Storage events and uses SendGrid to send an email to users that a new chart has been created. You need to setup a SendGrid account and create an API key. You can follow this doc for more details on how to setup SendGrid.
The code of the service is in notifier folder.
Inside the notifier/python folder, build and push the container image:
docker build -t$PROJECT_ID/$SERVICE_NAME:v1 .
Deploy the service while passing in TO_EMAILS
to email address where you want
to send the notification and SENDGRID_API_KEY
with your send SendGrid API Key.
[email protected]
gcloud run deploy $SERVICE_NAME \
The trigger of the service filters on Audit Logs for Cloud Storage events with
of storage.objects.create
Create the trigger:
gcloud eventarc triggers create $TRIGGER_NAME \
--destination-gke-cluster=$CLUSTER_NAME \
--destination-gke-location=$CLUSTER_LOCATION \
--destination-gke-namespace=default \
--destination-gke-service=$SERVICE_NAME \
--destination-gke-path=/ \
--event-filters="" \
--event-filters="" \
--event-filters="methodName=storage.objects.create" \
This service receives the custom event from Query Runner, queries the BigQuery
table for the requested country and creates a chart out of the data using
library. Finally, the chart is uploaded to a public bucket in
Cloud Storage.
The code of the service is in chart-creator folder.
Inside the chart-creator/python folder, build and push the container image:
docker build -t$PROJECT_ID/$SERVICE_NAME:v1 .
Deploy the service while passing in BUCKET
with the bucket you created earlier.
gcloud run deploy $SERVICE_NAME \
--update-env-vars BUCKET=$BUCKET
Create a Pub/Sub trigger:
gcloud eventarc triggers create $TRIGGER_NAME \
--destination-gke-cluster=$CLUSTER_NAME \
--destination-gke-location=$CLUSTER_LOCATION \
--destination-gke-namespace=default \
--destination-gke-service=$SERVICE_NAME \
--destination-gke-path=/ \
--event-filters="" \
Set the Pub/Sub topic in an env variable that we'll need later:
TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe $TRIGGER_NAME --format='value(transport.pubsub.topic)'))
This service receives Cloud Scheduler events for each country. It uses BigQuery API to query for the public Covid19 dataset for those countries. Once done, it saves the results to a new BigQuery table and passes a custom event onwards.
The code of the service is in query-runner folder.
Inside the top level processing-pipelines folder, build and push the container image:
docker build -t$PROJECT_ID/$SERVICE_NAME:v1 -f bigquery/$SERVICE_NAME/csharp/Dockerfile .
Deploy the service while passing in PROJECT_ID
with your actual project id.
This is needed for the BigQuery client and TOPIC_ID
gcloud run deploy $SERVICE_NAME \
Create a Pub/Sub trigger:
gcloud eventarc triggers create $TRIGGER_NAME \
--destination-gke-cluster=$CLUSTER_NAME \
--destination-gke-location=$CLUSTER_LOCATION \
--destination-gke-namespace=default \
--destination-gke-service=$SERVICE_NAME \
--destination-gke-path=/ \
--event-filters="" \
Set the Pub/Sub topic in an env variable that we'll need later:
TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe $TRIGGER_NAME --format='value(transport.pubsub.topic)')
The service will be triggered with Cloud Scheduler. More specifically, we will
create two triggers for two countries (United Kingdom and Cyprus) and Cloud
Scheduler will emit to queryscheduled
topic once a day for each country which
in turn will call the service.
Cloud Scheduler currently needs users to create an App Engine application. Pick an App Engine Location and create the app:
gcloud app create --region=$APP_ENGINE_LOCATION
Create the scheduler job for UK:
gcloud scheduler jobs create pubsub cre-scheduler-uk \
--schedule="0 16 * * *" \
--message-body="United Kingdom"
Create the scheduler job for Cyprus:
gcloud scheduler jobs create pubsub cre-scheduler-cy \
--schedule="0 17 * * *" \
Before testing the pipeline, make sure all the triggers are ready:
gcloud eventarc triggers list
You can wait for Cloud Scheduler to trigger the services or you can manually trigger the jobs.
Find the jobs IDs:
gcloud scheduler jobs list
cre-scheduler-cy europe-west1 0 17 * * * (Etc/UTC) Pub/Sub ENABLED
cre-scheduler-uk europe-west1 0 16 * * * (Etc/UTC) Pub/Sub ENABLED
Trigger the jobs manually:
gcloud scheduler jobs run cre-scheduler-cy
gcloud scheduler jobs run cre-scheduler-uk
After a minute or so, you should see 2 charts in the bucket:
gsutil ls gs://$BUCKET
You should also get 2 emails with links to the charts!