Skip to main content

Command Palette

Search for a command to run...

How to trigger the DAG in Amazon Managed Workflows for Apache Airflow (MWAA)

Updated
3 min read

Background

Directed Acyclic Graphs (DAGs) in Amazon Managed Workflows for Apache Airflow (MWAA) can be triggered in several ways, depending on how much automation and integration you need. While you can manually trigger DAGs using the Airflow UI, we often prefer to do it programmatically.

Recently, I helped a customer set this up. The official documentation doesn't provide a step-by-step guide with screenshots, so I want to document it here for anyone interested in setting it up.

Lab

First, let’s create an MWAA environment. The simplest way is to download the CloudFormation template from this AWS tutorial and deploy it.

Create two simple DAGs that print out the dag.conf values.

The first DAG uses the traditional method of defining tasks and employs a Jinja template to access the value:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models.param import Param

from airflow.operators.bash_operator import BashOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
}

with DAG(
    "10_dag_run_conf_dag",
    default_args=default_args,
    description="Dag Run Conf Dag",
    schedule_interval="0 12 * * *",
    start_date=datetime(2023, 1, 24),
    catchup=False,
    tags=["custom"]
) as dag:

    start = BashOperator(
        task_id="start",
        bash_command="echo start",
    )
    print_dag_run_conf = BashOperator(
        task_id="print_dag_run_conf",
        bash_command="echo value: {{ dag_run.conf['conf1'] }}",
        dag=dag,
    )
    end = BashOperator(
        task_id="end",
        bash_command="echo stop",
    )

    start >> print_dag_run_conf >> end

The second DAG uses Airflow TaskFlow API:

import time
from datetime import datetime
from pprint import pprint

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator


with DAG(
    dag_id="11_taskflow_dag_run_conf_dag",
    schedule_interval=None,
    start_date=datetime(2023, 1, 24),
    catchup=False,
    tags=["custom"],
) as dag:
    @task(task_id="sleep_for_1")
    def my_sleeping_function():
        time.sleep(1)

    sleeping_task = my_sleeping_function()

    @task(task_id="print_dag_conf")
    def print_dag_conf(ds=None, **kwargs):
        print(kwargs["dag_run"].conf)
        return "Whatever you return gets printed in the logs"

    print_dag_conf = print_dag_conf(my_keyword="Airflow")

    sleeping_task >> print_dag_conf

Now let's upload the DAGs to S3 so MWAA can load them:

Check that the DAGs are loaded in the Airflow UI:

TIPS: When navigating to the MWAA Airflow UI, you might see an "Internal Server Error." This is an unresolved issue.

To fix this, you need to delete the cookie in your browser. For example, in Chrome Developer Tools:

To trigger the DAG, you need an IAM user with the AmazonMWAAAirflowCliAccess AWS managed policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}

Create an access key:

Use the keys in the AWS CLI to test it:

Write a Python script to trigger the DAG:

import boto3
import json
import requests 
import base64

mwaa_env_name = 'Derrick-MWAA-MwaaEnvironment'
dag_name = '10_dag_run_conf_dag'
key = "conf1"
value = "cool value1"
conf = "{\"" + key + "\":\"" + value + "\"}"

client = boto3.client('mwaa')

mwaa_cli_token = client.create_cli_token(
  Name=mwaa_env_name
)

mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = "dags trigger {0} --conf '{1}'".format(dag_name, conf)
#raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf)
mwaa_response = requests.post(
      mwaa_webserver_hostname,
      headers={
          'Authorization': mwaa_auth_token,
          'Content-Type': 'text/plain'
          },
      data=raw_data
      )

mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')

print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)

Note that before Airflow version xxx, you need to use the trigger_dag command. After that version, you use the dags trigger command.

Run the Python script:

Yay! We can see the value of dag.conf in the Airflow logs! This means that triggering the DAG from our Python script works.

To double-check this, we can update the Python script to trigger the second DAG. The logs look promising:

That’s it. Hope this works for you.