Fix #15062 & #14810: Fix Column level lineage overwrites pipeline Lineage & manual col lineage (#15897)

This commit is contained in:
Mayur Singal 2024-04-23 09:37:43 +05:30 committed by GitHub
parent 449a5f2de3
commit 85b6983eee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 435 additions and 19 deletions

View File

@ -33,6 +33,7 @@ logger = cli_logger()
class LineageWorkflow(BaseModel):
filePath: Optional[str]
query: Optional[str]
checkPatch: Optional[bool] = True
serviceName: str
workflowConfig: WorkflowConfig
parseTimeout: Optional[int] = 5 * 60 # default parsing timeout to be 5 mins
@ -67,7 +68,10 @@ def run_lineage(config_path: Path) -> None:
)
if service:
metadata.add_lineage_by_query(
database_service=service, timeout=workflow.parseTimeout, sql=sql
database_service=service,
timeout=workflow.parseTimeout,
sql=sql,
check_patch=workflow.checkPatch,
)
else:
logger.error(f"Service not found with name {workflow.serviceName}")

View File

@ -37,10 +37,9 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.logger import utils_logger
from metadata.utils.lru_cache import LRUCache
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
logger = utils_logger()
LRU_CACHE_SIZE = 4096
DEFAULT_SCHEMA_NAME = "<default>"

View File

