From 85b6983eee18fca84d2752b379fb3d6ddd511650 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 23 Apr 2024 09:37:43 +0530 Subject: [PATCH] Fix #15062 & #14810: Fix Column level lineage overwrites pipeline Lineage & manual col lineage (#15897) --- ingestion/src/metadata/cli/lineage.py | 6 +- .../metadata/ingestion/lineage/sql_lineage.py | 3 +- .../ingestion/models/patch_request.py | 6 +- .../ingestion/ometa/mixins/lineage_mixin.py | 168 +++++++++++++++++- .../metadata/ingestion/sink/metadata_rest.py | 2 +- ingestion/src/metadata/utils/lru_cache.py | 2 + .../ometa/test_ometa_lineage_api.py | 121 ++++++++++++- .../service/jdbi3/CollectionDAO.java | 9 + .../service/jdbi3/LineageRepository.java | 56 ++++++ .../resources/lineage/LineageResource.java | 81 +++++++++ 10 files changed, 435 insertions(+), 19 deletions(-) diff --git a/ingestion/src/metadata/cli/lineage.py b/ingestion/src/metadata/cli/lineage.py index c1dcbd06e29..b035ff60162 100644 --- a/ingestion/src/metadata/cli/lineage.py +++ b/ingestion/src/metadata/cli/lineage.py @@ -33,6 +33,7 @@ logger = cli_logger() class LineageWorkflow(BaseModel): filePath: Optional[str] query: Optional[str] + checkPatch: Optional[bool] = True serviceName: str workflowConfig: WorkflowConfig parseTimeout: Optional[int] = 5 * 60 # default parsing timeout to be 5 mins @@ -67,7 +68,10 @@ def run_lineage(config_path: Path) -> None: ) if service: metadata.add_lineage_by_query( - database_service=service, timeout=workflow.parseTimeout, sql=sql + database_service=service, + timeout=workflow.parseTimeout, + sql=sql, + check_patch=workflow.checkPatch, ) else: logger.error(f"Service not found with name {workflow.serviceName}") diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index b1e3b989a5f..29602cbf737 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -37,10 +37,9 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.logger import utils_logger -from metadata.utils.lru_cache import LRUCache +from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache logger = utils_logger() -LRU_CACHE_SIZE = 4096 DEFAULT_SCHEMA_NAME = "" diff --git a/ingestion/src/metadata/ingestion/models/patch_request.py b/ingestion/src/metadata/ingestion/models/patch_request.py index 3538ed72c4f..08b42eb721e 100644 --- a/ingestion/src/metadata/ingestion/models/patch_request.py +++ b/ingestion/src/metadata/ingestion/models/patch_request.py @@ -311,6 +311,7 @@ def build_patch( allowed_fields: Optional[Dict] = None, restrict_update_fields: Optional[List] = None, array_entity_fields: Optional[List] = None, + remove_change_description: bool = True, ) -> Optional[jsonpatch.JsonPatch]: """ Given an Entity type and Source entity and Destination entity, @@ -327,8 +328,9 @@ def build_patch( """ # remove change descriptions from entities - source = _remove_change_description(source) - destination = _remove_change_description(destination) + if remove_change_description: + source = _remove_change_description(source) + destination = _remove_change_description(destination) if array_entity_fields: _sort_array_entity_fields( diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index cd8d55d795f..aac169f1515 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -14,24 +14,31 @@ Mixin class containing Lineage specific methods To be used by OpenMetadata class """ import traceback -from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union +from copy import deepcopy +from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union from pydantic import BaseModel from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT +from metadata.ingestion.models.patch_request import build_patch from metadata.ingestion.ometa.client import REST, APIError from metadata.ingestion.ometa.utils import get_entity_type from metadata.utils.logger import ometa_logger +from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache logger = ometa_logger() T = TypeVar("T", bound=BaseModel) +search_cache = LRUCache(LRU_CACHE_SIZE) + + class OMetaLineageMixin(Generic[T]): """ OpenMetadata API methods related to Lineage. @@ -41,13 +48,91 @@ class OMetaLineageMixin(Generic[T]): client: REST - def add_lineage(self, data: AddLineageRequest) -> Dict[str, Any]: + def _merge_column_lineage( + self, original: List[Dict[str, Any]], updated: List[Dict[str, Any]] + ): + temp_result = [] + for column in original or []: + temp_result.append((*column.get("fromColumns", []), column.get("toColumn"))) + for column in updated or []: + data = column.dict() + temp_result.append((*data.get("fromColumns", []), data.get("toColumn"))) + return [ + {"fromColumns": list(col_data[:-1]), "toColumn": col_data[-1]} + for col_data in set(temp_result) + ] + + def _update_cache(self, request: AddLineageRequest, response: Dict[str, Any]): + try: + for res in response.get("downstreamEdges", []): + if str(request.edge.toEntity.id.__root__) == res.get("toEntity"): + search_cache.put( + ( + request.edge.fromEntity.id.__root__, + request.edge.toEntity.id.__root__, + ), + {"edge": res.get("lineageDetails")}, + ) + return + except Exception as e: + logger.debug(f"Error while updating cache: {e}") + + # discard the cache if failed to update + search_cache.put( + ( + request.edge.fromEntity.id.__root__, + request.edge.toEntity.id.__root__, + ), + None, + ) + + def add_lineage( + self, data: AddLineageRequest, check_patch: bool = False + ) -> Dict[str, Any]: """ Add lineage relationship between two entities and returns the entity information of the origin node """ try: - self.client.put(self.get_suffix(AddLineageRequest), data=data.json()) + patch_op_success = False + if check_patch and data.edge.lineageDetails: + from_id = data.edge.fromEntity.id.__root__ + to_id = data.edge.toEntity.id.__root__ + edge = self.get_lineage_edge(from_id, to_id) + if edge: + original: AddLineageRequest = deepcopy(data) + original.edge.lineageDetails.columnsLineage = edge["edge"].get( + "columnsLineage", [] + ) + original.edge.lineageDetails.pipeline = ( + EntityReference(**edge["edge"].get("pipeline")) + if edge["edge"].get("pipeline") + else None + ) + # merge the original and new column level lineage + data.edge.lineageDetails.columnsLineage = ( + self._merge_column_lineage( + original.edge.lineageDetails.columnsLineage, + data.edge.lineageDetails.columnsLineage, + ) + ) + + # Keep the pipeline information from the original + # lineage if available + if ( + original.edge.lineageDetails.pipeline + and not data.edge.lineageDetails.pipeline + ): + data.edge.lineageDetails.pipeline = ( + original.edge.lineageDetails.pipeline + ) + patch = self.patch_lineage_edge(original=original, updated=data) + if patch is not None: + patch_op_success = True + + if patch_op_success is False: + self.client.put(self.get_suffix(AddLineageRequest), data=data.json()) + except APIError as err: logger.debug(traceback.format_exc()) logger.error( @@ -62,8 +147,80 @@ class OMetaLineageMixin(Generic[T]): data.edge.fromEntity.type, str(data.edge.fromEntity.id.__root__) ) + self._update_cache(data, from_entity_lineage) return from_entity_lineage + def get_lineage_edge( + self, + from_id: str, + to_id: str, + ) -> Optional[Dict[str, Any]]: + """ + Get the lineage edge between two entities. + + Args: + from_id (str): The ID of the source entity. + to_id (str): The ID of the target entity. + + Returns: + Optional[Dict[str, Any]]: The lineage edge if found, None otherwise. + """ + try: + if (from_id, to_id) in search_cache: + return search_cache.get((from_id, to_id)) + res = self.client.get( + f"{self.get_suffix(AddLineageRequest)}/getLineageEdge/" + f"{from_id}/{to_id}" + ) + search_cache.put((from_id, to_id), res) + return res + except APIError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error {err.status_code} trying to GET linage edge between " + f"{from_id} and {to_id}: {err}" + ) + return None + + def patch_lineage_edge( + self, + original: AddLineageRequest, + updated: AddLineageRequest, + ) -> Optional[str]: + """ + Patches a lineage edge between two entities. + + Args: + original (AddLineageRequest): The original lineage request. + updated (AddLineageRequest): The updated lineage request. + + Returns: + bool: True if the patch operation is successful, False otherwise. + """ + try: + allowed_fields = {"columnsLineage": True, "pipeline": True} + patch = build_patch( + source=original.edge.lineageDetails, + destination=updated.edge.lineageDetails, + allowed_fields=allowed_fields, + remove_change_description=False, + ) + if patch: + self.client.patch( + f"{self.get_suffix(AddLineageRequest)}/{original.edge.fromEntity.type}/" + f"{original.edge.fromEntity.id.__root__}/{original.edge.toEntity.type}" + f"/{original.edge.toEntity.id.__root__}", + data=str(patch), + ) + return str(patch) + except APIError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error Patching Lineage Edge {err.status_code} " + f"for {original.edge.fromEntity.fullyQualifiedName}" + ) + return None + def get_lineage_by_id( self, entity: Union[Type[T], str], @@ -155,6 +312,7 @@ class OMetaLineageMixin(Generic[T]): database_name: str = None, schema_name: str = None, timeout: int = LINEAGE_PARSING_TIMEOUT, + check_patch: bool = False, ) -> None: """ Method parses the query and generated the lineage @@ -178,7 +336,9 @@ class OMetaLineageMixin(Generic[T]): ) for lineage_request in add_lineage_request or []: if lineage_request.right: - resp = self.add_lineage(lineage_request.right) + resp = self.add_lineage( + lineage_request.right, check_patch=check_patch + ) entity_name = resp.get("entity", {}).get("name") for node in resp.get("nodes", []): logger.info( diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 0cfc09e476e..2c4dfe61a5a 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -246,7 +246,7 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods @_run_dispatch.register def write_lineage(self, add_lineage: AddLineageRequest) -> Either[Dict[str, Any]]: - created_lineage = self.metadata.add_lineage(add_lineage) + created_lineage = self.metadata.add_lineage(add_lineage, check_patch=True) return Either(right=created_lineage["entity"]["fullyQualifiedName"]) def _create_role(self, create_role: CreateRoleRequest) -> Optional[Role]: diff --git a/ingestion/src/metadata/utils/lru_cache.py b/ingestion/src/metadata/utils/lru_cache.py index cbc83e26903..1276807ae95 100644 --- a/ingestion/src/metadata/utils/lru_cache.py +++ b/ingestion/src/metadata/utils/lru_cache.py @@ -15,6 +15,8 @@ LRU cache from collections import OrderedDict +LRU_CACHE_SIZE = 4096 + class LRUCache: """Least Recently Used cache""" diff --git a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py index 2f132b552f7..418c7fd1e8d 100644 --- a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py @@ -56,7 +56,11 @@ from metadata.generated.schema.entity.services.pipelineService import ( from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -131,13 +135,27 @@ class OMetaLineageTest(TestCase): create_schema_entity = cls.metadata.create_or_update(data=create_schema) - cls.table = CreateTableRequest( - name="test", + cls.table1 = CreateTableRequest( + name="table1", databaseSchema=create_schema_entity.fullyQualifiedName, - columns=[Column(name="id", dataType=DataType.BIGINT)], + columns=[ + Column(name="id", dataType=DataType.BIGINT), + Column(name="name", dataType=DataType.STRING), + ], ) - cls.table_entity = cls.metadata.create_or_update(data=cls.table) + cls.table1_entity = cls.metadata.create_or_update(data=cls.table1) + + cls.table2 = CreateTableRequest( + name="table2", + databaseSchema=create_schema_entity.fullyQualifiedName, + columns=[ + Column(name="id", dataType=DataType.BIGINT), + Column(name="name", dataType=DataType.STRING), + ], + ) + + cls.table2_entity = cls.metadata.create_or_update(data=cls.table2) cls.pipeline = CreatePipelineRequest( name="test", @@ -148,8 +166,8 @@ class OMetaLineageTest(TestCase): cls.create = AddLineageRequest( edge=EntitiesEdge( - fromEntity=EntityReference(id=cls.table_entity.id, type="table"), - toEntity=EntityReference(id=cls.pipeline_entity.id, type="pipeline"), + fromEntity=EntityReference(id=cls.table1_entity.id, type="table"), + toEntity=EntityReference(id=cls.table2_entity.id, type="table"), lineageDetails=LineageDetails(description="test lineage"), ), ) @@ -190,8 +208,8 @@ class OMetaLineageTest(TestCase): We can create a Lineage and get the origin node lineage info back """ - from_id = str(self.table_entity.id.__root__) - to_id = str(self.pipeline_entity.id.__root__) + from_id = str(self.table1_entity.id.__root__) + to_id = str(self.table2_entity.id.__root__) res = self.metadata.add_lineage(data=self.create) @@ -203,3 +221,88 @@ class OMetaLineageTest(TestCase): iter([node["id"] for node in res["nodes"] if node["id"] == to_id]), None ) assert node_id + + # Add Pipeline to the lineage edge + linage_request_1 = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=self.table1_entity.id, type="table"), + toEntity=EntityReference(id=self.table2_entity.id, type="table"), + lineageDetails=LineageDetails( + description="test lineage", + pipeline=EntityReference( + id=self.pipeline_entity.id, type="pipeline" + ), + ), + ), + ) + + res = self.metadata.add_lineage(data=linage_request_1, check_patch=True) + + res["entity"]["id"] = str(res["entity"]["id"]) + self.assertEqual(len(res["downstreamEdges"]), 1) + self.assertEqual( + res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"], + str(self.pipeline_entity.id.__root__), + ) + + # Add a column to the lineage edge + linage_request_2 = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=self.table1_entity.id, type="table"), + toEntity=EntityReference(id=self.table2_entity.id, type="table"), + lineageDetails=LineageDetails( + description="test lineage", + columnsLineage=[ + ColumnLineage( + fromColumns=[ + f"{self.table1_entity.fullyQualifiedName.__root__}.id" + ], + toColumn=f"{self.table2_entity.fullyQualifiedName.__root__}.id", + ) + ], + ), + ), + ) + + res = self.metadata.add_lineage(data=linage_request_2, check_patch=True) + + res["entity"]["id"] = str(res["entity"]["id"]) + self.assertEqual(len(res["downstreamEdges"]), 1) + self.assertEqual( + res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"], + str(self.pipeline_entity.id.__root__), + ) + self.assertEqual( + len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1 + ) + + # Add a new column to the lineage edge + linage_request_2 = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=self.table1_entity.id, type="table"), + toEntity=EntityReference(id=self.table2_entity.id, type="table"), + lineageDetails=LineageDetails( + description="test lineage", + columnsLineage=[ + ColumnLineage( + fromColumns=[ + f"{self.table1_entity.fullyQualifiedName.__root__}.name" + ], + toColumn=f"{self.table2_entity.fullyQualifiedName.__root__}.name", + ) + ], + ), + ), + ) + + res = self.metadata.add_lineage(data=linage_request_2, check_patch=True) + + res["entity"]["id"] = str(res["entity"]["id"]) + self.assertEqual(len(res["downstreamEdges"]), 1) + self.assertEqual( + res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"], + str(self.pipeline_entity.id.__root__), + ) + self.assertEqual( + len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2 + ) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index c27f3126039..05cb421b84f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -865,6 +865,15 @@ public interface CollectionDAO { int findIfAnyRelationExist( @Bind("fromEntity") String fromEntity, @Bind("toEntity") String toEntity); + @SqlQuery( + "SELECT json FROM entity_relationship WHERE fromId = :fromId " + + " AND toId = :toId " + + " AND relation = :relation ") + String getRelation( + @BindUUID("fromId") UUID fromId, + @BindUUID("toId") UUID toId, + @Bind("relation") int relation); + // // Delete Operations // diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 09defe6a272..e66868c5fbd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -13,6 +13,7 @@ package org.openmetadata.service.jdbi3; +import static javax.ws.rs.core.Response.Status.OK; import static org.openmetadata.service.Entity.CONTAINER; import static org.openmetadata.service.Entity.DASHBOARD; import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL; @@ -27,6 +28,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import javax.json.JsonPatch; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.transaction.Transaction; @@ -42,15 +45,18 @@ import org.openmetadata.schema.type.ColumnLineage; import org.openmetadata.schema.type.Edge; import org.openmetadata.schema.type.EntityLineage; import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.LineageDetails; import org.openmetadata.schema.type.Relationship; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; +import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord; import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.RestUtil; @Repository public class LineageRepository { @@ -356,6 +362,56 @@ public class LineageRepository { } } + public Response getLineageEdge(UUID fromId, UUID toId) { + String json = dao.relationshipDAO().getRelation(fromId, toId, Relationship.UPSTREAM.ordinal()); + if (json != null) { + Map responseMap = new HashMap<>(); + LineageDetails lineageDetails = JsonUtils.readValue(json, LineageDetails.class); + responseMap.put("edge", lineageDetails); + return Response.status(OK).entity(responseMap).build(); + } else { + throw new EntityNotFoundException( + "Lineage edge not found between " + fromId + " and " + " " + toId); + } + } + + public Response patchLineageEdge( + String fromEntity, UUID fromId, String toEntity, UUID toId, JsonPatch patch) { + EntityReference from = Entity.getEntityReferenceById(fromEntity, fromId, Include.NON_DELETED); + EntityReference to = Entity.getEntityReferenceById(toEntity, toId, Include.NON_DELETED); + String json = dao.relationshipDAO().getRelation(fromId, toId, Relationship.UPSTREAM.ordinal()); + + if (json != null) { + + LineageDetails original = JsonUtils.readValue(json, LineageDetails.class); + LineageDetails updated = JsonUtils.applyPatch(original, patch, LineageDetails.class); + if (updated.getPipeline() != null) { + // Validate pipeline entity + EntityReference pipeline = updated.getPipeline(); + pipeline = + Entity.getEntityReferenceById( + pipeline.getType(), pipeline.getId(), Include.NON_DELETED); + updated.withPipeline(pipeline); + } + String detailsJson = JsonUtils.pojoToJson(updated); + dao.relationshipDAO() + .insert(fromId, toId, fromEntity, toEntity, Relationship.UPSTREAM.ordinal(), detailsJson); + addLineageToSearch(from, to, updated); + return new RestUtil.PatchResponse<>(Response.Status.OK, updated, EventType.ENTITY_UPDATED) + .toResponse(); + } else { + throw new EntityNotFoundException( + "Lineage edge not found between " + + fromEntity + + " " + + fromId + + " and " + + toEntity + + " " + + toId); + } + } + private void getDownstreamLineage( UUID id, String entityType, EntityLineage lineage, int downstreamDepth) { if (downstreamDepth == 0) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java index 5d81fc92435..880b10c294b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java @@ -16,15 +16,20 @@ package org.openmetadata.service.resources.lineage; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import es.org.elasticsearch.action.search.SearchResponse; +import io.dropwizard.jersey.PATCH; import io.dropwizard.jersey.errors.ErrorMessage; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.ExampleObject; import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import java.io.IOException; import java.util.List; +import java.util.UUID; +import javax.json.JsonPatch; import javax.validation.Valid; import javax.validation.constraints.Max; import javax.validation.constraints.Min; @@ -225,6 +230,82 @@ public class LineageResource { return Response.status(Status.OK).build(); } + @GET + @Path("/getLineageEdge/{fromId}/{toId}") + @Operation( + operationId = "getLineageEdge", + summary = "Get a lineage edge", + description = + "Get a lineage edge with from entity as upstream node and to entity as downstream node.", + responses = { + @ApiResponse(responseCode = "200"), + @ApiResponse( + responseCode = "404", + description = "Entity for instance {fromId} is not found") + }) + public Response getLineageEdge( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Entity FQN", required = true, schema = @Schema(type = "string")) + @PathParam("fromId") + UUID fromId, + @Parameter(description = "Entity FQN", required = true, schema = @Schema(type = "string")) + @PathParam("toId") + UUID toId) { + return dao.getLineageEdge(fromId, toId); + } + + @PATCH + @Path("/{fromEntity}/{fromId}/{toEntity}/{toId}") + @Operation( + operationId = "patchLineageEdge", + summary = "Patch a lineage edge", + description = + "Patch a lineage edge with from entity as upstream node and to entity as downstream node.", + responses = { + @ApiResponse(responseCode = "200"), + @ApiResponse( + responseCode = "404", + description = "Entity for instance {fromId} is not found") + }) + @Consumes(MediaType.APPLICATION_JSON_PATCH_JSON) + public Response patchLineageEdge( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter( + description = "Entity type of upstream entity of the edge", + required = true, + schema = @Schema(type = "string", example = "table, report, metrics, or dashboard")) + @PathParam("fromEntity") + String fromEntity, + @Parameter(description = "Entity id", required = true, schema = @Schema(type = "string")) + @PathParam("fromId") + UUID fromId, + @Parameter( + description = "Entity type for downstream entity of the edge", + required = true, + schema = @Schema(type = "string", example = "table, report, metrics, or dashboard")) + @PathParam("toEntity") + String toEntity, + @Parameter(description = "Entity id", required = true, schema = @Schema(type = "string")) + @PathParam("toId") + UUID toId, + @RequestBody( + description = "JsonPatch with array of operations", + content = + @Content( + mediaType = MediaType.APPLICATION_JSON_PATCH_JSON, + examples = { + @ExampleObject("[{op:remove, path:/a},{op:add, path: /b, value: val}]") + })) + JsonPatch patch) { + authorizer.authorize( + securityContext, + new OperationContext(LINEAGE_FIELD, MetadataOperation.EDIT_LINEAGE), + new LineageResourceContext()); + return dao.patchLineageEdge(fromEntity, fromId, toEntity, toId, patch); + } + @DELETE @Path("/{fromEntity}/{fromId}/{toEntity}/{toId}") @Operation(