Lineage task (#9499)

* solved lineage data issue related to pipeline

* fixed some bugs

* formatting done

---------

Co-authored-by: Himank Mehta <himankmehta@Himanks-MacBook-Air.local>
Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com>
Co-authored-by: Onkar Ravgan <onkar.10r@gmail.com>
This commit is contained in:
07Himank 2023-02-27 11:59:22 +05:30 committed by GitHub
parent 3e7f890b3f
commit 01ec17f6f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 9 deletions

View File

@ -642,6 +642,21 @@ public interface CollectionDAO {
@Bind("relation") int relation, @Bind("relation") int relation,
@Bind("toEntity") String toEntity); @Bind("toEntity") String toEntity);
@ConnectionAwareSqlQuery(
value =
"SELECT toId, toEntity, json FROM entity_relationship "
+ "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) =:fromId AND relation = :relation "
+ "ORDER BY toId",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT toId, toEntity, json FROM entity_relationship "
+ "WHERE json->'pipeline'->>'id' =:fromId AND relation = :relation "
+ "ORDER BY toId",
connectionType = POSTGRES)
@RegisterRowMapper(ToRelationshipMapper.class)
List<EntityRelationshipRecord> findToPipeline(@Bind("fromId") String fromId, @Bind("relation") int relation);
// //
// Find from operations // Find from operations
// //
@ -664,6 +679,21 @@ public interface CollectionDAO {
List<EntityRelationshipRecord> findFrom( List<EntityRelationshipRecord> findFrom(
@Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation); @Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation);
@ConnectionAwareSqlQuery(
value =
"SELECT fromId, fromEntity, json FROM entity_relationship "
+ "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) = :toId AND relation = :relation "
+ "ORDER BY fromId",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT fromId, fromEntity, json FROM entity_relationship "
+ "WHERE json->'pipeline'->>'id' = :toId AND relation = :relation "
+ "ORDER BY fromId",
connectionType = POSTGRES)
@RegisterRowMapper(FromRelationshipMapper.class)
List<EntityRelationshipRecord> findFromPipleine(@Bind("toId") String toId, @Bind("relation") int relation);
@SqlQuery("SELECT fromId, fromEntity, json FROM entity_relationship " + "WHERE toId = :toId ORDER BY fromId") @SqlQuery("SELECT fromId, fromEntity, json FROM entity_relationship " + "WHERE toId = :toId ORDER BY fromId")
@RegisterRowMapper(FromRelationshipMapper.class) @RegisterRowMapper(FromRelationshipMapper.class)
List<EntityRelationshipRecord> findFrom(@Bind("toId") String toId); List<EntityRelationshipRecord> findFrom(@Bind("toId") String toId);

View File

@ -152,10 +152,13 @@ public class LineageRepository {
if (upstreamDepth == 0) { if (upstreamDepth == 0) {
return; return;
} }
// from this id ---> find other ids List<EntityRelationshipRecord> records = new ArrayList<>();
List<EntityRelationshipRecord> records = // pipeline information is not maintained
dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); if (entityType == Entity.PIPELINE) {
records = dao.relationshipDAO().findFromPipleine(id.toString(), Relationship.UPSTREAM.ordinal());
} else {
records = dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal());
}
final List<EntityReference> upstreamEntityReferences = new ArrayList<>(); final List<EntityReference> upstreamEntityReferences = new ArrayList<>();
for (EntityRelationshipRecord entityRelationshipRecord : records) { for (EntityRelationshipRecord entityRelationshipRecord : records) {
EntityReference ref = EntityReference ref =
@ -167,8 +170,8 @@ public class LineageRepository {
.getUpstreamEdges() .getUpstreamEdges()
.add(new Edge().withFromEntity(ref.getId()).withToEntity(id).withLineageDetails(lineageDetails)); .add(new Edge().withFromEntity(ref.getId()).withToEntity(id).withLineageDetails(lineageDetails));
} }
lineage.getNodes().addAll(upstreamEntityReferences); lineage.getNodes().addAll(upstreamEntityReferences);
// from this id ---> find other ids
upstreamDepth--; upstreamDepth--;
// Recursively add upstream nodes and edges // Recursively add upstream nodes and edges
@ -182,10 +185,12 @@ public class LineageRepository {
if (downstreamDepth == 0) { if (downstreamDepth == 0) {
return; return;
} }
// from other ids ---> to this id List<EntityRelationshipRecord> records = new ArrayList<>();
List<EntityRelationshipRecord> records = if (entityType == Entity.PIPELINE) {
dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); records = dao.relationshipDAO().findToPipeline(id.toString(), Relationship.UPSTREAM.ordinal());
} else {
records = dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal());
}
final List<EntityReference> downstreamEntityReferences = new ArrayList<>(); final List<EntityReference> downstreamEntityReferences = new ArrayList<>();
for (EntityRelationshipRecord entityRelationshipRecord : records) { for (EntityRelationshipRecord entityRelationshipRecord : records) {
EntityReference ref = EntityReference ref =