From 27af481433f9baf0d5143b49590aa355ee0618b7 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:28:22 +0530 Subject: [PATCH] MINOR: Fix lineage PATCH API (#17264) --- .../ingestion/ometa/mixins/lineage_mixin.py | 25 ++++++++++++------- .../service/jdbi3/LineageRepository.java | 1 + 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index f6e180b7738..a3e1da1870c 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -52,11 +52,12 @@ class OMetaLineageMixin(Generic[T]): def _merge_column_lineage( self, original: List[Dict[str, Any]], updated: List[Dict[str, Any]] ): - temp_result = [] + flat_original_result = set() + flat_updated_result = set() try: for column in original or []: if column.get("toColumn") and column.get("fromColumns"): - temp_result.append( + flat_original_result.add( (*column.get("fromColumns", []), column.get("toColumn")) ) for column in updated or []: @@ -65,15 +66,18 @@ class OMetaLineageMixin(Generic[T]): else: data = column if data.get("toColumn") and data.get("fromColumns"): - temp_result.append( + flat_updated_result.add( (*data.get("fromColumns", []), data.get("toColumn")) ) except Exception as exc: logger.debug(f"Error while merging column lineage: {exc}") logger.debug(traceback.format_exc()) + union_result = flat_original_result.union(flat_updated_result) + if flat_original_result == union_result: + return original return [ {"fromColumns": list(col_data[:-1]), "toColumn": col_data[-1]} - for col_data in set(temp_result) + for col_data in union_result ] def _update_cache(self, request: AddLineageRequest, response: Dict[str, Any]): @@ -119,7 +123,10 @@ class OMetaLineageMixin(Generic[T]): "columnsLineage", [] ) original.edge.lineageDetails.pipeline = ( - EntityReference(**edge["edge"].get("pipeline")) + EntityReference( + id=edge["edge"]["pipeline"]["id"], + type=edge["edge"]["pipeline"]["type"], + ) if edge["edge"].get("pipeline") else None ) @@ -141,7 +148,7 @@ class OMetaLineageMixin(Generic[T]): original.edge.lineageDetails.pipeline ) patch = self.patch_lineage_edge(original=original, updated=data) - if patch is not None: + if patch: patch_op_success = True if patch_op_success is False: @@ -203,7 +210,7 @@ class OMetaLineageMixin(Generic[T]): self, original: AddLineageRequest, updated: AddLineageRequest, - ) -> Optional[str]: + ) -> Optional[bool]: """ Patches a lineage edge between two entities. @@ -229,14 +236,14 @@ class OMetaLineageMixin(Generic[T]): f"/{original.edge.toEntity.id.root}", data=str(patch), ) - return str(patch) + return True except APIError as err: logger.debug(traceback.format_exc()) logger.warning( f"Error Patching Lineage Edge {err.status_code} " f"for {original.edge.fromEntity.fullyQualifiedName}" ) - return None + return False def get_lineage_by_id( self, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 2d8513febbc..d73d05eb8b0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -204,6 +204,7 @@ public class LineageRepository { } else { pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL)); } + pipelineMap.remove("changeDescription"); relationshipDetails.put("pipelineEntityType", pipelineRef.getType()); relationshipDetails.put(PIPELINE, pipelineMap); }