test(ingest/mcp_diff): Fallback to overwriting file on more complicated diffs (#11407)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Andrew Sikowitz 2024-09-18 16:40:08 -07:00 committed by GitHub
parent de40d251e2
commit 3e3a05f09c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 39 additions and 21 deletions

View File

@ -19,7 +19,8 @@
}, },
"externalUrl": "http://airflow.example.com/tree?dag_id=datahub_emitter_operator_jinja_template_dag", "externalUrl": "http://airflow.example.com/tree?dag_id=datahub_emitter_operator_jinja_template_dag",
"name": "datahub_emitter_operator_jinja_template_dag", "name": "datahub_emitter_operator_jinja_template_dag",
"description": "An example dag with jinja template" "description": "An example dag with jinja template",
"env": "PROD"
} }
} }
}, },
@ -140,13 +141,14 @@
"downstream_task_ids": "[]", "downstream_task_ids": "[]",
"inlets": "[]", "inlets": "[]",
"outlets": "[]", "outlets": "[]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.18.0/integration/airflow\", \"_schemaURL\": \"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/BaseFacet\", \"unknownItems\": [{\"name\": \"DatahubEmitterOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"execution_timeout\": \"<<non-serializable: timedelta>>\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"datahub_airflow_plugin.operators.datahub.DatahubEmitterOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"datahub_emitter_operator_jinja_template_dag_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.18.0/integration/airflow\", \"_schemaURL\": \"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/BaseFacet\", \"unknownItems\": [{\"name\": \"DatahubEmitterOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"execution_timeout\": \"<<non-serializable: timedelta>>\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"multiple_outputs\": false, \"operator_class\": \"datahub_airflow_plugin.operators.datahub.DatahubEmitterOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"datahub_emitter_operator_jinja_template_dag_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"<<non-serializable: _DownstreamPriorityWeightStrategy>>\"}, \"type\": \"operator\"}]}"
}, },
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=datahub_emitter_operator_jinja_template_dag&_flt_3_task_id=datahub_emitter_operator_jinja_template_dag_task", "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=datahub_emitter_operator_jinja_template_dag&_flt_3_task_id=datahub_emitter_operator_jinja_template_dag_task",
"name": "datahub_emitter_operator_jinja_template_dag_task", "name": "datahub_emitter_operator_jinja_template_dag_task",
"type": { "type": {
"string": "COMMAND" "string": "COMMAND"
} },
"env": "PROD"
} }
} }
}, },
@ -233,16 +235,16 @@
"state": "running", "state": "running",
"operator": "DatahubEmitterOperator", "operator": "DatahubEmitterOperator",
"priority_weight": "1", "priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=datahub_emitter_operator_jinja_template_dag_task&dag_id=datahub_emitter_operator_jinja_template_dag&map_index=-1", "log_url": "http://airflow.example.com/dags/datahub_emitter_operator_jinja_template_dag/grid?dag_run_id=manual_run_test&task_id=datahub_emitter_operator_jinja_template_dag_task&map_index=-1&tab=logs",
"orchestrator": "airflow", "orchestrator": "airflow",
"dag_id": "datahub_emitter_operator_jinja_template_dag", "dag_id": "datahub_emitter_operator_jinja_template_dag",
"task_id": "datahub_emitter_operator_jinja_template_dag_task" "task_id": "datahub_emitter_operator_jinja_template_dag_task"
}, },
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=datahub_emitter_operator_jinja_template_dag_task&dag_id=datahub_emitter_operator_jinja_template_dag&map_index=-1", "externalUrl": "http://airflow.example.com/dags/datahub_emitter_operator_jinja_template_dag/grid?dag_run_id=manual_run_test&task_id=datahub_emitter_operator_jinja_template_dag_task&map_index=-1&tab=logs",
"name": "datahub_emitter_operator_jinja_template_dag_datahub_emitter_operator_jinja_template_dag_task_manual_run_test", "name": "datahub_emitter_operator_jinja_template_dag_datahub_emitter_operator_jinja_template_dag_task_manual_run_test",
"type": "BATCH_AD_HOC", "type": "BATCH_AD_HOC",
"created": { "created": {
"time": 1726570937652, "time": 1726699852909,
"actor": "urn:li:corpuser:datahub" "actor": "urn:li:corpuser:datahub"
} }
} }
@ -267,7 +269,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1726570937652, "timestampMillis": 1726699852909,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"
@ -360,13 +362,14 @@
"downstream_task_ids": "[]", "downstream_task_ids": "[]",
"inlets": "[]", "inlets": "[]",
"outlets": "[]", "outlets": "[]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.18.0/integration/airflow\", \"_schemaURL\": \"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/BaseFacet\", \"unknownItems\": [{\"name\": \"DatahubEmitterOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"execution_timeout\": \"<<non-serializable: timedelta>>\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"datahub_airflow_plugin.operators.datahub.DatahubEmitterOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"datahub_emitter_operator_jinja_template_dag_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.18.0/integration/airflow\", \"_schemaURL\": \"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/BaseFacet\", \"unknownItems\": [{\"name\": \"DatahubEmitterOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"execution_timeout\": \"<<non-serializable: timedelta>>\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"multiple_outputs\": false, \"operator_class\": \"datahub_airflow_plugin.operators.datahub.DatahubEmitterOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"datahub_emitter_operator_jinja_template_dag_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"<<non-serializable: _DownstreamPriorityWeightStrategy>>\"}, \"type\": \"operator\"}]}"
}, },
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=datahub_emitter_operator_jinja_template_dag&_flt_3_task_id=datahub_emitter_operator_jinja_template_dag_task", "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=datahub_emitter_operator_jinja_template_dag&_flt_3_task_id=datahub_emitter_operator_jinja_template_dag_task",
"name": "datahub_emitter_operator_jinja_template_dag_task", "name": "datahub_emitter_operator_jinja_template_dag_task",
"type": { "type": {
"string": "COMMAND" "string": "COMMAND"
} },
"env": "PROD"
} }
} }
}, },
@ -441,7 +444,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1726570937811, "timestampMillis": 1726699853392,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"

