From 9d255962278b2f406ff2d884fb367675fc7b00c9 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Tue, 24 May 2022 16:00:12 -0700 Subject: [PATCH] Fixes #5122 Add backend and API support for column-level lineage (#5123) * Fixes #5122 Add backend and API support for column-level lineage * amend * Fix test failures --- .../v002__create_db_connection_info.sql | 4 + .../v002__create_db_connection_info.sql | 2 + .../catalog/jdbi3/CollectionDAO.java | 93 ++++++++-------- .../catalog/jdbi3/EntityRepository.java | 67 ++++++------ .../catalog/jdbi3/LineageRepository.java | 103 +++++++++++++----- .../catalog/jdbi3/TableRepository.java | 38 +++---- .../catalog/jdbi3/TeamRepository.java | 2 +- .../catalog/jdbi3/UserRepository.java | 7 +- .../resources/lineage/LineageResource.java | 1 - .../catalog/resources/types/TypeResource.java | 2 +- .../openmetadata/catalog/util/EntityUtil.java | 15 +++ .../resources/json/schema/type/basic.json | 4 + .../json/schema/type/entityLineage.json | 52 +++++++++ .../catalog/resources/EntityResourceTest.java | 12 +- .../resources/feeds/FeedResourceTest.java | 7 +- .../lineage/LineageResourceTest.java | 92 +++++++++++++--- .../resources/teams/UserResourceTest.java | 4 +- ingestion-core/src/metadata/_version.py | 2 +- .../resources/json/schema/entity/type.json | 8 -- 19 files changed, 350 insertions(+), 165 deletions(-) diff --git a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v002__create_db_connection_info.sql b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v002__create_db_connection_info.sql index 9302363e32d..a7eeaff226b 100644 --- a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v002__create_db_connection_info.sql +++ b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v002__create_db_connection_info.sql @@ -11,3 +11,7 @@ CREATE TABLE IF NOT EXISTS type_entity ( ALTER TABLE webhook_entity DROP COLUMN deleted; + +ALTER TABLE entity_relationship +DROP INDEX edge_index; + diff --git a/bootstrap/sql/org.postgresql.Driver/v002__create_db_connection_info.sql b/bootstrap/sql/org.postgresql.Driver/v002__create_db_connection_info.sql index 98c97f6cd08..0a50d4be94e 100644 --- a/bootstrap/sql/org.postgresql.Driver/v002__create_db_connection_info.sql +++ b/bootstrap/sql/org.postgresql.Driver/v002__create_db_connection_info.sql @@ -11,3 +11,5 @@ CREATE TABLE IF NOT EXISTS type_entity ( ALTER TABLE webhook_entity DROP COLUMN deleted; + +DROP INDEX entity_relationship_edge_index; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java index a9cd1823659..27dae24d121 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/CollectionDAO.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import lombok.Builder; import lombok.Getter; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; @@ -63,7 +64,6 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO.TagUsageDAO.TagLabelMapper; import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper; import org.openmetadata.catalog.jdbi3.locator.ConnectionAwareSqlQuery; import org.openmetadata.catalog.jdbi3.locator.ConnectionAwareSqlUpdate; -import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.Relationship; import org.openmetadata.catalog.type.TagCategory; import org.openmetadata.catalog.type.TagLabel; @@ -284,11 +284,14 @@ public interface CollectionDAO { interface EntityExtensionDAO { @ConnectionAwareSqlUpdate( value = - "REPLACE INTO entity_extension(id, extension, jsonSchema, json) VALUES (:id, :extension, :jsonSchema, :json)", + "REPLACE INTO entity_extension(id, extension, jsonSchema, json) " + + "VALUES (:id, :extension, :jsonSchema, :json)", connectionType = MYSQL) @ConnectionAwareSqlUpdate( value = - "INSERT INTO entity_extension(id, extension, jsonSchema, json) VALUES (:id, :extension, :jsonSchema, (:json :: jsonb)) ON CONFLICT (id, extension) DO UPDATE SET jsonSchema = EXCLUDED.jsonSchema, json = EXCLUDED.json", + "INSERT INTO entity_extension(id, extension, jsonSchema, json) " + + "VALUES (:id, :extension, :jsonSchema, (:json :: jsonb)) " + + "ON CONFLICT (id, extension) DO UPDATE SET jsonSchema = EXCLUDED.jsonSchema, json = EXCLUDED.json", connectionType = POSTGRES) void insert( @Bind("id") String id, @@ -336,18 +339,12 @@ public interface CollectionDAO { } } - class FromEntityReferenceMapper implements RowMapper { - @Override - public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { - return new EntityReference().withId(UUID.fromString(rs.getString("fromId"))).withType(rs.getString("fromEntity")); - } - } - - class ToEntityReferenceMapper implements RowMapper { - @Override - public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { - return new EntityReference().withId(UUID.fromString(rs.getString("toId"))).withType(rs.getString("toEntity")); - } + @Getter + @Builder + class EntityRelationshipRecord { + private UUID id; + private String type; + private String json; } interface EntityRelationshipDAO { @@ -355,40 +352,23 @@ public interface CollectionDAO { insert(fromId, toId, fromEntity, toEntity, relation, null); } - default int insert(UUID fromId, UUID toId, String fromEntity, String toEntity, int relation, String json) { - return insert(fromId.toString(), toId.toString(), fromEntity, toEntity, relation, json); + default void insert(UUID fromId, UUID toId, String fromEntity, String toEntity, int relation, String json) { + insert(fromId.toString(), toId.toString(), fromEntity, toEntity, relation, json); } @ConnectionAwareSqlUpdate( value = - "INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) " - + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation)", - connectionType = MYSQL) - @ConnectionAwareSqlUpdate( - value = - "INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) " - + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation) " - + "ON CONFLICT (fromId, toId, relation) DO NOTHING", - connectionType = POSTGRES) - int insert( - @Bind("fromId") String fromId, - @Bind("toId") String toId, - @Bind("fromEntity") String fromEntity, - @Bind("toEntity") String toEntity, - @Bind("relation") int relation); - - @ConnectionAwareSqlUpdate( - value = - "INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) " - + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation, :json)", + "INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) " + + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation, :json) " + + "ON DUPLICATE KEY UPDATE json = :json", connectionType = MYSQL) @ConnectionAwareSqlUpdate( value = "INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) VALUES " + "(:fromId, :toId, :fromEntity, :toEntity, :relation, (:json :: jsonb)) " - + "ON CONFLICT (fromId, toId, relation) DO NOTHING", + + "ON CONFLICT (fromId, toId, relation) DO UPDATE SET json = EXCLUDED.json", connectionType = POSTGRES) - int insert( + void insert( @Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("fromEntity") String fromEntity, @@ -400,13 +380,14 @@ public interface CollectionDAO { // Find to operations // @SqlQuery( - "SELECT toId, toEntity FROM entity_relationship " + "SELECT toId, toEntity, json FROM entity_relationship " + "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation " + "ORDER BY toId") - @RegisterRowMapper(ToEntityReferenceMapper.class) - List findTo( + @RegisterRowMapper(ToRelationshipMapper.class) + List findTo( @Bind("fromId") String fromId, @Bind("fromEntity") String fromEntity, @Bind("relation") int relation); + // TODO delete this @SqlQuery( "SELECT toId FROM entity_relationship " + "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation AND toEntity = :toEntity " @@ -442,11 +423,11 @@ public interface CollectionDAO { @Bind("fromEntity") String fromEntity); @SqlQuery( - "SELECT fromId, fromEntity FROM entity_relationship " + "SELECT fromId, fromEntity, json FROM entity_relationship " + "WHERE toId = :toId AND toEntity = :toEntity AND relation = :relation " + "ORDER BY fromId") - @RegisterRowMapper(FromEntityReferenceMapper.class) - List findFrom( + @RegisterRowMapper(FromRelationshipMapper.class) + List findFrom( @Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation); // @@ -487,6 +468,28 @@ public interface CollectionDAO { "DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR " + "(fromId = :id AND fromEntity = :entity)") void deleteAll(@Bind("id") String id, @Bind("entity") String entity); + + class FromRelationshipMapper implements RowMapper { + @Override + public EntityRelationshipRecord map(ResultSet rs, StatementContext ctx) throws SQLException { + return EntityRelationshipRecord.builder() + .id(UUID.fromString(rs.getString("fromId"))) + .type(rs.getString("fromEntity")) + .json(rs.getString("json")) + .build(); + } + } + + class ToRelationshipMapper implements RowMapper { + @Override + public EntityRelationshipRecord map(ResultSet rs, StatementContext ctx) throws SQLException { + return EntityRelationshipRecord.builder() + .id(UUID.fromString(rs.getString("toId"))) + .type(rs.getString("toEntity")) + .json(rs.getString("json")) + .build(); + } + } } interface FeedDAO { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java index c1153597a69..e85b4c5d915 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRepository.java @@ -67,6 +67,7 @@ import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.exception.UnhandledServerException; +import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord; import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityVersionPair; import org.openmetadata.catalog.jdbi3.CollectionDAO.ExtensionRecord; import org.openmetadata.catalog.jdbi3.TableRepository.TableUpdater; @@ -484,7 +485,7 @@ public abstract class EntityRepository { } // Add relationship - int added = addRelationship(userId, entityId, Entity.USER, entityType, Relationship.FOLLOWS); + addRelationship(userId, entityId, Entity.USER, entityType, Relationship.FOLLOWS); ChangeDescription change = new ChangeDescription().withPreviousVersion(entity.getVersion()); change @@ -504,7 +505,7 @@ public abstract class EntityRepository { .withCurrentVersion(entity.getVersion()) .withPreviousVersion(change.getPreviousVersion()); - return new PutResponse<>(added > 0 ? Status.CREATED : Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); + return new PutResponse<>(Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); } public final DeleteResponse delete(String updatedBy, String id, boolean recursive, boolean hardDelete) @@ -552,10 +553,10 @@ public abstract class EntityRepository { private void deleteChildren(String id, boolean recursive, boolean hardDelete, String updatedBy) throws IOException { // If an entity being deleted contains other **non-deleted** children entities, it can't be deleted - List contains = + List records = daoCollection.relationshipDAO().findTo(id, entityType, Relationship.CONTAINS.ordinal()); - if (contains.isEmpty()) { + if (records.isEmpty()) { return; } // Entity being deleted contains children entities @@ -563,13 +564,9 @@ public abstract class EntityRepository { throw new IllegalArgumentException(CatalogExceptionMessage.entityIsNotEmpty(entityType)); } // Delete all the contained entities - for (EntityReference entityReference : contains) { - LOG.info( - "Recursively {} deleting {} {}", - hardDelete ? "hard" : "soft", - entityReference.getType(), - entityReference.getId()); - Entity.deleteEntity(updatedBy, entityReference.getType(), entityReference.getId(), true, hardDelete); + for (EntityRelationshipRecord record : records) { + LOG.info("Recursively {} deleting {} {}", hardDelete ? "hard" : "soft", record.getType(), record.getId()); + Entity.deleteEntity(updatedBy, record.getType(), record.getId(), true, hardDelete); } } @@ -771,10 +768,10 @@ public abstract class EntityRepository { if (!supportsFollower || entity == null) { return null; } - List followers = findFrom(entity.getId(), entityType, Relationship.FOLLOWS); - for (EntityReference follower : followers) { - User user = daoCollection.userDAO().findEntityById(follower.getId(), ALL); - follower.withName(user.getName()).withDeleted(user.getDeleted()).withFullyQualifiedName(user.getName()); + List followers = new ArrayList<>(); + List records = findFrom(entity.getId(), entityType, Relationship.FOLLOWS); + for (EntityRelationshipRecord record : records) { + followers.add(daoCollection.userDAO().findEntityReferenceById(record.getId(), ALL)); } return followers; } @@ -792,14 +789,14 @@ public abstract class EntityRepository { public void restoreEntity(String updatedBy, String entityType, UUID id) throws IOException { // If an entity being restored contains other **deleted** children entities, restore them - List contains = + List records = daoCollection.relationshipDAO().findTo(id.toString(), entityType, Relationship.CONTAINS.ordinal()); - if (!contains.isEmpty()) { + if (!records.isEmpty()) { // Restore all the contained entities - for (EntityReference entityReference : contains) { - LOG.info("Recursively restoring {} {}", entityReference.getType(), entityReference.getId()); - Entity.restoreEntity(updatedBy, entityReference.getType(), entityReference.getId()); + for (EntityRelationshipRecord record : records) { + LOG.info("Recursively restoring {} {}", record.getType(), record.getId()); + Entity.restoreEntity(updatedBy, record.getType(), record.getId()); } } @@ -810,16 +807,16 @@ public abstract class EntityRepository { dao.update(entity.getId(), JsonUtils.pojoToJson(entity)); } - public int addRelationship(UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship) { - return addRelationship(fromId, toId, fromEntity, toEntity, relationship, false); + public void addRelationship(UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship) { + addRelationship(fromId, toId, fromEntity, toEntity, relationship, false); } - public int addRelationship( + public void addRelationship( UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship, boolean bidirectional) { - return addRelationship(fromId, toId, fromEntity, toEntity, relationship, null, bidirectional); + addRelationship(fromId, toId, fromEntity, toEntity, relationship, null, bidirectional); } - public int addRelationship( + public void addRelationship( UUID fromId, UUID toId, String fromEntity, @@ -835,7 +832,7 @@ public abstract class EntityRepository { from = toId; to = fromId; } - return daoCollection.relationshipDAO().insert(from, to, fromEntity, toEntity, relationship.ordinal(), json); + daoCollection.relationshipDAO().insert(from, to, fromEntity, toEntity, relationship.ordinal(), json); } public List findBoth(UUID entity1, String entityType1, Relationship relationship, String entity2) { @@ -852,15 +849,15 @@ public abstract class EntityRepository { .findFrom(toId.toString(), toEntityType, relationship.ordinal(), fromEntityType); } - public List findFrom(UUID toId, String toEntityType, Relationship relationship) { + public List findFrom(UUID toId, String toEntityType, Relationship relationship) { return daoCollection.relationshipDAO().findFrom(toId.toString(), toEntityType, relationship.ordinal()); } public EntityReference getContainer(UUID toId, String toEntityType) throws IOException { - List refs = findFrom(toId, toEntityType, Relationship.CONTAINS); + List records = findFrom(toId, toEntityType, Relationship.CONTAINS); // An entity can have only one container - ensureSingleRelationship(toEntityType, toId, refs, "container", true); - return Entity.getEntityReferenceById(refs.get(0).getType(), refs.get(0).getId(), ALL); + ensureSingleRelationship(toEntityType, toId, records, "container", true); + return Entity.getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL); } public void ensureSingleRelationship( @@ -877,7 +874,7 @@ public abstract class EntityRepository { } } - public List findTo(UUID fromId, String fromEntityType, Relationship relationship, String toEntityType) { + public final List findTo(UUID fromId, String fromEntityType, Relationship relationship, String toEntityType) { return daoCollection .relationshipDAO() .findTo(fromId.toString(), fromEntityType, relationship.ordinal(), toEntityType); @@ -916,9 +913,11 @@ public abstract class EntityRepository { if (!supportsOwner) { return null; } - List refs = findFrom(entity.getId(), entityType, Relationship.OWNS); - ensureSingleRelationship(entityType, entity.getId(), refs, "owners", false); - return refs.isEmpty() ? null : getOwner(refs.get(0)); + List records = findFrom(entity.getId(), entityType, Relationship.OWNS); + ensureSingleRelationship(entityType, entity.getId(), records, "owners", false); + return records.isEmpty() + ? null + : getOwner(new EntityReference().withId(records.get(0).getId()).withType(records.get(0).getType())); } public EntityReference getOwner(EntityReference ref) throws IOException { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java index 84acbaea132..c7de6f83419 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java @@ -13,6 +13,8 @@ package org.openmetadata.catalog.jdbi3; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -21,11 +23,17 @@ import java.util.stream.Collectors; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.api.lineage.AddLineage; +import org.openmetadata.catalog.entity.data.Table; +import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord; +import org.openmetadata.catalog.type.ColumnLineage; import org.openmetadata.catalog.type.Edge; import org.openmetadata.catalog.type.EntityLineage; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.Include; +import org.openmetadata.catalog.type.LineageDetails; import org.openmetadata.catalog.type.Relationship; +import org.openmetadata.catalog.util.FullyQualifiedName; +import org.openmetadata.catalog.util.JsonUtils; public class LineageRepository { private final CollectionDAO dao; @@ -57,9 +65,40 @@ public class LineageRepository { EntityReference to = addLineage.getEdge().getToEntity(); to = Entity.getEntityReferenceById(to.getType(), to.getId(), Include.NON_DELETED); + // Validate lineage details + String detailsJson = validateLineageDetails(from, to, addLineage.getEdge().getLineageDetails()); + // Finally, add lineage relationship dao.relationshipDAO() - .insert(from.getId(), to.getId(), from.getType(), to.getType(), Relationship.UPSTREAM.ordinal()); + .insert(from.getId(), to.getId(), from.getType(), to.getType(), Relationship.UPSTREAM.ordinal(), detailsJson); + } + + private String validateLineageDetails(EntityReference from, EntityReference to, LineageDetails details) + throws IOException { + if (details == null || listOrEmpty(details.getColumnsLineage()).isEmpty()) { + return null; + } + + List columnsLineage = details.getColumnsLineage(); + if (!from.getType().equals(Entity.TABLE) || !to.getType().equals(Entity.TABLE)) { + throw new IllegalArgumentException("Column level lineage is only allowed between two tables."); + } + + Table fromTable = dao.tableDAO().findEntityById(from.getId()); + Table toTable = dao.tableDAO().findEntityById(to.getId()); + for (ColumnLineage columnLineage : columnsLineage) { + for (String fromColumn : columnLineage.getFromColumns()) { + // From column belongs to the fromNode + if (fromColumn.startsWith(fromTable.getFullyQualifiedName())) { + TableRepository.validateColumnFQN(fromTable, fromColumn); + } else { + Table otherTable = dao.tableDAO().findEntityByName(FullyQualifiedName.getTableFQN(fromColumn)); + TableRepository.validateColumnFQN(otherTable, fromColumn); + } + } + TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn()); + } + return JsonUtils.pojoToJson(details); } @Transaction @@ -89,54 +128,66 @@ public class LineageRepository { .withNodes(entities) .withUpstreamEdges(new ArrayList<>()) .withDownstreamEdges(new ArrayList<>()); - addUpstreamLineage(primary.getId(), primary.getType(), lineage, upstreamDepth); - addDownstreamLineage(primary.getId(), primary.getType(), lineage, downstreamDepth); + getUpstreamLineage(primary.getId(), primary.getType(), lineage, upstreamDepth); + getDownstreamLineage(primary.getId(), primary.getType(), lineage, downstreamDepth); // Remove duplicate nodes lineage.withNodes(lineage.getNodes().stream().distinct().collect(Collectors.toList())); - - // Add entityReference details - for (int i = 0; i < lineage.getNodes().size(); i++) { - EntityReference ref = lineage.getNodes().get(i); - ref = Entity.getEntityReferenceById(ref.getType(), ref.getId(), Include.NON_DELETED); - lineage.getNodes().set(i, ref); - } return lineage; } - private void addUpstreamLineage(UUID id, String entityType, EntityLineage lineage, int upstreamDepth) { + private void getUpstreamLineage(UUID id, String entityType, EntityLineage lineage, int upstreamDepth) + throws IOException { if (upstreamDepth == 0) { return; } // from this id ---> find other ids - List upstreamEntities = + List records = dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); - lineage.getNodes().addAll(upstreamEntities); + + final List upstreamEntityReferences = new ArrayList<>(); + for (EntityRelationshipRecord record : records) { + EntityReference ref = Entity.getEntityReferenceById(record.getType(), record.getId(), Include.ALL); + LineageDetails lineageDetails = JsonUtils.readValue(record.getJson(), LineageDetails.class); + upstreamEntityReferences.add(ref); + lineage + .getUpstreamEdges() + .add(new Edge().withFromEntity(ref.getId()).withToEntity(id).withLineageDetails(lineageDetails)); + } + + lineage.getNodes().addAll(upstreamEntityReferences); upstreamDepth--; - for (EntityReference upstreamEntity : upstreamEntities) { - lineage.getUpstreamEdges().add(new Edge().withFromEntity(upstreamEntity.getId()).withToEntity(id)); - addUpstreamLineage( - upstreamEntity.getId(), - upstreamEntity.getType(), - lineage, - upstreamDepth); // Recursively add upstream nodes and edges + // Recursively add upstream nodes and edges + for (EntityReference entity : upstreamEntityReferences) { + getUpstreamLineage(entity.getId(), entity.getType(), lineage, upstreamDepth); } } - private void addDownstreamLineage(UUID id, String entityType, EntityLineage lineage, int downstreamDepth) { + private void getDownstreamLineage(UUID id, String entityType, EntityLineage lineage, int downstreamDepth) + throws IOException { if (downstreamDepth == 0) { return; } // from other ids ---> to this id - List downStreamEntities = + List records = dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); - lineage.getNodes().addAll(downStreamEntities); + + final List downstreamEntityReferences = new ArrayList<>(); + for (EntityRelationshipRecord record : records) { + EntityReference ref = Entity.getEntityReferenceById(record.getType(), record.getId(), Include.ALL); + LineageDetails lineageDetails = JsonUtils.readValue(record.getJson(), LineageDetails.class); + downstreamEntityReferences.add(ref); + lineage + .getDownstreamEdges() + .add(new Edge().withToEntity(ref.getId()).withFromEntity(id).withLineageDetails(lineageDetails)); + } + lineage.getNodes().addAll(downstreamEntityReferences); downstreamDepth--; - for (EntityReference entity : downStreamEntities) { - lineage.getDownstreamEdges().add(new Edge().withToEntity(entity.getId()).withFromEntity(id)); - addDownstreamLineage(entity.getId(), entity.getType(), lineage, downstreamDepth); + // Recursively add upstream nodes and edges + for (EntityReference entity : downstreamEntityReferences) { + getDownstreamLineage(entity.getId(), entity.getType(), lineage, downstreamDepth); } } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java index 0c4d436c6a1..90b85fe21ed 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java @@ -86,13 +86,13 @@ public class TableRepository extends EntityRepository { // Table fields that can be updated in a PUT request static final String TABLE_UPDATE_FIELDS = "owner,tags,tableConstraints,tablePartition,dataModel,profileSample"; - public TableRepository(CollectionDAO dao) { + public TableRepository(CollectionDAO daoCollection) { super( TableResource.COLLECTION_PATH, TABLE, Table.class, - dao.tableDAO(), - dao, + daoCollection.tableDAO(), + daoCollection, TABLE_PATCH_FIELDS, TABLE_UPDATE_FIELDS); } @@ -147,7 +147,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addJoins(UUID tableId, TableJoins joins) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); if (!CommonUtil.dateInRange(RestUtil.DATE_FORMAT, joins.getStartDate(), 0, 30)) { throw new IllegalArgumentException("Date range can only include past 30 days starting today"); } @@ -169,7 +169,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addSampleData(UUID tableId, TableData tableData) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); // Validate all the columns for (String columnName : tableData.getColumns()) { @@ -194,7 +194,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addTableProfileData(UUID tableId, TableProfile tableProfile) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); List storedTableProfiles = getTableProfile(table); Map storedMapTableProfiles = new HashMap<>(); @@ -219,7 +219,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addLocation(UUID tableId, UUID locationId) throws IOException { - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); EntityReference location = daoCollection.locationDAO().findEntityReferenceById(locationId); // A table has only one location. deleteFrom(tableId, TABLE, Relationship.HAS, LOCATION); @@ -237,7 +237,7 @@ public class TableRepository extends EntityRepository
{ } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); List storedQueries = getQueries(table); Map storedMapQueries = new HashMap<>(); if (storedQueries != null) { @@ -257,7 +257,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addTableTest(UUID tableId, TableTest tableTest) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); // if ID is not passed we treat it as a new test case being added List storedTableTests = getTableTests(table); // we will override any test case name passed by user/client with tableName + testType @@ -294,7 +294,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table deleteTableTest(UUID tableId, String tableTestType) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); // if ID is not passed we treat it as a new test case being added List storedTableTests = getTableTests(table); // we will override any test case name passed by user/client with tableName + testType @@ -321,7 +321,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addColumnTest(UUID tableId, ColumnTest columnTest) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); String columnName = columnTest.getColumnName(); validateColumn(table, columnName); // we will override any test case name passed by user/client with columnName + testType @@ -367,7 +367,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table deleteColumnTest(UUID tableId, String columnName, String columnTestType) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); validateColumn(table, columnName); // we will override any test case name passed by user/client with columnName + testType // our assumption is there is only one instance of a test type as of now. @@ -403,7 +403,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addCustomMetric(UUID tableId, CustomMetric customMetric) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); String columnName = customMetric.getColumnName(); validateColumn(table, columnName); @@ -441,7 +441,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table deleteCustomMetric(UUID tableId, String columnName, String metricName) throws IOException { // Validate the request content - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); validateColumn(table, columnName); // Override any custom metric definition with the same name @@ -475,7 +475,7 @@ public class TableRepository extends EntityRepository
{ @Transaction public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException { - Table table = daoCollection.tableDAO().findEntityById(tableId); + Table table = dao.findEntityById(tableId); table.withDataModel(dataModel); // Carry forward the table description from the model to table entity, if empty @@ -496,7 +496,7 @@ public class TableRepository extends EntityRepository
{ stored.setDescription(modelColumn.getDescription()); } } - daoCollection.tableDAO().update(table.getId(), JsonUtils.pojoToJson(table)); + dao.update(table.getId(), JsonUtils.pojoToJson(table)); setFields(table, Fields.EMPTY_FIELDS); return table; } @@ -647,7 +647,7 @@ public class TableRepository extends EntityRepository
{ } // Validate if a given column exists in the table - private void validateColumn(Table table, String columnName) { + public static void validateColumn(Table table, String columnName) { boolean validColumn = table.getColumns().stream().anyMatch(col -> col.getName().equals(columnName)); if (!validColumn) { throw new IllegalArgumentException("Invalid column name " + columnName); @@ -655,7 +655,7 @@ public class TableRepository extends EntityRepository
{ } // Validate if a given column exists in the table - private void validateColumnFQN(Table table, String columnFQN) { + public static void validateColumnFQN(Table table, String columnFQN) { boolean validColumn = false; for (Column column : table.getColumns()) { if (column.getFullyQualifiedName().equals(columnFQN)) { @@ -672,7 +672,7 @@ public class TableRepository extends EntityRepository
{ for (JoinedWith joinedWith : joinedWithList) { // Validate table String tableFQN = FullyQualifiedName.getTableFQN(joinedWith.getFullyQualifiedName()); - Table joinedWithTable = daoCollection.tableDAO().findEntityByName(tableFQN); + Table joinedWithTable = dao.findEntityByName(tableFQN); // Validate column validateColumnFQN(joinedWithTable, joinedWith.getFullyQualifiedName()); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java index c11442042a9..9b9de7e35b7 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java @@ -115,7 +115,7 @@ public class TeamRepository extends EntityRepository { private List getOwns(Team team) throws IOException { // Compile entities owned by the team - return EntityUtil.populateEntityReferences( + return EntityUtil.getEntityReferences( daoCollection.relationshipDAO().findTo(team.getId().toString(), TEAM, Relationship.OWNS.ordinal())); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java index 5703f24025d..5bcf01a051c 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java @@ -28,6 +28,7 @@ import org.openmetadata.catalog.entity.teams.AuthenticationMechanism; import org.openmetadata.catalog.entity.teams.Team; import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.exception.CatalogExceptionMessage; +import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord; import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.teams.authn.JWTAuthMechanism; import org.openmetadata.catalog.type.EntityReference; @@ -145,7 +146,7 @@ public class UserRepository extends EntityRepository { private List getOwns(User user) throws IOException { // Compile entities owned by the user - List ownedEntities = + List ownedEntities = daoCollection.relationshipDAO().findTo(user.getId().toString(), Entity.USER, Relationship.OWNS.ordinal()); // Compile entities owned by the team the user belongs to @@ -155,11 +156,11 @@ public class UserRepository extends EntityRepository { daoCollection.relationshipDAO().findTo(team.getId().toString(), Entity.TEAM, Relationship.OWNS.ordinal())); } // Populate details in entity reference - return EntityUtil.populateEntityReferences(ownedEntities); + return EntityUtil.getEntityReferences(ownedEntities); } private List getFollows(User user) throws IOException { - return EntityUtil.populateEntityReferences( + return EntityUtil.getEntityReferences( daoCollection.relationshipDAO().findTo(user.getId().toString(), Entity.USER, Relationship.FOLLOWS.ordinal())); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java index a0ad7f024fd..9b4c9cd9eaf 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/lineage/LineageResource.java @@ -166,7 +166,6 @@ public class LineageResource { @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid AddLineage addLineage) throws IOException { SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, null, MetadataOperation.UpdateLineage); - dao.addLineage(addLineage); return Response.status(Status.OK).build(); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/types/TypeResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/types/TypeResource.java index 9b7602af3ee..199e33975a5 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/types/TypeResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/types/TypeResource.java @@ -99,7 +99,7 @@ public class TypeResource extends EntityResource { types.forEach( type -> { type.withId(UUID.randomUUID()).withUpdatedBy("admin").withUpdatedAt(now); - LOG.info("Loading type {} with schema {}", type.getName(), type.getSchema()); + LOG.info("Loading type {}", type.getName()); try { this.dao.createOrUpdate(null, type); } catch (IOException e) { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index 11194c9cb12..29fbb8e659a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -19,6 +19,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.List; @@ -42,6 +43,7 @@ import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.type.CustomField; import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.EntityNotFoundException; +import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord; import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityVersionPair; import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO; import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink; @@ -170,6 +172,7 @@ public final class EntityUtil { return entity; } + // TODO delete public static List populateEntityReferences(List list) throws IOException { if (list != null) { for (EntityReference ref : list) { @@ -181,6 +184,18 @@ public final class EntityUtil { return list; } + public static List getEntityReferences(List list) throws IOException { + if (list == null) { + return Collections.emptyList(); + } + List refs = new ArrayList<>(); + for (EntityRelationshipRecord ref : list) { + refs.add(Entity.getEntityReferenceById(ref.getType(), ref.getId(), ALL)); + } + refs.sort(compareEntityReference); + return refs; + } + public static List populateEntityReferences(@NonNull List ids, @NonNull String entityType) throws IOException { List refs = new ArrayList<>(ids.size()); diff --git a/catalog-rest-service/src/main/resources/json/schema/type/basic.json b/catalog-rest-service/src/main/resources/json/schema/type/basic.json index 85c05ee0a92..12695c34015 100644 --- a/catalog-rest-service/src/main/resources/json/schema/type/basic.json +++ b/catalog-rest-service/src/main/resources/json/schema/type/basic.json @@ -101,6 +101,10 @@ "description": "SQL query statement. Example - 'select * from orders'.", "type": "string" }, + "sqlFunction": { + "description": "SQL function. Example - 'AVG()`, `COUNT()`, etc..", + "type": "string" + }, "markdown": { "$comment" : "@om-field-type", "description": "Text in Markdown format", diff --git a/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json b/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json index 89e41a1dece..11d0bb7554e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json +++ b/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json @@ -6,6 +6,48 @@ "type": "object", "javaType": "org.openmetadata.catalog.type.EntityLineage", "definitions": { + "columnLineage": { + "type" : "object", + "properties": { + "fromColumns" : { + "description": "One or more source columns identified by fully qualified column name used by transformation function to create destination column.", + "type" : "array", + "items" : { + "$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName" + } + }, + "toColumn" : { + "description": "Destination column identified by fully qualified column name created by the transformation of source columns.", + "$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName" + }, + "function" : { + "description": "Transformation function applied to source columns to create destination column. That is `function(fromColumns) -> toColumn`.", + "$ref" : "../type/basic.json#/definitions/sqlFunction" + } + } + }, + "lineageDetails" : { + "description" : "Lineage details including sqlQuery + pipeline + columnLineage.", + "type" : "object", + "properties": { + "sqlQuery" : { + "description": "SQL used for transformation.", + "$ref" : "../type/basic.json#/definitions/sqlQuery" + }, + "columnsLineage" : { + "description" : "Lineage information of how upstream columns were combined to get downstream column.", + "type" : "array", + "items" : { + "$ref" : "#/definitions/columnLineage" + } + }, + "pipeline" : { + "description": "Pipeline where the sqlQuery is periodically run.", + "$ref" : "../type/entityReference.json" + } + }, + "required": ["sqlQuery", "columnsLineage"] + }, "edge": { "description": "Edge in the lineage graph from one entity to another by entity IDs.", "type": "object", @@ -21,8 +63,13 @@ }, "description": { "$ref": "basic.json#/definitions/markdown" + }, + "lineageDetails": { + "description": "Optional lineageDetails provided only for table to table lineage edge.", + "$ref": "#/definitions/lineageDetails" } }, + "required": ["fromEntity", "toEntity"], "additionalProperties": false }, "entitiesEdge": { @@ -40,8 +87,13 @@ }, "description": { "$ref": "basic.json#/definitions/markdown" + }, + "lineageDetails": { + "description": "Optional lineageDetails provided only for table to table lineage edge.", + "$ref": "#/definitions/lineageDetails" } }, + "required": ["fromEntity", "toEntity"], "additionalProperties": false } }, diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java index 7ea7a6e654d..105e8acf514 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java @@ -585,7 +585,7 @@ public abstract class EntityResourceTest addAndCheckFollower(entityId, NON_EXISTENT_ENTITY, CREATED, 1, ADMIN_AUTH_HEADERS), + () -> addAndCheckFollower(entityId, NON_EXISTENT_ENTITY, OK, 1, ADMIN_AUTH_HEADERS), NOT_FOUND, entityNotFound(Entity.USER, NON_EXISTENT_ENTITY)); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java index 4056da7a7eb..e84ef274c16 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/feeds/FeedResourceTest.java @@ -14,9 +14,9 @@ package org.openmetadata.catalog.resources.feeds; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; -import static javax.ws.rs.core.Response.Status.CREATED; import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.OK; import static org.awaitility.Awaitility.with; import static org.awaitility.Durations.ONE_SECOND; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -726,7 +726,7 @@ public class FeedResourceTest extends CatalogApplicationTest { public static void followTable(UUID tableId, UUID userId, Map authHeaders) throws HttpResponseException { WebTarget target = getResource("tables/" + tableId + "/followers"); - TestUtils.put(target, userId.toString(), CREATED, authHeaders); + TestUtils.put(target, userId.toString(), OK, authHeaders); } public static ThreadList listThreadsWithFilter(String userId, String filterType, Map authHeaders) @@ -758,16 +758,13 @@ public class FeedResourceTest extends CatalogApplicationTest { protected final Thread patchThreadAndCheck(Thread updated, String originalJson, Map authHeaders) throws IOException { - // Validate information returned in patch response has the updates Thread returned = patchThread(updated.getId(), originalJson, updated, authHeaders); - compareEntities(updated, returned, authHeaders); // GET the entity and Validate information returned Thread getEntity = getThread(updated.getId(), authHeaders); compareEntities(updated, getEntity, authHeaders); - return returned; } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java index 38f0f07637e..4b90aa26775 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/lineage/LineageResourceTest.java @@ -13,6 +13,7 @@ package org.openmetadata.catalog.resources.lineage; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -36,6 +37,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestMethodOrder; @@ -50,10 +52,12 @@ import org.openmetadata.catalog.resources.databases.TableResourceTest; import org.openmetadata.catalog.resources.teams.RoleResource; import org.openmetadata.catalog.resources.teams.RoleResourceTest; import org.openmetadata.catalog.resources.teams.UserResourceTest; +import org.openmetadata.catalog.type.ColumnLineage; import org.openmetadata.catalog.type.Edge; import org.openmetadata.catalog.type.EntitiesEdge; import org.openmetadata.catalog.type.EntityLineage; import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.type.LineageDetails; import org.openmetadata.catalog.util.TestUtils; @Slf4j @@ -75,6 +79,7 @@ public class LineageResourceTest extends CatalogApplicationTest { } } + @Order(1) @Test void put_delete_lineage_withAuthorizer(TestInfo test) throws HttpResponseException { // Random user cannot update lineage. @@ -107,7 +112,9 @@ public class LineageResourceTest extends CatalogApplicationTest { if (shouldThrowException) { assertResponse( - () -> addEdge(TABLES.get(1), TABLES.get(2), authHeaders), FORBIDDEN, noPermission(userName, "UpdateLineage")); + () -> addEdge(TABLES.get(1), TABLES.get(2), null, authHeaders), + FORBIDDEN, + noPermission(userName, "UpdateLineage")); assertResponse( () -> deleteEdge(TABLES.get(1), TABLES.get(2), authHeaders), FORBIDDEN, @@ -115,10 +122,11 @@ public class LineageResourceTest extends CatalogApplicationTest { return; } - addEdge(TABLES.get(1), TABLES.get(2), authHeaders(userName + "@open-metadata.org")); + addEdge(TABLES.get(1), TABLES.get(2), null, authHeaders(userName + "@open-metadata.org")); deleteEdge(TABLES.get(1), TABLES.get(2), authHeaders(userName + "@open-metadata.org")); } + @Order(2) @Test void put_delete_lineage_200() throws HttpResponseException { // Add lineage table4-->table5 @@ -227,21 +235,79 @@ public class LineageResourceTest extends CatalogApplicationTest { Entity.TABLE, TABLES.get(4).getId(), TABLES.get(4).getFullyQualifiedName(), 2, 2, new Edge[0], new Edge[0]); } - public Edge getEdge(Table from, Table to) { - return getEdge(from.getId(), to.getId()); + @Order(3) + @Test + void put_lineageWithDetails() throws HttpResponseException { + // Add column lineage table1.c1 -> table2.c1 + LineageDetails details = new LineageDetails(); + String t1c1FQN = TABLES.get(0).getColumns().get(0).getFullyQualifiedName(); + String t1c2FQN = TABLES.get(0).getColumns().get(1).getFullyQualifiedName(); + String t1c3FQN = TABLES.get(0).getColumns().get(2).getFullyQualifiedName(); + String t2c1FQN = TABLES.get(1).getColumns().get(0).getFullyQualifiedName(); + String t2c2FQN = TABLES.get(1).getColumns().get(1).getFullyQualifiedName(); + String t2c3FQN = TABLES.get(1).getColumns().get(2).getFullyQualifiedName(); + String t3c1FQN = TABLES.get(2).getColumns().get(0).getFullyQualifiedName(); + String t3c2FQN = TABLES.get(2).getColumns().get(1).getFullyQualifiedName(); + String t3c3FQN = TABLES.get(2).getColumns().get(2).getFullyQualifiedName(); + + details.getColumnsLineage().add(new ColumnLineage().withFromColumns(List.of(t1c1FQN)).withToColumn(t2c1FQN)); + addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS); + + // Add invalid column lineage (from column or to column are invalid) + details + .getColumnsLineage() + .add(new ColumnLineage().withFromColumns(List.of("invalidColumn")).withToColumn(t2c1FQN)); + assertResponse( + () -> addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS), + BAD_REQUEST, + "Invalid fully qualified column name invalidColumn"); + details + .getColumnsLineage() + .add(new ColumnLineage().withFromColumns(List.of(t1c1FQN)).withToColumn("invalidColumn")); + assertResponse( + () -> addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS), + BAD_REQUEST, + "Invalid fully qualified column name invalidColumn"); + + // Add column level lineage with multiple fromColumns (t1c1 + t3c1) to t2c1 + details.getColumnsLineage().clear(); + details + .getColumnsLineage() + .add(new ColumnLineage().withFromColumns(List.of(t1c1FQN, t3c1FQN)).withToColumn(t2c1FQN)); + addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS); + + // Finally, add detailed column level lineage + details.getColumnsLineage().clear(); + List lineage = details.getColumnsLineage(); + lineage.add(new ColumnLineage().withFromColumns(List.of(t1c1FQN, t3c1FQN)).withToColumn(t2c1FQN)); + lineage.add(new ColumnLineage().withFromColumns(List.of(t1c2FQN, t3c2FQN)).withToColumn(t2c2FQN)); + lineage.add(new ColumnLineage().withFromColumns(List.of(t1c3FQN, t3c3FQN)).withToColumn(t2c3FQN)); + + addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS); } - public static Edge getEdge(UUID from, UUID to) { - return new Edge().withFromEntity(from).withToEntity(to); + public Edge getEdge(Table from, Table to) { + return getEdge(from.getId(), to.getId(), null); + } + + public static Edge getEdge(UUID from, UUID to, LineageDetails details) { + return new Edge().withFromEntity(from).withToEntity(to).withLineageDetails(details); } public void addEdge(Table from, Table to) throws HttpResponseException { - addEdge(from, to, ADMIN_AUTH_HEADERS); + addEdge(from, to, null, ADMIN_AUTH_HEADERS); } - private void addEdge(Table from, Table to, Map authHeaders) throws HttpResponseException { + private void addEdge(Table from, Table to, LineageDetails details, Map authHeaders) + throws HttpResponseException { + if (details != null) { + details.setSqlQuery("select *;"); + } EntitiesEdge edge = - new EntitiesEdge().withFromEntity(from.getEntityReference()).withToEntity(to.getEntityReference()); + new EntitiesEdge() + .withFromEntity(from.getEntityReference()) + .withToEntity(to.getEntityReference()) + .withLineageDetails(details); AddLineage addLineage = new AddLineage().withEdge(edge); addLineageAndCheck(addLineage, authHeaders); } @@ -288,13 +354,13 @@ public class LineageResourceTest extends CatalogApplicationTest { throws HttpResponseException { EntityReference from = addLineage.getEdge().getFromEntity(); EntityReference to = addLineage.getEdge().getToEntity(); - Edge expectedEdge = getEdge(from.getId(), to.getId()); + Edge expectedEdge = getEdge(from.getId(), to.getId(), addLineage.getEdge().getLineageDetails()); - // Check fromEntity ---> toEntity downstream edge is returned + // Check fromEntity ---> toEntity downstream edge of 'from' is returned EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders); assertEdge(lineage, expectedEdge, true); - // Check fromEntity ---> toEntity upstream edge is returned + // Check fromEntity ---> toEntity upstream edge 'to' is returned lineage = getLineage(to.getType(), to.getId(), 1, 0, authHeaders); assertEdge(lineage, expectedEdge, false); } @@ -303,7 +369,7 @@ public class LineageResourceTest extends CatalogApplicationTest { throws HttpResponseException { EntityReference from = deletedEdge.getFromEntity(); EntityReference to = deletedEdge.getToEntity(); - Edge expectedEdge = getEdge(from.getId(), to.getId()); + Edge expectedEdge = getEdge(from.getId(), to.getId(), deletedEdge.getLineageDetails()); // Check fromEntity ---> toEntity downstream edge is returned EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java index 97a7cb34dd2..abb0f7f368a 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java @@ -650,7 +650,7 @@ public class UserResourceTest extends EntityResourceTest { // Add user as follower to a table TableResourceTest tableResourceTest = new TableResourceTest(); Table table = tableResourceTest.createEntity(test, 1); - tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), CREATED, 1, ADMIN_AUTH_HEADERS); + tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), OK, 1, ADMIN_AUTH_HEADERS); // Delete user deleteAndCheckEntity(user, ADMIN_AUTH_HEADERS); @@ -662,7 +662,7 @@ public class UserResourceTest extends EntityResourceTest { // User can no longer follow other entities assertResponse( - () -> tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), CREATED, 1, ADMIN_AUTH_HEADERS), + () -> tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), OK, 1, ADMIN_AUTH_HEADERS), NOT_FOUND, entityNotFound("user", user.getId())); } diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index 7e90f529f15..6485da3eb8c 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 11, 0, dev=8) +__version__ = Version("metadata", 0, 11, 0, dev=9) __all__ = ["__version__"] diff --git a/openmetadata-core/src/main/resources/json/schema/entity/type.json b/openmetadata-core/src/main/resources/json/schema/entity/type.json index 38886324502..d5487eefa32 100644 --- a/openmetadata-core/src/main/resources/json/schema/entity/type.json +++ b/openmetadata-core/src/main/resources/json/schema/entity/type.json @@ -88,14 +88,6 @@ "category" : { "$ref" : "#/definitions/category" }, - "nameSpace": { - "description": "Namespace or group to which this type belongs to. For example, some of the field types commonly used can come from `basic` namespace. Some of the entities such as `table`, `database`, etc. come from `data` namespace.", - "type": "string", - "default" : "custom" - }, - "category" : { - "$ref" : "#/definitions/category" - }, "schema": { "description": "JSON schema encoded as string that defines the type. This will be used to validate the type values.", "$ref": "../type/basic.json#/definitions/jsonSchema"