@ -311,6 +311,7 @@ def build_patch(
allowed_fields: Optional[Dict] = None,
restrict_update_fields: Optional[List] = None,
array_entity_fields: Optional[List] = None,
remove_change_description: bool = True,
) -> Optional[jsonpatch.JsonPatch]:
"""
Given an Entity type and Source entity and Destination entity,
@ -327,8 +328,9 @@ def build_patch(
"""
# remove change descriptions from entities
source = _remove_change_description(source)
destination = _remove_change_description(destination)
if remove_change_description:
source = _remove_change_description(source)
destination = _remove_change_description(destination)
if array_entity_fields:
_sort_array_entity_fields(

View File

@ -14,24 +14,31 @@ Mixin class containing Lineage specific methods
To be used by OpenMetadata class
"""
import traceback
from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union
from copy import deepcopy
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT
from metadata.ingestion.models.patch_request import build_patch
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import get_entity_type
from metadata.utils.logger import ometa_logger
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
logger = ometa_logger()
T = TypeVar("T", bound=BaseModel)
search_cache = LRUCache(LRU_CACHE_SIZE)
class OMetaLineageMixin(Generic[T]):
"""
OpenMetadata API methods related to Lineage.
@ -41,13 +48,91 @@ class OMetaLineageMixin(Generic[T]):
client: REST
def add_lineage(self, data: AddLineageRequest) -> Dict[str, Any]:
def _merge_column_lineage(
self, original: List[Dict[str, Any]], updated: List[Dict[str, Any]]
):
temp_result = []
for column in original or []:
temp_result.append((*column.get("fromColumns", []), column.get("toColumn")))
for column in updated or []:
data = column.dict()
temp_result.append((*data.get("fromColumns", []), data.get("toColumn")))
return [
{"fromColumns": list(col_data[:-1]), "toColumn": col_data[-1]}
for col_data in set(temp_result)
]
def _update_cache(self, request: AddLineageRequest, response: Dict[str, Any]):
try:
for res in response.get("downstreamEdges", []):
if str(request.edge.toEntity.id.__root__) == res.get("toEntity"):
search_cache.put(
(
request.edge.fromEntity.id.__root__,
request.edge.toEntity.id.__root__,
),
{"edge": res.get("lineageDetails")},
)
return
except Exception as e:
logger.debug(f"Error while updating cache: {e}")
# discard the cache if failed to update
search_cache.put(
(
request.edge.fromEntity.id.__root__,
request.edge.toEntity.id.__root__,
),
None,
)
def add_lineage(
self, data: AddLineageRequest, check_patch: bool = False
) -> Dict[str, Any]:
"""
Add lineage relationship between two entities and returns
the entity information of the origin node
"""
try:
self.client.put(self.get_suffix(AddLineageRequest), data=data.json())
patch_op_success = False
if check_patch and data.edge.lineageDetails:
from_id = data.edge.fromEntity.id.__root__
to_id = data.edge.toEntity.id.__root__
edge = self.get_lineage_edge(from_id, to_id)
if edge:
original: AddLineageRequest = deepcopy(data)
original.edge.lineageDetails.columnsLineage = edge["edge"].get(
"columnsLineage", []
)
original.edge.lineageDetails.pipeline = (
EntityReference(**edge["edge"].get("pipeline"))
if edge["edge"].get("pipeline")
else None
)
# merge the original and new column level lineage
data.edge.lineageDetails.columnsLineage = (
self._merge_column_lineage(
original.edge.lineageDetails.columnsLineage,
data.edge.lineageDetails.columnsLineage,
)
)
# Keep the pipeline information from the original
# lineage if available
if (
original.edge.lineageDetails.pipeline
and not data.edge.lineageDetails.pipeline
):
data.edge.lineageDetails.pipeline = (
original.edge.lineageDetails.pipeline
)
patch = self.patch_lineage_edge(original=original, updated=data)
if patch is not None:
patch_op_success = True
if patch_op_success is False:
self.client.put(self.get_suffix(AddLineageRequest), data=data.json())
except APIError as err:
logger.debug(traceback.format_exc())
logger.error(
@ -62,8 +147,80 @@ class OMetaLineageMixin(Generic[T]):
data.edge.fromEntity.type, str(data.edge.fromEntity.id.__root__)
)
self._update_cache(data, from_entity_lineage)
return from_entity_lineage
def get_lineage_edge(
self,
from_id: str,
to_id: str,
) -> Optional[Dict[str, Any]]:
"""
Get the lineage edge between two entities.
Args:
from_id (str): The ID of the source entity.
to_id (str): The ID of the target entity.
Returns:
Optional[Dict[str, Any]]: The lineage edge if found, None otherwise.
"""
try:
if (from_id, to_id) in search_cache:
return search_cache.get((from_id, to_id))
res = self.client.get(
f"{self.get_suffix(AddLineageRequest)}/getLineageEdge/"
f"{from_id}/{to_id}"
)
search_cache.put((from_id, to_id), res)
return res
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error {err.status_code} trying to GET linage edge between "
f"{from_id} and {to_id}: {err}"
)
return None
def patch_lineage_edge(
self,
original: AddLineageRequest,
updated: AddLineageRequest,
) -> Optional[str]:
"""
Patches a lineage edge between two entities.
Args:
original (AddLineageRequest): The original lineage request.
updated (AddLineageRequest): The updated lineage request.
Returns:
bool: True if the patch operation is successful, False otherwise.
"""
try:
allowed_fields = {"columnsLineage": True, "pipeline": True}
patch = build_patch(
source=original.edge.lineageDetails,
destination=updated.edge.lineageDetails,
allowed_fields=allowed_fields,
remove_change_description=False,
)
if patch:
self.client.patch(
f"{self.get_suffix(AddLineageRequest)}/{original.edge.fromEntity.type}/"
f"{original.edge.fromEntity.id.__root__}/{original.edge.toEntity.type}"
f"/{original.edge.toEntity.id.__root__}",
data=str(patch),
)
return str(patch)
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error Patching Lineage Edge {err.status_code} "
f"for {original.edge.fromEntity.fullyQualifiedName}"
)
return None
def get_lineage_by_id(
self,
entity: Union[Type[T], str],
@ -155,6 +312,7 @@ class OMetaLineageMixin(Generic[T]):
database_name: str = None,
schema_name: str = None,
timeout: int = LINEAGE_PARSING_TIMEOUT,
check_patch: bool = False,
) -> None:
"""
Method parses the query and generated the lineage
@ -178,7 +336,9 @@ class OMetaLineageMixin(Generic[T]):
)
for lineage_request in add_lineage_request or []:
if lineage_request.right:
resp = self.add_lineage(lineage_request.right)
resp = self.add_lineage(
lineage_request.right, check_patch=check_patch
)
entity_name = resp.get("entity", {}).get("name")
for node in resp.get("nodes", []):
logger.info(

View File

@ -246,7 +246,7 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods
@_run_dispatch.register
def write_lineage(self, add_lineage: AddLineageRequest) -> Either[Dict[str, Any]]:
created_lineage = self.metadata.add_lineage(add_lineage)
created_lineage = self.metadata.add_lineage(add_lineage, check_patch=True)
return Either(right=created_lineage["entity"]["fullyQualifiedName"])
def _create_role(self, create_role: CreateRoleRequest) -> Optional[Role]:

View File

@ -15,6 +15,8 @@ LRU cache
from collections import OrderedDict
LRU_CACHE_SIZE = 4096
class LRUCache:
"""Least Recently Used cache"""

View File

@ -56,7 +56,11 @@ from metadata.generated.schema.entity.services.pipelineService import (
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -131,13 +135,27 @@ class OMetaLineageTest(TestCase):
create_schema_entity = cls.metadata.create_or_update(data=create_schema)
cls.table = CreateTableRequest(
name="test",
cls.table1 = CreateTableRequest(
name="table1",
databaseSchema=create_schema_entity.fullyQualifiedName,
columns=[Column(name="id", dataType=DataType.BIGINT)],
columns=[
Column(name="id", dataType=DataType.BIGINT),
Column(name="name", dataType=DataType.STRING),
],
)
cls.table_entity = cls.metadata.create_or_update(data=cls.table)
cls.table1_entity = cls.metadata.create_or_update(data=cls.table1)
cls.table2 = CreateTableRequest(
name="table2",
databaseSchema=create_schema_entity.fullyQualifiedName,
columns=[
Column(name="id", dataType=DataType.BIGINT),
Column(name="name", dataType=DataType.STRING),
],
)
cls.table2_entity = cls.metadata.create_or_update(data=cls.table2)
cls.pipeline = CreatePipelineRequest(
name="test",
@ -148,8 +166,8 @@ class OMetaLineageTest(TestCase):
cls.create = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=cls.table_entity.id, type="table"),
toEntity=EntityReference(id=cls.pipeline_entity.id, type="pipeline"),
fromEntity=EntityReference(id=cls.table1_entity.id, type="table"),
toEntity=EntityReference(id=cls.table2_entity.id, type="table"),
lineageDetails=LineageDetails(description="test lineage"),
),
)
@ -190,8 +208,8 @@ class OMetaLineageTest(TestCase):
We can create a Lineage and get the origin node lineage info back
"""
from_id = str(self.table_entity.id.__root__)
to_id = str(self.pipeline_entity.id.__root__)
from_id = str(self.table1_entity.id.__root__)
to_id = str(self.table2_entity.id.__root__)
res = self.metadata.add_lineage(data=self.create)
@ -203,3 +221,88 @@ class OMetaLineageTest(TestCase):
iter([node["id"] for node in res["nodes"] if node["id"] == to_id]), None
)
assert node_id
# Add Pipeline to the lineage edge
linage_request_1 = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
toEntity=EntityReference(id=self.table2_entity.id, type="table"),
lineageDetails=LineageDetails(
description="test lineage",
pipeline=EntityReference(
id=self.pipeline_entity.id, type="pipeline"
),
),
),
)
res = self.metadata.add_lineage(data=linage_request_1, check_patch=True)
res["entity"]["id"] = str(res["entity"]["id"])
self.assertEqual(len(res["downstreamEdges"]), 1)
self.assertEqual(
res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
str(self.pipeline_entity.id.__root__),
)
# Add a column to the lineage edge
linage_request_2 = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
toEntity=EntityReference(id=self.table2_entity.id, type="table"),
lineageDetails=LineageDetails(
description="test lineage",
columnsLineage=[
ColumnLineage(
fromColumns=[
f"{self.table1_entity.fullyQualifiedName.__root__}.id"
],
toColumn=f"{self.table2_entity.fullyQualifiedName.__root__}.id",
)
],
),
),
)
res = self.metadata.add_lineage(data=linage_request_2, check_patch=True)
res["entity"]["id"] = str(res["entity"]["id"])
self.assertEqual(len(res["downstreamEdges"]), 1)
self.assertEqual(
res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
str(self.pipeline_entity.id.__root__),
)
self.assertEqual(
len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1
)
# Add a new column to the lineage edge
linage_request_2 = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
toEntity=EntityReference(id=self.table2_entity.id, type="table"),
lineageDetails=LineageDetails(
description="test lineage",
columnsLineage=[
ColumnLineage(
fromColumns=[
f"{self.table1_entity.fullyQualifiedName.__root__}.name"
],
toColumn=f"{self.table2_entity.fullyQualifiedName.__root__}.name",
)
],
),
),
)
res = self.metadata.add_lineage(data=linage_request_2, check_patch=True)
res["entity"]["id"] = str(res["entity"]["id"])
self.assertEqual(len(res["downstreamEdges"]), 1)
self.assertEqual(
res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
str(self.pipeline_entity.id.__root__),
)
self.assertEqual(
len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2
)

