Airflow Xcom Exclusive [exclusive] ❲TRUSTED❳
, which allows a task to request specific values from one or more previous tasks. Explicit Storage: Tasks must explicitly "push" data to the Airflow metadata database
: A built-in mechanism for tasks to "push" (store) and "pull" (retrieve) small pieces of data.
@staticmethod def deserialize_value(value): """Convert stored string back to original value""" return json.loads(value)
@task def multi_pull(**context): count = context['ti'].xcom_pull(key='count', task_ids='multi_push') status = context['ti'].xcom_pull(key='status', task_ids='multi_push') main = context['ti'].xcom_pull(task_ids='multi_push') # default key airflow xcom exclusive
When we talk about Airflow XCom being "exclusive," we're referring to the fact that XCom is only accessible to tasks within the same DAG. This means that tasks in one DAG cannot access XCom values from another DAG.
Implement with Redis:
If you attempt to pass a 500MB pandas DataFrame or a massive JSON payload through XCom: , which allows a task to request specific
XComs allow tasks to "push" and "pull" metadata or small results. They are stored in the Airflow metadata database and are keyed by: dag_id : The specific workflow. task_id : The originating task. run_id : The specific execution instance. key : A custom identifier (defaults to return_value ). 🔒 Implementing "Exclusive" Scoping
The absolute most critical rule of Airflow development is: The Technical Reality
cleanup_task = BashOperator( task_id='cleanup_temp_directory', bash_command='rm -rf /tmp/data/*', do_xcom_push=False # Prevents pushing empty strings or terminal execution logs to DB ) Use code with caution. Pattern 3: Automatic XCom Purging This means that tasks in one DAG cannot
By following best practices and using XCom judiciously, you can unlock the full potential of Airflow and build more efficient, scalable, and reliable workflows. So, go ahead and experiment with Airflow XCom exclusive – your workflows will thank you!
Example (Python using redis-py):
This approach is particularly useful with the TaskFlow API, where a task's return value is automatically pushed to XCom. By explicitly passing these references as arguments to downstream tasks, you create an intuitive, functional declaration of your pipeline.
@staticmethod def serialize_value(value): """Convert value to string for storage""" # Custom serialization logic here return json.dumps(value)