Fixes #5122 Add backend and API support for column-level lineage (#5123)

* Fixes #5122 Add backend and API support for column-level lineage

* amend

* Fix test failures
This commit is contained in:
Suresh Srinivas 2022-05-24 16:00:12 -07:00 committed by GitHub
parent 62e8959944
commit 9d25596227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 350 additions and 165 deletions

View File

@ -11,3 +11,7 @@ CREATE TABLE IF NOT EXISTS type_entity (
ALTER TABLE webhook_entity ALTER TABLE webhook_entity
DROP COLUMN deleted; DROP COLUMN deleted;
ALTER TABLE entity_relationship
DROP INDEX edge_index;

View File

@ -11,3 +11,5 @@ CREATE TABLE IF NOT EXISTS type_entity (
ALTER TABLE webhook_entity ALTER TABLE webhook_entity
DROP COLUMN deleted; DROP COLUMN deleted;
DROP INDEX entity_relationship_edge_index;

View File

@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext; import org.jdbi.v3.core.statement.StatementContext;
@ -63,7 +64,6 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO.TagUsageDAO.TagLabelMapper;
import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper; import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper;
import org.openmetadata.catalog.jdbi3.locator.ConnectionAwareSqlQuery; import org.openmetadata.catalog.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.catalog.jdbi3.locator.ConnectionAwareSqlUpdate; import org.openmetadata.catalog.jdbi3.locator.ConnectionAwareSqlUpdate;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.Relationship; import org.openmetadata.catalog.type.Relationship;
import org.openmetadata.catalog.type.TagCategory; import org.openmetadata.catalog.type.TagCategory;
import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.type.TagLabel;
@ -284,11 +284,14 @@ public interface CollectionDAO {
interface EntityExtensionDAO { interface EntityExtensionDAO {
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"REPLACE INTO entity_extension(id, extension, jsonSchema, json) VALUES (:id, :extension, :jsonSchema, :json)", "REPLACE INTO entity_extension(id, extension, jsonSchema, json) "
+ "VALUES (:id, :extension, :jsonSchema, :json)",
connectionType = MYSQL) connectionType = MYSQL)
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"INSERT INTO entity_extension(id, extension, jsonSchema, json) VALUES (:id, :extension, :jsonSchema, (:json :: jsonb)) ON CONFLICT (id, extension) DO UPDATE SET jsonSchema = EXCLUDED.jsonSchema, json = EXCLUDED.json", "INSERT INTO entity_extension(id, extension, jsonSchema, json) "
+ "VALUES (:id, :extension, :jsonSchema, (:json :: jsonb)) "
+ "ON CONFLICT (id, extension) DO UPDATE SET jsonSchema = EXCLUDED.jsonSchema, json = EXCLUDED.json",
connectionType = POSTGRES) connectionType = POSTGRES)
void insert( void insert(
@Bind("id") String id, @Bind("id") String id,
@ -336,18 +339,12 @@ public interface CollectionDAO {
} }
} }
class FromEntityReferenceMapper implements RowMapper<EntityReference> { @Getter
@Override @Builder
public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { class EntityRelationshipRecord {
return new EntityReference().withId(UUID.fromString(rs.getString("fromId"))).withType(rs.getString("fromEntity")); private UUID id;
} private String type;
} private String json;
class ToEntityReferenceMapper implements RowMapper<EntityReference> {
@Override
public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException {
return new EntityReference().withId(UUID.fromString(rs.getString("toId"))).withType(rs.getString("toEntity"));
}
} }
interface EntityRelationshipDAO { interface EntityRelationshipDAO {
@ -355,40 +352,23 @@ public interface CollectionDAO {
insert(fromId, toId, fromEntity, toEntity, relation, null); insert(fromId, toId, fromEntity, toEntity, relation, null);
} }
default int insert(UUID fromId, UUID toId, String fromEntity, String toEntity, int relation, String json) { default void insert(UUID fromId, UUID toId, String fromEntity, String toEntity, int relation, String json) {
return insert(fromId.toString(), toId.toString(), fromEntity, toEntity, relation, json); insert(fromId.toString(), toId.toString(), fromEntity, toEntity, relation, json);
} }
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) " "INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) "
+ "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation)", + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation, :json) "
connectionType = MYSQL) + "ON DUPLICATE KEY UPDATE json = :json",
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) "
+ "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation) "
+ "ON CONFLICT (fromId, toId, relation) DO NOTHING",
connectionType = POSTGRES)
int insert(
@Bind("fromId") String fromId,
@Bind("toId") String toId,
@Bind("fromEntity") String fromEntity,
@Bind("toEntity") String toEntity,
@Bind("relation") int relation);
@ConnectionAwareSqlUpdate(
value =
"INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) "
+ "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation, :json)",
connectionType = MYSQL) connectionType = MYSQL)
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) VALUES " "INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) VALUES "
+ "(:fromId, :toId, :fromEntity, :toEntity, :relation, (:json :: jsonb)) " + "(:fromId, :toId, :fromEntity, :toEntity, :relation, (:json :: jsonb)) "
+ "ON CONFLICT (fromId, toId, relation) DO NOTHING", + "ON CONFLICT (fromId, toId, relation) DO UPDATE SET json = EXCLUDED.json",
connectionType = POSTGRES) connectionType = POSTGRES)
int insert( void insert(
@Bind("fromId") String fromId, @Bind("fromId") String fromId,
@Bind("toId") String toId, @Bind("toId") String toId,
@Bind("fromEntity") String fromEntity, @Bind("fromEntity") String fromEntity,
@ -400,13 +380,14 @@ public interface CollectionDAO {
// Find to operations // Find to operations
// //
@SqlQuery( @SqlQuery(
"SELECT toId, toEntity FROM entity_relationship " "SELECT toId, toEntity, json FROM entity_relationship "
+ "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation " + "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation "
+ "ORDER BY toId") + "ORDER BY toId")
@RegisterRowMapper(ToEntityReferenceMapper.class) @RegisterRowMapper(ToRelationshipMapper.class)
List<EntityReference> findTo( List<EntityRelationshipRecord> findTo(
@Bind("fromId") String fromId, @Bind("fromEntity") String fromEntity, @Bind("relation") int relation); @Bind("fromId") String fromId, @Bind("fromEntity") String fromEntity, @Bind("relation") int relation);
// TODO delete this
@SqlQuery( @SqlQuery(
"SELECT toId FROM entity_relationship " "SELECT toId FROM entity_relationship "
+ "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation AND toEntity = :toEntity " + "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation AND toEntity = :toEntity "
@ -442,11 +423,11 @@ public interface CollectionDAO {
@Bind("fromEntity") String fromEntity); @Bind("fromEntity") String fromEntity);
@SqlQuery( @SqlQuery(
"SELECT fromId, fromEntity FROM entity_relationship " "SELECT fromId, fromEntity, json FROM entity_relationship "
+ "WHERE toId = :toId AND toEntity = :toEntity AND relation = :relation " + "WHERE toId = :toId AND toEntity = :toEntity AND relation = :relation "
+ "ORDER BY fromId") + "ORDER BY fromId")
@RegisterRowMapper(FromEntityReferenceMapper.class) @RegisterRowMapper(FromRelationshipMapper.class)
List<EntityReference> findFrom( List<EntityRelationshipRecord> findFrom(
@Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation); @Bind("toId") String toId, @Bind("toEntity") String toEntity, @Bind("relation") int relation);
// //
@ -487,6 +468,28 @@ public interface CollectionDAO {
"DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR " "DELETE from entity_relationship WHERE (toId = :id AND toEntity = :entity) OR "
+ "(fromId = :id AND fromEntity = :entity)") + "(fromId = :id AND fromEntity = :entity)")
void deleteAll(@Bind("id") String id, @Bind("entity") String entity); void deleteAll(@Bind("id") String id, @Bind("entity") String entity);
class FromRelationshipMapper implements RowMapper<EntityRelationshipRecord> {
@Override
public EntityRelationshipRecord map(ResultSet rs, StatementContext ctx) throws SQLException {
return EntityRelationshipRecord.builder()
.id(UUID.fromString(rs.getString("fromId")))
.type(rs.getString("fromEntity"))
.json(rs.getString("json"))
.build();
}
}
class ToRelationshipMapper implements RowMapper<EntityRelationshipRecord> {
@Override
public EntityRelationshipRecord map(ResultSet rs, StatementContext ctx) throws SQLException {
return EntityRelationshipRecord.builder()
.id(UUID.fromString(rs.getString("toId")))
.type(rs.getString("toEntity"))
.json(rs.getString("json"))
.build();
}
}
} }
interface FeedDAO { interface FeedDAO {

View File

@ -67,6 +67,7 @@ import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.exception.UnhandledServerException; import org.openmetadata.catalog.exception.UnhandledServerException;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityVersionPair; import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityVersionPair;
import org.openmetadata.catalog.jdbi3.CollectionDAO.ExtensionRecord; import org.openmetadata.catalog.jdbi3.CollectionDAO.ExtensionRecord;
import org.openmetadata.catalog.jdbi3.TableRepository.TableUpdater; import org.openmetadata.catalog.jdbi3.TableRepository.TableUpdater;
@ -484,7 +485,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
} }
// Add relationship // Add relationship
int added = addRelationship(userId, entityId, Entity.USER, entityType, Relationship.FOLLOWS); addRelationship(userId, entityId, Entity.USER, entityType, Relationship.FOLLOWS);
ChangeDescription change = new ChangeDescription().withPreviousVersion(entity.getVersion()); ChangeDescription change = new ChangeDescription().withPreviousVersion(entity.getVersion());
change change
@ -504,7 +505,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
.withCurrentVersion(entity.getVersion()) .withCurrentVersion(entity.getVersion())
.withPreviousVersion(change.getPreviousVersion()); .withPreviousVersion(change.getPreviousVersion());
return new PutResponse<>(added > 0 ? Status.CREATED : Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); return new PutResponse<>(Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
} }
public final DeleteResponse<T> delete(String updatedBy, String id, boolean recursive, boolean hardDelete) public final DeleteResponse<T> delete(String updatedBy, String id, boolean recursive, boolean hardDelete)
@ -552,10 +553,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
private void deleteChildren(String id, boolean recursive, boolean hardDelete, String updatedBy) throws IOException { private void deleteChildren(String id, boolean recursive, boolean hardDelete, String updatedBy) throws IOException {
// If an entity being deleted contains other **non-deleted** children entities, it can't be deleted // If an entity being deleted contains other **non-deleted** children entities, it can't be deleted
List<EntityReference> contains = List<EntityRelationshipRecord> records =
daoCollection.relationshipDAO().findTo(id, entityType, Relationship.CONTAINS.ordinal()); daoCollection.relationshipDAO().findTo(id, entityType, Relationship.CONTAINS.ordinal());
if (contains.isEmpty()) { if (records.isEmpty()) {
return; return;
} }
// Entity being deleted contains children entities // Entity being deleted contains children entities
@ -563,13 +564,9 @@ public abstract class EntityRepository<T extends EntityInterface> {
throw new IllegalArgumentException(CatalogExceptionMessage.entityIsNotEmpty(entityType)); throw new IllegalArgumentException(CatalogExceptionMessage.entityIsNotEmpty(entityType));
} }
// Delete all the contained entities // Delete all the contained entities
for (EntityReference entityReference : contains) { for (EntityRelationshipRecord record : records) {
LOG.info( LOG.info("Recursively {} deleting {} {}", hardDelete ? "hard" : "soft", record.getType(), record.getId());
"Recursively {} deleting {} {}", Entity.deleteEntity(updatedBy, record.getType(), record.getId(), true, hardDelete);
hardDelete ? "hard" : "soft",
entityReference.getType(),
entityReference.getId());
Entity.deleteEntity(updatedBy, entityReference.getType(), entityReference.getId(), true, hardDelete);
} }
} }
@ -771,10 +768,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
if (!supportsFollower || entity == null) { if (!supportsFollower || entity == null) {
return null; return null;
} }
List<EntityReference> followers = findFrom(entity.getId(), entityType, Relationship.FOLLOWS); List<EntityReference> followers = new ArrayList<>();
for (EntityReference follower : followers) { List<EntityRelationshipRecord> records = findFrom(entity.getId(), entityType, Relationship.FOLLOWS);
User user = daoCollection.userDAO().findEntityById(follower.getId(), ALL); for (EntityRelationshipRecord record : records) {
follower.withName(user.getName()).withDeleted(user.getDeleted()).withFullyQualifiedName(user.getName()); followers.add(daoCollection.userDAO().findEntityReferenceById(record.getId(), ALL));
} }
return followers; return followers;
} }
@ -792,14 +789,14 @@ public abstract class EntityRepository<T extends EntityInterface> {
public void restoreEntity(String updatedBy, String entityType, UUID id) throws IOException { public void restoreEntity(String updatedBy, String entityType, UUID id) throws IOException {
// If an entity being restored contains other **deleted** children entities, restore them // If an entity being restored contains other **deleted** children entities, restore them
List<EntityReference> contains = List<EntityRelationshipRecord> records =
daoCollection.relationshipDAO().findTo(id.toString(), entityType, Relationship.CONTAINS.ordinal()); daoCollection.relationshipDAO().findTo(id.toString(), entityType, Relationship.CONTAINS.ordinal());
if (!contains.isEmpty()) { if (!records.isEmpty()) {
// Restore all the contained entities // Restore all the contained entities
for (EntityReference entityReference : contains) { for (EntityRelationshipRecord record : records) {
LOG.info("Recursively restoring {} {}", entityReference.getType(), entityReference.getId()); LOG.info("Recursively restoring {} {}", record.getType(), record.getId());
Entity.restoreEntity(updatedBy, entityReference.getType(), entityReference.getId()); Entity.restoreEntity(updatedBy, record.getType(), record.getId());
} }
} }
@ -810,16 +807,16 @@ public abstract class EntityRepository<T extends EntityInterface> {
dao.update(entity.getId(), JsonUtils.pojoToJson(entity)); dao.update(entity.getId(), JsonUtils.pojoToJson(entity));
} }
public int addRelationship(UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship) { public void addRelationship(UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship) {
return addRelationship(fromId, toId, fromEntity, toEntity, relationship, false); addRelationship(fromId, toId, fromEntity, toEntity, relationship, false);
} }
public int addRelationship( public void addRelationship(
UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship, boolean bidirectional) { UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship, boolean bidirectional) {
return addRelationship(fromId, toId, fromEntity, toEntity, relationship, null, bidirectional); addRelationship(fromId, toId, fromEntity, toEntity, relationship, null, bidirectional);
} }
public int addRelationship( public void addRelationship(
UUID fromId, UUID fromId,
UUID toId, UUID toId,
String fromEntity, String fromEntity,
@ -835,7 +832,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
from = toId; from = toId;
to = fromId; to = fromId;
} }
return daoCollection.relationshipDAO().insert(from, to, fromEntity, toEntity, relationship.ordinal(), json); daoCollection.relationshipDAO().insert(from, to, fromEntity, toEntity, relationship.ordinal(), json);
} }
public List<String> findBoth(UUID entity1, String entityType1, Relationship relationship, String entity2) { public List<String> findBoth(UUID entity1, String entityType1, Relationship relationship, String entity2) {
@ -852,15 +849,15 @@ public abstract class EntityRepository<T extends EntityInterface> {
.findFrom(toId.toString(), toEntityType, relationship.ordinal(), fromEntityType); .findFrom(toId.toString(), toEntityType, relationship.ordinal(), fromEntityType);
} }
public List<EntityReference> findFrom(UUID toId, String toEntityType, Relationship relationship) { public List<EntityRelationshipRecord> findFrom(UUID toId, String toEntityType, Relationship relationship) {
return daoCollection.relationshipDAO().findFrom(toId.toString(), toEntityType, relationship.ordinal()); return daoCollection.relationshipDAO().findFrom(toId.toString(), toEntityType, relationship.ordinal());
} }
public EntityReference getContainer(UUID toId, String toEntityType) throws IOException { public EntityReference getContainer(UUID toId, String toEntityType) throws IOException {
List<EntityReference> refs = findFrom(toId, toEntityType, Relationship.CONTAINS); List<EntityRelationshipRecord> records = findFrom(toId, toEntityType, Relationship.CONTAINS);
// An entity can have only one container // An entity can have only one container
ensureSingleRelationship(toEntityType, toId, refs, "container", true); ensureSingleRelationship(toEntityType, toId, records, "container", true);
return Entity.getEntityReferenceById(refs.get(0).getType(), refs.get(0).getId(), ALL); return Entity.getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL);
} }
public void ensureSingleRelationship( public void ensureSingleRelationship(
@ -877,7 +874,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
} }
} }
public List<String> findTo(UUID fromId, String fromEntityType, Relationship relationship, String toEntityType) { public final List<String> findTo(UUID fromId, String fromEntityType, Relationship relationship, String toEntityType) {
return daoCollection return daoCollection
.relationshipDAO() .relationshipDAO()
.findTo(fromId.toString(), fromEntityType, relationship.ordinal(), toEntityType); .findTo(fromId.toString(), fromEntityType, relationship.ordinal(), toEntityType);
@ -916,9 +913,11 @@ public abstract class EntityRepository<T extends EntityInterface> {
if (!supportsOwner) { if (!supportsOwner) {
return null; return null;
} }
List<EntityReference> refs = findFrom(entity.getId(), entityType, Relationship.OWNS); List<EntityRelationshipRecord> records = findFrom(entity.getId(), entityType, Relationship.OWNS);
ensureSingleRelationship(entityType, entity.getId(), refs, "owners", false); ensureSingleRelationship(entityType, entity.getId(), records, "owners", false);
return refs.isEmpty() ? null : getOwner(refs.get(0)); return records.isEmpty()
? null
: getOwner(new EntityReference().withId(records.get(0).getId()).withType(records.get(0).getType()));
} }
public EntityReference getOwner(EntityReference ref) throws IOException { public EntityReference getOwner(EntityReference ref) throws IOException {

View File

@ -13,6 +13,8 @@
package org.openmetadata.catalog.jdbi3; package org.openmetadata.catalog.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -21,11 +23,17 @@ import java.util.stream.Collectors;
import org.jdbi.v3.sqlobject.transaction.Transaction; import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.lineage.AddLineage; import org.openmetadata.catalog.api.lineage.AddLineage;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.catalog.type.ColumnLineage;
import org.openmetadata.catalog.type.Edge; import org.openmetadata.catalog.type.Edge;
import org.openmetadata.catalog.type.EntityLineage; import org.openmetadata.catalog.type.EntityLineage;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.Include; import org.openmetadata.catalog.type.Include;
import org.openmetadata.catalog.type.LineageDetails;
import org.openmetadata.catalog.type.Relationship; import org.openmetadata.catalog.type.Relationship;
import org.openmetadata.catalog.util.FullyQualifiedName;
import org.openmetadata.catalog.util.JsonUtils;
public class LineageRepository { public class LineageRepository {
private final CollectionDAO dao; private final CollectionDAO dao;
@ -57,9 +65,40 @@ public class LineageRepository {
EntityReference to = addLineage.getEdge().getToEntity(); EntityReference to = addLineage.getEdge().getToEntity();
to = Entity.getEntityReferenceById(to.getType(), to.getId(), Include.NON_DELETED); to = Entity.getEntityReferenceById(to.getType(), to.getId(), Include.NON_DELETED);
// Validate lineage details
String detailsJson = validateLineageDetails(from, to, addLineage.getEdge().getLineageDetails());
// Finally, add lineage relationship // Finally, add lineage relationship
dao.relationshipDAO() dao.relationshipDAO()
.insert(from.getId(), to.getId(), from.getType(), to.getType(), Relationship.UPSTREAM.ordinal()); .insert(from.getId(), to.getId(), from.getType(), to.getType(), Relationship.UPSTREAM.ordinal(), detailsJson);
}
private String validateLineageDetails(EntityReference from, EntityReference to, LineageDetails details)
throws IOException {
if (details == null || listOrEmpty(details.getColumnsLineage()).isEmpty()) {
return null;
}
List<ColumnLineage> columnsLineage = details.getColumnsLineage();
if (!from.getType().equals(Entity.TABLE) || !to.getType().equals(Entity.TABLE)) {
throw new IllegalArgumentException("Column level lineage is only allowed between two tables.");
}
Table fromTable = dao.tableDAO().findEntityById(from.getId());
Table toTable = dao.tableDAO().findEntityById(to.getId());
for (ColumnLineage columnLineage : columnsLineage) {
for (String fromColumn : columnLineage.getFromColumns()) {
// From column belongs to the fromNode
if (fromColumn.startsWith(fromTable.getFullyQualifiedName())) {
TableRepository.validateColumnFQN(fromTable, fromColumn);
} else {
Table otherTable = dao.tableDAO().findEntityByName(FullyQualifiedName.getTableFQN(fromColumn));
TableRepository.validateColumnFQN(otherTable, fromColumn);
}
}
TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn());
}
return JsonUtils.pojoToJson(details);
} }
@Transaction @Transaction
@ -89,54 +128,66 @@ public class LineageRepository {
.withNodes(entities) .withNodes(entities)
.withUpstreamEdges(new ArrayList<>()) .withUpstreamEdges(new ArrayList<>())
.withDownstreamEdges(new ArrayList<>()); .withDownstreamEdges(new ArrayList<>());
addUpstreamLineage(primary.getId(), primary.getType(), lineage, upstreamDepth); getUpstreamLineage(primary.getId(), primary.getType(), lineage, upstreamDepth);
addDownstreamLineage(primary.getId(), primary.getType(), lineage, downstreamDepth); getDownstreamLineage(primary.getId(), primary.getType(), lineage, downstreamDepth);
// Remove duplicate nodes // Remove duplicate nodes
lineage.withNodes(lineage.getNodes().stream().distinct().collect(Collectors.toList())); lineage.withNodes(lineage.getNodes().stream().distinct().collect(Collectors.toList()));
// Add entityReference details
for (int i = 0; i < lineage.getNodes().size(); i++) {
EntityReference ref = lineage.getNodes().get(i);
ref = Entity.getEntityReferenceById(ref.getType(), ref.getId(), Include.NON_DELETED);
lineage.getNodes().set(i, ref);
}
return lineage; return lineage;
} }
private void addUpstreamLineage(UUID id, String entityType, EntityLineage lineage, int upstreamDepth) { private void getUpstreamLineage(UUID id, String entityType, EntityLineage lineage, int upstreamDepth)
throws IOException {
if (upstreamDepth == 0) { if (upstreamDepth == 0) {
return; return;
} }
// from this id ---> find other ids // from this id ---> find other ids
List<EntityReference> upstreamEntities = List<EntityRelationshipRecord> records =
dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); dao.relationshipDAO().findFrom(id.toString(), entityType, Relationship.UPSTREAM.ordinal());
lineage.getNodes().addAll(upstreamEntities);
final List<EntityReference> upstreamEntityReferences = new ArrayList<>();
for (EntityRelationshipRecord record : records) {
EntityReference ref = Entity.getEntityReferenceById(record.getType(), record.getId(), Include.ALL);
LineageDetails lineageDetails = JsonUtils.readValue(record.getJson(), LineageDetails.class);
upstreamEntityReferences.add(ref);
lineage
.getUpstreamEdges()
.add(new Edge().withFromEntity(ref.getId()).withToEntity(id).withLineageDetails(lineageDetails));
}
lineage.getNodes().addAll(upstreamEntityReferences);
upstreamDepth--; upstreamDepth--;
for (EntityReference upstreamEntity : upstreamEntities) { // Recursively add upstream nodes and edges
lineage.getUpstreamEdges().add(new Edge().withFromEntity(upstreamEntity.getId()).withToEntity(id)); for (EntityReference entity : upstreamEntityReferences) {
addUpstreamLineage( getUpstreamLineage(entity.getId(), entity.getType(), lineage, upstreamDepth);
upstreamEntity.getId(),
upstreamEntity.getType(),
lineage,
upstreamDepth); // Recursively add upstream nodes and edges
} }
} }
private void addDownstreamLineage(UUID id, String entityType, EntityLineage lineage, int downstreamDepth) { private void getDownstreamLineage(UUID id, String entityType, EntityLineage lineage, int downstreamDepth)
throws IOException {
if (downstreamDepth == 0) { if (downstreamDepth == 0) {
return; return;
} }
// from other ids ---> to this id // from other ids ---> to this id
List<EntityReference> downStreamEntities = List<EntityRelationshipRecord> records =
dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal()); dao.relationshipDAO().findTo(id.toString(), entityType, Relationship.UPSTREAM.ordinal());
lineage.getNodes().addAll(downStreamEntities);
final List<EntityReference> downstreamEntityReferences = new ArrayList<>();
for (EntityRelationshipRecord record : records) {
EntityReference ref = Entity.getEntityReferenceById(record.getType(), record.getId(), Include.ALL);
LineageDetails lineageDetails = JsonUtils.readValue(record.getJson(), LineageDetails.class);
downstreamEntityReferences.add(ref);
lineage
.getDownstreamEdges()
.add(new Edge().withToEntity(ref.getId()).withFromEntity(id).withLineageDetails(lineageDetails));
}
lineage.getNodes().addAll(downstreamEntityReferences);
downstreamDepth--; downstreamDepth--;
for (EntityReference entity : downStreamEntities) { // Recursively add upstream nodes and edges
lineage.getDownstreamEdges().add(new Edge().withToEntity(entity.getId()).withFromEntity(id)); for (EntityReference entity : downstreamEntityReferences) {
addDownstreamLineage(entity.getId(), entity.getType(), lineage, downstreamDepth); getDownstreamLineage(entity.getId(), entity.getType(), lineage, downstreamDepth);
} }
} }
} }

