Fix #718 - Add PUT and GET API support for lineage (#719)

Co-authored-by: sureshms <suresh@getcollate.io>
This commit is contained in:
Suresh Srinivas 2021-10-08 18:36:35 -07:00 committed by GitHub
parent d311fbf849
commit 09db4ef023
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 647 additions and 27 deletions

View File

@ -30,6 +30,9 @@ public interface EntityRelationshipDAO {
int insert(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("fromEntity") String fromEntity,
@Bind("toEntity") String toEntity, @Bind("relation") int relation);
//
// Find to operations
//
@SqlQuery("SELECT toId, toEntity FROM entity_relationship WHERE fromId = :fromId AND relation = :relation")
@RegisterMapper(ToEntityReferenceMapper.class)
List<EntityReference> findTo(@Bind("fromId") String fromId, @Bind("relation") int relation);
@ -43,6 +46,9 @@ public interface EntityRelationshipDAO {
"fromId = :fromId AND relation = :relation AND toEntity = :toEntity ORDER BY fromId")
int findToCount(@Bind("fromId") String fromId, @Bind("relation") int relation, @Bind("toEntity") String toEntity);
//
// Find from operations
//
@SqlQuery("SELECT fromId FROM entity_relationship WHERE " +
"toId = :toId AND relation = :relation AND fromEntity = :fromEntity ORDER BY fromId")
List<String> findFrom(@Bind("toId") String toId, @Bind("relation") int relation,
@ -59,6 +65,9 @@ public interface EntityRelationshipDAO {
List<EntityReference> findFromEntity(@Bind("toId") String toId, @Bind("relation") int relation,
@Bind("fromEntity") String fromEntity);
//
// Delete Operations
//
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation")
void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation);

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.openmetadata.catalog.api.lineage.AddLineage;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.type.Edge;
import org.openmetadata.catalog.type.EntityLineage;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.openmetadata.catalog.util.EntityUtil.getEntityReference;
public abstract class LineageRepository {
private static final Logger LOG = LoggerFactory.getLogger(LineageRepository.class);
@CreateSqlObject
abstract TableDAO tableDAO();
@CreateSqlObject
abstract DatabaseDAO databaseDAO();
@CreateSqlObject
abstract MetricsDAO metricsDAO();
@CreateSqlObject
abstract DashboardDAO dashboardDAO();
@CreateSqlObject
abstract ReportDAO reportDAO();
@CreateSqlObject
abstract TopicDAO topicDAO();
@CreateSqlObject
abstract ChartDAO chartDAO();
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO();
@Transaction
public EntityLineage get(String entityType, String id, int upstreamDepth, int downstreamDepth) throws IOException {
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@Transaction
public EntityLineage getByName(String entityType, String fqn, int upstreamDepth, int downstreamDepth)
throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(),
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO());
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@Transaction
public void addLineage(AddLineage addLineage) throws IOException {
// Validate from entity
EntityReference from = addLineage.getEdge().getFrom();
from = EntityUtil.getEntityReference(from.getType(), from.getId(), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
// Validate to entity
EntityReference to = addLineage.getEdge().getTo();
to = EntityUtil.getEntityReference(to.getType(), to.getId(), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
// Finally, add lineage relationship
relationshipDAO().insert(from.getId().toString(), to.getId().toString(), from.getType(), to.getType(),
Relationship.UPSTREAM.ordinal());
}
private EntityLineage getLineage(EntityReference primary, int upstreamDepth, int downstreamDepth) throws IOException {
List<EntityReference> entities = new ArrayList<>();
EntityLineage lineage = new EntityLineage().withEntity(primary).withNodes(entities)
.withUpstreamEdges(new ArrayList<>()).withDownstreamEdges(new ArrayList<>());
addUpstreamLineage(primary.getId(), lineage, upstreamDepth);
addDownstreamLineage(primary.getId(), lineage, downstreamDepth);
// Remove duplicate nodes
lineage.withNodes(lineage.getNodes().stream().distinct().collect(Collectors.toList()));
// Add entityReference details
for (int i = 0; i < lineage.getNodes().size(); i++) {
EntityReference ref = lineage.getNodes().get(i);
ref = getEntityReference(ref.getType(), ref.getId(), tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
lineage.getNodes().set(i, ref);
}
return lineage;
}
private void addUpstreamLineage(UUID id, EntityLineage lineage, int upstreamDepth) {
if (upstreamDepth == 0) {
return;
}
// from this id ---> find other ids
List<EntityReference> upstreamEntities = relationshipDAO().findFrom(id.toString(), Relationship.UPSTREAM.ordinal());
lineage.getNodes().addAll(upstreamEntities);
upstreamDepth--;
for (EntityReference upstreamEntity : upstreamEntities) {
lineage.getUpstreamEdges().add(new Edge().withFrom(upstreamEntity.getId()).withTo(id));
addUpstreamLineage(upstreamEntity.getId(), lineage, upstreamDepth); // Recursively add upstream nodes and edges
}
}
private void addDownstreamLineage(UUID id, EntityLineage lineage, int downstreamDepth) {
if (downstreamDepth == 0) {
return;
}
// from other ids ---> to this id
List<EntityReference> downStreamEntities = relationshipDAO().findTo(id.toString(), Relationship.UPSTREAM.ordinal());
lineage.getNodes().addAll(downStreamEntities);
downstreamDepth--;
for (EntityReference entity : downStreamEntities) {
lineage.getDownstreamEdges().add(new Edge().withTo(entity.getId()).withFrom(id));
addDownstreamLineage(entity.getId(), lineage, downstreamDepth);
}
}
}

View File

@ -35,7 +35,6 @@ public enum Relationship {
CONTAINS("contains"),
// User/Bot --- created ---> Thread
// Pipeline --- created ---> Table
CREATED("createdBy"),
// User/Bot --- repliedTo ---> Thread
@ -70,7 +69,13 @@ public enum Relationship {
FOLLOWS("follows"),
// {Table.Column...} --- joinedWith ---> {Table.Column}
JOINED_WITH("joinedWith");
JOINED_WITH("joinedWith"),
// Lineage relationship
// {Table1} --- upstream ---> {Table2} (Table1 is used for creating Table2}
// {Pipeline} --- upstream ---> {Table2} (Pipeline creates Table2)
// {Table} --- upstream ---> {Dashboard} (Table was used to create Dashboard)
UPSTREAM("upstream");
/*** Add new enums to the end of the list **/
private final String value;

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.lineage;
import com.google.inject.Inject;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.lineage.AddLineage;
import org.openmetadata.catalog.jdbi3.LineageRepository;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.type.EntityLineage;
import org.openmetadata.catalog.util.EntityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.util.Objects;
@Path("/v1/lineage")
@Api(value = "Lineage resource", tags = "Lineage resource")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "lineage", repositoryClass = "org.openmetadata.catalog.jdbi3.LineageRepository")
public class LineageResource {
private static final Logger LOG = LoggerFactory.getLogger(UserResource.class);
private final LineageRepository dao;
@Inject
public LineageResource(LineageRepository dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "LineageRepository must not be null");
this.dao = dao;
}
@GET
@Valid
@Path("/{entity}/{id}")
@Operation(summary = "Get lineage", tags = "lineage",
description = "Get lineage details for an entity identified by `id`.",
responses = {
@ApiResponse(responseCode = "200", description = "Entity lineage",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = EntityLineage.class))),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
})
public EntityLineage get(
@Context UriInfo uriInfo,
@Parameter(description = "Entity type for which lineage is requested",
required = true,
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard"))
@PathParam("entity") String entity,
@Parameter(description = "Entity id",
required = true,
schema = @Schema(type = "string"))
@PathParam("id") String id,
@Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)")
@QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)")
@QueryParam("downstreamDepth") int downStreamDepth) throws IOException {
upstreamDepth = Math.min(Math.max(upstreamDepth, 0), 3);
downStreamDepth = Math.min(Math.max(downStreamDepth, 0), 3);
return addHref(uriInfo, dao.get(entity, id, upstreamDepth, downStreamDepth));
}
@GET
@Valid
@Path("/{entity}/name/{fqn}")
@Operation(summary = "Get lineage by name", tags = "lineage",
description = "Get lineage details for an entity identified by fully qualified name.",
responses = {
@ApiResponse(responseCode = "200", description = "Entity lineage",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = EntityLineage.class))),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
})
public EntityLineage getByName(
@Context UriInfo uriInfo,
@Parameter(description = "Entity type for which lineage is requested",
required = true,
schema = @Schema(type = "string", example = "table, report, metrics, or dashboard"))
@PathParam("entity") String entity,
@Parameter(description = "Fully qualified name of the entity that uniquely identifies an entity",
required = true,
schema = @Schema(type = "string"))
@PathParam("fqn") String fqn,
@Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)")
@QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)")
@QueryParam("downstreamDepth") int downStreamDepth) throws IOException {
upstreamDepth = Math.min(Math.max(upstreamDepth, 0), 3);
downStreamDepth = Math.min(Math.max(downStreamDepth, 1), 3);
return addHref(uriInfo, dao.getByName(entity, fqn, upstreamDepth, downStreamDepth));
}
@PUT
@Operation(summary = "Add a lineage edge", tags = "lineage",
description = "Add a lineage edge with from entity as upstream node and to entity as downstream node.",
responses = {
@ApiResponse(responseCode = "200"),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
})
public Response addLineage(
@Context UriInfo uriInfo,
@Valid AddLineage addLineage) throws IOException {
dao.addLineage(addLineage);
return Response.status(Status.OK).build();
}
private EntityLineage addHref(UriInfo uriInfo, EntityLineage lineage) {
EntityUtil.addHref(uriInfo, lineage.getEntity());
lineage.getNodes().forEach(node -> EntityUtil.addHref(uriInfo, node));
return lineage;
}
}

