Recently, a client wanted to use dbt core in Cloud Composer Airflow but they encountered Python dependencies issues. To solve this issue, I built a solution to run dbt inside of Cloud Run with Cloud Composer as orchestrator.
The workflow looks like:
Cloud Composer -> BashOperator to trigger a Cloud Run service -> Cloud Run container runs dbt cli
Below is how it works:
Cloud Run service:
When the Cloud Run service is triggered (via HTTP call), it will run a bash script (script.sh)
Dockerfile
FROM golang:1.18-buster as builder
WORKDIR /app
COPY go.* ./
RUN go mod download
COPY invoke.go ./
RUN go build -mod=readonly -v -o server
# Use the official dbt-bigquery image for running
# https://github.com/dbt-labs/dbt-bigquery/pkgs/container/dbt-bigquery
FROM ghcr.io/dbt-labs/dbt-bigquery:1.4.1
WORKDIR /
COPY --from=builder /app/server /app/server
COPY script.sh ./
ENTRYPOINT ["/app/server"]
Invoke.go (Source code from here)
package main
import (
"log"
"net/http"
"os"
"os/exec"
)
func main() {
http.HandleFunc("/", scriptHandler)
// Determine port for HTTP service.
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("Defaulting to port %s", port)
}
// Start HTTP server.
log.Printf("Listening on port %s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatal(err)
}
}
func scriptHandler(w http.ResponseWriter, r *http.Request) {
cmd := exec.CommandContext(r.Context(), "/bin/bash", "script.sh")
cmd.Stderr = os.Stderr
out, err := cmd.Output()
if err != nil {
w.WriteHeader(500)
}
w.Write(out)
}
script.sh (This is just an example of running dbt)
dbt --version
I followed this document to deploy the Cloud Run shell service.gcloud run deploy run-dbt --no-allow-unauthenticated --region=us-central1 --source=.
Terminal output:
Airflow DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
}
dag = DAG(
"trigger_cloud_run_dag",
description="trigger_cloud_run_dag",
schedule_interval="0 3 * * *",
start_date=datetime(2023, 4, 19),
catchup=False,
tags=["custom"],
)
trigger_cloud_run = BashOperator(
task_id="trigger_cloud_run",
bash_command='curl -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://run-dbt-7ensisdq5q-uc.a.run.app',
do_xcom_push=True,
dag=dag
)
trigger_cloud_run
To make it secure, the Cloud Run service requires IAM authentication. Therefore I granted my Cloud Composer worker service account roles/run.invoker
role.
Airflow logs:
Cloud Run logs show the dbt cli output:
I've uploaded the code to a GitHub repo. Feel free to let me know if you need any help running it.