diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java index 8b086ddd9f7..551f7ba8615 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java @@ -30,6 +30,9 @@ public interface EntityRelationshipDAO { int insert(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("fromEntity") String fromEntity, @Bind("toEntity") String toEntity, @Bind("relation") int relation); + // + // Find to operations + // @SqlQuery("SELECT toId, toEntity FROM entity_relationship WHERE fromId = :fromId AND relation = :relation") @RegisterMapper(ToEntityReferenceMapper.class) List findTo(@Bind("fromId") String fromId, @Bind("relation") int relation); @@ -43,6 +46,9 @@ public interface EntityRelationshipDAO { "fromId = :fromId AND relation = :relation AND toEntity = :toEntity ORDER BY fromId") int findToCount(@Bind("fromId") String fromId, @Bind("relation") int relation, @Bind("toEntity") String toEntity); + // + // Find from operations + // @SqlQuery("SELECT fromId FROM entity_relationship WHERE " + "toId = :toId AND relation = :relation AND fromEntity = :fromEntity ORDER BY fromId") List findFrom(@Bind("toId") String toId, @Bind("relation") int relation, @@ -59,6 +65,9 @@ public interface EntityRelationshipDAO { List findFromEntity(@Bind("toId") String toId, @Bind("relation") int relation, @Bind("fromEntity") String fromEntity); + // + // Delete Operations + // @SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation") void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java new file mode 100644 index 00000000000..4c70dacafff --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import org.openmetadata.catalog.api.lineage.AddLineage; +import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; +import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO; +import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO; +import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; +import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO; +import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; +import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; +import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; +import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; +import org.openmetadata.catalog.type.Edge; +import org.openmetadata.catalog.type.EntityLineage; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.util.EntityUtil; +import org.skife.jdbi.v2.sqlobject.CreateSqlObject; +import org.skife.jdbi.v2.sqlobject.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.openmetadata.catalog.util.EntityUtil.getEntityReference; + +public abstract class LineageRepository { + private static final Logger LOG = LoggerFactory.getLogger(LineageRepository.class); + + @CreateSqlObject + abstract TableDAO tableDAO(); + + @CreateSqlObject + abstract DatabaseDAO databaseDAO(); + + @CreateSqlObject + abstract MetricsDAO metricsDAO(); + + @CreateSqlObject + abstract DashboardDAO dashboardDAO(); + + @CreateSqlObject + abstract ReportDAO reportDAO(); + + @CreateSqlObject + abstract TopicDAO topicDAO(); + + @CreateSqlObject + abstract ChartDAO chartDAO(); + + @CreateSqlObject + abstract TaskDAO taskDAO(); + + @CreateSqlObject + abstract ModelDAO modelDAO(); + + @CreateSqlObject + abstract EntityRelationshipDAO relationshipDAO(); + + @Transaction + public EntityLineage get(String entityType, String id, int upstreamDepth, int downstreamDepth) throws IOException { + EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), + metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO()); + return getLineage(ref, upstreamDepth, downstreamDepth); + } + + @Transaction + public EntityLineage getByName(String entityType, String fqn, int upstreamDepth, int downstreamDepth) + throws IOException { + EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(), + metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO()); + return getLineage(ref, upstreamDepth, downstreamDepth); + } + + @Transaction + public void addLineage(AddLineage addLineage) throws IOException { + // Validate from entity + EntityReference from = addLineage.getEdge().getFrom(); + from = EntityUtil.getEntityReference(from.getType(), from.getId(), tableDAO(), databaseDAO(), + metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO()); + + // Validate to entity + EntityReference to = addLineage.getEdge().getTo(); + to = EntityUtil.getEntityReference(to.getType(), to.getId(), tableDAO(), databaseDAO(), + metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO()); + + // Finally, add lineage relationship + relationshipDAO().insert(from.getId().toString(), to.getId().toString(), from.getType(), to.getType(), + Relationship.UPSTREAM.ordinal()); + } + + private EntityLineage getLineage(EntityReference primary, int upstreamDepth, int downstreamDepth) throws IOException { + List entities = new ArrayList<>(); + EntityLineage lineage = new EntityLineage().withEntity(primary).withNodes(entities) + .withUpstreamEdges(new ArrayList<>()).withDownstreamEdges(new ArrayList<>()); + addUpstreamLineage(primary.getId(), lineage, upstreamDepth); + addDownstreamLineage(primary.getId(), lineage, downstreamDepth); + + // Remove duplicate nodes + lineage.withNodes(lineage.getNodes().stream().distinct().collect(Collectors.toList())); + + // Add entityReference details + for (int i = 0; i < lineage.getNodes().size(); i++) { + EntityReference ref = lineage.getNodes().get(i); + ref = getEntityReference(ref.getType(), ref.getId(), tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), + reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO()); + lineage.getNodes().set(i, ref); + } + return lineage; + } + + private void addUpstreamLineage(UUID id, EntityLineage lineage, int upstreamDepth) { + if (upstreamDepth == 0) { + return; + } + // from this id ---> find other ids + List upstreamEntities = relationshipDAO().findFrom(id.toString(), Relationship.UPSTREAM.ordinal()); + lineage.getNodes().addAll(upstreamEntities); + + upstreamDepth--; + for (EntityReference upstreamEntity : upstreamEntities) { + lineage.getUpstreamEdges().add(new Edge().withFrom(upstreamEntity.getId()).withTo(id)); + addUpstreamLineage(upstreamEntity.getId(), lineage, upstreamDepth); // Recursively add upstream nodes and edges + } + } + + private void addDownstreamLineage(UUID id, EntityLineage lineage, int downstreamDepth) { + if (downstreamDepth == 0) { + return; + } + // from other ids ---> to this id + List downStreamEntities = relationshipDAO().findTo(id.toString(), Relationship.UPSTREAM.ordinal()); + lineage.getNodes().addAll(downStreamEntities); + + downstreamDepth--; + for (EntityReference entity : downStreamEntities) { + lineage.getDownstreamEdges().add(new Edge().withTo(entity.getId()).withFrom(id)); + addDownstreamLineage(entity.getId(), lineage, downstreamDepth); + } + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java index fc7b1430d18..ad2a4ac6379 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/Relationship.java @@ -35,7 +35,6 @@ public enum Relationship { CONTAINS("contains"), // User/Bot --- created ---> Thread - // Pipeline --- created ---> Table CREATED("createdBy"), // User/Bot --- repliedTo ---> Thread @@ -70,7 +69,13 @@ public enum Relationship { FOLLOWS("follows"), // {Table.Column...} --- joinedWith ---> {Table.Column} - JOINED_WITH("joinedWith"); + JOINED_WITH("joinedWith"), + + // Lineage relationship + // {Table1} --- upstream ---> {Table2} (Table1 is used for creating Table2} + // {Pipeline} --- upstream ---> {Table2} (Pipeline creates Table2) + // {Table} --- upstream ---> {Dashboard} (Table was used to create Dashboard) + UPSTREAM("upstream"); /*** Add new enums to the end of the list **/ private final String value; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java new file mode 100644 index 00000000000..278b709befa --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.lineage; + +import com.google.inject.Inject; +import io.swagger.annotations.Api; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.openmetadata.catalog.api.lineage.AddLineage; +import org.openmetadata.catalog.jdbi3.LineageRepository; +import org.openmetadata.catalog.resources.Collection; +import org.openmetadata.catalog.resources.teams.UserResource; +import org.openmetadata.catalog.security.CatalogAuthorizer; +import org.openmetadata.catalog.type.EntityLineage; +import org.openmetadata.catalog.util.EntityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.Valid; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriInfo; +import java.io.IOException; +import java.util.Objects; + +@Path("/v1/lineage") +@Api(value = "Lineage resource", tags = "Lineage resource") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "lineage", repositoryClass = "org.openmetadata.catalog.jdbi3.LineageRepository") +public class LineageResource { + private static final Logger LOG = LoggerFactory.getLogger(UserResource.class); + private final LineageRepository dao; + + @Inject + public LineageResource(LineageRepository dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "LineageRepository must not be null"); + this.dao = dao; + } + + @GET + @Valid + @Path("/{entity}/{id}") + @Operation(summary = "Get lineage", tags = "lineage", + description = "Get lineage details for an entity identified by `id`.", + responses = { + @ApiResponse(responseCode = "200", description = "Entity lineage", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = EntityLineage.class))), + @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") + }) + public EntityLineage get( + @Context UriInfo uriInfo, + @Parameter(description = "Entity type for which lineage is requested", + required = true, + schema = @Schema(type = "string", example = "table, report, metrics, or dashboard")) + @PathParam("entity") String entity, + @Parameter(description = "Entity id", + required = true, + schema = @Schema(type = "string")) + @PathParam("id") String id, + @Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)") + @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)") + @QueryParam("downstreamDepth") int downStreamDepth) throws IOException { + upstreamDepth = Math.min(Math.max(upstreamDepth, 0), 3); + downStreamDepth = Math.min(Math.max(downStreamDepth, 0), 3); + return addHref(uriInfo, dao.get(entity, id, upstreamDepth, downStreamDepth)); + } + + @GET + @Valid + @Path("/{entity}/name/{fqn}") + @Operation(summary = "Get lineage by name", tags = "lineage", + description = "Get lineage details for an entity identified by fully qualified name.", + responses = { + @ApiResponse(responseCode = "200", description = "Entity lineage", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = EntityLineage.class))), + @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") + }) + public EntityLineage getByName( + @Context UriInfo uriInfo, + @Parameter(description = "Entity type for which lineage is requested", + required = true, + schema = @Schema(type = "string", example = "table, report, metrics, or dashboard")) + @PathParam("entity") String entity, + @Parameter(description = "Fully qualified name of the entity that uniquely identifies an entity", + required = true, + schema = @Schema(type = "string")) + @PathParam("fqn") String fqn, + @Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)") + @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)") + @QueryParam("downstreamDepth") int downStreamDepth) throws IOException { + upstreamDepth = Math.min(Math.max(upstreamDepth, 0), 3); + downStreamDepth = Math.min(Math.max(downStreamDepth, 1), 3); + return addHref(uriInfo, dao.getByName(entity, fqn, upstreamDepth, downStreamDepth)); + } + + @PUT + @Operation(summary = "Add a lineage edge", tags = "lineage", + description = "Add a lineage edge with from entity as upstream node and to entity as downstream node.", + responses = { + @ApiResponse(responseCode = "200"), + @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") + }) + public Response addLineage( + @Context UriInfo uriInfo, + @Valid AddLineage addLineage) throws IOException { + dao.addLineage(addLineage); + return Response.status(Status.OK).build(); + } + + private EntityLineage addHref(UriInfo uriInfo, EntityLineage lineage) { + EntityUtil.addHref(uriInfo, lineage.getEntity()); + lineage.getNodes().forEach(node -> EntityUtil.addHref(uriInfo, node)); + return lineage; + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/usage/UsageResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/usage/UsageResource.java index 258b7e0d9fd..8b616daedd6 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/usage/UsageResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/usage/UsageResource.java @@ -93,8 +93,7 @@ public class UsageResource { "(default = currentDate)") @QueryParam("date") String date) throws IOException { // TODO add href - int actualDays = Math.min(30, days); - actualDays = Math.max(1, actualDays); + int actualDays = Math.min(Math.max(days, 1), 30); String actualDate = date == null ? RestUtil.DATE_FORMAT.format(new Date()) : date; return addHref(uriInfo, dao.get(entity, id, actualDate, actualDays)); } @@ -128,8 +127,7 @@ public class UsageResource { @QueryParam("date") String date ) throws IOException { // TODO add href - int actualDays = Math.min(30, days); - actualDays = Math.max(1, actualDays); + int actualDays = Math.min(Math.max(days, 1), 30); String actualDate = date == null ? RestUtil.DATE_FORMAT.format(new Date()) : date; return addHref(uriInfo, dao.getByName(entity, fqn, actualDate, actualDays)); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index 662c3bfb8bd..0ae3d9f08a4 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -308,40 +308,31 @@ public final class EntityUtil { throws IOException { if (entity.equalsIgnoreCase(Entity.TABLE)) { Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TABLE) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.DATABASE)) { Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DATABASE) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.METRICS)) { Metrics instance = EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.METRICS) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.REPORT)) { Report instance = EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.REPORT) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.TOPIC)) { Topic instance = EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TOPIC) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.CHART)) { Chart instance = EntityUtil.validate(fqn, chartDAO.findByFQN(fqn), Chart.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.CHART) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) { Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DASHBOARD) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.TASK)) { Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TASK) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.MODEL)) { Model instance = EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class); - return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.MODEL) - .withDescription(instance.getDescription()); + return getEntityReference(instance); } throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn)); } diff --git a/catalog-rest-service/src/main/resources/json/schema/api/lineage/addLineage.json b/catalog-rest-service/src/main/resources/json/schema/api/lineage/addLineage.json new file mode 100644 index 00000000000..ea598f32941 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/lineage/addLineage.json @@ -0,0 +1,18 @@ +{ + "$id": "https://open-metadata.org/schema/api/lineage/addLineage.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "addLineage", + "description": "Add lineage details between two entities", + "type": "object", + "properties" : { + "description": { + "description": "User provided description of the lineage details.", + "type": "string" + }, + "edge" : { + "description": "Lineage edge details.", + "$ref" : "../../type/entityLineage.json#/definitions/entitiesEdge" + } + }, + "required": ["edge"] +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json b/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json new file mode 100644 index 00000000000..80b730a9dc9 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json @@ -0,0 +1,80 @@ +{ + "$id": "https://open-metadata.org/schema/type/entityLineage.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Entity Lineage", + "description": "This schema defines the type used for lineage of an entity.", + "type": "object", + "javaType": "org.openmetadata.catalog.type.EntityLineage", + "definitions" : { + "edge" : { + "description": "Edge in the lineage graph from one entity to another by entity IDs.", + "type": "object", + "javaType": "org.openmetadata.catalog.type.Edge", + "properties": { + "from": { + "description" : "From entity that is upstream of lineage edge.", + "$ref" : "basic.json#/definitions/uuid" + }, + "to": { + "description" : "To entity that is downstream of lineage edge.", + "$ref" : "basic.json#/definitions/uuid" + }, + "description" : { + "type": "string" + } + } + }, + "entitiesEdge" : { + "description": "Edge in the lineage graph from one entity to another using entity references.", + "type": "object", + "javaType": "org.openmetadata.catalog.type.EntitiesEdge", + "properties": { + "from": { + "description" : "From entity that is upstream of lineage edge.", + "$ref" : "entityReference.json" + }, + "to": { + "description" : "To entity that is downstream of lineage edge.", + "$ref" : "entityReference.json" + }, + "description" : { + "type": "string" + } + } + } + }, + "properties": { + "entity" : { + "description": "Primary entity for which this lineage graph is created", + "$ref": "entityReference.json" + }, + "nodes": { + "descriptions" : "All the entities that are the nodes in the lineage graph excluding the primary entity.", + "type" : "array", + "items" : { + "$ref": "entityReference.json" + }, + "default" : null + }, + "upstreamEdges": { + "descriptions" : "All the edges in the lineage graph that are upstream from the primiary entity.", + "type": "array", + "items": { + "$ref": "#/definitions/edge" + }, + "default" : null + }, + "downstreamEdges": { + "descriptions" : "All the edges in the lineage graph that are downstream from the primiary entity.", + "type": "array", + "items": { + "$ref": "#/definitions/edge" + }, + "default" : null + } + }, + "required": [ + "entity" + ], + "additionalProperties": false +} \ No newline at end of file diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java index 650ddba6258..629a7f74679 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/EnumBackwardCompatibilityTest.java @@ -35,8 +35,8 @@ public class EnumBackwardCompatibilityTest { */ @Test public void testRelationshipEnumBackwardCompatible() { - assertEquals(13, Relationship.values().length); - assertEquals(12, Relationship.JOINED_WITH.ordinal()); + assertEquals(14, Relationship.values().length); + assertEquals(13, Relationship.UPSTREAM.ordinal()); } /** diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java new file mode 100644 index 00000000000..5139a24020b --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.lineage; + +import org.apache.http.client.HttpResponseException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestMethodOrder; +import org.openmetadata.catalog.CatalogApplicationTest; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.api.data.CreateTable; +import org.openmetadata.catalog.api.lineage.AddLineage; +import org.openmetadata.catalog.entity.data.Table; +import org.openmetadata.catalog.resources.databases.TableResourceTest; +import org.openmetadata.catalog.type.Edge; +import org.openmetadata.catalog.type.EntitiesEdge; +import org.openmetadata.catalog.type.EntityLineage; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response.Status; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.openmetadata.catalog.util.EntityUtil.getEntityReference; +import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class LineageResourceTest extends CatalogApplicationTest { + private static final Logger LOG = LoggerFactory.getLogger(LineageResourceTest.class); + public static final List TABLES = new ArrayList<>(); + public static final int TABLE_COUNT = 10; + + @BeforeAll + public static void setup(TestInfo test) throws HttpResponseException { + TableResourceTest.setup(test); // Initialize TableResourceTest for using helper methods + // Create TABLE_COUNT number of tables + for (int i = 0; i < TABLE_COUNT; i++) { + CreateTable createTable = TableResourceTest.create(test, i); + TABLES.add(TableResourceTest.createTable(createTable, adminAuthHeaders())); + } + } + + @Test + public void put_addLineageForInvalidEntities() throws HttpResponseException { + // Add lineage table4-->table5 + addEdge(TABLES.get(4), TABLES.get(5)); + + // Add lineage table5-->table6 + addEdge(TABLES.get(5), TABLES.get(6)); + addEdge(TABLES.get(5), TABLES.get(6)); // PUT operation again with the same edge + + // + // Add edges to this lineage graph + // table2--> -->table9 + // table0-->table3-->table4-->table5->table6->table7 + // table1--> -->table8 + addEdge(TABLES.get(0), TABLES.get(3)); + addEdge(TABLES.get(2), TABLES.get(4)); + addEdge(TABLES.get(3), TABLES.get(4)); + addEdge(TABLES.get(1), TABLES.get(4)); + addEdge(TABLES.get(4), TABLES.get(9)); + addEdge(TABLES.get(4), TABLES.get(5)); + addEdge(TABLES.get(4), TABLES.get(8)); + addEdge(TABLES.get(5), TABLES.get(6)); + addEdge(TABLES.get(6), TABLES.get(7)); + + // Test table4 lineage + Edge[] expectedUpstreamEdges = {getEdge(TABLES.get(2), TABLES.get(4)), getEdge(TABLES.get(3), TABLES.get(4)), + getEdge(TABLES.get(1), TABLES.get(4)), getEdge(TABLES.get(0), TABLES.get(3))}; + Edge[] expectedDownstreamEdges = {getEdge(TABLES.get(4), TABLES.get(9)), getEdge(TABLES.get(4), + TABLES.get(5)), getEdge(TABLES.get(4), TABLES.get(8)), getEdge(TABLES.get(5), TABLES.get(6)), + getEdge(TABLES.get(6), TABLES.get(7))}; + + // GET lineage by id + EntityLineage lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 3, 3, adminAuthHeaders()); + assertEdges(lineage, expectedUpstreamEdges, expectedDownstreamEdges); + + // GET lineage by fqn + lineage = getLineageByName(Entity.TABLE, TABLES.get(4).getFullyQualifiedName(), 3, 3, adminAuthHeaders()); + assertEdges(lineage, expectedUpstreamEdges, expectedDownstreamEdges); + + // Test table4 partial lineage with various upstream and downstream depths + lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 0, 0, adminAuthHeaders()); + assertEdges(lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 0), + Arrays.copyOfRange(expectedDownstreamEdges, 0, 0)); + lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 1, 1, adminAuthHeaders()); + assertEdges(lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 3), + Arrays.copyOfRange(expectedDownstreamEdges, 0, 3)); + lineage = getLineage(Entity.TABLE, TABLES.get(4).getId(), 2, 2, adminAuthHeaders()); + assertEdges(lineage, Arrays.copyOfRange(expectedUpstreamEdges, 0, 4), + Arrays.copyOfRange(expectedDownstreamEdges, 0, 4)); + } + + public Edge getEdge(Table from, Table to) { + return getEdge(from.getId(), to.getId()); + } + + public static Edge getEdge(UUID from, UUID to) { + return new Edge().withFrom(from).withTo(to); + } + + public void addEdge(Table from, Table to) throws HttpResponseException { + EntitiesEdge edge = new EntitiesEdge().withFrom(getEntityReference(from)).withTo(getEntityReference(to)); + AddLineage addLineage = new AddLineage().withEdge(edge); + addLineageAndCheck(addLineage, adminAuthHeaders()); + } + + public static void addLineageAndCheck(AddLineage addLineage, Map authHeaders) + throws HttpResponseException { + addLineage(addLineage, authHeaders); + validateLineage(addLineage, authHeaders); + } + + public static void addLineage(AddLineage addLineage, Map authHeaders) + throws HttpResponseException { + TestUtils.put(CatalogApplicationTest.getResource("lineage"), addLineage, Status.OK, authHeaders); + } + + private static void validateLineage(AddLineage addLineage, Map authHeaders) + throws HttpResponseException { + EntityReference from = addLineage.getEdge().getFrom(); + EntityReference to = addLineage.getEdge().getTo(); + Edge expectedEdge = getEdge(from.getId(), to.getId()); + + // Check fromEntity ---> toEntity downstream edge is returned + EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders); + assertEdge(lineage, expectedEdge, true); + + // Check fromEntity ---> toEntity upstream edge is returned + lineage = getLineage(to.getType(), to.getId(), 1, 0, authHeaders); + assertEdge(lineage, expectedEdge, false); + } + + private static void validateLineage(EntityLineage lineage) { + TestUtils.validateEntityReference(lineage.getEntity()); + lineage.getNodes().forEach(TestUtils::validateEntityReference); + + // Total number of from and to points in an edge must be equal to the number of nodes + List ids = new ArrayList<>(); + lineage.getUpstreamEdges().forEach(edge -> {ids.add(edge.getFrom()); ids.add(edge.getTo());}); + lineage.getDownstreamEdges().forEach(edge -> {ids.add(edge.getFrom()); ids.add(edge.getTo());}); + if (lineage.getNodes().size() != 0) { + assertEquals((int) ids.stream().distinct().count(), lineage.getNodes().size() + 1); + } + } + + public static EntityLineage getLineage(String entity, UUID id, Integer upstreamDepth, + Integer downStreamDepth, Map authHeaders) + throws HttpResponseException { + WebTarget target = getResource("lineage/" + entity + "/" + id); + target = upstreamDepth != null ? target.queryParam("upstreamDepth", upstreamDepth) : target; + target = downStreamDepth != null ? target.queryParam("downstreamDepth", downStreamDepth) : target; + EntityLineage lineage = TestUtils.get(target, EntityLineage.class, authHeaders); + validateLineage((lineage)); + return lineage; + } + + public static EntityLineage getLineageByName(String entity, String fqn, Integer upstreamDepth, + Integer downStreamDepth, Map authHeaders) + throws HttpResponseException { + WebTarget target = getResource("lineage/" + entity + "/name/" + fqn); + target = upstreamDepth != null ? target.queryParam("upstreamDepth", upstreamDepth) : target; + target = downStreamDepth != null ? target.queryParam("downstreamDepth", downStreamDepth) : target; + EntityLineage lineage = TestUtils.get(target, EntityLineage.class, authHeaders); + validateLineage((lineage)); + return lineage; + } + + public static void assertEdge(EntityLineage lineage, Edge expectedEdge, boolean downstream) { + if (downstream) { + assertTrue(lineage.getDownstreamEdges().contains(expectedEdge)); + } else { + assertTrue(lineage.getUpstreamEdges().contains(expectedEdge)); + } + } + + public static void assertEdges(EntityLineage lineage, Edge[] expectedUpstreamEdges, Edge[] expectedDownstreamEdges) { + assertEquals(lineage.getUpstreamEdges().size(), expectedUpstreamEdges.length); + for (Edge expectedUpstreamEdge : expectedUpstreamEdges) { + assertTrue(lineage.getUpstreamEdges().contains(expectedUpstreamEdge)); + } + assertEquals(lineage.getDownstreamEdges().size(), expectedDownstreamEdges.length); + for (Edge expectedDownstreamEdge : expectedDownstreamEdges) { + assertTrue(lineage.getDownstreamEdges().contains(expectedDownstreamEdge)); + } + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java index 7d3bd6e1a3b..2c3eb8149b0 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/util/TestUtils.java @@ -195,7 +195,8 @@ public final class TestUtils { // Ensure data entities use fully qualified name if (List.of("table", "database", "metrics", "dashboard", "pipeline", "report", "topic", "chart") .contains(ref.getType())) { - assertTrue(ref.getName().contains(".")); // FullyQualifiedName has "." as separator + // FullyQualifiedName has "." as separator + assertTrue(ref.getName().contains("."), "entity name is not fully qualified - " + ref.getName()); } }