diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py index 4147cb25357..000fbb0ee1e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py @@ -85,8 +85,10 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]: :param xlet: airflow v2 xlet dict :return: dictionary of xlet list or None """ + # This branch is for lineage parser op if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict): xlet_dict = xlet[0] + # This is how the Serialized DAG is giving us the info from _inlets & _outlets if isinstance(xlet_dict, dict) and xlet_dict.get("__var"): xlet_dict = xlet_dict["__var"] return { diff --git a/ingestion/tests/unit/airflow/test_lineage_parser.py b/ingestion/tests/unit/airflow/test_lineage_parser.py index 1399f788425..cdd7484f394 100644 --- a/ingestion/tests/unit/airflow/test_lineage_parser.py +++ b/ingestion/tests/unit/airflow/test_lineage_parser.py @@ -93,13 +93,13 @@ class TestAirflowLineageParser(TestCase): BashOperator( task_id="print_date", bash_command="date", - inlets={"__var": {"tables": ["A"]}}, + inlets={"tables": ["A"]}, ) BashOperator( task_id="sleep", bash_command="sleep 1", - outlets={"__var": {"tables": ["B"]}}, + outlets={"tables": ["B"]}, ) self.assertEqual(