View File

@ -93,8 +93,7 @@ public class UsageResource {
"(default = currentDate)")
@QueryParam("date") String date) throws IOException {
// TODO add href
int actualDays = Math.min(30, days);
actualDays = Math.max(1, actualDays);
int actualDays = Math.min(Math.max(days, 1), 30);
String actualDate = date == null ? RestUtil.DATE_FORMAT.format(new Date()) : date;
return addHref(uriInfo, dao.get(entity, id, actualDate, actualDays));
}
@ -128,8 +127,7 @@ public class UsageResource {
@QueryParam("date") String date
) throws IOException {
// TODO add href
int actualDays = Math.min(30, days);
actualDays = Math.max(1, actualDays);
int actualDays = Math.min(Math.max(days, 1), 30);
String actualDate = date == null ? RestUtil.DATE_FORMAT.format(new Date()) : date;
return addHref(uriInfo, dao.getByName(entity, fqn, actualDate, actualDays));
}

View File

@ -308,40 +308,31 @@ public final class EntityUtil {
throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TABLE)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DATABASE)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.METRICS)) {
Metrics instance = EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.METRICS)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.REPORT)) {
Report instance = EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.REPORT)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.TOPIC)) {
Topic instance = EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TOPIC)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.CHART)) {
Chart instance = EntityUtil.validate(fqn, chartDAO.findByFQN(fqn), Chart.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.CHART)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) {
Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DASHBOARD)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TASK)
.withDescription(instance.getDescription());
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
Model instance = EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.MODEL)
.withDescription(instance.getDescription());
return getEntityReference(instance);
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
}

