fix(ingestion/airflow-plugin): warning log for non-materialized iolets (#10421)

This commit is contained in:
dushayntAW 2024-05-08 10:41:16 +02:00 committed by GitHub
parent de5b503380
commit 4d20c22a49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 2 deletions

View File

@ -428,8 +428,8 @@ class AirflowGenerator:
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
clone_inlets=True,
clone_outlets=True,
clone_inlets=config is None or config.materialize_iolets,
clone_outlets=config is None or config.materialize_iolets,
)
job_property_bag: Dict[str, str] = {}
job_property_bag["run_id"] = str(dag_run.run_id)

View File

@ -433,6 +433,14 @@ class DataHubListener:
self.emitter.emit(operation_mcp)
logger.debug(f"Emitted Dataset Operation: {outlet}")
else:
if self.graph:
for outlet in datajob.outlets:
if not self.graph.exists(str(outlet)):
logger.warning(f"Dataset {str(outlet)} not materialized")
for inlet in datajob.inlets:
if not self.graph.exists(str(inlet)):
logger.warning(f"Dataset {str(inlet)} not materialized")
def on_task_instance_finish(
self, task_instance: "TaskInstance", status: InstanceRunResult