View File

@ -865,6 +865,15 @@ public interface CollectionDAO {
int findIfAnyRelationExist(
@Bind("fromEntity") String fromEntity, @Bind("toEntity") String toEntity);
@SqlQuery(
"SELECT json FROM entity_relationship WHERE fromId = :fromId "
+ " AND toId = :toId "
+ " AND relation = :relation ")
String getRelation(
@BindUUID("fromId") UUID fromId,
@BindUUID("toId") UUID toId,
@Bind("relation") int relation);
//
// Delete Operations
//

View File

@ -13,6 +13,7 @@
package org.openmetadata.service.jdbi3;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.service.Entity.CONTAINER;
import static org.openmetadata.service.Entity.DASHBOARD;
import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL;
@ -27,6 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.JsonPatch;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
@ -42,15 +45,18 @@ import org.openmetadata.schema.type.ColumnLineage;
import org.openmetadata.schema.type.Edge;
import org.openmetadata.schema.type.EntityLineage;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
@Repository
public class LineageRepository {
@ -356,6 +362,56 @@ public class LineageRepository {
}
}
public Response getLineageEdge(UUID fromId, UUID toId) {
String json = dao.relationshipDAO().getRelation(fromId, toId, Relationship.UPSTREAM.ordinal());
if (json != null) {
Map<String, Object> responseMap = new HashMap<>();
LineageDetails lineageDetails = JsonUtils.readValue(json, LineageDetails.class);
responseMap.put("edge", lineageDetails);
return Response.status(OK).entity(responseMap).build();
} else {
throw new EntityNotFoundException(
"Lineage edge not found between " + fromId + " and " + " " + toId);
}
}
public Response patchLineageEdge(
String fromEntity, UUID fromId, String toEntity, UUID toId, JsonPatch patch) {
EntityReference from = Entity.getEntityReferenceById(fromEntity, fromId, Include.NON_DELETED);
EntityReference to = Entity.getEntityReferenceById(toEntity, toId, Include.NON_DELETED);
String json = dao.relationshipDAO().getRelation(fromId, toId, Relationship.UPSTREAM.ordinal());
if (json != null) {
LineageDetails original = JsonUtils.readValue(json, LineageDetails.class);
LineageDetails updated = JsonUtils.applyPatch(original, patch, LineageDetails.class);
if (updated.getPipeline() != null) {
// Validate pipeline entity
EntityReference pipeline = updated.getPipeline();
pipeline =
Entity.getEntityReferenceById(
pipeline.getType(), pipeline.getId(), Include.NON_DELETED);
updated.withPipeline(pipeline);
}
String detailsJson = JsonUtils.pojoToJson(updated);
dao.relationshipDAO()
.insert(fromId, toId, fromEntity, toEntity, Relationship.UPSTREAM.ordinal(), detailsJson);
addLineageToSearch(from, to, updated);
return new RestUtil.PatchResponse<>(Response.Status.OK, updated, EventType.ENTITY_UPDATED)
.toResponse();
} else {
throw new EntityNotFoundException(
"Lineage edge not found between "
+ fromEntity
+ " "
+ fromId
+ " and "
+ toEntity
+ " "
+ toId);
}
}
private void getDownstreamLineage(
UUID id, String entityType, EntityLineage lineage, int downstreamDepth) {
if (downstreamDepth == 0) {

View File

@ -16,15 +16,20 @@ package org.openmetadata.service.resources.lineage;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import es.org.elasticsearch.action.search.SearchResponse;
import io.dropwizard.jersey.PATCH;
import io.dropwizard.jersey.errors.ErrorMessage;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
@ -225,6 +230,82 @@ public class LineageResource {
return Response.status(Status.OK).build();
}
@GET
@Path("/getLineageEdge/{fromId}/{toId}")
@Operation(
operationId = "getLineageEdge",
summary = "Get a lineage edge",
description =
"Get a lineage edge with from entity as upstream node and to entity as downstream node.",
responses = {
@ApiResponse(responseCode = "200"),
@ApiResponse(
responseCode = "404",
description = "Entity for instance {fromId} is not found")
})
public Response getLineageEdge(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Entity FQN", required = true, schema = @Schema(type = "string"))
@PathParam("fromId")
UUID fromId,
@Parameter(description = "Entity FQN", required = true, schema = @Schema(type = "string"))
@PathParam("toId")
UUID toId) {
return dao.getLineageEdge(fromId, toId);
}
@PATCH
@Path("/{fromEntity}/{fromId}/{toEntity}/{toId}")
@Operation(
operationId = "patchLineageEdge",
summary = "Patch a lineage edge",
description =
"Patch a lineage edge with from entity as upstream node and to entity as downstream node.",
responses = {
@ApiResponse(responseCode = "200"),
@ApiResponse(
responseCode = "404",
description = "Entity for instance {fromId} is not found")
})
@Consumes(MediaType.APPLICATION_JSON_PATCH_JSON)
public Response patchLineageEdge(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(
description = "Entity type of upstream entity of the edge",
required = true,
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard"))
@PathParam("fromEntity")
String fromEntity,
@Parameter(description = "Entity id", required = true, schema = @Schema(type = "string"))
@PathParam("fromId")
UUID fromId,
@Parameter(
description = "Entity type for downstream entity of the edge",
required = true,
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard"))
@PathParam("toEntity")
String toEntity,
@Parameter(description = "Entity id", required = true, schema = @Schema(type = "string"))
@PathParam("toId")
UUID toId,
@RequestBody(
description = "JsonPatch with array of operations",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_PATCH_JSON,
examples = {
@ExampleObject("[{op:remove, path:/a},{op:add, path: /b, value: val}]")
}))
JsonPatch patch) {
authorizer.authorize(
securityContext,
new OperationContext(LINEAGE_FIELD, MetadataOperation.EDIT_LINEAGE),
new LineageResourceContext());
return dao.patchLineageEdge(fromEntity, fromId, toEntity, toId, patch);
}
@DELETE
@Path("/{fromEntity}/{fromId}/{toEntity}/{toId}")
@Operation(