MINOR: Fix lineage PATCH API (#17264)

This commit is contained in:
Mayur Singal 2024-08-01 17:28:22 +05:30 committed by GitHub
parent da6b2b7310
commit 27af481433
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 9 deletions

View File

@ -52,11 +52,12 @@ class OMetaLineageMixin(Generic[T]):
def _merge_column_lineage(
self, original: List[Dict[str, Any]], updated: List[Dict[str, Any]]
):
temp_result = []
flat_original_result = set()
flat_updated_result = set()
try:
for column in original or []:
if column.get("toColumn") and column.get("fromColumns"):
temp_result.append(
flat_original_result.add(
(*column.get("fromColumns", []), column.get("toColumn"))
)
for column in updated or []:
@ -65,15 +66,18 @@ class OMetaLineageMixin(Generic[T]):
else:
data = column
if data.get("toColumn") and data.get("fromColumns"):
temp_result.append(
flat_updated_result.add(
(*data.get("fromColumns", []), data.get("toColumn"))
)
except Exception as exc:
logger.debug(f"Error while merging column lineage: {exc}")
logger.debug(traceback.format_exc())
union_result = flat_original_result.union(flat_updated_result)
if flat_original_result == union_result:
return original
return [
{"fromColumns": list(col_data[:-1]), "toColumn": col_data[-1]}
for col_data in set(temp_result)
for col_data in union_result
]
def _update_cache(self, request: AddLineageRequest, response: Dict[str, Any]):
@ -119,7 +123,10 @@ class OMetaLineageMixin(Generic[T]):
"columnsLineage", []
)
original.edge.lineageDetails.pipeline = (
EntityReference(**edge["edge"].get("pipeline"))
EntityReference(
id=edge["edge"]["pipeline"]["id"],
type=edge["edge"]["pipeline"]["type"],
)
if edge["edge"].get("pipeline")
else None
)
@ -141,7 +148,7 @@ class OMetaLineageMixin(Generic[T]):
original.edge.lineageDetails.pipeline
)
patch = self.patch_lineage_edge(original=original, updated=data)
if patch is not None:
if patch:
patch_op_success = True
if patch_op_success is False:
@ -203,7 +210,7 @@ class OMetaLineageMixin(Generic[T]):
self,
original: AddLineageRequest,
updated: AddLineageRequest,
) -> Optional[str]:
) -> Optional[bool]:
"""
Patches a lineage edge between two entities.
@ -229,14 +236,14 @@ class OMetaLineageMixin(Generic[T]):
f"/{original.edge.toEntity.id.root}",
data=str(patch),
)
return str(patch)
return True
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error Patching Lineage Edge {err.status_code} "
f"for {original.edge.fromEntity.fullyQualifiedName}"
)
return None
return False
def get_lineage_by_id(
self,

View File

@ -204,6 +204,7 @@ public class LineageRepository {
} else {
pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL));
}
pipelineMap.remove("changeDescription");
relationshipDetails.put("pipelineEntityType", pipelineRef.getType());
relationshipDetails.put(PIPELINE, pipelineMap);
}