fix(airflow): Add comma parsing of owners to DataJobs (#10903)

This commit is contained in:
Ellie O'Neil 2024-07-17 14:58:25 -07:00 committed by GitHub
parent 998d7d1227
commit 0b64de8f2b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -127,6 +127,10 @@ class AirflowGenerator:
)
return upstream_tasks
@staticmethod
def _extract_owners(dag: "DAG") -> List[str]:
return [owner.strip() for owner in dag.owner.split(",")]
@staticmethod
def generate_dataflow(
config: DatahubLineageConfig,
@ -175,7 +179,7 @@ class AirflowGenerator:
data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}"
if config.capture_ownership_info and dag.owner:
owners = [owner.strip() for owner in dag.owner.split(",")]
owners = AirflowGenerator._extract_owners(dag)
if config.capture_ownership_as_group:
data_flow.group_owners.update(owners)
else:
@ -282,10 +286,12 @@ class AirflowGenerator:
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}"
if capture_owner and dag.owner:
if config and config.capture_ownership_as_group:
datajob.group_owners.add(dag.owner)
else:
datajob.owners.add(dag.owner)
if config and config.capture_ownership_info:
owners = AirflowGenerator._extract_owners(dag)
if config.capture_ownership_as_group:
datajob.group_owners.update(owners)
else:
datajob.owners.update(owners)
if capture_tags and dag.tags:
datajob.tags.update(dag.tags)