Run dbt with Cloud Composer and Cloud Run

Recently, a client wanted to use dbt core in Cloud Composer Airflow but they encountered Python dependencies issues. To solve this issue, I built a solution to run dbt inside of Cloud Run with Cloud Composer as orchestrator.

The workflow looks like:

Cloud Composer -> BashOperator to trigger a Cloud Run service -> Cloud Run container runs dbt cli

Below is how it works:

Cloud Run service:
When the Cloud Run service is triggered (via HTTP call), it will run a bash script (script.sh)

Dockerfile

FROM golang:1.18-buster as builder

WORKDIR /app

COPY go.* ./
RUN go mod download

COPY invoke.go ./

RUN go build -mod=readonly -v -o server

# Use the official dbt-bigquery image for running
# https://github.com/dbt-labs/dbt-bigquery/pkgs/container/dbt-bigquery

FROM ghcr.io/dbt-labs/dbt-bigquery:1.4.1

WORKDIR /

COPY --from=builder /app/server /app/server
COPY script.sh ./

ENTRYPOINT ["/app/server"]

Invoke.go (Source code from here)

package main

import (
        "log"
        "net/http"
        "os"
        "os/exec"
)

func main() {
        http.HandleFunc("/", scriptHandler)

        // Determine port for HTTP service.
        port := os.Getenv("PORT")
        if port == "" {
                port = "8080"
                log.Printf("Defaulting to port %s", port)
        }

        // Start HTTP server.
        log.Printf("Listening on port %s", port)
        if err := http.ListenAndServe(":"+port, nil); err != nil {
                log.Fatal(err)
        }
}

func scriptHandler(w http.ResponseWriter, r *http.Request) {
        cmd := exec.CommandContext(r.Context(), "/bin/bash", "script.sh")
        cmd.Stderr = os.Stderr
        out, err := cmd.Output()
        if err != nil {
                w.WriteHeader(500)
        }
        w.Write(out)
}

script.sh (This is just an example of running dbt)

dbt --version

I followed this document to deploy the Cloud Run shell service.
gcloud run deploy run-dbt --no-allow-unauthenticated --region=us-central1 --source=.

Terminal output:

Airflow DAG:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
}

dag = DAG(
    "trigger_cloud_run_dag",
    description="trigger_cloud_run_dag",
    schedule_interval="0 3 * * *",
    start_date=datetime(2023, 4, 19),
    catchup=False,
    tags=["custom"],
)

trigger_cloud_run = BashOperator(
    task_id="trigger_cloud_run",
    bash_command='curl -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://run-dbt-7ensisdq5q-uc.a.run.app',
    do_xcom_push=True,
    dag=dag
)

trigger_cloud_run

To make it secure, the Cloud Run service requires IAM authentication. Therefore I granted my Cloud Composer worker service account roles/run.invoker role.

Airflow logs:

Cloud Run logs show the dbt cli output:

I've uploaded the code to a GitHub repo. Feel free to let me know if you need any help running it.