Aligned airflow source with lineage (#10596)

* aligned airflow source with lineage

* fix python test

* changes as per comment
This commit is contained in:
NiharDoshi99 2023-03-15 19:29:24 +05:30 committed by GitHub
parent 0e57c56d83
commit 205e5c8c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 6 deletions

View File

@ -87,7 +87,8 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:
"""
if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict):
xlet_dict = xlet[0]
if isinstance(xlet_dict, dict) and xlet_dict.get("__var"):
xlet_dict = xlet_dict["__var"]
return {
key: value for key, value in xlet_dict.items() if isinstance(value, list)
}

View File

@ -44,6 +44,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.airflow.lineage_parser import get_xlets_from_dag
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
@ -408,11 +409,12 @@ class AirflowSource(PipelineServiceSource):
)
)
for task in dag.tasks:
for from_fqn in self.get_inlets(task) or []:
xlets = get_xlets_from_dag(dag=dag)
for xlet in xlets:
for from_fqn in xlet.inlets or []:
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
if from_entity:
for to_fqn in self.get_outlets(task) or []:
for to_fqn in xlet.outlets or []:
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
if to_entity:
lineage = AddLineageRequest(

View File

@ -93,13 +93,13 @@ class TestAirflowLineageParser(TestCase):
BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A"]},
inlets={"__var": {"tables": ["A"]}},
)
BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={"tables": ["B"]},
outlets={"__var": {"tables": ["B"]}},
)
self.assertEqual(