How to trigger Cloud Run actions on BigQuery events | Google Cloud Blog


Developers & Practitioners

Many BigQuery users ask for database triggers—a way to run some procedural code in response to events on a particular BigQuery table, model, or dataset. Maybe you want to run an ELT job whenever a new table partition is created, or maybe you want to retrain your ML model whenever new rows are inserted into the table.

In the general category of “Cloud gets easier”, this article will show how to quite simply and cleanly tie together BigQuery and Cloud Run. Because if you love BigQuery and you love Cloud Run, how can you not love when they get together?!

Cloud Run will be triggered when BigQuery writes to its audit log. Every data access in BigQuery is logged (there is no way to turn it off), and so all that we need to do is to find out the exact log message that we are looking for.

Follow along with me.

Find the BigQuery event

I’m going to take a wild guess here and assume that you don’t want to muck up your actual datasets, so create a temporary dataset named cloud_run_tmp in your project in BigQuery.

In that project, let’s create a table into which we will insert some rows to try things out. Grab some rows from a BigQuery public dataset to create this table:

 CREATE OR REPLACE TABLE cloud_run_tmp.cloud_run_trigger AS
SELECT  state, gender, year, name, number
FROM `bigquery-public-data.usa_names.usa_1910_current` LIMIT 10000

Then, run the insert query that we want to build a database trigger for:

 INSERT INTO cloud_run_tmp.cloud_run_trigger
VALUES('OK', 'F', 2021, 'Joe', 3)
event

Note that there will be several audit logs for a given BigQuery action. In this case, for example, when we submit a query, a log will be generated immediately. But only after the query is parsed does BigQuery know which table(s) we want to interact with, so the initial log will not have the table name. Keep in mind that you don’t want any old audit log... make sure to look for a unique set of attributes that clearly identifies your action.

In the case of inserting rows, this is the combination:

  • The method is google.cloud.bigquery.v2.JobService.InsertJob
  • The name of the table being inserted to is the protoPayload.resourceName
  • The dataset id is available as resource.labels.dataset_id
  • The number of inserted rows is protoPayload.metadata.tableDataChanged.insertedRowsCount

Write the Cloud Run Action

Now that we know the payload that we are looking for, we can write the Cloud Run action. Let’s do it in Python as a Flask App (full code is on GitHub).

First, we make sure that this is the event we want to process:

 @app.route('/', methods=['POST'])
def index():
 # Gets the Payload data from the Audit Log
 content = request.json
 try:
 ds = content['resource']['labels']['dataset_id']
 proj = content['resource']['labels']['project_id']
 tbl = content['protoPayload']['resourceName']
 rows = int(content['protoPayload']['metadata']
 ['tableDataChange']['insertedRowsCount'])
 if ds == 'cloud_run_tmp' and \
 tbl.endswith('tables/cloud_run_trigger') and rows > 0:
 query = create_agg()
 return "table created", 200
 except:
 # if these fields are not in the JSON, ignore
 pass
 return "ok", 200

Once we have identified that this is the event we want, then we carry out the action that we want to do. Here, let’s do an aggregation and write out a new table:

 def create_agg():
 client = bigquery.Client()
 query = """
CREATE OR REPLACE TABLE cloud_run_tmp.created_by_trigger AS
SELECT  name, SUM(number) AS n
FROM cloud_run_tmp.cloud_run_trigger
GROUP BY name
ORDER BY n desc
LIMIT 10
 """
 client.query(query)
 return query

The Dockerfile for the container is simply a basic Python container into which we install Flask and the BigQuery client library:

 FROM python:3.9-slim
RUN pip install Flask==1.1.2 gunicorn==20.0.4 google-cloud-bigquery
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY *.py ./
CMD exec gunicorn --bind :$PORT main:app
Build the container and deploy it using a couple of gcloud commands:
 SERVICE=bq-cloud-run
PROJECT=$(gcloud config get-value project)
CONTAINER="gcr.io/${PROJECT}/${SERVICE}"
gcloud builds submit --tag ${CONTAINER}
gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed

In order for the trigger to work, the service account for Cloud Run will need a couple of permissions:

 gcloud projects add-iam-policy-binding $PROJECT \
 --member="serviceAccount:service-${PROJECT_NO}@gcp-sa-pubsub.iam.gserviceaccount.com"\
 --role='roles/iam.serviceAccountTokenCreator'

gcloud projects add-iam-policy-binding $PROJECT \
 --member=serviceAccount:${SVC_ACCOUNT} \
 --role='roles/eventarc.admin'

Finally create the event trigger:

 gcloud eventarc triggers create ${SERVICE}-trigger \
 --location ${REGION} --service-account ${SVC_ACCOUNT} \
 --destination-run-service ${SERVICE} \
 --event-filters type=google.cloud.audit.log.v1.written \
 --event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
 --event-filters serviceName=bigquery.googleapis.com

The important thing to note is that we are triggering on any Insert log created by BigQuery. That’s why, in the action, we had to filter these events based on the payload.

What events are supported? An easy way to check is to look at the Web Console for Cloud Run. Here are a few to get your mind whirring:

supported events

Now, try out the BigQuery -> Cloud Run trigger and action. Go to the BigQuery console and insert a row or two:

 INSERT INTO cloud_run_tmp.cloud_run_trigger
VALUES('OK', 'F', 2021, 'Joe', 3)

Watch as a new table called created_by_trigger  gets created! You have successfully triggered a Cloud Run action on a database event in BigQuery. 

Enjoy!

Resources

Thanks to Prashant Gulati.