mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-09 15:32:25 +00:00
Issue2147 - Add support for deleting edge in lineage (#2164)
* Restore deleted entity on PATCH requests * Add support for deleting edge in lineage
This commit is contained in:
parent
7b24e31973
commit
4fc9db8847
10
.idea/runConfigurations.xml
generated
10
.idea/runConfigurations.xml
generated
@ -1,10 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project version="4">
|
|
||||||
<component name="RunConfigurationProducerService">
|
|
||||||
<option name="ignoredProducers">
|
|
||||||
<set>
|
|
||||||
<option value="com.android.tools.idea.compose.preview.runconfiguration.ComposePreviewRunConfigurationProducer" />
|
|
||||||
</set>
|
|
||||||
</option>
|
|
||||||
</component>
|
|
||||||
</project>
|
|
||||||
@ -408,7 +408,7 @@ public interface CollectionDAO {
|
|||||||
"DELETE from entity_relationship WHERE fromId = :fromId "
|
"DELETE from entity_relationship WHERE fromId = :fromId "
|
||||||
+ "AND fromEntity = :fromEntity AND toId = :toId AND toEntity = :toEntity "
|
+ "AND fromEntity = :fromEntity AND toId = :toId AND toEntity = :toEntity "
|
||||||
+ "AND relation = :relation")
|
+ "AND relation = :relation")
|
||||||
void delete(
|
int delete(
|
||||||
@Bind("fromId") String fromId,
|
@Bind("fromId") String fromId,
|
||||||
@Bind("fromEntity") String fromEntity,
|
@Bind("fromEntity") String fromEntity,
|
||||||
@Bind("toId") String toId,
|
@Bind("toId") String toId,
|
||||||
@ -419,7 +419,7 @@ public interface CollectionDAO {
|
|||||||
@SqlUpdate(
|
@SqlUpdate(
|
||||||
"DELETE from entity_relationship WHERE fromId = :fromId AND fromEntity = :fromEntity "
|
"DELETE from entity_relationship WHERE fromId = :fromId AND fromEntity = :fromEntity "
|
||||||
+ "AND relation = :relation AND toEntity = :toEntity")
|
+ "AND relation = :relation AND toEntity = :toEntity")
|
||||||
void deleteFrom(
|
int deleteFrom(
|
||||||
@Bind("fromId") String fromId,
|
@Bind("fromId") String fromId,
|
||||||
@Bind("fromEntity") String fromEntity,
|
@Bind("fromEntity") String fromEntity,
|
||||||
@Bind("relation") int relation,
|
@Bind("relation") int relation,
|
||||||
@ -429,14 +429,14 @@ public interface CollectionDAO {
|
|||||||
@SqlUpdate(
|
@SqlUpdate(
|
||||||
"DELETE from entity_relationship WHERE fromId = :fromId AND fromEntity = :fromEntity "
|
"DELETE from entity_relationship WHERE fromId = :fromId AND fromEntity = :fromEntity "
|
||||||
+ "AND relation = :relation")
|
+ "AND relation = :relation")
|
||||||
void deleteFrom(
|
int deleteFrom(
|
||||||
@Bind("fromId") String fromId, @Bind("fromEntity") String fromEntity, @Bind("relation") int relation);
|
@Bind("fromId") String fromId, @Bind("fromEntity") String fromEntity, @Bind("relation") int relation);
|
||||||
|
|
||||||
// Delete all the entity relationship toId <-- relation -- entity of type fromEntity
|
// Delete all the entity relationship toId <-- relation -- entity of type fromEntity
|
||||||
@SqlUpdate(
|
@SqlUpdate(
|
||||||
"DELETE from entity_relationship WHERE toId = :toId AND toEntity = :toEntity AND relation = :relation "
|
"DELETE from entity_relationship WHERE toId = :toId AND toEntity = :toEntity AND relation = :relation "
|
||||||
+ "AND fromEntity = :fromEntity")
|
+ "AND fromEntity = :fromEntity")
|
||||||
void deleteTo(
|
int deleteTo(
|
||||||
@Bind("toId") String toId,
|
@Bind("toId") String toId,
|
||||||
@Bind("toEntity") String toEntity,
|
@Bind("toEntity") String toEntity,
|
||||||
@Bind("relation") int relation,
|
@Bind("relation") int relation,
|
||||||
@ -445,7 +445,7 @@ public interface CollectionDAO {
|
|||||||
@SqlUpdate(
|
@SqlUpdate(
|
||||||
"DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR "
|
"DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR "
|
||||||
+ "(fromId = :id AND toEntity = :entity)")
|
+ "(fromId = :id AND toEntity = :entity)")
|
||||||
void deleteAll(@Bind("id") String id, @Bind("entity") String entity);
|
int deleteAll(@Bind("id") String id, @Bind("entity") String entity);
|
||||||
|
|
||||||
@SqlUpdate(
|
@SqlUpdate(
|
||||||
"UPDATE entity_relationship SET deleted = true WHERE (toId = :id AND toEntity = :entity) "
|
"UPDATE entity_relationship SET deleted = true WHERE (toId = :id AND toEntity = :entity) "
|
||||||
|
|||||||
@ -65,6 +65,25 @@ public class LineageRepository {
|
|||||||
Relationship.UPSTREAM.ordinal());
|
Relationship.UPSTREAM.ordinal());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transaction
|
||||||
|
public boolean deleteLineage(String fromEntity, String fromId, String toEntity, String toId) throws IOException {
|
||||||
|
// Validate from entity
|
||||||
|
EntityReference from = Entity.getEntityReference(fromEntity, UUID.fromString(fromId));
|
||||||
|
|
||||||
|
// Validate to entity
|
||||||
|
EntityReference to = Entity.getEntityReference(toEntity, UUID.fromString(toId));
|
||||||
|
|
||||||
|
// Finally, delete lineage relationship
|
||||||
|
return dao.relationshipDAO()
|
||||||
|
.delete(
|
||||||
|
from.getId().toString(),
|
||||||
|
from.getType(),
|
||||||
|
to.getId().toString(),
|
||||||
|
to.getType(),
|
||||||
|
Relationship.UPSTREAM.ordinal())
|
||||||
|
> 0;
|
||||||
|
}
|
||||||
|
|
||||||
private EntityLineage getLineage(EntityReference primary, int upstreamDepth, int downstreamDepth) throws IOException {
|
private EntityLineage getLineage(EntityReference primary, int upstreamDepth, int downstreamDepth) throws IOException {
|
||||||
List<EntityReference> entities = new ArrayList<>();
|
List<EntityReference> entities = new ArrayList<>();
|
||||||
EntityLineage lineage =
|
EntityLineage lineage =
|
||||||
|
|||||||
@ -13,6 +13,9 @@
|
|||||||
|
|
||||||
package org.openmetadata.catalog.resources.lineage;
|
package org.openmetadata.catalog.resources.lineage;
|
||||||
|
|
||||||
|
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
|
||||||
|
|
||||||
|
import io.dropwizard.jersey.errors.ErrorMessage;
|
||||||
import io.swagger.annotations.Api;
|
import io.swagger.annotations.Api;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
@ -25,6 +28,7 @@ import javax.validation.Valid;
|
|||||||
import javax.validation.constraints.Max;
|
import javax.validation.constraints.Max;
|
||||||
import javax.validation.constraints.Min;
|
import javax.validation.constraints.Min;
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
|
import javax.ws.rs.DELETE;
|
||||||
import javax.ws.rs.DefaultValue;
|
import javax.ws.rs.DefaultValue;
|
||||||
import javax.ws.rs.GET;
|
import javax.ws.rs.GET;
|
||||||
import javax.ws.rs.PUT;
|
import javax.ws.rs.PUT;
|
||||||
@ -42,11 +46,8 @@ import org.openmetadata.catalog.api.lineage.AddLineage;
|
|||||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||||
import org.openmetadata.catalog.jdbi3.LineageRepository;
|
import org.openmetadata.catalog.jdbi3.LineageRepository;
|
||||||
import org.openmetadata.catalog.resources.Collection;
|
import org.openmetadata.catalog.resources.Collection;
|
||||||
import org.openmetadata.catalog.resources.teams.UserResource;
|
|
||||||
import org.openmetadata.catalog.security.Authorizer;
|
import org.openmetadata.catalog.security.Authorizer;
|
||||||
import org.openmetadata.catalog.type.EntityLineage;
|
import org.openmetadata.catalog.type.EntityLineage;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
@Path("/v1/lineage")
|
@Path("/v1/lineage")
|
||||||
@Api(value = "Lineage resource", tags = "Lineage resource")
|
@Api(value = "Lineage resource", tags = "Lineage resource")
|
||||||
@ -54,7 +55,6 @@ import org.slf4j.LoggerFactory;
|
|||||||
@Consumes(MediaType.APPLICATION_JSON)
|
@Consumes(MediaType.APPLICATION_JSON)
|
||||||
@Collection(name = "lineage")
|
@Collection(name = "lineage")
|
||||||
public class LineageResource {
|
public class LineageResource {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(UserResource.class);
|
|
||||||
private final LineageRepository dao;
|
private final LineageRepository dao;
|
||||||
|
|
||||||
public LineageResource(CollectionDAO dao, Authorizer authorizer) {
|
public LineageResource(CollectionDAO dao, Authorizer authorizer) {
|
||||||
@ -154,6 +154,44 @@ public class LineageResource {
|
|||||||
return Response.status(Status.OK).build();
|
return Response.status(Status.OK).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DELETE
|
||||||
|
@Path("/{fromEntity}/{fromId}/{toEntity}/{toId}")
|
||||||
|
@Operation(
|
||||||
|
summary = "Delete a lineage edge",
|
||||||
|
tags = "lineage",
|
||||||
|
description = "Delete a lineage edge with from entity as upstream node and to entity as downstream node.",
|
||||||
|
responses = {
|
||||||
|
@ApiResponse(responseCode = "200"),
|
||||||
|
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
|
||||||
|
})
|
||||||
|
public Response deleteLineage(
|
||||||
|
@Context UriInfo uriInfo,
|
||||||
|
@Parameter(
|
||||||
|
description = "Entity type of upstream entity of the edge",
|
||||||
|
required = true,
|
||||||
|
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard"))
|
||||||
|
@PathParam("fromEntity")
|
||||||
|
String fromEntity,
|
||||||
|
@Parameter(description = "Entity id", required = true, schema = @Schema(type = "string")) @PathParam("fromId")
|
||||||
|
String fromId,
|
||||||
|
@Parameter(
|
||||||
|
description = "Entity type for downstream entity of the edge",
|
||||||
|
required = true,
|
||||||
|
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard"))
|
||||||
|
@PathParam("toEntity")
|
||||||
|
String toEntity,
|
||||||
|
@Parameter(description = "Entity id", required = true, schema = @Schema(type = "string")) @PathParam("toId")
|
||||||
|
String toId)
|
||||||
|
throws IOException {
|
||||||
|
boolean deleted = dao.deleteLineage(fromEntity, fromId, toEntity, toId);
|
||||||
|
if (!deleted) {
|
||||||
|
return Response.status(NOT_FOUND)
|
||||||
|
.entity(new ErrorMessage(NOT_FOUND.getStatusCode(), "Lineage edge not " + "found"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
return Response.status(Status.OK).build();
|
||||||
|
}
|
||||||
|
|
||||||
private EntityLineage addHref(UriInfo uriInfo, EntityLineage lineage) {
|
private EntityLineage addHref(UriInfo uriInfo, EntityLineage lineage) {
|
||||||
Entity.withHref(uriInfo, lineage.getEntity());
|
Entity.withHref(uriInfo, lineage.getEntity());
|
||||||
Entity.withHref(uriInfo, lineage.getNodes());
|
Entity.withHref(uriInfo, lineage.getNodes());
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
"$id": "https://open-metadata.org/schema/api/lineage/addLineage.json",
|
"$id": "https://open-metadata.org/schema/api/lineage/addLineage.json",
|
||||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
"title": "addLineage",
|
"title": "addLineage",
|
||||||
"description": "Add lineage details between two entities",
|
"description": "Add lineage edge between two entities",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties" : {
|
"properties" : {
|
||||||
"description": {
|
"description": {
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
package org.openmetadata.catalog.resources.lineage;
|
package org.openmetadata.catalog.resources.lineage;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
|
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
|
||||||
|
|
||||||
@ -62,7 +63,7 @@ public class LineageResourceTest extends CatalogApplicationTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void put_addLineageForInvalidEntities() throws HttpResponseException {
|
void put_delete_lineage_200() throws HttpResponseException {
|
||||||
// Add lineage table4-->table5
|
// Add lineage table4-->table5
|
||||||
addEdge(TABLES.get(4), TABLES.get(5));
|
addEdge(TABLES.get(4), TABLES.get(5));
|
||||||
|
|
||||||
@ -118,6 +119,24 @@ public class LineageResourceTest extends CatalogApplicationTest {
|
|||||||
lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 2, 2, adminAuthHeaders());
|
lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 2, 2, adminAuthHeaders());
|
||||||
assertEdges(
|
assertEdges(
|
||||||
lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 4), Arrays.copyOfRange(expectedDownstreamEdges, 0, 4));
|
lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 4), Arrays.copyOfRange(expectedDownstreamEdges, 0, 4));
|
||||||
|
|
||||||
|
//
|
||||||
|
// Delete all the lineage edges
|
||||||
|
// table2--> -->table9
|
||||||
|
// table0-->table3-->table4-->table5->table6->table7
|
||||||
|
// table1--> -->table8
|
||||||
|
deleteEdge(TABLES.get(0), TABLES.get(3));
|
||||||
|
deleteEdge(TABLES.get(3), TABLES.get(4));
|
||||||
|
deleteEdge(TABLES.get(2), TABLES.get(4));
|
||||||
|
deleteEdge(TABLES.get(1), TABLES.get(4));
|
||||||
|
deleteEdge(TABLES.get(4), TABLES.get(9));
|
||||||
|
deleteEdge(TABLES.get(4), TABLES.get(5));
|
||||||
|
deleteEdge(TABLES.get(4), TABLES.get(8));
|
||||||
|
deleteEdge(TABLES.get(5), TABLES.get(6));
|
||||||
|
deleteEdge(TABLES.get(6), TABLES.get(7));
|
||||||
|
lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 2, 2, adminAuthHeaders());
|
||||||
|
assertTrue(lineage.getUpstreamEdges().isEmpty());
|
||||||
|
assertTrue(lineage.getDownstreamEdges().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Edge getEdge(Table from, Table to) {
|
public Edge getEdge(Table from, Table to) {
|
||||||
@ -137,14 +156,40 @@ public class LineageResourceTest extends CatalogApplicationTest {
|
|||||||
addLineageAndCheck(addLineage, adminAuthHeaders());
|
addLineageAndCheck(addLineage, adminAuthHeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void deleteEdge(Table from, Table to) throws HttpResponseException {
|
||||||
|
EntitiesEdge edge =
|
||||||
|
new EntitiesEdge()
|
||||||
|
.withFromEntity(new TableEntityInterface(from).getEntityReference())
|
||||||
|
.withToEntity(new TableEntityInterface(to).getEntityReference());
|
||||||
|
deleteLineageAndCheck(edge, adminAuthHeaders());
|
||||||
|
}
|
||||||
|
|
||||||
public static void addLineageAndCheck(AddLineage addLineage, Map<String, String> authHeaders)
|
public static void addLineageAndCheck(AddLineage addLineage, Map<String, String> authHeaders)
|
||||||
throws HttpResponseException {
|
throws HttpResponseException {
|
||||||
addLineage(addLineage, authHeaders);
|
addLineage(addLineage, authHeaders);
|
||||||
validateLineage(addLineage, authHeaders);
|
validateLineage(addLineage, authHeaders);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void deleteLineageAndCheck(EntitiesEdge deleteEdge, Map<String, String> authHeaders)
|
||||||
|
throws HttpResponseException {
|
||||||
|
deleteLineage(deleteEdge, authHeaders);
|
||||||
|
validateLineageDeleted(deleteEdge, authHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
public static void addLineage(AddLineage addLineage, Map<String, String> authHeaders) throws HttpResponseException {
|
public static void addLineage(AddLineage addLineage, Map<String, String> authHeaders) throws HttpResponseException {
|
||||||
TestUtils.put(CatalogApplicationTest.getResource("lineage"), addLineage, Status.OK, authHeaders);
|
TestUtils.put(getResource("lineage"), addLineage, Status.OK, authHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void deleteLineage(EntitiesEdge edge, Map<String, String> authHeaders) throws HttpResponseException {
|
||||||
|
WebTarget target =
|
||||||
|
getResource(
|
||||||
|
String.format(
|
||||||
|
"lineage/%s/%s/%s/%s",
|
||||||
|
edge.getFromEntity().getType(),
|
||||||
|
edge.getFromEntity().getId(),
|
||||||
|
edge.getToEntity().getType(),
|
||||||
|
edge.getToEntity().getId()));
|
||||||
|
TestUtils.delete(target, authHeaders);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void validateLineage(AddLineage addLineage, Map<String, String> authHeaders)
|
private static void validateLineage(AddLineage addLineage, Map<String, String> authHeaders)
|
||||||
@ -162,6 +207,21 @@ public class LineageResourceTest extends CatalogApplicationTest {
|
|||||||
assertEdge(lineage, expectedEdge, false);
|
assertEdge(lineage, expectedEdge, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void validateLineageDeleted(EntitiesEdge deletedEdge, Map<String, String> authHeaders)
|
||||||
|
throws HttpResponseException {
|
||||||
|
EntityReference from = deletedEdge.getFromEntity();
|
||||||
|
EntityReference to = deletedEdge.getToEntity();
|
||||||
|
Edge expectedEdge = getEdge(from.getId(), to.getId());
|
||||||
|
|
||||||
|
// Check fromEntity ---> toEntity downstream edge is returned
|
||||||
|
EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders);
|
||||||
|
assertDeleted(lineage, expectedEdge, true);
|
||||||
|
|
||||||
|
// Check fromEntity ---> toEntity upstream edge is returned
|
||||||
|
lineage = getLineage(to.getType(), to.getId(), 1, 0, authHeaders);
|
||||||
|
assertDeleted(lineage, expectedEdge, false);
|
||||||
|
}
|
||||||
|
|
||||||
private static void validateLineage(EntityLineage lineage) {
|
private static void validateLineage(EntityLineage lineage) {
|
||||||
TestUtils.validateEntityReference(lineage.getEntity());
|
TestUtils.validateEntityReference(lineage.getEntity());
|
||||||
lineage.getNodes().forEach(TestUtils::validateEntityReference);
|
lineage.getNodes().forEach(TestUtils::validateEntityReference);
|
||||||
@ -217,6 +277,14 @@ public class LineageResourceTest extends CatalogApplicationTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void assertDeleted(EntityLineage lineage, Edge expectedEdge, boolean downstream) {
|
||||||
|
if (downstream) {
|
||||||
|
assertFalse(lineage.getDownstreamEdges().contains(expectedEdge));
|
||||||
|
} else {
|
||||||
|
assertFalse(lineage.getUpstreamEdges().contains(expectedEdge));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void assertEdges(EntityLineage lineage, Edge[] expectedUpstreamEdges, Edge[] expectedDownstreamEdges) {
|
public static void assertEdges(EntityLineage lineage, Edge[] expectedUpstreamEdges, Edge[] expectedDownstreamEdges) {
|
||||||
assertEquals(lineage.getUpstreamEdges().size(), expectedUpstreamEdges.length);
|
assertEquals(lineage.getUpstreamEdges().size(), expectedUpstreamEdges.length);
|
||||||
for (Edge expectedUpstreamEdge : expectedUpstreamEdges) {
|
for (Edge expectedUpstreamEdge : expectedUpstreamEdges) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user