View File

@ -0,0 +1,18 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/addLineage.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "addLineage",
"description": "Add lineage details between two entities",
"type": "object",
"properties" : {
"description": {
"description": "User provided description of the lineage details.",
"type": "string"
},
"edge" : {
"description": "Lineage edge details.",
"$ref" : "../../type/entityLineage.json#/definitions/entitiesEdge"
}
},
"required": ["edge"]
}

View File

@ -0,0 +1,80 @@
{
"$id": "https://open-metadata.org/schema/type/entityLineage.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Entity Lineage",
"description": "This schema defines the type used for lineage of an entity.",
"type": "object",
"javaType": "org.openmetadata.catalog.type.EntityLineage",
"definitions" : {
"edge" : {
"description": "Edge in the lineage graph from one entity to another by entity IDs.",
"type": "object",
"javaType": "org.openmetadata.catalog.type.Edge",
"properties": {
"from": {
"description" : "From entity that is upstream of lineage edge.",
"$ref" : "basic.json#/definitions/uuid"
},
"to": {
"description" : "To entity that is downstream of lineage edge.",
"$ref" : "basic.json#/definitions/uuid"
},
"description" : {
"type": "string"
}
}
},
"entitiesEdge" : {
"description": "Edge in the lineage graph from one entity to another using entity references.",
"type": "object",
"javaType": "org.openmetadata.catalog.type.EntitiesEdge",
"properties": {
"from": {
"description" : "From entity that is upstream of lineage edge.",
"$ref" : "entityReference.json"
},
"to": {
"description" : "To entity that is downstream of lineage edge.",
"$ref" : "entityReference.json"
},
"description" : {
"type": "string"
}
}
}
},
"properties": {
"entity" : {
"description": "Primary entity for which this lineage graph is created",
"$ref": "entityReference.json"
},
"nodes": {
"descriptions" : "All the entities that are the nodes in the lineage graph excluding the primary entity.",
"type" : "array",
"items" : {
"$ref": "entityReference.json"
},
"default" : null
},
"upstreamEdges": {
"descriptions" : "All the edges in the lineage graph that are upstream from the primiary entity.",
"type": "array",
"items": {
"$ref": "#/definitions/edge"
},
"default" : null
},
"downstreamEdges": {
"descriptions" : "All the edges in the lineage graph that are downstream from the primiary entity.",
"type": "array",
"items": {
"$ref": "#/definitions/edge"
},
"default" : null
}
},
"required": [
"entity"
],
"additionalProperties": false
}

