From 01ec17f6f3617c516ad0a646f19daf89bf09edb9 Mon Sep 17 00:00:00 2001 From: 07Himank <112613760+07Himank@users.noreply.github.com> Date: Mon, 27 Feb 2023 11:59:22 +0530 Subject: [PATCH] Lineage task (#9499) * solved lineage data issue related to pipeline * fixed some bugs * formatting done --------- Co-authored-by: Himank Mehta Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Co-authored-by: Onkar Ravgan --- .../service/jdbi3/CollectionDAO.java | 30 +++++++++++++++++++ .../service/jdbi3/LineageRepository.java | 23 ++++++++------ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 665fe34c391..30d5eaef155 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -642,6 +642,21 @@ public interface CollectionDAO { @Bind("relation") int relation, @Bind("toEntity") String toEntity); + @ConnectionAwareSqlQuery( + value = + "SELECT toId, toEntity, json FROM entity_relationship " + + "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) =:fromId AND relation = :relation " + + "ORDER BY toId", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT toId, toEntity, json FROM entity_relationship " + + "WHERE json->'pipeline'->>'id' =:fromId AND relation = :relation " + + "ORDER BY toId", + connectionType = POSTGRES) + @RegisterRowMapper(ToRelationshipMapper.class) + List findToPipeline(@Bind("fromId") String fromId, @Bind("relation") int relation); + // // Find from operations // @@ -664,6 +679,21 @@ public interface CollectionDAO { List findFrom( @Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation); + @ConnectionAwareSqlQuery( + value = + "SELECT fromId, fromEntity, json FROM entity_relationship " + + "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) = :toId AND relation = :relation " + + "ORDER BY fromId", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT fromId, fromEntity, json FROM entity_relationship " + + "WHERE json->'pipeline'->>'id' = :toId AND relation = :relation " + + "ORDER BY fromId", + connectionType = POSTGRES) + @RegisterRowMapper(FromRelationshipMapper.class) + List findFromPipleine(@Bind("toId") String toId, @Bind("relation") int relation); + @SqlQuery("SELECT fromId, fromEntity, json FROM entity_relationship " + "WHERE toId = :toId ORDER BY fromId") @RegisterRowMapper(FromRelationshipMapper.class) List findFrom(@Bind("toId") String toId); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 61e637e604a..7d2ab1a0037 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -152,10 +152,13 @@ public class LineageRepository { if (upstreamDepth == 0) { return; } - // from this id ---> find other ids - List records = - dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); - + List records = new ArrayList<>(); + // pipeline information is not maintained + if (entityType == Entity.PIPELINE) { + records = dao.relationshipDAO().findFromPipleine(id.toString(), Relationship.UPSTREAM.ordinal()); + } else { + records = dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); + } final List upstreamEntityReferences = new ArrayList<>(); for (EntityRelationshipRecord entityRelationshipRecord : records) { EntityReference ref = @@ -167,8 +170,8 @@ public class LineageRepository { .getUpstreamEdges() .add(new Edge().withFromEntity(ref.getId()).withToEntity(id).withLineageDetails(lineageDetails)); } - lineage.getNodes().addAll(upstreamEntityReferences); + // from this id ---> find other ids upstreamDepth--; // Recursively add upstream nodes and edges @@ -182,10 +185,12 @@ public class LineageRepository { if (downstreamDepth == 0) { return; } - // from other ids ---> to this id - List records = - dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); - + List records = new ArrayList<>(); + if (entityType == Entity.PIPELINE) { + records = dao.relationshipDAO().findToPipeline(id.toString(), Relationship.UPSTREAM.ordinal()); + } else { + records = dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); + } final List downstreamEntityReferences = new ArrayList<>(); for (EntityRelationshipRecord entityRelationshipRecord : records) { EntityReference ref =