diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py index 4ad7bd3a54a..4147cb25357 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py @@ -87,7 +87,8 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]: """ if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict): xlet_dict = xlet[0] - + if isinstance(xlet_dict, dict) and xlet_dict.get("__var"): + xlet_dict = xlet_dict["__var"] return { key: value for key, value in xlet_dict.items() if isinstance(value, list) } diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 411a0aec9c7..f44bf49a8e0 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -44,6 +44,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.connections.session import create_and_bind_session from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.airflow.lineage_parser import get_xlets_from_dag from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger @@ -408,11 +409,12 @@ class AirflowSource(PipelineServiceSource): ) ) - for task in dag.tasks: - for from_fqn in self.get_inlets(task) or []: + xlets = get_xlets_from_dag(dag=dag) + for xlet in xlets: + for from_fqn in xlet.inlets or []: from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) if from_entity: - for to_fqn in self.get_outlets(task) or []: + for to_fqn in xlet.outlets or []: to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) if to_entity: lineage = AddLineageRequest( diff --git a/ingestion/tests/unit/airflow/test_lineage_parser.py b/ingestion/tests/unit/airflow/test_lineage_parser.py index cdd7484f394..1399f788425 100644 --- a/ingestion/tests/unit/airflow/test_lineage_parser.py +++ b/ingestion/tests/unit/airflow/test_lineage_parser.py @@ -93,13 +93,13 @@ class TestAirflowLineageParser(TestCase): BashOperator( task_id="print_date", bash_command="date", - inlets={"tables": ["A"]}, + inlets={"__var": {"tables": ["A"]}}, ) BashOperator( task_id="sleep", bash_command="sleep 1", - outlets={"tables": ["B"]}, + outlets={"__var": {"tables": ["B"]}}, ) self.assertEqual(