LLM Workflows then Agents: Getting Started with Apache Airflow
2 months ago
5
This repository contains an SDK for working with LLMs from Apache Airflow, based on Pydantic AI. It allows users to call LLMs and orchestrate agent calls directly within their Airflow pipelines using decorator-based tasks. The SDK leverages the familiar Airflow @task syntax with extensions like @task.llm, @task.llm_branch, and @task.agent.
To get started, check out the examples repository here, which offers a full local Airflow instance with the AI SDK installed and 5 example pipelines. To run this locally, run:
git clone https://github.com/astronomer/ai-sdk-examples.git
cd ai-sdk-examples
astro dev start
If you don't have the Astro CLI installed, run brew install astro (or see other options here).
If you already have Airflow running, you can also install the package with any optional dependencies you need:
pip install airflow-ai-sdk[openai,duckduckgo]
Note that installing the package with no optional dependencies will install the slim version of the package, which does not include any LLM models or tools. The available optional packages are listed here. While this SDK offers the optional dependencies for convenience sake, you can also install the optional dependencies from Pydantic AI directly.
We follow the taskflow pattern of Airflow with three decorators:
@task.llm: Define a task that calls an LLM. Under the hood, this creates a Pydantic AI Agent with no tools.
@task.agent: Define a task that calls an agent. You can pass in a Pydantic AI Agent directly.
@task.llm_branch: Define a task that branches the control flow of a DAG based on the output of an LLM. Enforces that the LLM output is one of the downstream task_ids.
The function supplied to each decorator is a translation function that converts the Airflow task's input into the LLM's input. If you don't want to do any translation, you
can just return the input unchanged.
AI workflows are becoming increasingly common as organizations look for pragmatic ways to get value out of LLMs. As with
any workflow, it's important to have a flexible and scalable way to orchestrate them.
Airflow is a popular choice for orchestrating data pipelines. It's a powerful tool for managing the dependencies
between tasks and for scheduling and monitoring them, and has been trusted by data teams everywhere for 10+ years. It comes "batteries included" with a rich set of capabilities:
Flexible scheduling: run tasks on a fixed schedule, on-demand, or based on external events
Dynamic task mapping: easily process multiple inputs in parallel with full error handling and observability
Branching and conditional logic: change the control flow of a DAG based on the output of certain tasks
Error handling: built-in support for retries, exponential backoff, and timeouts
Resource management: limit the concurrency of tasks with Airflow Pools
Monitoring: detailed logs and monitoring capabilities
Scalability: designed for production workflows
This SDK is designed to make it easy to integrate LLM workflows into your Airflow pipelines. It allows you to do anything from simple LLM calls to complex agentic workflows.
See the full set of examples in the examples/dags directory.
LLM calls from a DAG (summarize Airflow's commits)
This example shows how to use the @task.llm decorator as part of an Airflow DAG. In the @task.llm decorator, we can
specify a model and system prompt. The decorator allows you to transform the Airflow task's input into the LLM's input.
importosimportpendulumfromairflow.decoratorsimportdag, taskfromgithubimportGithub@taskdefget_recent_commits(data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) ->list[str]:
""" This task returns a mocked list of recent commits. In a real workflow, this task would get the recent commits from a database or API. """print(f"Getting commits for {data_interval_start} to {data_interval_end}")
gh=Github(os.getenv("GITHUB_TOKEN"))
repo=gh.get_repo("apache/airflow")
commits=repo.get_commits(since=data_interval_start, until=data_interval_end)
return [f"{commit.commit.sha}: {commit.commit.message}"forcommitincommits]
@task.llm(model="gpt-4o-mini",result_type=str,system_prompt=""" Your job is to summarize the commits to the Airflow project given a week's worth of commits. Pay particular attention to large changes and new features as opposed to bug fixes and minor changes. You don't need to include every commit, just the most important ones. Add a one line overall summary of the changes at the top, followed by bullet points of the most important changes. Example output: This week, we made architectural changes to the core scheduler to make it more maintainable and easier to understand. - Made the scheduler 20% faster (commit 1234567) - Added a new task type: `example_task` (commit 1234568) - Added a new operator: `example_operator` (commit 1234569) - Added a new sensor: `example_sensor` (commit 1234570) """)defsummarize_commits(commits: list[str] |None=None) ->str:
""" This task summarizes the commits. You can add logic here to transform the input before it gets passed to the LLM. """# don't need to do any translationreturn"\n".join(commits)
@taskdefsend_summaries(summaries: str):
...
@dag(schedule="@weekly",start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),catchup=False,)defgithub_changelog():
commits=get_recent_commits()
summaries=summarize_commits(commits=commits)
send_summaries(summaries)
github_changelog()
LLM calls with structured outputs using @task.llm (user feedback -> sentiment and feature requests)
This example demonstrates how to use the @task.llm decorator to call an LLM and return a structured output. In this
case, we're using a Pydantic model to validate the output of the LLM. We recommend using the airflow_ai_sdk.BaseModel
class to define your Pydantic models in case we add more functionality in the future.
importpendulumfromtypingimportLiteral, Anyfromairflow.decoratorsimportdag, taskfromairflow.exceptionsimportAirflowSkipExceptionimportairflow_ai_sdkasai_sdkfrominclude.piiimportmask_pii@taskdefget_product_feedback() ->list[str]:
""" This task returns a mocked list of product feedback. In a real workflow, this task would get the product feedback from a database or API. """
...
classProductFeedbackSummary(ai_sdk.BaseModel):
summary: strsentiment: Literal["positive", "negative", "neutral"]
feature_requests: list[str]
@task.llm(model="gpt-4o-mini",result_type=ProductFeedbackSummary,system_prompt=""" You are a helpful assistant that summarizes product feedback. """)defsummarize_product_feedback(feedback: str|None=None) ->ProductFeedbackSummary:
""" This task summarizes the product feedback. You can add logic here to transform the input before summarizing it. """# if the feedback doesn't mention Airflow, skip itif"Airflow"notinfeedback:
raiseAirflowSkipException("Feedback does not mention Airflow")
# mask PII in the feedbackfeedback=mask_pii(feedback)
returnfeedback@taskdefupload_summaries(summaries: list[dict[str, Any]]):
...
@dag(schedule=None,start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),catchup=False,)defproduct_feedback_summarization():
feedback=get_product_feedback()
summaries=summarize_product_feedback.expand(feedback=feedback)
upload_summaries(summaries)
product_feedback_summarization()
Agent calls with @task.agent (deep research agent)
This example shows how to build an AI agent that can autonomously invoke external tools (e.g., a knowledge base search) when answering a user question.
importpendulumimportrequestsfromairflow.decoratorsimportdag, taskfromairflow.models.dagrunimportDagRunfromairflow.models.paramimportParamfrombs4importBeautifulSoupfrompydantic_aiimportAgentfrompydantic_ai.common_tools.duckduckgoimportduckduckgo_search_tool# custom tool to get the content of a pagedefget_page_content(url: str) ->str:
""" Get the content of a page. """response=requests.get(url)
soup=BeautifulSoup(response.text, "html.parser")
distillation_agent=Agent(
"gpt-4o-mini",
system_prompt=""" You are responsible for distilling information from a text. The summary will be used by a research agent to generate a research report. Keep the summary concise and to the point, focusing on only key information. """,
)
returndistillation_agent.run_sync(soup.get_text())
deep_research_agent=Agent(
"o3-mini",
system_prompt=""" You are a deep research agent who is very skilled at distilling information from the web. You are given a query and your job is to generate a research report. You can search the web by using the `duckduckgo_search_tool`. You can also use the `get_page_content` tool to get the contents of a page. Keep going until you have enough information to generate a research report. Assume you know nothing about the query or contents, so you need to search the web for relevant information. Do not generate new information, only distill information from the web. """,
tools=[duckduckgo_search_tool(), get_page_content],
)
@task.agent(agent=deep_research_agent)defdeep_research_task(dag_run: DagRun) ->str:
""" This task performs a deep research on the given query. """query=dag_run.conf.get("query")
ifnotquery:
raiseValueError("Query is required")
print(f"Performing deep research on {query}")
returnquery@taskdefupload_results(results: str):
...
@dag(schedule=None,start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),catchup=False,params={"query": Param(type="string",default="How has the field of data engineering evolved in the last 5 years?", ), },)defdeep_research():
results=deep_research_task()
upload_results(results)
deep_research()
Changing dag control flow with @task.llm_branch (support ticket routing)
This example demonstrates how to use the @task.llm_branch decorator to change the control flow of a DAG based on the output of an LLM. In this case, we're routing support tickets based on the severity of the ticket.
importpendulumfromairflow.decoratorsimportdag, taskfromairflow.models.dagrunimportDagRun@task.llm_branch(model="gpt-4o-mini",system_prompt=""" You are a support agent that routes support tickets based on the priority of the ticket. Here are the priority definitions: - P0: Critical issues that impact the user's ability to use the product, specifically for a production deployment. - P1: Issues that impact the user's ability to use the product, but not as severely (or not for their production deployment). - P2: Issues that are low priority and can wait until the next business day - P3: Issues that are not important or time sensitive Here are some examples of tickets and their priorities: - "Our production deployment just went down because it ran out of memory. Please help.": P0 - "Our staging / dev / QA deployment just went down because it ran out of memory. Please help.": P1 - "I'm having trouble logging in to my account.": P1 - "The UI is not loading.": P1 - "I need help setting up my account.": P2 - "I have a question about the product.": P3 """,allow_multiple_branches=True,)defroute_ticket(dag_run: DagRun) ->str:
returndag_run.conf.get("ticket")
@taskdefhandle_p0_ticket(ticket: str):
print(f"Handling P0 ticket: {ticket}")
@taskdefhandle_p1_ticket(ticket: str):
print(f"Handling P1 ticket: {ticket}")
@taskdefhandle_p2_ticket(ticket: str):
print(f"Handling P2 ticket: {ticket}")
@taskdefhandle_p3_ticket(ticket: str):
print(f"Handling P3 ticket: {ticket}")
@dag(start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),schedule=None,catchup=False,params={"ticket": "Hi, our production deployment just went down because it ran out of memory. Please help."})defsupport_ticket_routing():
ticket=route_ticket()
handle_p0_ticket(ticket)
handle_p1_ticket(ticket)
handle_p2_ticket(ticket)
handle_p3_ticket(ticket)
support_ticket_routing()