Clean Airflow lineage test (#10745)

This commit is contained in:
Pere Miquel Brull 2023-03-27 11:46:34 +02:00 committed by GitHub
parent c50390da80
commit cc3d2b448b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4 additions and 2 deletions

View File

@ -85,8 +85,10 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:
:param xlet: airflow v2 xlet dict
:return: dictionary of xlet list or None
"""
# This branch is for lineage parser op
if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict):
xlet_dict = xlet[0]
# This is how the Serialized DAG is giving us the info from _inlets & _outlets
if isinstance(xlet_dict, dict) and xlet_dict.get("__var"):
xlet_dict = xlet_dict["__var"]
return {

View File

@ -93,13 +93,13 @@ class TestAirflowLineageParser(TestCase):
BashOperator(
task_id="print_date",
bash_command="date",
inlets={"__var": {"tables": ["A"]}},
inlets={"tables": ["A"]},
)
BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={"__var": {"tables": ["B"]}},
outlets={"tables": ["B"]},
)
self.assertEqual(