From 4d20c22a49bc4bc8adebf0cecfd61a98a2667608 Mon Sep 17 00:00:00 2001 From: dushayntAW <158567391+dushayntAW@users.noreply.github.com> Date: Wed, 8 May 2024 10:41:16 +0200 Subject: [PATCH] fix(ingestion/airflow-plugin): warning log for non-materialized iolets (#10421) --- .../datahub_airflow_plugin/client/airflow_generator.py | 4 ++-- .../src/datahub_airflow_plugin/datahub_listener.py | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 070fc0526e..cdba268eed 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -428,8 +428,8 @@ class AirflowGenerator: dpi = DataProcessInstance.from_datajob( datajob=datajob, id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}", - clone_inlets=True, - clone_outlets=True, + clone_inlets=config is None or config.materialize_iolets, + clone_outlets=config is None or config.materialize_iolets, ) job_property_bag: Dict[str, str] = {} job_property_bag["run_id"] = str(dag_run.run_id) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 737ea99e87..cf55d5347a 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -433,6 +433,14 @@ class DataHubListener: self.emitter.emit(operation_mcp) logger.debug(f"Emitted Dataset Operation: {outlet}") + else: + if self.graph: + for outlet in datajob.outlets: + if not self.graph.exists(str(outlet)): + logger.warning(f"Dataset {str(outlet)} not materialized") + for inlet in datajob.inlets: + if not self.graph.exists(str(inlet)): + logger.warning(f"Dataset {str(inlet)} not materialized") def on_task_instance_finish( self, task_instance: "TaskInstance", status: InstanceRunResult