How to trigger the DAG in Amazon Managed Workflows for Apache Airflow (MWAA)
Background
Directed Acyclic Graphs (DAGs) in Amazon Managed Workflows for Apache Airflow (MWAA) can be triggered in several ways, depending on how much automation and integration you need. While you can manually trigger DAGs using the Airflow UI, we often prefer to do it programmatically.
Recently, I helped a customer set this up. The official documentation doesn't provide a step-by-step guide with screenshots, so I want to document it here for anyone interested in setting it up.
Lab
First, let’s create an MWAA environment. The simplest way is to download the CloudFormation template from this AWS tutorial and deploy it.

Create two simple DAGs that print out the dag.conf values.
The first DAG uses the traditional method of defining tasks and employs a Jinja template to access the value:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
}
with DAG(
"10_dag_run_conf_dag",
default_args=default_args,
description="Dag Run Conf Dag",
schedule_interval="0 12 * * *",
start_date=datetime(2023, 1, 24),
catchup=False,
tags=["custom"]
) as dag:
start = BashOperator(
task_id="start",
bash_command="echo start",
)
print_dag_run_conf = BashOperator(
task_id="print_dag_run_conf",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
end = BashOperator(
task_id="end",
bash_command="echo stop",
)
start >> print_dag_run_conf >> end
The second DAG uses Airflow TaskFlow API:
import time
from datetime import datetime
from pprint import pprint
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
with DAG(
dag_id="11_taskflow_dag_run_conf_dag",
schedule_interval=None,
start_date=datetime(2023, 1, 24),
catchup=False,
tags=["custom"],
) as dag:
@task(task_id="sleep_for_1")
def my_sleeping_function():
time.sleep(1)
sleeping_task = my_sleeping_function()
@task(task_id="print_dag_conf")
def print_dag_conf(ds=None, **kwargs):
print(kwargs["dag_run"].conf)
return "Whatever you return gets printed in the logs"
print_dag_conf = print_dag_conf(my_keyword="Airflow")
sleeping_task >> print_dag_conf
Now let's upload the DAGs to S3 so MWAA can load them:

Check that the DAGs are loaded in the Airflow UI:

TIPS: When navigating to the MWAA Airflow UI, you might see an "Internal Server Error." This is an unresolved issue.

To fix this, you need to delete the cookie in your browser. For example, in Chrome Developer Tools:

To trigger the DAG, you need an IAM user with the AmazonMWAAAirflowCliAccess AWS managed policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"airflow:CreateCliToken"
],
"Resource": "*"
}
]
}
Create an access key:

Use the keys in the AWS CLI to test it:

Write a Python script to trigger the DAG:
import boto3
import json
import requests
import base64
mwaa_env_name = 'Derrick-MWAA-MwaaEnvironment'
dag_name = '10_dag_run_conf_dag'
key = "conf1"
value = "cool value1"
conf = "{\"" + key + "\":\"" + value + "\"}"
client = boto3.client('mwaa')
mwaa_cli_token = client.create_cli_token(
Name=mwaa_env_name
)
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = "dags trigger {0} --conf '{1}'".format(dag_name, conf)
#raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf)
mwaa_response = requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)
mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)
Note that before Airflow version xxx, you need to use the trigger_dag command. After that version, you use the dags trigger command.
Run the Python script:

Yay! We can see the value of dag.conf in the Airflow logs! This means that triggering the DAG from our Python script works.

To double-check this, we can update the Python script to trigger the second DAG. The logs look promising:

That’s it. Hope this works for you.