View File

@ -19,7 +19,8 @@
}, },
"externalUrl": "http://airflow.example.com/tree?dag_id=datahub_emitter_operator_jinja_template_dag", "externalUrl": "http://airflow.example.com/tree?dag_id=datahub_emitter_operator_jinja_template_dag",
"name": "datahub_emitter_operator_jinja_template_dag", "name": "datahub_emitter_operator_jinja_template_dag",
"description": "An example dag with jinja template" "description": "An example dag with jinja template",
"env": "PROD"
} }
} }
}, },
@ -146,7 +147,8 @@
"name": "datahub_emitter_operator_jinja_template_dag_task", "name": "datahub_emitter_operator_jinja_template_dag_task",
"type": { "type": {
"string": "COMMAND" "string": "COMMAND"
} },
"env": "PROD"
} }
} }
}, },
@ -242,7 +244,7 @@
"name": "datahub_emitter_operator_jinja_template_dag_datahub_emitter_operator_jinja_template_dag_task_manual_run_test", "name": "datahub_emitter_operator_jinja_template_dag_datahub_emitter_operator_jinja_template_dag_task_manual_run_test",
"type": "BATCH_AD_HOC", "type": "BATCH_AD_HOC",
"created": { "created": {
"time": 1726570903946, "time": 1726621020280,
"actor": "urn:li:corpuser:datahub" "actor": "urn:li:corpuser:datahub"
} }
} }
@ -267,7 +269,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1726570903946, "timestampMillis": 1726621020280,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"
@ -366,7 +368,8 @@
"name": "datahub_emitter_operator_jinja_template_dag_task", "name": "datahub_emitter_operator_jinja_template_dag_task",
"type": { "type": {
"string": "COMMAND" "string": "COMMAND"
} },
"env": "PROD"
} }
} }
}, },
@ -441,7 +444,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1726570904070, "timestampMillis": 1726621020427,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"

View File

