diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 665fe34c391..30d5eaef155 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -642,6 +642,21 @@ public interface CollectionDAO { @Bind("relation") int relation, @Bind("toEntity") String toEntity); + @ConnectionAwareSqlQuery( + value = + "SELECT toId, toEntity, json FROM entity_relationship " + + "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) =:fromId AND relation = :relation " + + "ORDER BY toId", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT toId, toEntity, json FROM entity_relationship " + + "WHERE json->'pipeline'->>'id' =:fromId AND relation = :relation " + + "ORDER BY toId", + connectionType = POSTGRES) + @RegisterRowMapper(ToRelationshipMapper.class) + List findToPipeline(@Bind("fromId") String fromId, @Bind("relation") int relation); + // // Find from operations // @@ -664,6 +679,21 @@ public interface CollectionDAO { List findFrom( @Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation); + @ConnectionAwareSqlQuery( + value = + "SELECT fromId, fromEntity, json FROM entity_relationship " + + "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) = :toId AND relation = :relation " + + "ORDER BY fromId", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT fromId, fromEntity, json FROM entity_relationship " + + "WHERE json->'pipeline'->>'id' = :toId AND relation = :relation " + + "ORDER BY fromId", + connectionType = POSTGRES) + @RegisterRowMapper(FromRelationshipMapper.class) + List findFromPipleine(@Bind("toId") String toId, @Bind("relation") int relation); + @SqlQuery("SELECT fromId, fromEntity, json FROM entity_relationship " + "WHERE toId = :toId ORDER BY fromId") @RegisterRowMapper(FromRelationshipMapper.class) List findFrom(@Bind("toId") String toId); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 61e637e604a..7d2ab1a0037 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -152,10 +152,13 @@ public class LineageRepository { if (upstreamDepth == 0) { return; } - // from this id ---> find other ids - List records = - dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); - + List records = new ArrayList<>(); + // pipeline information is not maintained + if (entityType == Entity.PIPELINE) { + records = dao.relationshipDAO().findFromPipleine(id.toString(), Relationship.UPSTREAM.ordinal()); + } else { + records = dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); + } final List upstreamEntityReferences = new ArrayList<>(); for (EntityRelationshipRecord entityRelationshipRecord : records) { EntityReference ref = @@ -167,8 +170,8 @@ public class LineageRepository { .getUpstreamEdges() .add(new Edge().withFromEntity(ref.getId()).withToEntity(id).withLineageDetails(lineageDetails)); } - lineage.getNodes().addAll(upstreamEntityReferences); + // from this id ---> find other ids upstreamDepth--; // Recursively add upstream nodes and edges @@ -182,10 +185,12 @@ public class LineageRepository { if (downstreamDepth == 0) { return; } - // from other ids ---> to this id - List records = - dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); - + List records = new ArrayList<>(); + if (entityType == Entity.PIPELINE) { + records = dao.relationshipDAO().findToPipeline(id.toString(), Relationship.UPSTREAM.ordinal()); + } else { + records = dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); + } final List downstreamEntityReferences = new ArrayList<>(); for (EntityRelationshipRecord entityRelationshipRecord : records) { EntityReference ref =