View File

@ -35,8 +35,8 @@ public class EnumBackwardCompatibilityTest {
*/
@Test
public void testRelationshipEnumBackwardCompatible() {
assertEquals(13, Relationship.values().length);
assertEquals(12, Relationship.JOINED_WITH.ordinal());
assertEquals(14, Relationship.values().length);
assertEquals(13, Relationship.UPSTREAM.ordinal());
}
/**

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.lineage;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestMethodOrder;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.data.CreateTable;
import org.openmetadata.catalog.api.lineage.AddLineage;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.resources.databases.TableResourceTest;
import org.openmetadata.catalog.type.Edge;
import org.openmetadata.catalog.type.EntitiesEdge;
import org.openmetadata.catalog.type.EntityLineage;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.util.EntityUtil.getEntityReference;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class LineageResourceTest extends CatalogApplicationTest {
private static final Logger LOG = LoggerFactory.getLogger(LineageResourceTest.class);
public static final List<Table> TABLES = new ArrayList<>();
public static final int TABLE_COUNT = 10;
@BeforeAll
public static void setup(TestInfo test) throws HttpResponseException {
TableResourceTest.setup(test); // Initialize TableResourceTest for using helper methods
// Create TABLE_COUNT number of tables
for (int i = 0; i < TABLE_COUNT; i++) {
CreateTable createTable = TableResourceTest.create(test, i);
TABLES.add(TableResourceTest.createTable(createTable, adminAuthHeaders()));
}
}
@Test
public void put_addLineageForInvalidEntities() throws HttpResponseException {
// Add lineage table4-->table5
addEdge(TABLES.get(4), TABLES.get(5));
// Add lineage table5-->table6
addEdge(TABLES.get(5), TABLES.get(6));
addEdge(TABLES.get(5), TABLES.get(6)); // PUT operation again with the same edge
//
// Add edges to this lineage graph
// table2--> -->table9
// table0-->table3-->table4-->table5->table6->table7
// table1--> -->table8
addEdge(TABLES.get(0), TABLES.get(3));
addEdge(TABLES.get(2), TABLES.get(4));
addEdge(TABLES.get(3), TABLES.get(4));
addEdge(TABLES.get(1), TABLES.get(4));
addEdge(TABLES.get(4), TABLES.get(9));
addEdge(TABLES.get(4), TABLES.get(5));
addEdge(TABLES.get(4), TABLES.get(8));
addEdge(TABLES.get(5), TABLES.get(6));
addEdge(TABLES.get(6), TABLES.get(7));
// Test table4 lineage
Edge[] expectedUpstreamEdges = {getEdge(TABLES.get(2), TABLES.get(4)), getEdge(TABLES.get(3), TABLES.get(4)),
getEdge(TABLES.get(1), TABLES.get(4)), getEdge(TABLES.get(0), TABLES.get(3))};
Edge[] expectedDownstreamEdges = {getEdge(TABLES.get(4), TABLES.get(9)), getEdge(TABLES.get(4),
TABLES.get(5)), getEdge(TABLES.get(4), TABLES.get(8)), getEdge(TABLES.get(5), TABLES.get(6)),
getEdge(TABLES.get(6), TABLES.get(7))};
// GET lineage by id
EntityLineage lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 3, 3, adminAuthHeaders());
assertEdges(lineage, expectedUpstreamEdges, expectedDownstreamEdges);
// GET lineage by fqn
lineage = getLineageByName(Entity.TABLE, TABLES.get(4).getFullyQualifiedName(), 3, 3, adminAuthHeaders());
assertEdges(lineage, expectedUpstreamEdges, expectedDownstreamEdges);
// Test table4 partial lineage with various upstream and downstream depths
lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 0, 0, adminAuthHeaders());
assertEdges(lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 0),
Arrays.copyOfRange(expectedDownstreamEdges, 0, 0));
lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 1, 1, adminAuthHeaders());
assertEdges(lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 3),
Arrays.copyOfRange(expectedDownstreamEdges, 0, 3));
lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 2, 2, adminAuthHeaders());
assertEdges(lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 4),
Arrays.copyOfRange(expectedDownstreamEdges, 0, 4));
}
public Edge getEdge(Table from, Table to) {
return getEdge(from.getId(), to.getId());
}
public static Edge getEdge(UUID from, UUID to) {
return new Edge().withFrom(from).withTo(to);
}
public void addEdge(Table from, Table to) throws HttpResponseException {
EntitiesEdge edge = new EntitiesEdge().withFrom(getEntityReference(from)).withTo(getEntityReference(to));
AddLineage addLineage = new AddLineage().withEdge(edge);
addLineageAndCheck(addLineage, adminAuthHeaders());
}
public static void addLineageAndCheck(AddLineage addLineage, Map<String, String> authHeaders)
throws HttpResponseException {
addLineage(addLineage, authHeaders);
validateLineage(addLineage, authHeaders);
}
public static void addLineage(AddLineage addLineage, Map<String, String> authHeaders)
throws HttpResponseException {
TestUtils.put(CatalogApplicationTest.getResource("lineage"), addLineage, Status.OK, authHeaders);
}
private static void validateLineage(AddLineage addLineage, Map<String, String> authHeaders)
throws HttpResponseException {
EntityReference from = addLineage.getEdge().getFrom();
EntityReference to = addLineage.getEdge().getTo();
Edge expectedEdge = getEdge(from.getId(), to.getId());
// Check fromEntity ---> toEntity downstream edge is returned
EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders);
assertEdge(lineage, expectedEdge, true);
// Check fromEntity ---> toEntity upstream edge is returned
lineage = getLineage(to.getType(), to.getId(), 1, 0, authHeaders);
assertEdge(lineage, expectedEdge, false);
}
private static void validateLineage(EntityLineage lineage) {
TestUtils.validateEntityReference(lineage.getEntity());
lineage.getNodes().forEach(TestUtils::validateEntityReference);
// Total number of from and to points in an edge must be equal to the number of nodes
List<UUID> ids = new ArrayList<>();
lineage.getUpstreamEdges().forEach(edge -> {ids.add(edge.getFrom()); ids.add(edge.getTo());});
lineage.getDownstreamEdges().forEach(edge -> {ids.add(edge.getFrom()); ids.add(edge.getTo());});
if (lineage.getNodes().size() != 0) {
assertEquals((int) ids.stream().distinct().count(), lineage.getNodes().size() + 1);
}
}
public static EntityLineage getLineage(String entity, UUID id, Integer upstreamDepth,
Integer downStreamDepth, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("lineage/" + entity + "/" + id);
target = upstreamDepth != null ? target.queryParam("upstreamDepth", upstreamDepth) : target;
target = downStreamDepth != null ? target.queryParam("downstreamDepth", downStreamDepth) : target;
EntityLineage lineage = TestUtils.get(target, EntityLineage.class, authHeaders);
validateLineage((lineage));
return lineage;
}
public static EntityLineage getLineageByName(String entity, String fqn, Integer upstreamDepth,
Integer downStreamDepth, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("lineage/" + entity + "/name/" + fqn);
target = upstreamDepth != null ? target.queryParam("upstreamDepth", upstreamDepth) : target;
target = downStreamDepth != null ? target.queryParam("downstreamDepth", downStreamDepth) : target;
EntityLineage lineage = TestUtils.get(target, EntityLineage.class, authHeaders);
validateLineage((lineage));
return lineage;
}
public static void assertEdge(EntityLineage lineage, Edge expectedEdge, boolean downstream) {
if (downstream) {
assertTrue(lineage.getDownstreamEdges().contains(expectedEdge));
} else {
assertTrue(lineage.getUpstreamEdges().contains(expectedEdge));
}
}
public static void assertEdges(EntityLineage lineage, Edge[] expectedUpstreamEdges, Edge[] expectedDownstreamEdges) {
assertEquals(lineage.getUpstreamEdges().size(), expectedUpstreamEdges.length);
for (Edge expectedUpstreamEdge : expectedUpstreamEdges) {
assertTrue(lineage.getUpstreamEdges().contains(expectedUpstreamEdge));
}
assertEquals(lineage.getDownstreamEdges().size(), expectedDownstreamEdges.length);
for (Edge expectedDownstreamEdge : expectedDownstreamEdges) {
assertTrue(lineage.getDownstreamEdges().contains(expectedDownstreamEdge));
}
}
}

View File

@ -195,7 +195,8 @@ public final class TestUtils {
// Ensure data entities use fully qualified name
if (List.of("table", "database", "metrics", "dashboard", "pipeline", "report", "topic", "chart")
.contains(ref.getType())) {
assertTrue(ref.getName().contains(".")); // FullyQualifiedName has "." as separator
// FullyQualifiedName has "." as separator
assertTrue(ref.getName().contains("."), "entity name is not fully qualified - " + ref.getName());
}
}