@ -84,10 +84,11 @@ def assert_metadata_files_equal(
diff = diff_metadata_json(output, golden, ignore_paths, ignore_order=ignore_order) diff = diff_metadata_json(output, golden, ignore_paths, ignore_order=ignore_order)
if diff and update_golden: if diff and update_golden:
if isinstance(diff, MCPDiff): if isinstance(diff, MCPDiff) and diff.is_delta_valid:
diff.apply_delta(golden) diff.apply_delta(golden)
write_metadata_file(pathlib.Path(golden_path), golden) write_metadata_file(pathlib.Path(golden_path), golden)
else: else:
# Fallback: just overwrite the golden file
shutil.copyfile(str(output_path), str(golden_path)) shutil.copyfile(str(output_path), str(golden_path))
return return

View File

@ -149,6 +149,7 @@ class MCPDiff:
aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] # urn -> aspect -> diff aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] # urn -> aspect -> diff
urns_added: Set[str] urns_added: Set[str]
urns_removed: Set[str] urns_removed: Set[str]
is_delta_valid: bool
def __bool__(self) -> bool: def __bool__(self) -> bool:
return bool(self.aspect_changes) return bool(self.aspect_changes)
@ -162,25 +163,31 @@ class MCPDiff:
) -> "MCPDiff": ) -> "MCPDiff":
ignore_paths = [cls.convert_path(path) for path in ignore_paths] ignore_paths = [cls.convert_path(path) for path in ignore_paths]
is_delta_valid = True
aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] = defaultdict(dict) aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] = defaultdict(dict)
for urn in golden.keys() | output.keys(): for urn in golden.keys() | output.keys():
golden_map = golden.get(urn, {}) golden_map = golden.get(urn, {})
output_map = output.get(urn, {}) output_map = output.get(urn, {})
for aspect_name in golden_map.keys() | output_map.keys(): for aspect_name in golden_map.keys() | output_map.keys():
t1 = golden_map.get(aspect_name, [])
t2 = output_map.get(aspect_name, [])
diff = DeepDiff( diff = DeepDiff(
t1=golden_map.get(aspect_name, []), t1=t1,
t2=output_map.get(aspect_name, []), t2=t2,
exclude_regex_paths=ignore_paths, exclude_regex_paths=ignore_paths,
ignore_order=True, ignore_order=True,
custom_operators=[DeltaInfoOperator()], custom_operators=[DeltaInfoOperator()],
) )
if diff: if diff:
aspect_changes[urn][aspect_name] = MCPAspectDiff.create(diff) aspect_changes[urn][aspect_name] = MCPAspectDiff.create(diff)
if len(t1) > 1 or len(t2) > 1:
is_delta_valid = False
return cls( return cls(
urns_added=output.keys() - golden.keys(), urns_added=output.keys() - golden.keys(),
urns_removed=golden.keys() - output.keys(), urns_removed=golden.keys() - output.keys(),
aspect_changes=aspect_changes, aspect_changes=aspect_changes,
is_delta_valid=is_delta_valid,
) )
@staticmethod @staticmethod
@ -193,9 +200,13 @@ class MCPDiff:
) )
def apply_delta(self, golden: List[Dict[str, Any]]) -> None: def apply_delta(self, golden: List[Dict[str, Any]]) -> None:
"""Update a golden file to match an output file based on the diff.
:param golden: Golden file represented as a list of MCPs, altered in-place.
"""
aspect_diffs = [v for d in self.aspect_changes.values() for v in d.values()] aspect_diffs = [v for d in self.aspect_changes.values() for v in d.values()]
for aspect_diff in aspect_diffs: for aspect_diff in aspect_diffs:
for (_, old, new), diffs in aspect_diff.aspects_changed.items(): for (_, old, new) in aspect_diff.aspects_changed.keys():
golden[old.delta_info.idx] = new.delta_info.original golden[old.delta_info.idx] = new.delta_info.original
indices_to_remove = set() indices_to_remove = set()
@ -268,7 +279,7 @@ class MCPDiff:
suffix = "rd" suffix = "rd"
else: else:
suffix = "th" suffix = "th"
ordinal = f"{(idx+1)}{suffix} " if idx else "" ordinal = f"{(idx + 1)}{suffix} " if idx else ""
return f"{ordinal}<{ga.aspect_name}> {msg}" return f"{ordinal}<{ga.aspect_name}> {msg}"
@staticmethod @staticmethod