Data Engineer Things

Things learned in our data engineering journey and ideas on data and engineering.

Follow publication

Member-only story

DAGs That Trigger DAGs

--

Image by author.

The Setup

Say we have a Core DAG in Airflow that coordinates tasks across our ELT process (We’ll use the PythonOperator to mimic tasks that you would typically find at most data teams).

Such a DAG would look something like this 👇

"""Core DAG"""
import sys
import time

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

from datetime import datetime


def print_message(message: str, sleep: int) -> None:
"""Simple Python function to wait and print a message"""
time.sleep(sleep)
print(message)


def break_task() -> None:
"""Forces a failed task."""
sys.exit(1)


with DAG(
dag_id="core",
description="Core DAG that orchestrates ELT.",
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
with TaskGroup(group_id="stock_market") as stock_market:
extract__stock_market_data = PythonOperator(
task_id="extract__stock_market_data",
python_callable=print_message,
op_kwargs={
"message": "Extracting stock market data from API and saving to S3.",
"sleep": 10
},
)
load__stock_market_data = PythonOperator(
task_id="load__stock_market_data",
python_callable=print_message…

--

--

Published in Data Engineer Things

Things learned in our data engineering journey and ideas on data and engineering.

No responses yet

Write a response