Skip to content Skip to sidebar Skip to footer

Airflow: How To Use Trigger Parameters In Functions

We are using Airflow's KubernetesPodOperator for our data pipelines. What we would like to add is the option to pass in parameters via the UI. We currently use it in a way that we

Solution 1:

You could use params, which is a dictionary that can be defined at DAG level parameters and remains accesible in every task. Works for every operator derived from BaseOperator and can also be set from the UI.

The following example shows how to use it with different operators. params could be defined in default_args dict or as arg to the DAG object.

default_args = {
    "owner": "airflow",
    'params': {
        "param1": "first_param",
        "param2": "second_param"
    }
}

dag = DAG(
    dag_id="example_dag_params",
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=['example_dags'],
    catchup=False
)

When triggering this DAG from the UI you could add an extra param:

set params while triggering DAG from the UI

Params could be accessed in templated fields, as in BashOperator case:

with dag:

    bash_task = BashOperator(
        task_id='bash_task',
        bash_command='echo bash_task: {{ params.param1 }}')

bash_task logs output:

{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code0

Params are accessible within execution context, like in python_callable:

def_print_params(**kwargs):
        print(f"Task_id: {kwargs['ti'].task_id}")
        for k, v in kwargs['params'].items():
            print(f"{k}:{v}")

    python_task = PythonOperator(
        task_id='python_task',
        python_callable=_print_params,
    )

Output:

{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI

You could also add params at task level definition:

    python_task_2 = PythonOperator(
        task_id='python_task_2',
        python_callable=_print_params,
        params={'param4': 'param defined at task level'}
    )

Output:

{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI

Following the example you could define a custom Operator that inherits from BaseOperator:

classCustomDummyOperator(BaseOperator):

    @apply_defaultsdef__init__(self, custom_arg: str = 'default', *args, **kwargs) -> None:
        self.custom_arg = custom_arg
        super(CustomDummyOperator, self).__init__(*args, **kwargs)

    defexecute(self, context):
        print(f"Task_id: {self.task_id}")
        for k, v in context['params'].items():
            print(f"{k}:{v}")

An example task would be:

    custom_op_task = CustomDummyOperator(
        task_id='custom_operator_task'
    )

Output:

{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI

Imports:

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults

I hope that works for you!

Post a Comment for "Airflow: How To Use Trigger Parameters In Functions"