Lineage resource migrated to jdbi3

This commit is contained in:
sureshms 2021-10-21 18:32:08 -07:00
parent a919367a2b
commit b84220a6a8
7 changed files with 100 additions and 86 deletions

View File

@ -1,18 +0,0 @@
package org.openmetadata.catalog.jdbi3;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import java.util.List;
public interface BotsDAO {
@SqlUpdate("INSERT INTO bot_entity(json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlQuery("SELECT json FROM bot_entity WHERE name = :name")
String findByName(@Bind("name") String name);
@SqlQuery("SELECT json FROM bot_entity WHERE (name = :name OR :name is NULL)")
List<String> list(@Bind("name") String name);
}

View File

@ -20,5 +20,5 @@ import org.jdbi.v3.sqlobject.CreateSqlObject;
public interface BotsRepository3 {
@CreateSqlObject
abstract BotsDAO botsDAO();
BotsDAO3 botsDAO();
}

View File

@ -37,7 +37,7 @@ public class BotsRepositoryHelper {
}
public Bots findByName(String name) throws IOException {
return EntityUtil.validate(name, repo3.botsDAO().findByName(name), Bots.class);
return repo3.botsDAO().findEntityByName(name);
}
public List<Bots> list(String name) throws IOException {

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.CreateSqlObject;
public interface LineageRepository3 {
@CreateSqlObject
TableDAO3 tableDAO();
@CreateSqlObject
DatabaseDAO3 databaseDAO();
@CreateSqlObject
MetricsDAO3 metricsDAO();
@CreateSqlObject
DashboardDAO3 dashboardDAO();
@CreateSqlObject
ReportDAO3 reportDAO();
@CreateSqlObject
TopicDAO3 topicDAO();
@CreateSqlObject
ChartDAO3 chartDAO();
@CreateSqlObject
TaskDAO3 taskDAO();
@CreateSqlObject
PipelineDAO3 pipelineDAO();
@CreateSqlObject
ModelDAO3 modelDAO();
@CreateSqlObject
EntityRelationshipDAO3 relationshipDAO();
}

View File

@ -16,13 +16,12 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.api.lineage.AddLineage;
import org.openmetadata.catalog.type.Edge;
import org.openmetadata.catalog.type.EntityLineage;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,54 +33,28 @@ import java.util.stream.Collectors;
import static org.openmetadata.catalog.util.EntityUtil.getEntityReference;
public abstract class LineageRepository {
private static final Logger LOG = LoggerFactory.getLogger(LineageRepository.class);
public class LineageRepositoryHelper {
private static final Logger LOG = LoggerFactory.getLogger(LineageRepositoryHelper.class);
@CreateSqlObject
abstract TableDAO tableDAO();
public LineageRepositoryHelper(LineageRepository3 repo3) { this.repo3 = repo3; }
@CreateSqlObject
abstract DatabaseDAO databaseDAO();
@CreateSqlObject
abstract MetricsDAO metricsDAO();
@CreateSqlObject
abstract DashboardDAO dashboardDAO();
@CreateSqlObject
abstract ReportDAO reportDAO();
@CreateSqlObject
abstract TopicDAO topicDAO();
@CreateSqlObject
abstract ChartDAO chartDAO();
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject
abstract PipelineDAO pipelineDAO();
@CreateSqlObject
abstract ModelDAO modelDAO();
@CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO();
private final LineageRepository3 repo3;
@Transaction
public EntityLineage get(String entityType, String id, int upstreamDepth, int downstreamDepth) throws IOException {
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), repo3.tableDAO(), repo3.databaseDAO(),
repo3.metricsDAO(), repo3.dashboardDAO(), repo3.reportDAO(), repo3.topicDAO(), repo3.chartDAO(),
repo3.taskDAO(), repo3.modelDAO(), repo3.pipelineDAO());
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@Transaction
public EntityLineage getByName(String entityType, String fqn, int upstreamDepth, int downstreamDepth)
throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(),
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO(), modelDAO(), pipelineDAO());
// TODO clean this up
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, repo3.tableDAO(), repo3.databaseDAO(),
repo3.metricsDAO(), repo3.reportDAO(), repo3.topicDAO(), repo3.chartDAO(), repo3.dashboardDAO(), repo3.taskDAO(),
repo3.modelDAO(), repo3.pipelineDAO());
return getLineage(ref, upstreamDepth, downstreamDepth);
}
@ -89,16 +62,18 @@ public abstract class LineageRepository {
public void addLineage(AddLineage addLineage) throws IOException {
// Validate from entity
EntityReference from = addLineage.getEdge().getFromEntity();
from = EntityUtil.getEntityReference(from.getType(), from.getId(), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
from = EntityUtil.getEntityReference(from.getType(), from.getId(), repo3.tableDAO(), repo3.databaseDAO(),
repo3.metricsDAO(), repo3.dashboardDAO(), repo3.reportDAO(), repo3.topicDAO(), repo3.chartDAO(),
repo3.taskDAO(), repo3.modelDAO(), repo3.pipelineDAO());
// Validate to entity
EntityReference to = addLineage.getEdge().getToEntity();
to = EntityUtil.getEntityReference(to.getType(), to.getId(), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
to = EntityUtil.getEntityReference(to.getType(), to.getId(), repo3.tableDAO(), repo3.databaseDAO(),
repo3.metricsDAO(), repo3.dashboardDAO(), repo3.reportDAO(), repo3.topicDAO(), repo3.chartDAO(),
repo3.taskDAO(), repo3.modelDAO(), repo3.pipelineDAO());
// Finally, add lineage relationship
relationshipDAO().insert(from.getId().toString(), to.getId().toString(), from.getType(), to.getType(),
repo3.relationshipDAO().insert(from.getId().toString(), to.getId().toString(), from.getType(), to.getType(),
Relationship.UPSTREAM.ordinal());
}
@ -115,8 +90,9 @@ public abstract class LineageRepository {
// Add entityReference details
for (int i = 0; i < lineage.getNodes().size(); i++) {
EntityReference ref = lineage.getNodes().get(i);
ref = getEntityReference(ref.getType(), ref.getId(), tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO(), topicDAO(), chartDAO(), taskDAO(), modelDAO(), pipelineDAO());
ref = getEntityReference(ref.getType(), ref.getId(), repo3.tableDAO(), repo3.databaseDAO(), repo3.metricsDAO(),
repo3.dashboardDAO(), repo3.reportDAO(), repo3.topicDAO(), repo3.chartDAO(), repo3.taskDAO(),
repo3.modelDAO(), repo3.pipelineDAO());
lineage.getNodes().set(i, ref);
}
return lineage;
@ -127,7 +103,8 @@ public abstract class LineageRepository {
return;
}
// from this id ---> find other ids
List<EntityReference> upstreamEntities = relationshipDAO().findFrom(id.toString(), Relationship.UPSTREAM.ordinal());
List<EntityReference> upstreamEntities = repo3.relationshipDAO().findFrom(id.toString(),
Relationship.UPSTREAM.ordinal());
lineage.getNodes().addAll(upstreamEntities);
upstreamDepth--;
@ -142,7 +119,8 @@ public abstract class LineageRepository {
return;
}
// from other ids ---> to this id
List<EntityReference> downStreamEntities = relationshipDAO().findTo(id.toString(), Relationship.UPSTREAM.ordinal());
List<EntityReference> downStreamEntities = repo3.relationshipDAO().findTo(id.toString(),
Relationship.UPSTREAM.ordinal());
lineage.getNodes().addAll(downStreamEntities);
downstreamDepth--;

View File

@ -24,7 +24,7 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.lineage.AddLineage;
import org.openmetadata.catalog.jdbi3.LineageRepository;
import org.openmetadata.catalog.jdbi3.LineageRepositoryHelper;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.security.CatalogAuthorizer;
@ -56,14 +56,14 @@ import java.util.Objects;
@Api(value = "Lineage resource", tags = "Lineage resource")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "lineage", repositoryClass = "org.openmetadata.catalog.jdbi3.LineageRepository")
@Collection(name = "lineage", repositoryClass = "org.openmetadata.catalog.jdbi3.LineageRepositoryHelper")
public class LineageResource {
private static final Logger LOG = LoggerFactory.getLogger(UserResource.class);
private final LineageRepository dao;
private final LineageRepositoryHelper dao;
@Inject
public LineageResource(LineageRepository dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "LineageRepository must not be null");
public LineageResource(LineageRepositoryHelper dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "LineageRepositoryHelper must not be null");
this.dao = dao;
}

View File

@ -521,40 +521,40 @@ public final class EntityUtil {
}
public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO3 tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
DashboardDAO dashboardDAO, TaskDAO taskDAO, ModelDAO modelDAO,
PipelineDAO pipelineDAO)
DatabaseDAO3 databaseDAO, MetricsDAO3 metricsDAO,
ReportDAO3 reportDAO, TopicDAO3 topicDAO, ChartDAO3 chartDAO,
DashboardDAO3 dashboardDAO, TaskDAO3 taskDAO, ModelDAO3 modelDAO,
PipelineDAO3 pipelineDAO)
throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = tableDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class);
Database instance = databaseDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.METRICS)) {
Metrics instance = EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class);
Metrics instance = metricsDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.REPORT)) {
Report instance = EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class);
Report instance = reportDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.TOPIC)) {
Topic instance = EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class);
Topic instance = topicDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.CHART)) {
Chart instance = EntityUtil.validate(fqn, chartDAO.findByFQN(fqn), Chart.class);
Chart instance = chartDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) {
Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class);
Dashboard instance = dashboardDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class);
Task instance = taskDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
Pipeline instance = EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class);
Pipeline instance = pipelineDAO.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
Model instance = EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class);
Model instance = modelDAO.findEntityByName(fqn);
return getEntityReference(instance);
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));