Sample lineage (#735)

* Fix #727: Add sample lineage data and ingestion support
This commit is contained in:
Sriharsha Chintalapani 2021-10-11 20:12:40 -07:00 committed by GitHub
parent f2a40a8c4b
commit c28665bca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
93 changed files with 809 additions and 122 deletions

View File

@ -29,6 +29,8 @@ import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.resources.feeds.FeedUtil;
import org.openmetadata.catalog.resources.feeds.MessageParser;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
@ -87,6 +89,9 @@ public abstract class FeedRepository {
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract PipelineDAO pipelineDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@ -99,7 +104,7 @@ public abstract class FeedRepository {
// Validate about data entity is valid
EntityLink about = EntityLink.parse(thread.getAbout());
EntityReference aboutRef = EntityUtil.validateEntityLink(about, userDAO(), teamDAO(), tableDAO(),
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), taskDAO(), modelDAO());
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), taskDAO(), modelDAO(), pipelineDAO());
// Get owner for the addressed to Entity
EntityReference owner = EntityUtil.populateOwner(aboutRef.getId(), relationshipDAO(), userDAO(), teamDAO());
@ -178,7 +183,7 @@ public abstract class FeedRepository {
throw new IllegalArgumentException("Only entity links of type <E#/{entityType}/{entityName}> is allowed");
}
EntityReference reference = EntityUtil.validateEntityLink(entityLink, userDAO(), teamDAO(), tableDAO(),
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), taskDAO(), modelDAO());
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), taskDAO(), modelDAO(), pipelineDAO());
List<String> threadIds = new ArrayList<>();
List<List<String>> result = fieldRelationshipDAO().listToByPrefix(entityLink.getFullyQualifiedFieldValue(),
entityLink.getFullyQualifiedFieldType(), "thread",

View File

@ -25,6 +25,7 @@ import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.type.Edge;
import org.openmetadata.catalog.type.EntityLineage;
@ -70,6 +71,9 @@ public abstract class LineageRepository {
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract PipelineDAO pipelineDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@ -79,7 +83,7 @@ public abstract class LineageRepository {
@Transaction
public EntityLineage get(String entityType, String id, int upstreamDepth, int downstreamDepth) throws IOException {
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@ -87,21 +91,21 @@ public abstract class LineageRepository {
public EntityLineage getByName(String entityType, String fqn, int upstreamDepth, int downstreamDepth)
throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(),
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO());
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO(), pipelineDAO());
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@Transaction
public void addLineage(AddLineage addLineage) throws IOException {
// Validate from entity
EntityReference from = addLineage.getEdge().getFrom();
EntityReference from = addLineage.getEdge().getFromEntity();
from = EntityUtil.getEntityReference(from.getType(), from.getId(), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
// Validate to entity
EntityReference to = addLineage.getEdge().getTo();
EntityReference to = addLineage.getEdge().getToEntity();
to = EntityUtil.getEntityReference(to.getType(), to.getId(), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
// Finally, add lineage relationship
relationshipDAO().insert(from.getId().toString(), to.getId().toString(), from.getType(), to.getType(),
@ -122,7 +126,7 @@ public abstract class LineageRepository {
for (int i = 0; i < lineage.getNodes().size(); i++) {
EntityReference ref = lineage.getNodes().get(i);
ref = getEntityReference(ref.getType(), ref.getId(), tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
lineage.getNodes().set(i, ref);
}
return lineage;
@ -138,7 +142,7 @@ public abstract class LineageRepository {
upstreamDepth--;
for (EntityReference upstreamEntity : upstreamEntities) {
lineage.getUpstreamEdges().add(new Edge().withFrom(upstreamEntity.getId()).withTo(id));
lineage.getUpstreamEdges().add(new Edge().withFromEntity(upstreamEntity.getId()).withToEntity(id));
addUpstreamLineage(upstreamEntity.getId(), lineage, upstreamDepth); // Recursively add upstream nodes and edges
}
}
@ -153,7 +157,7 @@ public abstract class LineageRepository {
downstreamDepth--;
for (EntityReference entity : downStreamEntities) {
lineage.getDownstreamEdges().add(new Edge().withTo(entity.getId()).withFrom(id));
lineage.getDownstreamEdges().add(new Edge().withToEntity(entity.getId()).withFromEntity(id));
addDownstreamLineage(entity.getId(), lineage, downstreamDepth);
}
}

View File

@ -30,6 +30,7 @@ import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.resources.teams.TeamResource;
import org.openmetadata.catalog.resources.teams.TeamResource.TeamList;
import org.openmetadata.catalog.type.EntityReference;
@ -104,6 +105,9 @@ public abstract class TeamRepository {
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract PipelineDAO pipelineDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@ -262,7 +266,7 @@ public abstract class TeamRepository {
private List<EntityReference> getOwns(String teamId) throws IOException {
// Compile entities owned by the team
return EntityUtil.getEntityReference(relationshipDAO().findTo(teamId, OWNS.ordinal()), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
}
private void addUserRelationship(Team team, User user) {

View File

@ -24,6 +24,7 @@ import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.type.EntityReference;
@ -81,6 +82,9 @@ public abstract class UsageRepository {
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract PipelineDAO pipelineDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@ -92,7 +96,7 @@ public abstract class UsageRepository {
@Transaction
public EntityUsage get(String entityType, String id, String date, int days) throws IOException {
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
List<UsageDetails> usageDetails = usageDAO().getUsageById(id, date, days - 1);
return new EntityUsage().withUsage(usageDetails).withEntity(ref);
}
@ -100,7 +104,7 @@ public abstract class UsageRepository {
@Transaction
public EntityUsage getByName(String entityType, String fqn, String date, int days) throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(),
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO());
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO(), pipelineDAO());
List<UsageDetails> usageDetails = usageDAO().getUsageById(ref.getId().toString(), date, days - 1);
return new EntityUsage().withUsage(usageDetails).withEntity(ref);
}
@ -109,14 +113,15 @@ public abstract class UsageRepository {
public void create(String entityType, String id, DailyCount usage) throws IOException {
// Validate data entity for which usage is being collected
getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), metricsDAO(),
dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
addUsage(entityType, id, usage);
}
@Transaction
public void createByName(String entityType, String fullyQualifiedName, DailyCount usage) throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fullyQualifiedName, tableDAO(),
databaseDAO(), metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO());
databaseDAO(), metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(),
taskDAO(), modelDAO(), pipelineDAO());
addUsage(entityType, ref.getId().toString(), usage);
LOG.info("Usage successfully posted by name");
}

View File

@ -29,6 +29,7 @@ import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.resources.teams.UserResource.UserList;
import org.openmetadata.catalog.type.EntityReference;
@ -49,7 +50,6 @@ import javax.json.JsonPatch;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -108,6 +108,9 @@ public abstract class UserRepository {
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract PipelineDAO pipelineDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@ -264,13 +267,13 @@ public abstract class UserRepository {
}
// Populate details in entity reference
return EntityUtil.getEntityReference(ownedEntities, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO());
reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
}
private List<EntityReference> getFollows(User user) throws IOException {
return EntityUtil.getEntityReference(relationshipDAO().findTo(user.getId().toString(), FOLLOWS.ordinal()),
tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(),
modelDAO());
modelDAO(), pipelineDAO());
}
private void patchTeams(User original, User updated) throws IOException {

View File

@ -34,7 +34,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
@ -116,11 +119,11 @@ public class LineageResource {
schema = @Schema(type = "string"))
@PathParam("fqn") String fqn,
@Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)")
@DefaultValue("1") @Min(0) @Max(3)
@QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "Upstream depth of lineage (default=1, min=0, max=3)")
@DefaultValue("1") @Min(0) @Max(3)
@QueryParam("downstreamDepth") int downStreamDepth) throws IOException {
upstreamDepth = Math.min(Math.max(upstreamDepth, 0), 3);
downStreamDepth = Math.min(Math.max(downStreamDepth, 1), 3);
return addHref(uriInfo, dao.getByName(entity, fqn, upstreamDepth, downStreamDepth));
}

View File

@ -180,9 +180,17 @@ public class SearchResource {
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
highlightDescription.highlighterType("unified");
HighlightBuilder.Field highlightColumns =
new HighlightBuilder.Field("column_names");
highlightColumns.highlighterType("unified");
HighlightBuilder.Field highlightColumnDescriptions =
new HighlightBuilder.Field("column_descriptions");
highlightColumnDescriptions.highlighterType("unified");
HighlightBuilder hb = new HighlightBuilder();
hb.field(highlightDescription);
hb.field(highlightTableName);
hb.field(highlightColumns);
hb.field(highlightColumnDescriptions);
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
@ -202,15 +210,15 @@ public class SearchResource {
private SearchSourceBuilder buildTopicSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =
HighlightBuilder.Field highlightTopicName =
new HighlightBuilder.Field("topic_name");
highlightTableName.highlighterType("unified");
highlightTopicName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
highlightDescription.highlighterType("unified");
HighlightBuilder hb = new HighlightBuilder();
hb.field(highlightDescription);
hb.field(highlightTableName);
hb.field(highlightTopicName);
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
@ -228,15 +236,24 @@ public class SearchResource {
private SearchSourceBuilder buildDashboardSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =
HighlightBuilder.Field highlightDashboardName =
new HighlightBuilder.Field("dashboard_name");
highlightTableName.highlighterType("unified");
highlightDashboardName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
highlightDescription.highlighterType("unified");
HighlightBuilder.Field highlightCharts =
new HighlightBuilder.Field("chart_names");
highlightCharts.highlighterType("unified");
HighlightBuilder.Field highlightChartDescriptions =
new HighlightBuilder.Field("chart_descriptions");
highlightChartDescriptions.highlighterType("unified");
HighlightBuilder hb = new HighlightBuilder();
hb.field(highlightDescription);
hb.field(highlightTableName);
hb.field(highlightDashboardName);
hb.field(highlightCharts);
hb.field(highlightChartDescriptions);
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
@ -256,15 +273,23 @@ public class SearchResource {
private SearchSourceBuilder buildPipelineSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =
HighlightBuilder.Field highlightPipelineName =
new HighlightBuilder.Field("pipeline_name");
highlightTableName.highlighterType("unified");
highlightPipelineName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
highlightDescription.highlighterType("unified");
HighlightBuilder.Field highlightTasks =
new HighlightBuilder.Field("task_names");
highlightTasks.highlighterType("unified");
HighlightBuilder.Field highlightTaskDescriptions =
new HighlightBuilder.Field("task_descriptions");
highlightTaskDescriptions.highlighterType("unified");
HighlightBuilder hb = new HighlightBuilder();
hb.field(highlightDescription);
hb.field(highlightTableName);
hb.field(highlightPipelineName);
hb.field(highlightTasks);
hb.field(highlightTaskDescriptions);
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)

View File

@ -41,6 +41,7 @@ import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.jdbi3.Relationship;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
@ -56,6 +57,7 @@ import org.openmetadata.catalog.resources.databases.DatabaseResource;
import org.openmetadata.catalog.resources.databases.TableResource;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.resources.models.ModelResource;
import org.openmetadata.catalog.resources.pipelines.PipelineResource;
import org.openmetadata.catalog.resources.services.dashboard.DashboardServiceResource;
import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource;
import org.openmetadata.catalog.resources.services.messaging.MessagingServiceResource;
@ -151,6 +153,8 @@ public final class EntityUtil {
ModelResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
TaskResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
PipelineResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.DATABASE_SERVICE)) {
DatabaseServiceResource.addHref(uriInfo, ref);
} else if (entity.equalsIgnoreCase(Entity.MESSAGING_SERVICE)) {
@ -243,10 +247,12 @@ public final class EntityUtil {
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO,
TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO, ModelDAO modelDAO) throws IOException {
TaskDAO taskDAO, ModelDAO modelDAO,
PipelineDAO pipelineDAO) throws IOException {
for (EntityReference ref : list) {
getEntityReference(
ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO, taskDAO, modelDAO
ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO,
topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO
);
}
return list;
@ -255,7 +261,7 @@ public final class EntityUtil {
public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO, ModelDAO modelDAO)
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
// Note href to entity reference is not added here
String entity = ref.getType();
@ -284,7 +290,10 @@ public final class EntityUtil {
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(id, taskDAO.findById(id), Task.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
Pipeline instance = EntityUtil.validate(id, pipelineDAO.findById(id), Pipeline.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
Model instance = EntityUtil.validate(id, modelDAO.findById(id), Model.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
}
@ -294,17 +303,18 @@ public final class EntityUtil {
public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO, ModelDAO modelDAO)
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
EntityReference ref = new EntityReference().withId(id).withType(entity);
return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO,
reportDAO, topicDAO, chartDAO, taskDAO, modelDAO);
reportDAO, topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO);
}
public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
DashboardDAO dashboardDAO, TaskDAO taskDAO, ModelDAO modelDAO)
DashboardDAO dashboardDAO, TaskDAO taskDAO, ModelDAO modelDAO,
PipelineDAO pipelineDAO)
throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class);
@ -330,6 +340,9 @@ public final class EntityUtil {
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
Pipeline instance = EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
Model instance = EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class);
return getEntityReference(instance);
@ -414,7 +427,7 @@ public final class EntityUtil {
public static EntityReference validateEntityLink(EntityLink entityLink, UserDAO userDAO, TeamDAO teamDAO,
TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO,
TaskDAO taskDAO, ModelDAO modelDAO)
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
String entityType = entityLink.getEntityType();
String fqn = entityLink.getEntityId();
@ -436,6 +449,8 @@ public final class EntityUtil {
return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class));
} else if (entityType.equalsIgnoreCase(Entity.TASK)) {
return getEntityReference(EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class));
} else if (entityType.equalsIgnoreCase(Entity.PIPELINE)) {
return getEntityReference(EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class));
} else if (entityType.equalsIgnoreCase(Entity.MODEL)) {
return getEntityReference(EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class));
} else {

View File

@ -11,11 +11,11 @@
"type": "object",
"javaType": "org.openmetadata.catalog.type.Edge",
"properties": {
"from": {
"fromEntity": {
"description" : "From entity that is upstream of lineage edge.",
"$ref" : "basic.json#/definitions/uuid"
},
"to": {
"toEntity": {
"description" : "To entity that is downstream of lineage edge.",
"$ref" : "basic.json#/definitions/uuid"
},
@ -29,11 +29,11 @@
"type": "object",
"javaType": "org.openmetadata.catalog.type.EntitiesEdge",
"properties": {
"from": {
"fromEntity": {
"description" : "From entity that is upstream of lineage edge.",
"$ref" : "entityReference.json"
},
"to": {
"toEntity": {
"description" : "To entity that is downstream of lineage edge.",
"$ref" : "entityReference.json"
},

View File

@ -121,11 +121,12 @@ public class LineageResourceTest extends CatalogApplicationTest {
}
public static Edge getEdge(UUID from, UUID to) {
return new Edge().withFrom(from).withTo(to);
return new Edge().withFromEntity(from).withToEntity(to);
}
public void addEdge(Table from, Table to) throws HttpResponseException {
EntitiesEdge edge = new EntitiesEdge().withFrom(getEntityReference(from)).withTo(getEntityReference(to));
EntitiesEdge edge = new EntitiesEdge().withFromEntity(getEntityReference(from))
.withToEntity(getEntityReference(to));
AddLineage addLineage = new AddLineage().withEdge(edge);
addLineageAndCheck(addLineage, adminAuthHeaders());
}
@ -143,8 +144,8 @@ public class LineageResourceTest extends CatalogApplicationTest {
private static void validateLineage(AddLineage addLineage, Map<String, String> authHeaders)
throws HttpResponseException {
EntityReference from = addLineage.getEdge().getFrom();
EntityReference to = addLineage.getEdge().getTo();
EntityReference from = addLineage.getEdge().getFromEntity();
EntityReference to = addLineage.getEdge().getToEntity();
Edge expectedEdge = getEdge(from.getId(), to.getId());
// Check fromEntity ---> toEntity downstream edge is returned
@ -162,8 +163,14 @@ public class LineageResourceTest extends CatalogApplicationTest {
// Total number of from and to points in an edge must be equal to the number of nodes
List<UUID> ids = new ArrayList<>();
lineage.getUpstreamEdges().forEach(edge -> {ids.add(edge.getFrom()); ids.add(edge.getTo());});
lineage.getDownstreamEdges().forEach(edge -> {ids.add(edge.getFrom()); ids.add(edge.getTo());});
lineage.getUpstreamEdges().forEach(edge -> {
ids.add(edge.getFromEntity());
ids.add(edge.getToEntity());
});
lineage.getDownstreamEdges().forEach(edge -> {
ids.add(edge.getFromEntity());
ids.add(edge.getToEntity());
});
if (lineage.getNodes().size() != 0) {
assertEquals((int) ids.stream().distinct().count(), lineage.getNodes().size() + 1);
}

View File

@ -1,5 +1,325 @@
{
"tables": [
{
"id": "02e7ede6-9cd5-4190-9c36-3fae6b2cbac8",
"name": "raw_customer",
"tableType": "Regular",
"description": "This is a raw customers table as represented in our online DB. This contains personal, shipping and billing addresses and details of the customer store and customer profile. This table is used to build our dimensional and fact tables",
"columns": [
{
"name": "comments",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 1
},
{
"name": "creditcard",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 2
},
{
"name": "customer",
"dataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "struct<username:character varying(32),name:character varying(32),sex:char(1),address:character varying(128),mail:character varying(64),birthdate:character varying(16)>",
"constraint": "NULL",
"ordinalPosition": 3,
"children": [
{
"name": "username",
"dataType": "VARCHAR",
"dataLength": 32,
"dataTypeDisplay": "character varying(32)",
"constraint": "NULL"
},
{
"name": "name",
"dataType": "VARCHAR",
"dataLength": 32,
"dataTypeDisplay": "character varying(32)",
"constraint": "NULL"
},
{
"name": "sex",
"dataType": "CHAR",
"dataLength": 1,
"dataTypeDisplay": "char(1)",
"constraint": "NULL"
},
{
"name": "address",
"dataType": "VARCHAR",
"dataLength": 128,
"dataTypeDisplay": "character varying(128)",
"constraint": "NULL"
},
{
"name": "mail",
"dataType": "VARCHAR",
"dataLength": 64,
"dataTypeDisplay": "character varying(64)",
"constraint": "NULL"
},
{
"name": "birthdate",
"dataType": "VARCHAR",
"dataLength": 16,
"dataTypeDisplay": "character varying(16)",
"constraint": "NULL"
}
]
},
{
"name": "membership",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 4
},
{
"name": "orders",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "array<struct<product_id:character varying(24),price:int,onsale:boolean,tax:int,weight:int,others:int,vendor:character varying(64)>>",
"constraint": "NULL",
"ordinalPosition": 5
},
{
"name": "platform",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 6
},
{
"name": "preference",
"dataType": "MAP",
"dataLength": 1,
"dataTypeDisplay": "map&lt;character varying(32),boolean&gt;",
"constraint": "NULL",
"ordinalPosition": 7
},
{
"name": "shipping_address",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "array<struct<name:character varying(32),street_address:character varying(128),city:character varying(32),postcode:character varying(8)>>",
"constraint": "NULL",
"ordinalPosition": 8
},
{
"name": "shipping_date",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 9
},
{
"name": "transaction_date",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 10
}
],
"database": {
"id": "89229e26-7f74-4443-a568-85512eaeaa07",
"type": "database"
},
"tags": null,
"joins": null
},
{
"id": "02e7ede6-9cd5-4190-9c36-3fae6b2cbac8",
"name": "raw_order",
"tableType": "Regular",
"description": "This is a raw orders table as represented in our online DB. This table contains all the orders by the customers and can be used to buid our dim and fact tables",
"fullyQualifiedName": "bigquery.shopify.raw_order",
"columns": [
{
"name": "comments",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 1
},
{
"name": "creditcard",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 2
},
{
"name": "membership",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 4
},
{
"name": "orders",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "array<struct<product_id:character varying(24),price:int,onsale:boolean,tax:int,weight:int,others:int,vendor:character varying(64)>>",
"constraint": "NULL",
"ordinalPosition": 5
},
{
"name": "platform",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 6
},
{
"name": "preference",
"dataType": "MAP",
"dataLength": 1,
"dataTypeDisplay": "map&lt;character varying(32),boolean&gt;",
"constraint": "NULL",
"ordinalPosition": 7
},
{
"name": "shipping_address",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "array<struct<name:character varying(32),street_address:character varying(128),city:character varying(32),postcode:character varying(8)>>",
"constraint": "NULL",
"ordinalPosition": 8
},
{
"name": "shipping_date",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 9
},
{
"name": "transaction_date",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 10
},
{
"name": "total_order_count",
"dataType": "NUMERIC",
"description": "The total number of orders that the customer has made from this store across their lifetime.",
"tags": null,
"ordinalPosition": 11
},
{
"name": "total_order_value",
"dataType": "NUMERIC",
"description": "The total amount of money that the customer has spent on orders from the store across their lifetime. The value is formatted in the store's currency.",
"tags": null,
"ordinalPosition": 12
},
{
"name": "first_order_date",
"dataType": "TIMESTAMP",
"description": "The date (ISO 8601) and time (UTC) when the customer placed their first order. The format is YYYY-MM-DD HH:mm:ss (for example, 2016-02-05 17:04:01).",
"tags": null,
"ordinalPosition": 13
},
{
"name": "last_order_date",
"dataType": "TIMESTAMP",
"description": "The date (ISO 8601) and time (UTC) when the customer placed their most recent order. The format is YYYY-MM-DD HH:mm:ss (for example, 2016-02-05 17:04:01).",
"tags": null,
"ordinalPosition": 14
}
],
"database": {
"id": "89229e26-7f74-4443-a568-85512eaeaa07",
"type": "database"
},
"tags": null,
"joins": null
},
{
"id": "02e7ede6-9cd5-4190-9c36-3fae6b2cbac8",
"name": "raw_product_catalog",
"tableType": "Regular",
"description": "This is a raw product catalog table contains the product listing, price, seller etc.. represented in our online DB. ",
"fullyQualifiedName": "bigquery.shopify.raw_product_catalog",
"columns": [
{
"name": "comments",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 1
},
{
"name": "products",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "array<struct<product_id:character varying(24),price:int,onsale:boolean,tax:int,weight:int,others:int,vendor:character varying(64), stock:int>>",
"constraint": "NULL",
"ordinalPosition": 2
},
{
"name": "platform",
"dataType": "STRING",
"dataLength": 1,
"dataTypeDisplay": "string",
"constraint": "NULL",
"ordinalPosition": 3
},
{
"name": "store_address",
"dataType": "ARRAY",
"arrayDataType": "STRUCT",
"dataLength": 1,
"dataTypeDisplay": "array<struct<name:character varying(32),street_address:character varying(128),city:character varying(32),postcode:character varying(8)>>",
"constraint": "NULL",
"ordinalPosition": 4
},
{
"name": "first_order_date",
"dataType": "TIMESTAMP",
"description": "The date (ISO 8601) and time (UTC) when the customer placed their first order. The format is YYYY-MM-DD HH:mm:ss (for example, 2016-02-05 17:04:01).",
"tags": null,
"ordinalPosition": 5
},
{
"name": "last_order_date",
"dataType": "TIMESTAMP",
"description": "The date (ISO 8601) and time (UTC) when the customer placed their most recent order. The format is YYYY-MM-DD HH:mm:ss (for example, 2016-02-05 17:04:01).",
"tags": null,
"ordinalPosition": 6
}
],
"database": {
"id": "89229e26-7f74-4443-a568-85512eaeaa07",
"type": "database"
},
"tags": null,
"joins": null
},
{
"id": "02e7ede6-9cd5-4190-9c36-3fae6b2cbac8",
"name": "dim_address",

View File

@ -0,0 +1,25 @@
[{
"from": { "fqn":"bigquery.shopify.raw_customer", "type": "table"},
"to": { "fqn":"sample_airflow.dim_address_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_airflow.dim_address_etl", "type": "pipeline"},
"to": {"fqn":"bigquery.shopify.dim_address", "type": "table"}
},
{
"from": {"fqn":"bigquery.shopify.raw_order", "type": "table"},
"to": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
},
{
"from": {"fqn":"bigquery.shopify.raw_customer", "type": "table"},
"to": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"}
},
{
"from": {"fqn":"sample_airflow.dim_product_etl", "type": "pipeline"},
"to": {"fqn":"bigquery.shopify.dim_product", "type": "table"}
},
{
"from": {"fqn": "sample_airflow.dim_product_etl", "type": "pipeline"},
"to": {"fqn":"bigquery.shopify.dim_product_variant", "type": "table"}
}
]

View File

@ -6,6 +6,34 @@
"pipelineUrl": "http://localhost:8080/tree?dag_id=presto_etl",
"tasks": ["presto_task", "assert_table_exists"]
},
{
"name": "dim_address_etl",
"displayName": "dim_address etl",
"description": "dim_address ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"tasks": ["dim_address_task", "assert_table_exists"]
},
{
"name": "dim_user_etl",
"displayName": "dim_user etl",
"description": "dim_user ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_user_etl",
"tasks": ["dim_user_task", "assert_table_exists"]
},
{
"name": "dim_location_etl",
"displayName": "dim_location etl",
"description": "diim_location ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"tasks": ["dim_location_task", "assert_table_exists"]
},
{
"name": "dim_product_etl",
"displayName": "dim_product etl",
"description": "diim_product ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"tasks": ["dim_product_task", "assert_table_exists"]
},
{
"name": "trino_etl",
"displayName": "Trino ETL",

View File

@ -31,6 +31,30 @@
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "dim_address_task",
"displayName": "dim_address Task",
"description": "Airflow operator to perform ETL and generate dim_address table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "dim_user_task",
"displayName": "dim_user Task",
"description": "Airflow operator to perform ETL and generate dim_user table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_user_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "dim_location_task",
"displayName": "dim_location Task",
"description": "Airflow operator to perform ETL and generate dim_location table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_location_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "trino_task",
"displayName": "Trino Task",
@ -38,6 +62,15 @@
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": ["assert_table_exists"],
"taskType": "TrinoOperator"
},
{
"name": "dim_location_etl",
"displayName": "dim_location etl",
"description": "dim_location ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": ["assert_table_exists"],
"taskType": "TrinoOperator"
}
]
}

View File

@ -101,8 +101,8 @@ plugins: Dict[str, Set[str]] = {
"trino": {"sqlalchemy-trino"},
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"redash": {"redash-toolbelt==0.1.4"},
"redshift": {"openmetadata-sqlalchemy-redshift==0.2.0", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift": {"openmetadata-sqlalchemy-redshift==0.2.1", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {"openmetadata-sqlalchemy-redshift==0.2.1", "psycopg2-binary", "GeoAlchemy2"},
"scheduler": scheduler_requirements,
"data-profiler": {"openmetadata-data-profiler"},
"snowflake": {"snowflake-sqlalchemy<=1.2.4"},

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createChart.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDashboard.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -0,0 +1,35 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createModel.json
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field, constr
from ...type import entityReference, tagLabel
class CreateModelEntityRequest(BaseModel):
name: constr(min_length=1, max_length=64) = Field(
..., description='Name that identifies this model.'
)
displayName: Optional[str] = Field(
None,
description='Display Name that identifies this model. It could be title or label from the source services',
)
description: Optional[str] = Field(
None,
description='Description of the model instance. How it was trained and for what it is used.',
)
algorithm: str = Field(..., description='Algorithm used to train the model')
dashboard: Optional[entityReference.EntityReference] = Field(
None, description='Performance Dashboard URL to track metric evolution'
)
tags: Optional[List[tagLabel.TagLabel]] = Field(
None, description='Tags for this model'
)
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this database'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createPipeline.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTask.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -0,0 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -0,0 +1,18 @@
# generated by datamodel-codegen:
# filename: schema/api/lineage/addLineage.json
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
from ...type import entityLineage
class AddLineage(BaseModel):
description: Optional[str] = Field(
None, description='User provided description of the lineage details.'
)
edge: entityLineage.EntitiesEdge = Field(..., description='Lineage edge details.')

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDashboardService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createPipelineService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDashboardService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updatePipelineService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/chart.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -0,0 +1,46 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/model.json
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field, constr
from ...type import basic, entityReference, tagLabel, usageDetails
class Model(BaseModel):
id: basic.Uuid = Field(..., description='Unique identifier of a model instance.')
name: constr(min_length=1, max_length=64) = Field(
..., description='Name that identifies this model.'
)
fullyQualifiedName: Optional[constr(min_length=1, max_length=64)] = Field(
None, description='A unique name that identifies a model.'
)
displayName: Optional[str] = Field(
None, description='Display Name that identifies this model.'
)
description: Optional[str] = Field(
None, description='Description of the model, what it is, and how to use it.'
)
algorithm: str = Field(..., description='Algorithm used to train the model')
dashboard: Optional[entityReference.EntityReference] = Field(
None, description='Performance Dashboard URL to track metric evolution'
)
href: Optional[basic.Href] = Field(
None, description='Link to the resource corresponding to this entity.'
)
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this model.'
)
followers: Optional[entityReference.EntityReferenceList] = Field(
None, description='Followers of this model.'
)
tags: Optional[List[tagLabel.TagLabel]] = Field(
None, description='Tags for this model.'
)
usageSummary: Optional[usageDetails.TypeUsedToReturnUsageDetailsOfAnEntity] = Field(
None, description='Latest usage information for this model.'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/task.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/dashboardService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/pipelineService.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -0,0 +1,43 @@
# generated by datamodel-codegen:
# filename: schema/type/entityLineage.json
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Extra, Field
from . import basic, entityReference
class Edge(BaseModel):
fromEntity: Optional[basic.Uuid] = Field(
None, description='From entity that is upstream of lineage edge.'
)
toEntity: Optional[basic.Uuid] = Field(
None, description='To entity that is downstream of lineage edge.'
)
description: Optional[str] = None
class EntityLineage(BaseModel):
class Config:
extra = Extra.forbid
entity: entityReference.EntityReference = Field(
..., description='Primary entity for which this lineage graph is created'
)
nodes: Optional[List[entityReference.EntityReference]] = None
upstreamEdges: Optional[List[Edge]] = None
downstreamEdges: Optional[List[Edge]] = None
class EntitiesEdge(BaseModel):
fromEntity: Optional[entityReference.EntityReference] = Field(
None, description='From entity that is upstream of lineage edge.'
)
toEntity: Optional[entityReference.EntityReference] = Field(
None, description='To entity that is downstream of lineage edge.'
)
description: Optional[str] = None

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,10 +1,10 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations
from pydantic import AnyUrl, BaseModel, Field
from pydantic import BaseModel, Field
class DriverClass(BaseModel):
@ -12,7 +12,10 @@ class DriverClass(BaseModel):
class ConnectionUrl(BaseModel):
__root__: AnyUrl = Field(..., description='Type used for JDBC connection URL.')
__root__: str = Field(
...,
description='Type used for JDBC connection URL of format `url_scheme://<username>:<password>@<host>:<port>/<db_name>`.',
)
class JdbcInfo(BaseModel):

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/paging.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-10-01T19:50:55+00:00
# timestamp: 2021-10-12T00:34:28+00:00
from __future__ import annotations

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.api.data.createPipeline import CreatePipelineEnti
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
@ -410,6 +411,12 @@ class OpenMetadataAPIClient(object):
resp = self.client.put('/dashboards', data=create_dashboard_request.json())
return Dashboard(**resp)
def get_dashboard_by_name(self, dashboard_name: str, fields: [] = ['charts', 'service']) -> Dashboard:
"""Get Dashboard By Name"""
params = {'fields': ",".join(fields)}
resp = self.client.get('/dashboards/name/{}'.format(dashboard_name), data=params)
return Dashboard(**resp)
def list_dashboards(self, fields: str = None, after: str = None, limit: int = 1000) -> DashboardEntities:
""" List all dashboards"""
@ -496,5 +503,15 @@ class OpenMetadataAPIClient(object):
after = resp['paging']['after'] if 'after' in resp['paging'] else None
return PipelineEntities(pipelines=pipelines, total=total, after=after)
def get_pipeline_by_name(self, pipeline_name: str, fields: [] = ['tasks', 'service']) -> Pipeline:
"""Get Pipeline By Name"""
params = {'fields': ",".join(fields)}
resp = self.client.get('/pipelines/name/{}'.format(pipeline_name), data=params)
return Pipeline(**resp)
def create_or_update_lineage(self, lineage: AddLineage):
resp = self.client.put('/lineage', data=lineage.json())
return resp
def close(self):
self.client.close()

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.api.data.createPipeline import CreatePipelineEnti
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.task import Task
@ -90,6 +91,8 @@ class MetadataRestSink(Sink):
self.write_tasks(record)
elif isinstance(record, Pipeline):
self.write_pipelines(record)
elif isinstance(record, AddLineage):
self.write_lineage(record)
else:
logging.info("Ignoring the record due to unknown Record type {}".format(type(record)))
@ -229,6 +232,17 @@ class MetadataRestSink(Sink):
logger.error(err)
self.status.failure(pipeline.name)
def write_lineage(self, add_lineage: AddLineage):
try:
logger.info(add_lineage)
created_lineage = self.client.create_or_update_lineage(add_lineage)
logger.info('Successfully added Lineage {}'.format(created_lineage))
self.status.records_written('{}'.format(created_lineage))
except (APIError, ValidationError) as err:
logger.error("Failed to ingest task {}".format(add_lineage))
logger.error(err)
self.status.failure(add_lineage)
def get_status(self):
return self.status

View File

@ -27,6 +27,7 @@ from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceEntityRequest
@ -37,6 +38,7 @@ from metadata.generated.schema.entity.data.task import Task
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Record
from metadata.ingestion.api.source import SourceStatus, Source
@ -99,6 +101,20 @@ def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineSer
return created_service
def get_lineage_entity_ref(edge, metadata_config) -> EntityReference:
client = OpenMetadataAPIClient(metadata_config)
fqn = edge['fqn']
if edge['type'] == 'table':
table = client.get_table_by_name(fqn)
return EntityReference(id=table.id, type='table')
elif edge['type'] == 'pipeline':
pipeline = client.get_pipeline_by_name(edge['fqn'])
return EntityReference(id=pipeline.id, type='pipeline')
elif edge['type'] == 'dashboard':
dashboard = client.get_dashboard_by_name(fqn)
return EntityReference(id=dashboard.id, type='dashboard')
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
@ -301,6 +317,7 @@ class SampleDataSource(Source):
self.tasks = json.load(open(self.config.sample_data_folder + "/pipelines/tasks.json", 'r'))
self.pipelines = json.load(open(self.config.sample_data_folder + "/pipelines/pipelines.json", 'r'))
self.pipeline_service = get_pipeline_service_or_create(self.pipeline_service_json, metadata_config)
self.lineage = json.load(open(self.config.sample_data_folder + "/lineage/lineage.json", 'r'))
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
@ -318,6 +335,7 @@ class SampleDataSource(Source):
yield from self.ingest_dashboards()
yield from self.ingest_tasks()
yield from self.ingest_pipelines()
yield from self.ingest_lineage()
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
db = Database(id=uuid.uuid4(),
@ -396,6 +414,19 @@ class SampleDataSource(Source):
service=EntityReference(id=self.pipeline_service.id, type='pipelineService'))
yield pipeline_ev
def ingest_lineage(self) -> Iterable[AddLineage]:
for edge in self.lineage:
print(edge)
from_entity_ref = get_lineage_entity_ref(edge['from'], self.metadata_config)
print("hello from {}".format(from_entity_ref))
to_entity_ref = get_lineage_entity_ref(edge['to'], self.metadata_config)
lineage = AddLineage(
edge=EntitiesEdge(fromEntity=from_entity_ref,
toEntity=to_entity_ref)
)
print(lineage)
yield lineage
def close(self):
pass