diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 070fc0526e..cdba268eed 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -428,8 +428,8 @@ class AirflowGenerator: dpi = DataProcessInstance.from_datajob( datajob=datajob, id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}", - clone_inlets=True, - clone_outlets=True, + clone_inlets=config is None or config.materialize_iolets, + clone_outlets=config is None or config.materialize_iolets, ) job_property_bag: Dict[str, str] = {} job_property_bag["run_id"] = str(dag_run.run_id) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 737ea99e87..cf55d5347a 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -433,6 +433,14 @@ class DataHubListener: self.emitter.emit(operation_mcp) logger.debug(f"Emitted Dataset Operation: {outlet}") + else: + if self.graph: + for outlet in datajob.outlets: + if not self.graph.exists(str(outlet)): + logger.warning(f"Dataset {str(outlet)} not materialized") + for inlet in datajob.inlets: + if not self.graph.exists(str(inlet)): + logger.warning(f"Dataset {str(inlet)} not materialized") def on_task_instance_finish( self, task_instance: "TaskInstance", status: InstanceRunResult