View File

@ -86,13 +86,13 @@ public class TableRepository extends EntityRepository<Table> {
// Table fields that can be updated in a PUT request // Table fields that can be updated in a PUT request
static final String TABLE_UPDATE_FIELDS = "owner,tags,tableConstraints,tablePartition,dataModel,profileSample"; static final String TABLE_UPDATE_FIELDS = "owner,tags,tableConstraints,tablePartition,dataModel,profileSample";
public TableRepository(CollectionDAO dao) { public TableRepository(CollectionDAO daoCollection) {
super( super(
TableResource.COLLECTION_PATH, TableResource.COLLECTION_PATH,
TABLE, TABLE,
Table.class, Table.class,
dao.tableDAO(), daoCollection.tableDAO(),
dao, daoCollection,
TABLE_PATCH_FIELDS, TABLE_PATCH_FIELDS,
TABLE_UPDATE_FIELDS); TABLE_UPDATE_FIELDS);
} }
@ -147,7 +147,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addJoins(UUID tableId, TableJoins joins) throws IOException { public Table addJoins(UUID tableId, TableJoins joins) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
if (!CommonUtil.dateInRange(RestUtil.DATE_FORMAT, joins.getStartDate(), 0, 30)) { if (!CommonUtil.dateInRange(RestUtil.DATE_FORMAT, joins.getStartDate(), 0, 30)) {
throw new IllegalArgumentException("Date range can only include past 30 days starting today"); throw new IllegalArgumentException("Date range can only include past 30 days starting today");
} }
@ -169,7 +169,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addSampleData(UUID tableId, TableData tableData) throws IOException { public Table addSampleData(UUID tableId, TableData tableData) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
// Validate all the columns // Validate all the columns
for (String columnName : tableData.getColumns()) { for (String columnName : tableData.getColumns()) {
@ -194,7 +194,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addTableProfileData(UUID tableId, TableProfile tableProfile) throws IOException { public Table addTableProfileData(UUID tableId, TableProfile tableProfile) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
List<TableProfile> storedTableProfiles = getTableProfile(table); List<TableProfile> storedTableProfiles = getTableProfile(table);
Map<String, TableProfile> storedMapTableProfiles = new HashMap<>(); Map<String, TableProfile> storedMapTableProfiles = new HashMap<>();
@ -219,7 +219,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addLocation(UUID tableId, UUID locationId) throws IOException { public Table addLocation(UUID tableId, UUID locationId) throws IOException {
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
EntityReference location = daoCollection.locationDAO().findEntityReferenceById(locationId); EntityReference location = daoCollection.locationDAO().findEntityReferenceById(locationId);
// A table has only one location. // A table has only one location.
deleteFrom(tableId, TABLE, Relationship.HAS, LOCATION); deleteFrom(tableId, TABLE, Relationship.HAS, LOCATION);
@ -237,7 +237,7 @@ public class TableRepository extends EntityRepository<Table> {
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
List<SQLQuery> storedQueries = getQueries(table); List<SQLQuery> storedQueries = getQueries(table);
Map<String, SQLQuery> storedMapQueries = new HashMap<>(); Map<String, SQLQuery> storedMapQueries = new HashMap<>();
if (storedQueries != null) { if (storedQueries != null) {
@ -257,7 +257,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addTableTest(UUID tableId, TableTest tableTest) throws IOException { public Table addTableTest(UUID tableId, TableTest tableTest) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
// if ID is not passed we treat it as a new test case being added // if ID is not passed we treat it as a new test case being added
List<TableTest> storedTableTests = getTableTests(table); List<TableTest> storedTableTests = getTableTests(table);
// we will override any test case name passed by user/client with tableName + testType // we will override any test case name passed by user/client with tableName + testType
@ -294,7 +294,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table deleteTableTest(UUID tableId, String tableTestType) throws IOException { public Table deleteTableTest(UUID tableId, String tableTestType) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
// if ID is not passed we treat it as a new test case being added // if ID is not passed we treat it as a new test case being added
List<TableTest> storedTableTests = getTableTests(table); List<TableTest> storedTableTests = getTableTests(table);
// we will override any test case name passed by user/client with tableName + testType // we will override any test case name passed by user/client with tableName + testType
@ -321,7 +321,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addColumnTest(UUID tableId, ColumnTest columnTest) throws IOException { public Table addColumnTest(UUID tableId, ColumnTest columnTest) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
String columnName = columnTest.getColumnName(); String columnName = columnTest.getColumnName();
validateColumn(table, columnName); validateColumn(table, columnName);
// we will override any test case name passed by user/client with columnName + testType // we will override any test case name passed by user/client with columnName + testType
@ -367,7 +367,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table deleteColumnTest(UUID tableId, String columnName, String columnTestType) throws IOException { public Table deleteColumnTest(UUID tableId, String columnName, String columnTestType) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
validateColumn(table, columnName); validateColumn(table, columnName);
// we will override any test case name passed by user/client with columnName + testType // we will override any test case name passed by user/client with columnName + testType
// our assumption is there is only one instance of a test type as of now. // our assumption is there is only one instance of a test type as of now.
@ -403,7 +403,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addCustomMetric(UUID tableId, CustomMetric customMetric) throws IOException { public Table addCustomMetric(UUID tableId, CustomMetric customMetric) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
String columnName = customMetric.getColumnName(); String columnName = customMetric.getColumnName();
validateColumn(table, columnName); validateColumn(table, columnName);
@ -441,7 +441,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table deleteCustomMetric(UUID tableId, String columnName, String metricName) throws IOException { public Table deleteCustomMetric(UUID tableId, String columnName, String metricName) throws IOException {
// Validate the request content // Validate the request content
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
validateColumn(table, columnName); validateColumn(table, columnName);
// Override any custom metric definition with the same name // Override any custom metric definition with the same name
@ -475,7 +475,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction @Transaction
public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException { public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException {
Table table = daoCollection.tableDAO().findEntityById(tableId); Table table = dao.findEntityById(tableId);
table.withDataModel(dataModel); table.withDataModel(dataModel);
// Carry forward the table description from the model to table entity, if empty // Carry forward the table description from the model to table entity, if empty
@ -496,7 +496,7 @@ public class TableRepository extends EntityRepository<Table> {
stored.setDescription(modelColumn.getDescription()); stored.setDescription(modelColumn.getDescription());
} }
} }
daoCollection.tableDAO().update(table.getId(), JsonUtils.pojoToJson(table)); dao.update(table.getId(), JsonUtils.pojoToJson(table));
setFields(table, Fields.EMPTY_FIELDS); setFields(table, Fields.EMPTY_FIELDS);
return table; return table;
} }
@ -647,7 +647,7 @@ public class TableRepository extends EntityRepository<Table> {
} }
// Validate if a given column exists in the table // Validate if a given column exists in the table
private void validateColumn(Table table, String columnName) { public static void validateColumn(Table table, String columnName) {
boolean validColumn = table.getColumns().stream().anyMatch(col -> col.getName().equals(columnName)); boolean validColumn = table.getColumns().stream().anyMatch(col -> col.getName().equals(columnName));
if (!validColumn) { if (!validColumn) {
throw new IllegalArgumentException("Invalid column name " + columnName); throw new IllegalArgumentException("Invalid column name " + columnName);
@ -655,7 +655,7 @@ public class TableRepository extends EntityRepository<Table> {
} }
// Validate if a given column exists in the table // Validate if a given column exists in the table
private void validateColumnFQN(Table table, String columnFQN) { public static void validateColumnFQN(Table table, String columnFQN) {
boolean validColumn = false; boolean validColumn = false;
for (Column column : table.getColumns()) { for (Column column : table.getColumns()) {
if (column.getFullyQualifiedName().equals(columnFQN)) { if (column.getFullyQualifiedName().equals(columnFQN)) {
@ -672,7 +672,7 @@ public class TableRepository extends EntityRepository<Table> {
for (JoinedWith joinedWith : joinedWithList) { for (JoinedWith joinedWith : joinedWithList) {
// Validate table // Validate table
String tableFQN = FullyQualifiedName.getTableFQN(joinedWith.getFullyQualifiedName()); String tableFQN = FullyQualifiedName.getTableFQN(joinedWith.getFullyQualifiedName());
Table joinedWithTable = daoCollection.tableDAO().findEntityByName(tableFQN); Table joinedWithTable = dao.findEntityByName(tableFQN);
// Validate column // Validate column
validateColumnFQN(joinedWithTable, joinedWith.getFullyQualifiedName()); validateColumnFQN(joinedWithTable, joinedWith.getFullyQualifiedName());

View File

@ -115,7 +115,7 @@ public class TeamRepository extends EntityRepository<Team> {
private List<EntityReference> getOwns(Team team) throws IOException { private List<EntityReference> getOwns(Team team) throws IOException {
// Compile entities owned by the team // Compile entities owned by the team
return EntityUtil.populateEntityReferences( return EntityUtil.getEntityReferences(
daoCollection.relationshipDAO().findTo(team.getId().toString(), TEAM, Relationship.OWNS.ordinal())); daoCollection.relationshipDAO().findTo(team.getId().toString(), TEAM, Relationship.OWNS.ordinal()));
} }

View File

@ -28,6 +28,7 @@ import org.openmetadata.catalog.entity.teams.AuthenticationMechanism;
import org.openmetadata.catalog.entity.teams.Team; import org.openmetadata.catalog.entity.teams.Team;
import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.teams.authn.JWTAuthMechanism; import org.openmetadata.catalog.teams.authn.JWTAuthMechanism;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
@ -145,7 +146,7 @@ public class UserRepository extends EntityRepository<User> {
private List<EntityReference> getOwns(User user) throws IOException { private List<EntityReference> getOwns(User user) throws IOException {
// Compile entities owned by the user // Compile entities owned by the user
List<EntityReference> ownedEntities = List<EntityRelationshipRecord> ownedEntities =
daoCollection.relationshipDAO().findTo(user.getId().toString(), Entity.USER, Relationship.OWNS.ordinal()); daoCollection.relationshipDAO().findTo(user.getId().toString(), Entity.USER, Relationship.OWNS.ordinal());
// Compile entities owned by the team the user belongs to // Compile entities owned by the team the user belongs to
@ -155,11 +156,11 @@ public class UserRepository extends EntityRepository<User> {
daoCollection.relationshipDAO().findTo(team.getId().toString(), Entity.TEAM, Relationship.OWNS.ordinal())); daoCollection.relationshipDAO().findTo(team.getId().toString(), Entity.TEAM, Relationship.OWNS.ordinal()));
} }
// Populate details in entity reference // Populate details in entity reference
return EntityUtil.populateEntityReferences(ownedEntities); return EntityUtil.getEntityReferences(ownedEntities);
} }
private List<EntityReference> getFollows(User user) throws IOException { private List<EntityReference> getFollows(User user) throws IOException {
return EntityUtil.populateEntityReferences( return EntityUtil.getEntityReferences(
daoCollection.relationshipDAO().findTo(user.getId().toString(), Entity.USER, Relationship.FOLLOWS.ordinal())); daoCollection.relationshipDAO().findTo(user.getId().toString(), Entity.USER, Relationship.FOLLOWS.ordinal()));
} }

View File

@ -166,7 +166,6 @@ public class LineageResource {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid AddLineage addLineage) @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid AddLineage addLineage)
throws IOException { throws IOException {
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, null, MetadataOperation.UpdateLineage); SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, null, MetadataOperation.UpdateLineage);
dao.addLineage(addLineage); dao.addLineage(addLineage);
return Response.status(Status.OK).build(); return Response.status(Status.OK).build();
} }

View File

@ -99,7 +99,7 @@ public class TypeResource extends EntityResource<Type, TypeRepository> {
types.forEach( types.forEach(
type -> { type -> {
type.withId(UUID.randomUUID()).withUpdatedBy("admin").withUpdatedAt(now); type.withId(UUID.randomUUID()).withUpdatedBy("admin").withUpdatedAt(now);
LOG.info("Loading type {} with schema {}", type.getName(), type.getSchema()); LOG.info("Loading type {}", type.getName());
try { try {
this.dao.createOrUpdate(null, type); this.dao.createOrUpdate(null, type);
} catch (IOException e) { } catch (IOException e) {

View File

@ -19,6 +19,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -42,6 +43,7 @@ import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.type.CustomField; import org.openmetadata.catalog.entity.type.CustomField;
import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityVersionPair; import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityVersionPair;
import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO; import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink; import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
@ -170,6 +172,7 @@ public final class EntityUtil {
return entity; return entity;
} }
// TODO delete
public static List<EntityReference> populateEntityReferences(List<EntityReference> list) throws IOException { public static List<EntityReference> populateEntityReferences(List<EntityReference> list) throws IOException {
if (list != null) { if (list != null) {
for (EntityReference ref : list) { for (EntityReference ref : list) {
@ -181,6 +184,18 @@ public final class EntityUtil {
return list; return list;
} }
public static List<EntityReference> getEntityReferences(List<EntityRelationshipRecord> list) throws IOException {
if (list == null) {
return Collections.emptyList();
}
List<EntityReference> refs = new ArrayList<>();
for (EntityRelationshipRecord ref : list) {
refs.add(Entity.getEntityReferenceById(ref.getType(), ref.getId(), ALL));
}
refs.sort(compareEntityReference);
return refs;
}
public static List<EntityReference> populateEntityReferences(@NonNull List<String> ids, @NonNull String entityType) public static List<EntityReference> populateEntityReferences(@NonNull List<String> ids, @NonNull String entityType)
throws IOException { throws IOException {
List<EntityReference> refs = new ArrayList<>(ids.size()); List<EntityReference> refs = new ArrayList<>(ids.size());

View File

@ -101,6 +101,10 @@
"description": "SQL query statement. Example - 'select * from orders'.", "description": "SQL query statement. Example - 'select * from orders'.",
"type": "string" "type": "string"
}, },
"sqlFunction": {
"description": "SQL function. Example - 'AVG()`, `COUNT()`, etc..",
"type": "string"
},
"markdown": { "markdown": {
"$comment" : "@om-field-type", "$comment" : "@om-field-type",
"description": "Text in Markdown format", "description": "Text in Markdown format",

View File

@ -6,6 +6,48 @@
"type": "object", "type": "object",
"javaType": "org.openmetadata.catalog.type.EntityLineage", "javaType": "org.openmetadata.catalog.type.EntityLineage",
"definitions": { "definitions": {
"columnLineage": {
"type" : "object",
"properties": {
"fromColumns" : {
"description": "One or more source columns identified by fully qualified column name used by transformation function to create destination column.",
"type" : "array",
"items" : {
"$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName"
}
},
"toColumn" : {
"description": "Destination column identified by fully qualified column name created by the transformation of source columns.",
"$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"function" : {
"description": "Transformation function applied to source columns to create destination column. That is `function(fromColumns) -> toColumn`.",
"$ref" : "../type/basic.json#/definitions/sqlFunction"
}
}
},
"lineageDetails" : {
"description" : "Lineage details including sqlQuery + pipeline + columnLineage.",
"type" : "object",
"properties": {
"sqlQuery" : {
"description": "SQL used for transformation.",
"$ref" : "../type/basic.json#/definitions/sqlQuery"
},
"columnsLineage" : {
"description" : "Lineage information of how upstream columns were combined to get downstream column.",
"type" : "array",
"items" : {
"$ref" : "#/definitions/columnLineage"
}
},
"pipeline" : {
"description": "Pipeline where the sqlQuery is periodically run.",
"$ref" : "../type/entityReference.json"
}
},
"required": ["sqlQuery", "columnsLineage"]
},
"edge": { "edge": {
"description": "Edge in the lineage graph from one entity to another by entity IDs.", "description": "Edge in the lineage graph from one entity to another by entity IDs.",
"type": "object", "type": "object",
@ -21,8 +63,13 @@
}, },
"description": { "description": {
"$ref": "basic.json#/definitions/markdown" "$ref": "basic.json#/definitions/markdown"
},
"lineageDetails": {
"description": "Optional lineageDetails provided only for table to table lineage edge.",
"$ref": "#/definitions/lineageDetails"
} }
}, },
"required": ["fromEntity", "toEntity"],
"additionalProperties": false "additionalProperties": false
}, },
"entitiesEdge": { "entitiesEdge": {
@ -40,8 +87,13 @@
}, },
"description": { "description": {
"$ref": "basic.json#/definitions/markdown" "$ref": "basic.json#/definitions/markdown"
},
"lineageDetails": {
"description": "Optional lineageDetails provided only for table to table lineage edge.",
"$ref": "#/definitions/lineageDetails"
} }
}, },
"required": ["fromEntity", "toEntity"],
"additionalProperties": false "additionalProperties": false
} }
}, },

View File

@ -585,7 +585,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
if (supportsFollowers) { if (supportsFollowers) {
UserResourceTest userResourceTest = new UserResourceTest(); UserResourceTest userResourceTest = new UserResourceTest();
User user1 = userResourceTest.createEntity(userResourceTest.createRequest(test, 1), TEST_AUTH_HEADERS); User user1 = userResourceTest.createEntity(userResourceTest.createRequest(test, 1), TEST_AUTH_HEADERS);
addFollower(entity.getId(), user1.getId(), CREATED, TEST_AUTH_HEADERS); addFollower(entity.getId(), user1.getId(), OK, TEST_AUTH_HEADERS);
} }
entity = validateGetWithDifferentFields(entity, false); entity = validateGetWithDifferentFields(entity, false);
validateGetCommonFields(entity); validateGetCommonFields(entity);
@ -912,15 +912,15 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
// Add follower to the entity // Add follower to the entity
UserResourceTest userResourceTest = new UserResourceTest(); UserResourceTest userResourceTest = new UserResourceTest();
User user1 = userResourceTest.createEntity(userResourceTest.createRequest(test, 1), TEST_AUTH_HEADERS); User user1 = userResourceTest.createEntity(userResourceTest.createRequest(test, 1), TEST_AUTH_HEADERS);
addAndCheckFollower(entityId, user1.getId(), CREATED, 1, TEST_AUTH_HEADERS); addAndCheckFollower(entityId, user1.getId(), OK, 1, TEST_AUTH_HEADERS);
// Add the same user as follower and make sure no errors are thrown and return response is OK // Add the same user as follower and make sure no errors are thrown
// (and not CREATED) // (and not CREATED)
addAndCheckFollower(entityId, user1.getId(), OK, 1, TEST_AUTH_HEADERS); addAndCheckFollower(entityId, user1.getId(), OK, 1, TEST_AUTH_HEADERS);
// Add a new follower to the entity // Add a new follower to the entity
User user2 = userResourceTest.createEntity(userResourceTest.createRequest(test, 2), TEST_AUTH_HEADERS); User user2 = userResourceTest.createEntity(userResourceTest.createRequest(test, 2), TEST_AUTH_HEADERS);
addAndCheckFollower(entityId, user2.getId(), CREATED, 2, TEST_AUTH_HEADERS); addAndCheckFollower(entityId, user2.getId(), OK, 2, TEST_AUTH_HEADERS);
// Delete followers and make sure they are deleted // Delete followers and make sure they are deleted
deleteAndCheckFollower(entityId, user1.getId(), 1, TEST_AUTH_HEADERS); deleteAndCheckFollower(entityId, user1.getId(), 1, TEST_AUTH_HEADERS);
@ -939,7 +939,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
// Add follower to the entity // Add follower to the entity
UserResourceTest userResourceTest = new UserResourceTest(); UserResourceTest userResourceTest = new UserResourceTest();
User user1 = userResourceTest.createEntity(userResourceTest.createRequest(test, 1), TEST_AUTH_HEADERS); User user1 = userResourceTest.createEntity(userResourceTest.createRequest(test, 1), TEST_AUTH_HEADERS);
addAndCheckFollower(entityId, user1.getId(), CREATED, 1, TEST_AUTH_HEADERS); addAndCheckFollower(entityId, user1.getId(), OK, 1, TEST_AUTH_HEADERS);
deleteEntity(entityId, ADMIN_AUTH_HEADERS); deleteEntity(entityId, ADMIN_AUTH_HEADERS);
@ -960,7 +960,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
// Add non-existent user as follower to the entity // Add non-existent user as follower to the entity
assertResponse( assertResponse(
() -> addAndCheckFollower(entityId, NON_EXISTENT_ENTITY, CREATED, 1, ADMIN_AUTH_HEADERS), () -> addAndCheckFollower(entityId, NON_EXISTENT_ENTITY, OK, 1, ADMIN_AUTH_HEADERS),
NOT_FOUND, NOT_FOUND,
entityNotFound(Entity.USER, NON_EXISTENT_ENTITY)); entityNotFound(Entity.USER, NON_EXISTENT_ENTITY));

View File

@ -14,9 +14,9 @@
package org.openmetadata.catalog.resources.feeds; package org.openmetadata.catalog.resources.feeds;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CREATED;
import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.awaitility.Awaitility.with; import static org.awaitility.Awaitility.with;
import static org.awaitility.Durations.ONE_SECOND; import static org.awaitility.Durations.ONE_SECOND;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -726,7 +726,7 @@ public class FeedResourceTest extends CatalogApplicationTest {
public static void followTable(UUID tableId, UUID userId, Map<String, String> authHeaders) public static void followTable(UUID tableId, UUID userId, Map<String, String> authHeaders)
throws HttpResponseException { throws HttpResponseException {
WebTarget target = getResource("tables/" + tableId + "/followers"); WebTarget target = getResource("tables/" + tableId + "/followers");
TestUtils.put(target, userId.toString(), CREATED, authHeaders); TestUtils.put(target, userId.toString(), OK, authHeaders);
} }
public static ThreadList listThreadsWithFilter(String userId, String filterType, Map<String, String> authHeaders) public static ThreadList listThreadsWithFilter(String userId, String filterType, Map<String, String> authHeaders)
@ -758,16 +758,13 @@ public class FeedResourceTest extends CatalogApplicationTest {
protected final Thread patchThreadAndCheck(Thread updated, String originalJson, Map<String, String> authHeaders) protected final Thread patchThreadAndCheck(Thread updated, String originalJson, Map<String, String> authHeaders)
throws IOException { throws IOException {
// Validate information returned in patch response has the updates // Validate information returned in patch response has the updates
Thread returned = patchThread(updated.getId(), originalJson, updated, authHeaders); Thread returned = patchThread(updated.getId(), originalJson, updated, authHeaders);
compareEntities(updated, returned, authHeaders); compareEntities(updated, returned, authHeaders);
// GET the entity and Validate information returned // GET the entity and Validate information returned
Thread getEntity = getThread(updated.getId(), authHeaders); Thread getEntity = getThread(updated.getId(), authHeaders);
compareEntities(updated, getEntity, authHeaders); compareEntities(updated, getEntity, authHeaders);
return returned; return returned;
} }

View File

@ -13,6 +13,7 @@
package org.openmetadata.catalog.resources.lineage; package org.openmetadata.catalog.resources.lineage;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -36,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException; import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.TestMethodOrder;
@ -50,10 +52,12 @@ import org.openmetadata.catalog.resources.databases.TableResourceTest;
import org.openmetadata.catalog.resources.teams.RoleResource; import org.openmetadata.catalog.resources.teams.RoleResource;
import org.openmetadata.catalog.resources.teams.RoleResourceTest; import org.openmetadata.catalog.resources.teams.RoleResourceTest;
import org.openmetadata.catalog.resources.teams.UserResourceTest; import org.openmetadata.catalog.resources.teams.UserResourceTest;
import org.openmetadata.catalog.type.ColumnLineage;
import org.openmetadata.catalog.type.Edge; import org.openmetadata.catalog.type.Edge;
import org.openmetadata.catalog.type.EntitiesEdge; import org.openmetadata.catalog.type.EntitiesEdge;
import org.openmetadata.catalog.type.EntityLineage; import org.openmetadata.catalog.type.EntityLineage;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.LineageDetails;
import org.openmetadata.catalog.util.TestUtils; import org.openmetadata.catalog.util.TestUtils;
@Slf4j @Slf4j
@ -75,6 +79,7 @@ public class LineageResourceTest extends CatalogApplicationTest {
} }
} }
@Order(1)
@Test @Test
void put_delete_lineage_withAuthorizer(TestInfo test) throws HttpResponseException { void put_delete_lineage_withAuthorizer(TestInfo test) throws HttpResponseException {
// Random user cannot update lineage. // Random user cannot update lineage.
@ -107,7 +112,9 @@ public class LineageResourceTest extends CatalogApplicationTest {
if (shouldThrowException) { if (shouldThrowException) {
assertResponse( assertResponse(
() -> addEdge(TABLES.get(1), TABLES.get(2), authHeaders), FORBIDDEN, noPermission(userName, "UpdateLineage")); () -> addEdge(TABLES.get(1), TABLES.get(2), null, authHeaders),
FORBIDDEN,
noPermission(userName, "UpdateLineage"));
assertResponse( assertResponse(
() -> deleteEdge(TABLES.get(1), TABLES.get(2), authHeaders), () -> deleteEdge(TABLES.get(1), TABLES.get(2), authHeaders),
FORBIDDEN, FORBIDDEN,
@ -115,10 +122,11 @@ public class LineageResourceTest extends CatalogApplicationTest {
return; return;
} }
addEdge(TABLES.get(1), TABLES.get(2), authHeaders(userName + "@open-metadata.org")); addEdge(TABLES.get(1), TABLES.get(2), null, authHeaders(userName + "@open-metadata.org"));
deleteEdge(TABLES.get(1), TABLES.get(2), authHeaders(userName + "@open-metadata.org")); deleteEdge(TABLES.get(1), TABLES.get(2), authHeaders(userName + "@open-metadata.org"));
} }
@Order(2)
@Test @Test
void put_delete_lineage_200() throws HttpResponseException { void put_delete_lineage_200() throws HttpResponseException {
// Add lineage table4-->table5 // Add lineage table4-->table5
@ -227,21 +235,79 @@ public class LineageResourceTest extends CatalogApplicationTest {
Entity.TABLE, TABLES.get(4).getId(), TABLES.get(4).getFullyQualifiedName(), 2, 2, new Edge[0], new Edge[0]); Entity.TABLE, TABLES.get(4).getId(), TABLES.get(4).getFullyQualifiedName(), 2, 2, new Edge[0], new Edge[0]);
} }
public Edge getEdge(Table from, Table to) { @Order(3)
return getEdge(from.getId(), to.getId()); @Test
void put_lineageWithDetails() throws HttpResponseException {
// Add column lineage table1.c1 -> table2.c1
LineageDetails details = new LineageDetails();
String t1c1FQN = TABLES.get(0).getColumns().get(0).getFullyQualifiedName();
String t1c2FQN = TABLES.get(0).getColumns().get(1).getFullyQualifiedName();
String t1c3FQN = TABLES.get(0).getColumns().get(2).getFullyQualifiedName();
String t2c1FQN = TABLES.get(1).getColumns().get(0).getFullyQualifiedName();
String t2c2FQN = TABLES.get(1).getColumns().get(1).getFullyQualifiedName();
String t2c3FQN = TABLES.get(1).getColumns().get(2).getFullyQualifiedName();
String t3c1FQN = TABLES.get(2).getColumns().get(0).getFullyQualifiedName();
String t3c2FQN = TABLES.get(2).getColumns().get(1).getFullyQualifiedName();
String t3c3FQN = TABLES.get(2).getColumns().get(2).getFullyQualifiedName();
details.getColumnsLineage().add(new ColumnLineage().withFromColumns(List.of(t1c1FQN)).withToColumn(t2c1FQN));
addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS);
// Add invalid column lineage (from column or to column are invalid)
details
.getColumnsLineage()
.add(new ColumnLineage().withFromColumns(List.of("invalidColumn")).withToColumn(t2c1FQN));
assertResponse(
() -> addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS),
BAD_REQUEST,
"Invalid fully qualified column name invalidColumn");
details
.getColumnsLineage()
.add(new ColumnLineage().withFromColumns(List.of(t1c1FQN)).withToColumn("invalidColumn"));
assertResponse(
() -> addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS),
BAD_REQUEST,
"Invalid fully qualified column name invalidColumn");
// Add column level lineage with multiple fromColumns (t1c1 + t3c1) to t2c1
details.getColumnsLineage().clear();
details
.getColumnsLineage()
.add(new ColumnLineage().withFromColumns(List.of(t1c1FQN, t3c1FQN)).withToColumn(t2c1FQN));
addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS);
// Finally, add detailed column level lineage
details.getColumnsLineage().clear();
List<ColumnLineage> lineage = details.getColumnsLineage();
lineage.add(new ColumnLineage().withFromColumns(List.of(t1c1FQN, t3c1FQN)).withToColumn(t2c1FQN));
lineage.add(new ColumnLineage().withFromColumns(List.of(t1c2FQN, t3c2FQN)).withToColumn(t2c2FQN));
lineage.add(new ColumnLineage().withFromColumns(List.of(t1c3FQN, t3c3FQN)).withToColumn(t2c3FQN));
addEdge(TABLES.get(0), TABLES.get(1), details, ADMIN_AUTH_HEADERS);
} }
public static Edge getEdge(UUID from, UUID to) { public Edge getEdge(Table from, Table to) {
return new Edge().withFromEntity(from).withToEntity(to); return getEdge(from.getId(), to.getId(), null);
}
public static Edge getEdge(UUID from, UUID to, LineageDetails details) {
return new Edge().withFromEntity(from).withToEntity(to).withLineageDetails(details);
} }
public void addEdge(Table from, Table to) throws HttpResponseException { public void addEdge(Table from, Table to) throws HttpResponseException {
addEdge(from, to, ADMIN_AUTH_HEADERS); addEdge(from, to, null, ADMIN_AUTH_HEADERS);
} }
private void addEdge(Table from, Table to, Map<String, String> authHeaders) throws HttpResponseException { private void addEdge(Table from, Table to, LineageDetails details, Map<String, String> authHeaders)
throws HttpResponseException {
if (details != null) {
details.setSqlQuery("select *;");
}
EntitiesEdge edge = EntitiesEdge edge =
new EntitiesEdge().withFromEntity(from.getEntityReference()).withToEntity(to.getEntityReference()); new EntitiesEdge()
.withFromEntity(from.getEntityReference())
.withToEntity(to.getEntityReference())
.withLineageDetails(details);
AddLineage addLineage = new AddLineage().withEdge(edge); AddLineage addLineage = new AddLineage().withEdge(edge);
addLineageAndCheck(addLineage, authHeaders); addLineageAndCheck(addLineage, authHeaders);
} }
@ -288,13 +354,13 @@ public class LineageResourceTest extends CatalogApplicationTest {
throws HttpResponseException { throws HttpResponseException {
EntityReference from = addLineage.getEdge().getFromEntity(); EntityReference from = addLineage.getEdge().getFromEntity();
EntityReference to = addLineage.getEdge().getToEntity(); EntityReference to = addLineage.getEdge().getToEntity();
Edge expectedEdge = getEdge(from.getId(), to.getId()); Edge expectedEdge = getEdge(from.getId(), to.getId(), addLineage.getEdge().getLineageDetails());
// Check fromEntity ---> toEntity downstream edge is returned // Check fromEntity ---> toEntity downstream edge of 'from' is returned
EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders); EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders);
assertEdge(lineage, expectedEdge, true); assertEdge(lineage, expectedEdge, true);
// Check fromEntity ---> toEntity upstream edge is returned // Check fromEntity ---> toEntity upstream edge 'to' is returned
lineage = getLineage(to.getType(), to.getId(), 1, 0, authHeaders); lineage = getLineage(to.getType(), to.getId(), 1, 0, authHeaders);
assertEdge(lineage, expectedEdge, false); assertEdge(lineage, expectedEdge, false);
} }
@ -303,7 +369,7 @@ public class LineageResourceTest extends CatalogApplicationTest {
throws HttpResponseException { throws HttpResponseException {
EntityReference from = deletedEdge.getFromEntity(); EntityReference from = deletedEdge.getFromEntity();
EntityReference to = deletedEdge.getToEntity(); EntityReference to = deletedEdge.getToEntity();
Edge expectedEdge = getEdge(from.getId(), to.getId()); Edge expectedEdge = getEdge(from.getId(), to.getId(), deletedEdge.getLineageDetails());
// Check fromEntity ---> toEntity downstream edge is returned // Check fromEntity ---> toEntity downstream edge is returned
EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders); EntityLineage lineage = getLineage(from.getType(), from.getId(), 0, 1, authHeaders);

View File

@ -650,7 +650,7 @@ public class UserResourceTest extends EntityResourceTest<User, CreateUser> {
// Add user as follower to a table // Add user as follower to a table
TableResourceTest tableResourceTest = new TableResourceTest(); TableResourceTest tableResourceTest = new TableResourceTest();
Table table = tableResourceTest.createEntity(test, 1); Table table = tableResourceTest.createEntity(test, 1);
tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), CREATED, 1, ADMIN_AUTH_HEADERS); tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), OK, 1, ADMIN_AUTH_HEADERS);
// Delete user // Delete user
deleteAndCheckEntity(user, ADMIN_AUTH_HEADERS); deleteAndCheckEntity(user, ADMIN_AUTH_HEADERS);
@ -662,7 +662,7 @@ public class UserResourceTest extends EntityResourceTest<User, CreateUser> {
// User can no longer follow other entities // User can no longer follow other entities
assertResponse( assertResponse(
() -> tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), CREATED, 1, ADMIN_AUTH_HEADERS), () -> tableResourceTest.addAndCheckFollower(table.getId(), user.getId(), OK, 1, ADMIN_AUTH_HEADERS),
NOT_FOUND, NOT_FOUND,
entityNotFound("user", user.getId())); entityNotFound("user", user.getId()));
} }

View File

@ -7,5 +7,5 @@ Provides metadata version information.
from incremental import Version from incremental import Version
__version__ = Version("metadata", 0, 11, 0, dev=8) __version__ = Version("metadata", 0, 11, 0, dev=9)
__all__ = ["__version__"] __all__ = ["__version__"]

View File

@ -88,14 +88,6 @@
"category" : { "category" : {
"$ref" : "#/definitions/category" "$ref" : "#/definitions/category"
}, },
"nameSpace": {
"description": "Namespace or group to which this type belongs to. For example, some of the field types commonly used can come from `basic` namespace. Some of the entities such as `table`, `database`, etc. come from `data` namespace.",
"type": "string",
"default" : "custom"
},
"category" : {
"$ref" : "#/definitions/category"
},
"schema": { "schema": {
"description": "JSON schema encoded as string that defines the type. This will be used to validate the type values.", "description": "JSON schema encoded as string that defines the type. This will be used to validate the type values.",
"$ref": "../type/basic.json#/definitions/jsonSchema" "$ref": "../type/basic.json#/definitions/jsonSchema"