mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-07 13:05:09 +00:00
Add BindFQN jdbi annotation to change fqn to fqnHash (#12658)
This commit is contained in:
parent
b954fcb5ca
commit
d968304853
@ -44,7 +44,6 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.resources.events.EventResource.EventList;
|
||||
import org.openmetadata.service.search.IndexUtil;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
@ -186,8 +185,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
|
||||
try {
|
||||
long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
|
||||
String recordString =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.getExtension(EntityUtil.hash(ELASTIC_SEARCH_ENTITY_FQN_STREAM), ELASTIC_SEARCH_EXTENSION);
|
||||
dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
|
||||
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
|
||||
long originalLastUpdate = lastRecord.getTimestamp();
|
||||
lastRecord.setStatus(status);
|
||||
@ -202,7 +200,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
|
||||
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.update(
|
||||
EntityUtil.hash(ELASTIC_SEARCH_ENTITY_FQN_STREAM),
|
||||
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
|
||||
ELASTIC_SEARCH_EXTENSION,
|
||||
JsonUtils.pojoToJson(lastRecord),
|
||||
originalLastUpdate);
|
||||
|
||||
@ -30,7 +30,6 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.resources.tags.ClassificationResource;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
|
||||
@Slf4j
|
||||
public class ClassificationRepository extends EntityRepository<Classification> {
|
||||
@ -77,9 +76,7 @@ public class ClassificationRepository extends EntityRepository<Classification> {
|
||||
}
|
||||
|
||||
private Integer getUsageCount(Classification classification) {
|
||||
return daoCollection
|
||||
.tagUsageDAO()
|
||||
.getTagCount(TagSource.CLASSIFICATION.ordinal(), FullyQualifiedName.buildHash(classification.getName()));
|
||||
return daoCollection.tagUsageDAO().getTagCount(TagSource.CLASSIFICATION.ordinal(), classification.getName());
|
||||
}
|
||||
|
||||
@Transaction
|
||||
|
||||
@ -123,6 +123,7 @@ import org.openmetadata.service.resources.tags.TagLabelCache;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.jdbi.BindFQN;
|
||||
|
||||
public interface CollectionDAO {
|
||||
@CreateSqlObject
|
||||
@ -998,7 +999,7 @@ public interface CollectionDAO {
|
||||
filterRelation = MENTIONED_IN.ordinal();
|
||||
}
|
||||
return listThreadsByEntityLink(
|
||||
FullyQualifiedName.buildHash(entityLink.getFullyQualifiedFieldValue()),
|
||||
entityLink.getFullyQualifiedFieldValue(),
|
||||
entityLink.getFullyQualifiedFieldType(),
|
||||
limit,
|
||||
relation,
|
||||
@ -1019,11 +1020,11 @@ public interface CollectionDAO {
|
||||
+ "ORDER BY createdAt DESC "
|
||||
+ "LIMIT :limit")
|
||||
List<String> listThreadsByEntityLink(
|
||||
@Bind("fqnPrefixHash") String fqnPrefixHash,
|
||||
@BindFQN("fqnPrefixHash") String fqnPrefixHash,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("relation") int relation,
|
||||
@Bind("userName") String userName,
|
||||
@BindFQN("userName") String userName,
|
||||
@BindList("teamNames") List<String> teamNames,
|
||||
@Bind("filterRelation") int filterRelation,
|
||||
@Define("condition") String condition);
|
||||
@ -1035,7 +1036,7 @@ public interface CollectionDAO {
|
||||
filterRelation = MENTIONED_IN.ordinal();
|
||||
}
|
||||
return listCountThreadsByEntityLink(
|
||||
FullyQualifiedName.buildHash(entityLink.getFullyQualifiedFieldValue()),
|
||||
entityLink.getFullyQualifiedFieldValue(),
|
||||
entityLink.getFullyQualifiedFieldType(),
|
||||
relation,
|
||||
userName,
|
||||
@ -1053,7 +1054,7 @@ public interface CollectionDAO {
|
||||
+ " ((fromType='user' AND fromFQNHash= :userName) OR"
|
||||
+ " (fromType='team' AND fromFQNHash IN (<teamNames>))) AND toType='THREAD' AND relation= :filterRelation) )")
|
||||
int listCountThreadsByEntityLink(
|
||||
@Bind("fqnPrefixHash") String fqnPrefixHash,
|
||||
@BindFQN("fqnPrefixHash") String fqnPrefixHash,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation,
|
||||
@Bind("userName") String userName,
|
||||
@ -1076,7 +1077,7 @@ public interface CollectionDAO {
|
||||
+ "GROUP BY entityLink")
|
||||
@RegisterRowMapper(CountFieldMapper.class)
|
||||
List<List<String>> listCountByEntityLink(
|
||||
@Bind("fqnPrefixHash") String fqnPrefixHash,
|
||||
@BindFQN("fqnPrefixHash") String fqnPrefixHash,
|
||||
@Bind("fromType") String fromType,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation,
|
||||
@ -1175,8 +1176,8 @@ public interface CollectionDAO {
|
||||
+ "ON CONFLICT (fromFQNHash, toFQNHash, relation) DO NOTHING",
|
||||
connectionType = POSTGRES)
|
||||
void insert(
|
||||
@Bind("fromFQNHash") String fromFQNHash,
|
||||
@Bind("toFQNHash") String toFQNHash,
|
||||
@BindFQN("fromFQNHash") String fromFQNHash,
|
||||
@BindFQN("toFQNHash") String toFQNHash,
|
||||
@Bind("fromFQN") String fromFQN,
|
||||
@Bind("toFQN") String toFQN,
|
||||
@Bind("fromType") String fromType,
|
||||
@ -1197,33 +1198,8 @@ public interface CollectionDAO {
|
||||
+ "ON CONFLICT (fromFQNHash, toFQNHash, relation) DO UPDATE SET json = EXCLUDED.json",
|
||||
connectionType = POSTGRES)
|
||||
void upsert(
|
||||
@Bind("fromFQNHash") String fromFQNHash,
|
||||
@Bind("toFQNHash") String toFQNHash,
|
||||
@Bind("fromFQN") String fromFQN,
|
||||
@Bind("toFQN") String toFQN,
|
||||
@Bind("fromType") String fromType,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation,
|
||||
@Bind("jsonSchema") String jsonSchema,
|
||||
@Bind("json") String json);
|
||||
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value =
|
||||
"INSERT INTO field_relationship(fromFQNHash, toFQNHash, fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) "
|
||||
+ "VALUES (:fromFQNHash, :toFQNHash, :fromFQN, :toFQN, :fromType, :toType, :relation, :jsonSchema, :json) "
|
||||
+ "ON DUPLICATE KEY UPDATE fromFQNHash = :fromFQNHash,"
|
||||
+ "toFQNHash = :toFQNHash",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value =
|
||||
"INSERT INTO field_relationship(fromFQNHash, toFQNHash, fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) "
|
||||
+ "VALUES (:fromFQNHash, :toFQNHash, :fromFQN, :toFQN, :fromType, :toType, :relation, :jsonSchema, (:json :: jsonb)) "
|
||||
+ "ON CONFLICT (fromFQN, toFQN, relation) DO UPDATE SET fromFQNHash = EXCLUDED.fromFQNHash,"
|
||||
+ "toFQNHash = EXCLUDED.toFQNHash",
|
||||
connectionType = POSTGRES)
|
||||
void upsertFQNHash(
|
||||
@Bind("fromFQNHash") String fromFQNHash,
|
||||
@Bind("toFQNHash") String toFQNHash,
|
||||
@BindFQN("fromFQNHash") String fromFQNHash,
|
||||
@BindFQN("toFQNHash") String toFQNHash,
|
||||
@Bind("fromFQN") String fromFQN,
|
||||
@Bind("toFQN") String toFQN,
|
||||
@Bind("fromType") String fromType,
|
||||
@ -1237,8 +1213,8 @@ public interface CollectionDAO {
|
||||
+ "fromFQNHash = :fromFQNHash AND toFQNHash = :toFQNHash AND fromType = :fromType "
|
||||
+ "AND toType = :toType AND relation = :relation")
|
||||
String find(
|
||||
@Bind("fromFQNHash") String fromFQNHash,
|
||||
@Bind("toFQNHash") String toFQNHash,
|
||||
@BindFQN("fromFQNHash") String fromFQNHash,
|
||||
@BindFQN("toFQNHash") String toFQNHash,
|
||||
@Bind("fromType") String fromType,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation);
|
||||
@ -1249,7 +1225,7 @@ public interface CollectionDAO {
|
||||
+ "AND relation = :relation")
|
||||
@RegisterRowMapper(ToFieldMapper.class)
|
||||
List<Triple<String, String, String>> listToByPrefix(
|
||||
@Bind("fqnPrefixHash") String fqnPrefixHash,
|
||||
@BindFQN("fqnPrefixHash") String fqnPrefixHash,
|
||||
@Bind("fromType") String fromType,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation);
|
||||
@ -1277,7 +1253,7 @@ public interface CollectionDAO {
|
||||
+ "toFQNHash = :fqnHash AND toType = :type AND fromType = :otherType AND relation = :relation")
|
||||
@RegisterRowMapper(ToFieldMapper.class)
|
||||
List<Triple<String, String, String>> listBidirectional(
|
||||
@Bind("fqnHash") String fqnHash,
|
||||
@BindFQN("fqnHash") String fqnHash,
|
||||
@Bind("type") String type,
|
||||
@Bind("otherType") String otherType,
|
||||
@Bind("relation") int relation);
|
||||
@ -1290,13 +1266,13 @@ public interface CollectionDAO {
|
||||
+ "toFQNHash LIKE CONCAT(:fqnPrefixHash, '%') AND toType = :type AND fromType = :otherType AND relation = :relation")
|
||||
@RegisterRowMapper(ToFieldMapper.class)
|
||||
List<Triple<String, String, String>> listBidirectionalByPrefix(
|
||||
@Bind("fqnPrefixHash") String fqnPrefixHash,
|
||||
@BindFQN("fqnPrefixHash") String fqnPrefixHash,
|
||||
@Bind("type") String type,
|
||||
@Bind("otherType") String otherType,
|
||||
@Bind("relation") int relation);
|
||||
|
||||
default void deleteAllByPrefix(String fqnPrefixHash) {
|
||||
String prefix = String.format("%s%s%%", fqnPrefixHash, Entity.SEPARATOR);
|
||||
default void deleteAllByPrefix(String fqn) {
|
||||
String prefix = String.format("%s%s%%", FullyQualifiedName.buildHash(fqn), Entity.SEPARATOR);
|
||||
String condition = "WHERE (toFQNHash LIKE :prefix OR fromFQNHash LIKE :prefix)";
|
||||
Map<String, String> bindMap = new HashMap<>();
|
||||
bindMap.put("prefix", prefix);
|
||||
@ -1310,8 +1286,8 @@ public interface CollectionDAO {
|
||||
"DELETE from field_relationship WHERE fromFQNHash = :fromFQNHash AND toFQNHash = :toFQNHash AND fromType = :fromType "
|
||||
+ "AND toType = :toType AND relation = :relation")
|
||||
void delete(
|
||||
@Bind("fromFQNHash") String fromFQNHash,
|
||||
@Bind("toFQNHash") String toFQNHash,
|
||||
@BindFQN("fromFQNHash") String fromFQNHash,
|
||||
@BindFQN("toFQNHash") String toFQNHash,
|
||||
@Bind("fromType") String fromType,
|
||||
@Bind("toType") String toType,
|
||||
@Bind("relation") int relation);
|
||||
@ -2058,8 +2034,8 @@ public interface CollectionDAO {
|
||||
void applyTag(
|
||||
@Bind("source") int source,
|
||||
@Bind("tagFQN") String tagFQN,
|
||||
@Bind("tagFQNHash") String tagFQNHash,
|
||||
@Bind("targetFQNHash") String targetFQNHash,
|
||||
@BindFQN("tagFQNHash") String tagFQNHash,
|
||||
@BindFQN("targetFQNHash") String targetFQNHash,
|
||||
@Bind("labelType") int labelType,
|
||||
@Bind("state") int state);
|
||||
|
||||
@ -2067,14 +2043,14 @@ public interface CollectionDAO {
|
||||
List<String> getTargetFQNs(@Bind("source") int source, @Bind("tagFQNHash") String tagFQNHash);
|
||||
|
||||
default List<TagLabel> getTags(String targetFQN) {
|
||||
List<TagLabel> tags = getTagsInternal(FullyQualifiedName.buildHash(targetFQN));
|
||||
List<TagLabel> tags = getTagsInternal(targetFQN);
|
||||
tags.forEach(tagLabel -> tagLabel.setDescription(TagLabelCache.getDescription(tagLabel)));
|
||||
return tags;
|
||||
}
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT source, tagFQN, labelType, state FROM tag_usage WHERE targetFQNHash = :targetFQNHash ORDER BY tagFQN")
|
||||
List<TagLabel> getTagsInternal(@Bind("targetFQNHash") String targetFQNHash);
|
||||
List<TagLabel> getTagsInternal(@BindFQN("targetFQNHash") String targetFQNHash);
|
||||
|
||||
@SqlQuery("SELECT * FROM tag_usage")
|
||||
@Deprecated(since = "Release 1.1")
|
||||
@ -2085,23 +2061,23 @@ public interface CollectionDAO {
|
||||
"SELECT COUNT(*) FROM tag_usage "
|
||||
+ "WHERE (tagFQNHash LIKE CONCAT(:tagFqnHash, '.%') OR tagFQNHash = :tagFqnHash) "
|
||||
+ "AND source = :source")
|
||||
int getTagCount(@Bind("source") int source, @Bind("tagFqnHash") String tagFqnHash);
|
||||
int getTagCount(@Bind("source") int source, @BindFQN("tagFqnHash") String tagFqnHash);
|
||||
|
||||
@SqlUpdate("DELETE FROM tag_usage where targetFQNHash = :targetFQNHash")
|
||||
void deleteTagsByTarget(@Bind("targetFQNHash") String targetFQNHash);
|
||||
void deleteTagsByTarget(@BindFQN("targetFQNHash") String targetFQNHash);
|
||||
|
||||
@SqlUpdate("DELETE FROM tag_usage where tagFQNHash = :tagFQNHash AND source = :source")
|
||||
void deleteTagLabels(@Bind("source") int source, @Bind("tagFQNHash") String tagFQNHash);
|
||||
void deleteTagLabels(@Bind("source") int source, @BindFQN("tagFQNHash") String tagFQNHash);
|
||||
|
||||
@SqlUpdate("DELETE FROM tag_usage where tagFQNHash = :tagFQNHash")
|
||||
void deleteTagLabelsByFqn(@Bind("tagFQNHash") String tagFQNHash);
|
||||
void deleteTagLabelsByFqn(@BindFQN("tagFQNHash") String tagFQNHash);
|
||||
|
||||
@SqlUpdate("DELETE FROM tag_usage where tagFQNHash LIKE CONCAT(:tagFQNHash, '.%') AND source = :source")
|
||||
void deleteTagLabelsByPrefix(@Bind("source") int source, @Bind("tagFQNHash") String tagFQNHash);
|
||||
|
||||
@SqlUpdate(
|
||||
"DELETE FROM tag_usage where targetFQNHash = :targetFQNHash OR targetFQNHash LIKE CONCAT(:targetFQNHash, '.%')")
|
||||
void deleteTagLabelsByTargetPrefix(@Bind("targetFQNHash") String targetFQNHash);
|
||||
void deleteTagLabelsByTargetPrefix(@BindFQN("targetFQNHash") String targetFQNHash);
|
||||
|
||||
@Deprecated(since = "Release 1.1")
|
||||
@ConnectionAwareSqlUpdate(
|
||||
@ -2141,11 +2117,7 @@ public interface CollectionDAO {
|
||||
}
|
||||
|
||||
default void rename(int source, String oldFQN, String newFQN) {
|
||||
renameInternal(
|
||||
source,
|
||||
FullyQualifiedName.buildHash(oldFQN),
|
||||
newFQN,
|
||||
FullyQualifiedName.buildHash(newFQN)); // First rename tagFQN from oldFQN to newFQN
|
||||
renameInternal(source, oldFQN, newFQN, newFQN); // First rename tagFQN from oldFQN to newFQN
|
||||
updateTagPrefix(source, oldFQN, newFQN); // Rename all the tagFQN prefixes starting with the oldFQN to newFQN
|
||||
}
|
||||
|
||||
@ -2154,9 +2126,9 @@ public interface CollectionDAO {
|
||||
"Update tag_usage set tagFQN = :newFQN, tagFQNHash = :newFQNHash WHERE source = :source AND tagFQNHash = :oldFQNHash")
|
||||
void renameInternal(
|
||||
@Bind("source") int source,
|
||||
@Bind("oldFQNHash") String oldFQNHash,
|
||||
@BindFQN("oldFQNHash") String oldFQNHash,
|
||||
@Bind("newFQN") String newFQN,
|
||||
@Bind("newFQNHash") String newFQNHash);
|
||||
@BindFQN("newFQNHash") String newFQNHash);
|
||||
|
||||
@SqlUpdate("<update>")
|
||||
void updateTagPrefixInternal(@Define("update") String update);
|
||||
@ -2529,7 +2501,7 @@ public interface CollectionDAO {
|
||||
|
||||
@Override
|
||||
default int listCount(ListFilter filter) {
|
||||
String team = FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(filter.getQueryParam("team")));
|
||||
String team = EntityInterfaceUtil.quoteName(filter.getQueryParam("team"));
|
||||
String isBotStr = filter.getQueryParam("isBot");
|
||||
String isAdminStr = filter.getQueryParam("isAdmin");
|
||||
String mySqlCondition = filter.getCondition("ue");
|
||||
@ -2574,7 +2546,7 @@ public interface CollectionDAO {
|
||||
|
||||
@Override
|
||||
default List<String> listBefore(ListFilter filter, int limit, String before) {
|
||||
String team = FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(filter.getQueryParam("team")));
|
||||
String team = EntityInterfaceUtil.quoteName(filter.getQueryParam("team"));
|
||||
String isBotStr = filter.getQueryParam("isBot");
|
||||
String isAdminStr = filter.getQueryParam("isAdmin");
|
||||
String mySqlCondition = filter.getCondition("ue");
|
||||
@ -2626,7 +2598,7 @@ public interface CollectionDAO {
|
||||
|
||||
@Override
|
||||
default List<String> listAfter(ListFilter filter, int limit, String after) {
|
||||
String team = FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(filter.getQueryParam("team")));
|
||||
String team = EntityInterfaceUtil.quoteName(filter.getQueryParam("team"));
|
||||
String isBotStr = filter.getQueryParam("isBot");
|
||||
String isAdminStr = filter.getQueryParam("isAdmin");
|
||||
String mySqlCondition = filter.getCondition("ue");
|
||||
@ -2703,7 +2675,7 @@ public interface CollectionDAO {
|
||||
@Define("nameColumn") String nameColumn,
|
||||
@Define("mysqlCond") String mysqlCond,
|
||||
@Define("postgresCond") String postgresCond,
|
||||
@Bind("team") String team,
|
||||
@BindFQN("team") String team,
|
||||
@Bind("relation") int relation);
|
||||
|
||||
@ConnectionAwareSqlQuery(
|
||||
@ -2741,7 +2713,7 @@ public interface CollectionDAO {
|
||||
@Define("nameColumn") String nameColumn,
|
||||
@Define("mysqlCond") String mysqlCond,
|
||||
@Define("postgresCond") String postgresCond,
|
||||
@Bind("team") String team,
|
||||
@BindFQN("team") String team,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("before") String before,
|
||||
@Bind("relation") int relation);
|
||||
@ -2777,7 +2749,7 @@ public interface CollectionDAO {
|
||||
@Define("nameColumn") String nameColumn,
|
||||
@Define("mysqlCond") String mysqlCond,
|
||||
@Define("postgresCond") String postgresCond,
|
||||
@Bind("team") String team,
|
||||
@BindFQN("team") String team,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("after") String after,
|
||||
@Bind("relation") int relation);
|
||||
@ -3122,7 +3094,7 @@ public interface CollectionDAO {
|
||||
+ "VALUES (:entityFQNHash, :extension, :jsonSchema, (:json :: jsonb))",
|
||||
connectionType = POSTGRES)
|
||||
void insert(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("jsonSchema") String jsonSchema,
|
||||
@Bind("json") String json);
|
||||
@ -3136,7 +3108,7 @@ public interface CollectionDAO {
|
||||
"UPDATE entity_extension_time_series set json = (:json :: jsonb) where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp",
|
||||
connectionType = POSTGRES)
|
||||
void update(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("json") String json,
|
||||
@Bind("timestamp") Long timestamp);
|
||||
@ -3150,7 +3122,7 @@ public interface CollectionDAO {
|
||||
"UPDATE entity_extension_time_series set json = (:json :: jsonb) where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp and json #>>'{operation}' = :operation",
|
||||
connectionType = POSTGRES)
|
||||
void updateExtensionByOperation(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("json") String json,
|
||||
@Bind("timestamp") Long timestamp,
|
||||
@ -3158,31 +3130,15 @@ public interface CollectionDAO {
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension")
|
||||
String getExtension(@Bind("entityFQNHash") String entityId, @Bind("extension") String extension);
|
||||
String getExtension(@BindFQN("entityFQNHash") String entityId, @Bind("extension") String extension);
|
||||
|
||||
@SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash")
|
||||
int listCount(@Bind("entityFQNHash") String entityFQNHash);
|
||||
int listCount(@BindFQN("entityFQNHash") String entityFQNHash);
|
||||
|
||||
@SqlQuery("SELECT COUNT(DISTINCT entityFQN) FROM entity_extension_time_series")
|
||||
@Deprecated
|
||||
int listDistinctCount();
|
||||
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
|
||||
+ "FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash) "
|
||||
+ "SELECT row_num, json FROM data WHERE row_num < :before LIMIT :limit",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
|
||||
+ "FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash) "
|
||||
+ "SELECT row_num, json FROM data WHERE row_num < (:before :: integer) LIMIT :limit",
|
||||
connectionType = POSTGRES)
|
||||
@RegisterRowMapper(ReportDataMapper.class)
|
||||
List<ReportDataRow> getBeforeExtension(
|
||||
@Bind("entityFQNHash") String entityFQN, @Bind("limit") int limit, @Bind("before") String before);
|
||||
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
|
||||
@ -3197,12 +3153,12 @@ public interface CollectionDAO {
|
||||
connectionType = POSTGRES)
|
||||
@RegisterRowMapper(ReportDataMapper.class)
|
||||
List<ReportDataRow> getAfterExtension(
|
||||
@Bind("entityFQNHash") String entityFQNHash, @Bind("limit") int limit, @Bind("after") String after);
|
||||
@BindFQN("entityFQNHash") String entityFQNHash, @Bind("limit") int limit, @Bind("after") String after);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension AND timestamp = :timestamp")
|
||||
String getExtensionAtTimestamp(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("timestamp") long timestamp);
|
||||
|
||||
@ -3215,7 +3171,7 @@ public interface CollectionDAO {
|
||||
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension AND timestamp = :timestamp AND json #>>'{operation}' = :operation",
|
||||
connectionType = POSTGRES)
|
||||
String getExtensionAtTimestampWithOperation(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("timestamp") long timestamp,
|
||||
@Bind("operation") String operation);
|
||||
@ -3223,7 +3179,7 @@ public interface CollectionDAO {
|
||||
@SqlQuery(
|
||||
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension "
|
||||
+ "ORDER BY timestamp DESC LIMIT 1")
|
||||
String getLatestExtension(@Bind("entityFQNHash") String entityFQNHash, @Bind("extension") String extension);
|
||||
String getLatestExtension(@BindFQN("entityFQNHash") String entityFQNHash, @Bind("extension") String extension);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT ranked.json FROM (SELECT json, ROW_NUMBER() OVER(PARTITION BY entityFQNHash ORDER BY timestamp DESC) AS row_num "
|
||||
@ -3251,7 +3207,7 @@ public interface CollectionDAO {
|
||||
|
||||
@SqlUpdate(
|
||||
"DELETE FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension")
|
||||
void delete(@Bind("entityFQNHash") String entityFQNHash, @Bind("extension") String extension);
|
||||
void delete(@BindFQN("entityFQNHash") String entityFQNHash, @Bind("extension") String extension);
|
||||
|
||||
// This just saves the limit number of records, and remove all other with given extension
|
||||
@SqlUpdate(
|
||||
@ -3261,27 +3217,22 @@ public interface CollectionDAO {
|
||||
@SqlUpdate(
|
||||
"DELETE FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension AND timestamp = :timestamp")
|
||||
void deleteAtTimestamp(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("timestamp") Long timestamp);
|
||||
|
||||
@SqlUpdate(
|
||||
"DELETE FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension AND timestamp < :timestamp")
|
||||
void deleteBeforeTimestamp(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("timestamp") Long timestamp);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND jsonSchema = :jsonSchema "
|
||||
+ "ORDER BY timestamp DESC LIMIT 1")
|
||||
String getLatestExtensionByFQN(@Bind("entityFQNHash") String entityFQNHash, @Bind("jsonSchema") String jsonSchema);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM entity_extension_time_series where entityFQNHash = :entityFQNHash and extension = :extension "
|
||||
+ " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp DESC")
|
||||
List<String> listBetweenTimestamps(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("startTs") Long startTs,
|
||||
@Bind("endTs") long endTs);
|
||||
@ -3290,33 +3241,28 @@ public interface CollectionDAO {
|
||||
"SELECT json FROM entity_extension_time_series where entityFQNHash = :entityFQNHash and extension = :extension "
|
||||
+ " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp <orderBy>")
|
||||
List<String> listBetweenTimestampsByOrder(
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("startTs") Long startTs,
|
||||
@Bind("endTs") long endTs,
|
||||
@Define("orderBy") OrderBy orderBy);
|
||||
|
||||
default void updateExtensionByKey(String key, String value, String entityFQN, String extension, String json) {
|
||||
String entityFQNHash = FullyQualifiedName.buildHash(entityFQN);
|
||||
String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
|
||||
String psqlCond = String.format("AND json->>'%s' = :value", key);
|
||||
|
||||
updateExtensionByKeyInternal(value, entityFQNHash, extension, json, mysqlCond, psqlCond);
|
||||
updateExtensionByKeyInternal(value, entityFQN, extension, json, mysqlCond, psqlCond);
|
||||
}
|
||||
|
||||
default String getExtensionByKey(String key, String value, String entityFQNHash, String extension) {
|
||||
|
||||
default String getExtensionByKey(String key, String value, String entityFQN, String extension) {
|
||||
String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
|
||||
String psqlCond = String.format("AND json->>'%s' = :value", key);
|
||||
|
||||
return getExtensionByKeyInternal(value, entityFQNHash, extension, mysqlCond, psqlCond);
|
||||
return getExtensionByKeyInternal(value, entityFQN, extension, mysqlCond, psqlCond);
|
||||
}
|
||||
|
||||
default String getLatestExtensionByKey(String key, String value, String entityFQN, String extension) {
|
||||
String entityFQNHash = FullyQualifiedName.buildHash(entityFQN);
|
||||
String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
|
||||
String psqlCond = String.format("AND json->>'%s' = :value", key);
|
||||
return getLatestExtensionByKeyInternal(value, entityFQNHash, extension, mysqlCond, psqlCond);
|
||||
return getLatestExtensionByKeyInternal(value, entityFQN, extension, mysqlCond, psqlCond);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -3338,7 +3284,7 @@ public interface CollectionDAO {
|
||||
connectionType = POSTGRES)
|
||||
void updateExtensionByKeyInternal(
|
||||
@Bind("value") String value,
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("json") String json,
|
||||
@Define("mysqlCond") String mysqlCond,
|
||||
@ -3363,7 +3309,7 @@ public interface CollectionDAO {
|
||||
connectionType = POSTGRES)
|
||||
String getExtensionByKeyInternal(
|
||||
@Bind("value") String value,
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Define("mysqlCond") String mysqlCond,
|
||||
@Define("psqlCond") String psqlCond);
|
||||
@ -3386,7 +3332,7 @@ public interface CollectionDAO {
|
||||
connectionType = POSTGRES)
|
||||
String getLatestExtensionByKeyInternal(
|
||||
@Bind("value") String value,
|
||||
@Bind("entityFQNHash") String entityFQNHash,
|
||||
@BindFQN("entityFQNHash") String entityFQNHash,
|
||||
@Bind("extension") String extension,
|
||||
@Define("mysqlCond") String mysqlCond,
|
||||
@Define("psqlCond") String psqlCond);
|
||||
|
||||
@ -99,11 +99,6 @@ public class ContainerRepository extends EntityRepository<Container> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Container container) {
|
||||
return FullyQualifiedName.buildHash(container.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
private void setColumnFQN(String parentFQN, List<Column> columns) {
|
||||
columns.forEach(
|
||||
c -> {
|
||||
|
||||
@ -92,11 +92,6 @@ public class DashboardDataModelRepository extends EntityRepository<DashboardData
|
||||
super.update(task, entityLink, newValue, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(DashboardDataModel dashboardDataModel) {
|
||||
return FullyQualifiedName.buildHash(dashboardDataModel.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(DashboardDataModel dashboardDataModel) throws IOException {
|
||||
DashboardService dashboardService = Entity.getEntity(dashboardDataModel.getService(), "", Include.ALL);
|
||||
|
||||
@ -91,11 +91,6 @@ public class DataProductRepository extends EntityRepository<DataProduct> {
|
||||
entity.setFullyQualifiedName(FullyQualifiedName.add(domain.getFullyQualifiedName(), entity.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(DataProduct entity) {
|
||||
return FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
public class DataProductUpdater extends EntityUpdater {
|
||||
public DataProductUpdater(DataProduct original, DataProduct updated, Operation operation) {
|
||||
super(original, updated, operation);
|
||||
|
||||
@ -40,11 +40,6 @@ public class DatabaseRepository extends EntityRepository<Database> {
|
||||
database.setFullyQualifiedName(FullyQualifiedName.build(database.getService().getName(), database.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Database entity) {
|
||||
return FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(Database database) throws IOException {
|
||||
populateService(database);
|
||||
|
||||
@ -50,11 +50,6 @@ public class DatabaseSchemaRepository extends EntityRepository<DatabaseSchema> {
|
||||
FullyQualifiedName.add(schema.getDatabase().getFullyQualifiedName(), schema.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(DatabaseSchema schema) {
|
||||
return FullyQualifiedName.buildHash(schema.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(DatabaseSchema schema) throws IOException {
|
||||
populateDatabase(schema);
|
||||
|
||||
@ -109,11 +109,6 @@ public class DomainRepository extends EntityRepository<Domain> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Domain entity) {
|
||||
return FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
public class DomainUpdater extends EntityUpdater {
|
||||
public DomainUpdater(Domain original, Domain updated, Operation operation) {
|
||||
super(original, updated, operation);
|
||||
|
||||
@ -38,6 +38,7 @@ import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.jdbi.BindFQN;
|
||||
|
||||
public interface EntityDAO<T extends EntityInterface> {
|
||||
org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(EntityDAO.class);
|
||||
@ -69,7 +70,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
int insert(
|
||||
@Define("table") String table,
|
||||
@Define("nameHashColumn") String nameHashColumn,
|
||||
@Bind("nameHashColumnValue") String nameHashColumnValue,
|
||||
@BindFQN("nameHashColumnValue") String nameHashColumnValue,
|
||||
@Bind("json") String json);
|
||||
|
||||
@ConnectionAwareSqlUpdate(
|
||||
@ -81,7 +82,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
void update(
|
||||
@Define("table") String table,
|
||||
@Define("nameHashColumn") String nameHashColumn,
|
||||
@Bind("nameHashColumnValue") String nameHashColumnValue,
|
||||
@BindFQN("nameHashColumnValue") String nameHashColumnValue,
|
||||
@Bind("id") String id,
|
||||
@Bind("json") String json);
|
||||
|
||||
@ -130,7 +131,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
String findByName(
|
||||
@Define("table") String table,
|
||||
@Define("nameColumn") String nameColumn,
|
||||
@Bind("name") String name,
|
||||
@BindFQN("name") String name,
|
||||
@Define("cond") String cond);
|
||||
|
||||
@SqlQuery("SELECT count(*) FROM <table> <cond>")
|
||||
@ -244,25 +245,27 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
|
||||
@SqlQuery("SELECT EXISTS (SELECT * FROM <table> WHERE <nameColumnHash> = :fqnHash)")
|
||||
boolean existsByName(
|
||||
@Define("table") String table, @Define("nameColumnHash") String nameColumnHash, @Bind("fqnHash") String fqnHash);
|
||||
@Define("table") String table,
|
||||
@Define("nameColumnHash") String nameColumnHash,
|
||||
@BindFQN("fqnHash") String fqnHash);
|
||||
|
||||
@SqlUpdate("DELETE FROM <table> WHERE id = :id")
|
||||
int delete(@Define("table") String table, @Bind("id") String id);
|
||||
|
||||
/** Default methods that interfaces with implementation. Don't override */
|
||||
default void insert(EntityInterface entity, String fqnHash) throws JsonProcessingException {
|
||||
insert(getTableName(), getNameHashColumn(), fqnHash, JsonUtils.pojoToJson(entity));
|
||||
default void insert(EntityInterface entity, String fqn) throws JsonProcessingException {
|
||||
insert(getTableName(), getNameHashColumn(), fqn, JsonUtils.pojoToJson(entity));
|
||||
}
|
||||
|
||||
default void update(UUID id, String fqnHash, String json) {
|
||||
update(getTableName(), getNameHashColumn(), fqnHash, id.toString(), json);
|
||||
default void update(UUID id, String fqn, String json) {
|
||||
update(getTableName(), getNameHashColumn(), fqn, id.toString(), json);
|
||||
}
|
||||
|
||||
default void update(EntityInterface entity) throws JsonProcessingException {
|
||||
update(
|
||||
getTableName(),
|
||||
getNameHashColumn(),
|
||||
FullyQualifiedName.buildHash(entity.getFullyQualifiedName()),
|
||||
entity.getFullyQualifiedName(),
|
||||
entity.getId().toString(),
|
||||
JsonUtils.pojoToJson(entity));
|
||||
}
|
||||
@ -292,8 +295,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
|
||||
@SneakyThrows
|
||||
default T findEntityByName(String fqn, Include include) {
|
||||
return jsonToEntity(
|
||||
findByName(getTableName(), getNameHashColumn(), FullyQualifiedName.buildHash(fqn), getCondition(include)), fqn);
|
||||
return jsonToEntity(findByName(getTableName(), getNameHashColumn(), fqn, getCondition(include)), fqn);
|
||||
}
|
||||
|
||||
default T jsonToEntity(String json, String identity) throws IOException {
|
||||
@ -327,7 +329,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
}
|
||||
|
||||
default String findJsonByFqn(String fqn, Include include) {
|
||||
return findByName(getTableName(), getNameHashColumn(), FullyQualifiedName.buildHash(fqn), getCondition(include));
|
||||
return findByName(getTableName(), getNameHashColumn(), fqn, getCondition(include));
|
||||
}
|
||||
|
||||
default int listCount(ListFilter filter) {
|
||||
@ -372,7 +374,7 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
}
|
||||
|
||||
default void existsByName(String fqn) {
|
||||
if (!existsByName(getTableName(), getNameHashColumn(), FullyQualifiedName.buildHash(fqn))) {
|
||||
if (!existsByName(getTableName(), getNameHashColumn(), fqn)) {
|
||||
String entityType = Entity.getEntityTypeFromClass(getEntityClass());
|
||||
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn));
|
||||
}
|
||||
|
||||
@ -298,11 +298,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
entity.setFullyQualifiedName(EntityInterfaceUtil.quoteName(entity.getName()));
|
||||
}
|
||||
|
||||
/** Set fullyQualifiedNameHash of an entity */
|
||||
public String getFullyQualifiedNameHash(T entity) {
|
||||
return FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
/** Update an entity based suggested description and tags in the task */
|
||||
public void update(TaskDetails task, EntityLink entityLink, String newValue, String user) throws IOException {
|
||||
TaskType taskType = task.getType();
|
||||
@ -839,22 +834,16 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
daoCollection.relationshipDAO().deleteAll(id, entityType);
|
||||
|
||||
// Delete all the field relationships to other entities
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.deleteAllByPrefix(FullyQualifiedName.buildHash(entityInterface.getFullyQualifiedName()));
|
||||
daoCollection.fieldRelationshipDAO().deleteAllByPrefix(entityInterface.getFullyQualifiedName());
|
||||
|
||||
// Delete all the extensions of entity
|
||||
daoCollection.entityExtensionDAO().deleteAll(id);
|
||||
|
||||
// Delete all the tag labels
|
||||
daoCollection
|
||||
.tagUsageDAO()
|
||||
.deleteTagLabelsByTargetPrefix(FullyQualifiedName.buildHash(entityInterface.getFullyQualifiedName()));
|
||||
daoCollection.tagUsageDAO().deleteTagLabelsByTargetPrefix(entityInterface.getFullyQualifiedName());
|
||||
|
||||
// when the glossary and tag is deleted .. delete its usage
|
||||
daoCollection
|
||||
.tagUsageDAO()
|
||||
.deleteTagLabelsByFqn(FullyQualifiedName.buildHash(entityInterface.getFullyQualifiedName()));
|
||||
daoCollection.tagUsageDAO().deleteTagLabelsByFqn(entityInterface.getFullyQualifiedName());
|
||||
// Delete all the usage data
|
||||
daoCollection.usageDAO().delete(id);
|
||||
|
||||
@ -924,10 +913,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
entity.setDataProducts(null);
|
||||
|
||||
if (update) {
|
||||
dao.update(entity.getId(), getFullyQualifiedNameHash(entity), JsonUtils.pojoToJson(entity));
|
||||
dao.update(entity.getId(), entity.getFullyQualifiedName(), JsonUtils.pojoToJson(entity));
|
||||
LOG.info("Updated {}:{}:{}", entityType, entity.getId(), entity.getFullyQualifiedName());
|
||||
} else {
|
||||
dao.insert(entity, getFullyQualifiedNameHash(entity));
|
||||
dao.insert(entity, entity.getFullyQualifiedName());
|
||||
LOG.info("Created {}:{}:{}", entityType, entity.getId(), entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@ -939,54 +928,43 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
|
||||
protected void storeTimeSeries(
|
||||
String fullyQualifiedName,
|
||||
String extension,
|
||||
String jsonSchema,
|
||||
String entityJson,
|
||||
Long timestamp,
|
||||
boolean update) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
String fqn, String extension, String jsonSchema, String entityJson, Long timestamp, boolean update) {
|
||||
if (update) {
|
||||
daoCollection.entityExtensionTimeSeriesDao().update(fqnHash, extension, entityJson, timestamp);
|
||||
daoCollection.entityExtensionTimeSeriesDao().update(fqn, extension, entityJson, timestamp);
|
||||
} else {
|
||||
daoCollection.entityExtensionTimeSeriesDao().insert(fqnHash, extension, jsonSchema, entityJson);
|
||||
daoCollection.entityExtensionTimeSeriesDao().insert(fqn, extension, jsonSchema, entityJson);
|
||||
}
|
||||
}
|
||||
|
||||
protected void storeTimeSeriesWithOperation(
|
||||
String fullyQualifiedName,
|
||||
String fqn,
|
||||
String extension,
|
||||
String jsonSchema,
|
||||
String entityJson,
|
||||
Long timestamp,
|
||||
String operation,
|
||||
boolean update) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
if (update) {
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.updateExtensionByOperation(fqnHash, extension, entityJson, timestamp, operation);
|
||||
.updateExtensionByOperation(fqn, extension, entityJson, timestamp, operation);
|
||||
} else {
|
||||
daoCollection.entityExtensionTimeSeriesDao().insert(fqnHash, extension, jsonSchema, entityJson);
|
||||
daoCollection.entityExtensionTimeSeriesDao().insert(fqn, extension, jsonSchema, entityJson);
|
||||
}
|
||||
}
|
||||
|
||||
public String getExtensionAtTimestamp(String fullyQualifiedName, String extension, Long timestamp) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
return daoCollection.entityExtensionTimeSeriesDao().getExtensionAtTimestamp(fqnHash, extension, timestamp);
|
||||
public String getExtensionAtTimestamp(String fqn, String extension, Long timestamp) {
|
||||
return daoCollection.entityExtensionTimeSeriesDao().getExtensionAtTimestamp(fqn, extension, timestamp);
|
||||
}
|
||||
|
||||
public String getExtensionAtTimestampWithOperation(
|
||||
String fullyQualifiedName, String extension, Long timestamp, String operation) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
public String getExtensionAtTimestampWithOperation(String fqn, String extension, Long timestamp, String operation) {
|
||||
return daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.getExtensionAtTimestampWithOperation(fqnHash, extension, timestamp, operation);
|
||||
.getExtensionAtTimestampWithOperation(fqn, extension, timestamp, operation);
|
||||
}
|
||||
|
||||
public String getLatestExtensionFromTimeseries(String fullyQualifiedName, String extension) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
return daoCollection.entityExtensionTimeSeriesDao().getLatestExtension(fqnHash, extension);
|
||||
public String getLatestExtensionFromTimeseries(String fqn, String extension) {
|
||||
return daoCollection.entityExtensionTimeSeriesDao().getLatestExtension(fqn, extension);
|
||||
}
|
||||
|
||||
public List<String> getResultsFromAndToTimestamps(
|
||||
@ -996,25 +974,22 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
|
||||
public List<String> getResultsFromAndToTimestamps(
|
||||
String fullyQualifiedName,
|
||||
String fqn,
|
||||
String extension,
|
||||
Long startTs,
|
||||
Long endTs,
|
||||
CollectionDAO.EntityExtensionTimeSeriesDAO.OrderBy orderBy) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
return daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.listBetweenTimestampsByOrder(fqnHash, extension, startTs, endTs, orderBy);
|
||||
.listBetweenTimestampsByOrder(fqn, extension, startTs, endTs, orderBy);
|
||||
}
|
||||
|
||||
public void deleteExtensionAtTimestamp(String fullyQualifiedName, String extension, Long timestamp) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
daoCollection.entityExtensionTimeSeriesDao().deleteAtTimestamp(fqnHash, extension, timestamp);
|
||||
public void deleteExtensionAtTimestamp(String fqn, String extension, Long timestamp) {
|
||||
daoCollection.entityExtensionTimeSeriesDao().deleteAtTimestamp(fqn, extension, timestamp);
|
||||
}
|
||||
|
||||
public void deleteExtensionBeforeTimestamp(String fullyQualifiedName, String extension, Long timestamp) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(fullyQualifiedName);
|
||||
daoCollection.entityExtensionTimeSeriesDao().deleteBeforeTimestamp(fqnHash, extension, timestamp);
|
||||
public void deleteExtensionBeforeTimestamp(String fqn, String extension, Long timestamp) {
|
||||
daoCollection.entityExtensionTimeSeriesDao().deleteBeforeTimestamp(fqn, extension, timestamp);
|
||||
}
|
||||
|
||||
private void validateExtension(T entity) {
|
||||
@ -1141,8 +1116,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
.applyTag(
|
||||
tagLabel.getSource().ordinal(),
|
||||
tagLabel.getTagFQN(),
|
||||
FullyQualifiedName.buildHash(tagLabel.getTagFQN()),
|
||||
FullyQualifiedName.buildHash(targetFQN),
|
||||
tagLabel.getTagFQN(),
|
||||
targetFQN,
|
||||
tagLabel.getLabelType().ordinal(),
|
||||
tagLabel.getState().ordinal());
|
||||
}
|
||||
@ -1686,7 +1661,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
|
||||
// Remove current entity tags in the database. It will be added back later from the merged tag list.
|
||||
daoCollection.tagUsageDAO().deleteTagsByTarget(FullyQualifiedName.buildHash(fqn));
|
||||
daoCollection.tagUsageDAO().deleteTagsByTarget(fqn);
|
||||
|
||||
if (operation.isPut()) {
|
||||
// PUT operation merges tags in the request with what already exists
|
||||
@ -1694,9 +1669,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
checkMutuallyExclusive(updatedTags);
|
||||
}
|
||||
|
||||
// Remove current entity tags in the database. It will be added back later from the merged tag list.
|
||||
daoCollection.tagUsageDAO().deleteTagsByTarget(fqn);
|
||||
|
||||
List<TagLabel> addedTags = new ArrayList<>();
|
||||
List<TagLabel> deletedTags = new ArrayList<>();
|
||||
recordListChange(fieldName, origTags, updatedTags, addedTags, deletedTags, tagLabelMatch);
|
||||
@ -2076,10 +2048,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
|
||||
// Delete tags related to deleted columns
|
||||
deletedColumns.forEach(
|
||||
deleted ->
|
||||
daoCollection
|
||||
.tagUsageDAO()
|
||||
.deleteTagsByTarget(FullyQualifiedName.buildHash(deleted.getFullyQualifiedName())));
|
||||
deleted -> daoCollection.tagUsageDAO().deleteTagsByTarget(deleted.getFullyQualifiedName()));
|
||||
|
||||
// Add tags related to newly added columns
|
||||
for (Column added : addedColumns) {
|
||||
|
||||
@ -164,8 +164,8 @@ public class FeedRepository {
|
||||
// Add field relationship for data asset - Thread -- isAbout ---> entity/entityField
|
||||
dao.fieldRelationshipDAO()
|
||||
.insert(
|
||||
FullyQualifiedName.buildHash(thread.getId().toString()), // from FQN
|
||||
FullyQualifiedName.buildHash(about.getFullyQualifiedFieldValue()), // to FQN,
|
||||
thread.getId().toString(), // from FQN
|
||||
about.getFullyQualifiedFieldValue(), // to FQN,
|
||||
thread.getId().toString(),
|
||||
about.getFullyQualifiedFieldValue(),
|
||||
Entity.THREAD, // From type
|
||||
@ -310,8 +310,8 @@ public class FeedRepository {
|
||||
mention ->
|
||||
dao.fieldRelationshipDAO()
|
||||
.insert(
|
||||
FullyQualifiedName.buildHash(mention.getFullyQualifiedFieldValue()),
|
||||
FullyQualifiedName.buildHash(thread.getId().toString()),
|
||||
mention.getFullyQualifiedFieldValue(),
|
||||
thread.getId().toString(),
|
||||
mention.getFullyQualifiedFieldValue(),
|
||||
thread.getId().toString(),
|
||||
mention.getFullyQualifiedFieldType(),
|
||||
@ -375,7 +375,7 @@ public class FeedRepository {
|
||||
dao.relationshipDAO().deleteAll(id, Entity.THREAD);
|
||||
|
||||
// Delete all the field relationships to other entities
|
||||
dao.fieldRelationshipDAO().deleteAllByPrefix(FullyQualifiedName.buildHash(id));
|
||||
dao.fieldRelationshipDAO().deleteAllByPrefix(id);
|
||||
|
||||
// Finally, delete the entity
|
||||
dao.feedDAO().delete(id);
|
||||
@ -416,7 +416,7 @@ public class FeedRepository {
|
||||
result =
|
||||
dao.feedDAO()
|
||||
.listCountByEntityLink(
|
||||
FullyQualifiedName.buildHash(entityLink.getFullyQualifiedFieldValue()),
|
||||
entityLink.getFullyQualifiedFieldValue(),
|
||||
Entity.THREAD,
|
||||
entityLink.getFullyQualifiedFieldType(),
|
||||
IS_ABOUT.ordinal(),
|
||||
@ -470,15 +470,14 @@ public class FeedRepository {
|
||||
// Only data assets are added as about
|
||||
User user = userId != null ? SubjectCache.getUserById(userId) : null;
|
||||
List<String> teamNameHash = getTeamNames(user);
|
||||
String userNameHash = getUserNameHash(user);
|
||||
String userName = user == null ? null : user.getFullyQualifiedName();
|
||||
List<String> jsons =
|
||||
dao.feedDAO()
|
||||
.listThreadsByEntityLink(
|
||||
filter, entityLink, limit + 1, IS_ABOUT.ordinal(), userNameHash, teamNameHash);
|
||||
.listThreadsByEntityLink(filter, entityLink, limit + 1, IS_ABOUT.ordinal(), userName, teamNameHash);
|
||||
threads = JsonUtils.readObjects(jsons, Thread.class);
|
||||
total =
|
||||
dao.feedDAO()
|
||||
.listCountThreadsByEntityLink(filter, entityLink, IS_ABOUT.ordinal(), userNameHash, teamNameHash);
|
||||
.listCountThreadsByEntityLink(filter, entityLink, IS_ABOUT.ordinal(), userName, teamNameHash);
|
||||
}
|
||||
} else {
|
||||
// userId filter present
|
||||
@ -535,8 +534,8 @@ public class FeedRepository {
|
||||
// field relationship table constraint (primary key)
|
||||
dao.fieldRelationshipDAO()
|
||||
.insert(
|
||||
FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(user)),
|
||||
FullyQualifiedName.buildHash(thread.getId().toString()),
|
||||
EntityInterfaceUtil.quoteName(user),
|
||||
thread.getId().toString(),
|
||||
user,
|
||||
thread.getId().toString(),
|
||||
Entity.USER,
|
||||
@ -915,10 +914,7 @@ public class FeedRepository {
|
||||
}
|
||||
|
||||
private String getUserNameHash(User user) {
|
||||
if (user != null) {
|
||||
return FullyQualifiedName.buildHash(user.getFullyQualifiedName());
|
||||
}
|
||||
return null;
|
||||
return user != null ? FullyQualifiedName.buildHash(user.getFullyQualifiedName()) : null;
|
||||
}
|
||||
|
||||
public static class FilteredThreads {
|
||||
|
||||
@ -101,9 +101,7 @@ public class GlossaryRepository extends EntityRepository<Glossary> {
|
||||
}
|
||||
|
||||
private Integer getUsageCount(Glossary glossary) {
|
||||
return daoCollection
|
||||
.tagUsageDAO()
|
||||
.getTagCount(TagSource.GLOSSARY.ordinal(), FullyQualifiedName.buildHash(glossary.getName()));
|
||||
return daoCollection.tagUsageDAO().getTagCount(TagSource.GLOSSARY.ordinal(), glossary.getName());
|
||||
}
|
||||
|
||||
private Integer getTermCount(Glossary glossary) {
|
||||
|
||||
@ -124,9 +124,7 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
|
||||
}
|
||||
|
||||
private Integer getUsageCount(GlossaryTerm term) {
|
||||
return daoCollection
|
||||
.tagUsageDAO()
|
||||
.getTagCount(TagSource.GLOSSARY.ordinal(), FullyQualifiedName.buildHash(term.getFullyQualifiedName()));
|
||||
return daoCollection.tagUsageDAO().getTagCount(TagSource.GLOSSARY.ordinal(), term.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
private EntityReference getParent(GlossaryTerm entity) throws IOException {
|
||||
@ -214,11 +212,6 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(GlossaryTerm entity) {
|
||||
return FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
protected EntityReference getGlossary(GlossaryTerm term) throws IOException {
|
||||
return getFromEntityRef(term.getId(), Relationship.CONTAINS, GLOSSARY, true);
|
||||
}
|
||||
@ -235,9 +228,7 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
|
||||
@Override
|
||||
protected void postDelete(GlossaryTerm entity) {
|
||||
// Cleanup all the tag labels using this glossary term
|
||||
daoCollection
|
||||
.tagUsageDAO()
|
||||
.deleteTagLabels(TagSource.GLOSSARY.ordinal(), FullyQualifiedName.buildHash(entity.getFullyQualifiedName()));
|
||||
daoCollection.tagUsageDAO().deleteTagLabels(TagSource.GLOSSARY.ordinal(), entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
private void addGlossaryRelationship(GlossaryTerm term) {
|
||||
|
||||
@ -72,11 +72,6 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
FullyQualifiedName.add(ingestionPipeline.getService().getFullyQualifiedName(), ingestionPipeline.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(IngestionPipeline ingestionPipeline) {
|
||||
return FullyQualifiedName.buildHash(ingestionPipeline.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public IngestionPipeline setFields(IngestionPipeline ingestionPipeline, Fields fields) throws IOException {
|
||||
return ingestionPipeline.withService(getContainer(ingestionPipeline.getId()));
|
||||
@ -95,7 +90,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.delete(FullyQualifiedName.buildHash(ingestionPipeline.getFullyQualifiedName()), PIPELINE_STATUS_EXTENSION);
|
||||
.delete(ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_EXTENSION);
|
||||
setFieldsInternal(ingestionPipeline, Fields.EMPTY_FIELDS);
|
||||
return ingestionPipeline;
|
||||
}
|
||||
@ -195,7 +190,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.insert(
|
||||
FullyQualifiedName.buildHash(ingestionPipeline.getFullyQualifiedName()),
|
||||
ingestionPipeline.getFullyQualifiedName(),
|
||||
PIPELINE_STATUS_EXTENSION,
|
||||
PIPELINE_STATUS_JSON_SCHEMA,
|
||||
JsonUtils.pojoToJson(pipelineStatus));
|
||||
@ -236,7 +231,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
.getExtensionByKey(
|
||||
RUN_ID_EXTENSION_KEY,
|
||||
pipelineStatusRunId.toString(),
|
||||
FullyQualifiedName.buildHash(ingestionPipeline.getFullyQualifiedName()),
|
||||
ingestionPipeline.getFullyQualifiedName(),
|
||||
PIPELINE_STATUS_EXTENSION),
|
||||
PipelineStatus.class);
|
||||
}
|
||||
|
||||
@ -36,11 +36,6 @@ public class MetricsRepository extends EntityRepository<Metrics> {
|
||||
metrics.setFullyQualifiedName(FullyQualifiedName.add(metrics.getService().getName(), metrics.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Metrics metrics) {
|
||||
return FullyQualifiedName.buildHash(metrics.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metrics setFields(Metrics metrics, Fields fields) throws IOException {
|
||||
metrics.setService(getContainer(metrics.getId())); // service is a default field
|
||||
|
||||
@ -76,11 +76,6 @@ public class MlModelRepository extends EntityRepository<MlModel> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(MlModel mlModel) {
|
||||
return FullyQualifiedName.buildHash(mlModel.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public MlModel setFields(MlModel mlModel, Fields fields) throws IOException {
|
||||
mlModel.setService(getContainer(mlModel.getId()));
|
||||
|
||||
@ -74,11 +74,6 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
|
||||
setTaskFQN(pipeline.getFullyQualifiedName(), pipeline.getTasks());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Pipeline pipeline) {
|
||||
return FullyQualifiedName.buildHash(pipeline.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TaskDetails task, MessageParser.EntityLink entityLink, String newValue, String user)
|
||||
throws IOException {
|
||||
|
||||
@ -7,7 +7,6 @@ import javax.ws.rs.core.Response;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.schema.analytics.ReportData;
|
||||
import org.openmetadata.schema.analytics.ReportData.ReportDataType;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
|
||||
@ -26,7 +25,7 @@ public class ReportDataRepository {
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.insert(
|
||||
EntityUtil.hash(reportData.getReportDataType().value()),
|
||||
reportData.getReportDataType().value(),
|
||||
REPORT_DATA_EXTENSION,
|
||||
"reportData",
|
||||
JsonUtils.pojoToJson(reportData));
|
||||
@ -41,7 +40,7 @@ public class ReportDataRepository {
|
||||
JsonUtils.readObjects(
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.listBetweenTimestamps(EntityUtil.hash(reportDataType.value()), REPORT_DATA_EXTENSION, startTs, endTs),
|
||||
.listBetweenTimestamps(reportDataType.value(), REPORT_DATA_EXTENSION, startTs, endTs),
|
||||
ReportData.class);
|
||||
|
||||
return new ResultList<>(reportData, String.valueOf(startTs), String.valueOf(endTs), reportData.size());
|
||||
|
||||
@ -24,7 +24,6 @@ import org.openmetadata.schema.entity.services.connections.TestConnectionResult;
|
||||
import org.openmetadata.service.secrets.SecretsManager;
|
||||
import org.openmetadata.service.secrets.SecretsManagerFactory;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
public abstract class ServiceEntityRepository<
|
||||
@ -90,7 +89,7 @@ public abstract class ServiceEntityRepository<
|
||||
public T addTestConnectionResult(UUID serviceId, TestConnectionResult testConnectionResult) throws IOException {
|
||||
T service = dao.findEntityById(serviceId);
|
||||
service.setTestConnectionResult(testConnectionResult);
|
||||
dao.update(serviceId, FullyQualifiedName.buildHash(service.getFullyQualifiedName()), JsonUtils.pojoToJson(service));
|
||||
dao.update(serviceId, service.getFullyQualifiedName(), JsonUtils.pojoToJson(service));
|
||||
return service;
|
||||
}
|
||||
|
||||
|
||||
@ -184,11 +184,6 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
ColumnUtil.setColumnFQN(table.getFullyQualifiedName(), table.getColumns());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Table entity) {
|
||||
return FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public Table addJoins(UUID tableId, TableJoins joins) throws IOException {
|
||||
// Validate the request content
|
||||
@ -599,7 +594,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
stored.setTags(modelColumn.getTags());
|
||||
}
|
||||
applyTags(table.getColumns());
|
||||
dao.update(table.getId(), FullyQualifiedName.buildHash(table.getFullyQualifiedName()), JsonUtils.pojoToJson(table));
|
||||
dao.update(table.getId(), table.getFullyQualifiedName(), JsonUtils.pojoToJson(table));
|
||||
setFieldsInternal(table, new Fields(Set.of(FIELD_OWNER), FIELD_OWNER));
|
||||
setFieldsInternal(table, new Fields(Set.of(FIELD_TAGS), FIELD_TAGS));
|
||||
return table;
|
||||
@ -722,7 +717,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (childrenName != "" && column != null) {
|
||||
if (!"".equals(childrenName) && column != null) {
|
||||
column = getChildrenColumn(column.getChildren(), childrenName);
|
||||
}
|
||||
if (column == null) {
|
||||
@ -753,9 +748,9 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
}
|
||||
}
|
||||
if (childrenColumn == null) {
|
||||
for (int i = 0; i < column.size(); i++) {
|
||||
if (column.get(i).getChildren() != null) {
|
||||
childrenColumn = getChildrenColumn(column.get(i).getChildren(), childrenName);
|
||||
for (Column value : column) {
|
||||
if (value.getChildren() != null) {
|
||||
childrenColumn = getChildrenColumn(value.getChildren(), childrenName);
|
||||
if (childrenColumn != null) {
|
||||
break;
|
||||
}
|
||||
@ -833,8 +828,8 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.find(
|
||||
FullyQualifiedName.buildHash(fromEntityFQN),
|
||||
FullyQualifiedName.buildHash(toEntityFQN),
|
||||
fromEntityFQN,
|
||||
toEntityFQN,
|
||||
entityRelationType,
|
||||
entityRelationType,
|
||||
Relationship.JOINED_WITH.ordinal()))
|
||||
@ -848,8 +843,8 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.upsert(
|
||||
FullyQualifiedName.buildHash(fromEntityFQN),
|
||||
FullyQualifiedName.buildHash(toEntityFQN),
|
||||
fromEntityFQN,
|
||||
toEntityFQN,
|
||||
fromEntityFQN,
|
||||
toEntityFQN,
|
||||
entityRelationType,
|
||||
@ -905,7 +900,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
List<Pair<String, List<DailyCount>>> entityRelations =
|
||||
daoCollection.fieldRelationshipDAO()
|
||||
.listBidirectional(
|
||||
FullyQualifiedName.buildHash(table.getFullyQualifiedName()),
|
||||
table.getFullyQualifiedName(),
|
||||
FIELD_RELATION_TABLE_TYPE,
|
||||
FIELD_RELATION_TABLE_TYPE,
|
||||
Relationship.JOINED_WITH.ordinal())
|
||||
@ -927,7 +922,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
List<Triple<String, String, List<DailyCount>>> entityRelations =
|
||||
daoCollection.fieldRelationshipDAO()
|
||||
.listBidirectionalByPrefix(
|
||||
FullyQualifiedName.buildHash(table.getFullyQualifiedName()),
|
||||
table.getFullyQualifiedName(),
|
||||
FIELD_RELATION_COLUMN_TYPE,
|
||||
FIELD_RELATION_COLUMN_TYPE,
|
||||
Relationship.JOINED_WITH.ordinal())
|
||||
|
||||
@ -101,11 +101,6 @@ public class TagRepository extends EntityRepository<Tag> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Tag tag) {
|
||||
return FullyQualifiedName.buildHash(tag.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityRepository<Tag>.EntityUpdater getUpdater(Tag original, Tag updated, Operation operation) {
|
||||
return new TagUpdater(original, updated, operation);
|
||||
@ -125,9 +120,7 @@ public class TagRepository extends EntityRepository<Tag> {
|
||||
}
|
||||
|
||||
private Integer getUsageCount(Tag tag) {
|
||||
return daoCollection
|
||||
.tagUsageDAO()
|
||||
.getTagCount(TagSource.CLASSIFICATION.ordinal(), FullyQualifiedName.buildHash(tag.getFullyQualifiedName()));
|
||||
return daoCollection.tagUsageDAO().getTagCount(TagSource.CLASSIFICATION.ordinal(), tag.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
private List<EntityReference> getChildren(Tag entity) throws IOException {
|
||||
|
||||
@ -72,7 +72,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
JsonUtils.readValue(
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.getExtensionAtTimestamp(FullyQualifiedName.buildHash(fqn), TESTCASE_RESULT_EXTENSION, timestamp),
|
||||
.getExtensionAtTimestamp(fqn, TESTCASE_RESULT_EXTENSION, timestamp),
|
||||
TestCaseResult.class);
|
||||
|
||||
TestCaseResult updated = JsonUtils.applyPatch(original, patch, TestCaseResult.class);
|
||||
@ -82,8 +82,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
updated.getTestCaseFailureStatus().setUpdatedAt(System.currentTimeMillis());
|
||||
daoCollection
|
||||
.entityExtensionTimeSeriesDao()
|
||||
.update(
|
||||
FullyQualifiedName.buildHash(fqn), TESTCASE_RESULT_EXTENSION, JsonUtils.pojoToJson(updated), timestamp);
|
||||
.update(fqn, TESTCASE_RESULT_EXTENSION, JsonUtils.pojoToJson(updated), timestamp);
|
||||
change = ENTITY_UPDATED;
|
||||
}
|
||||
return new RestUtil.PatchResponse<>(Response.Status.OK, updated, change);
|
||||
@ -98,11 +97,6 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
test.setEntityFQN(entityLink.getFullyQualifiedFieldValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(TestCase test) {
|
||||
return FullyQualifiedName.buildHash(test.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(TestCase test) throws IOException {
|
||||
EntityLink entityLink = EntityLink.parse(test.getEntityLink());
|
||||
|
||||
@ -66,11 +66,6 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFullyQualifiedNameHash(Topic topic) {
|
||||
return FullyQualifiedName.buildHash(topic.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
public TopicRepository(CollectionDAO dao) {
|
||||
super(TopicResource.COLLECTION_PATH, Entity.TOPIC, Topic.class, dao.topicDAO(), dao, "", "");
|
||||
}
|
||||
|
||||
@ -40,7 +40,6 @@ import org.openmetadata.service.TypeRegistry;
|
||||
import org.openmetadata.service.resources.types.TypeResource;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.RestUtil.PutResponse;
|
||||
|
||||
@ -130,10 +129,7 @@ public class TypeRepository extends EntityRepository<Type> {
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.listToByPrefix(
|
||||
FullyQualifiedName.buildHash(getCustomPropertyFQNPrefix(type.getName())),
|
||||
Entity.TYPE,
|
||||
Entity.TYPE,
|
||||
Relationship.HAS.ordinal());
|
||||
getCustomPropertyFQNPrefix(type.getName()), Entity.TYPE, Entity.TYPE, Relationship.HAS.ordinal());
|
||||
for (Triple<String, String, String> result : results) {
|
||||
CustomProperty property = JsonUtils.readValue(result.getRight(), CustomProperty.class);
|
||||
property.setPropertyType(dao.findEntityReferenceByName(result.getMiddle()));
|
||||
@ -192,8 +188,8 @@ public class TypeRepository extends EntityRepository<Type> {
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.insert(
|
||||
FullyQualifiedName.buildHash(customPropertyFQN),
|
||||
FullyQualifiedName.buildHash(property.getPropertyType().getName()),
|
||||
customPropertyFQN,
|
||||
property.getPropertyType().getName(),
|
||||
customPropertyFQN,
|
||||
property.getPropertyType().getName(),
|
||||
Entity.TYPE,
|
||||
@ -212,8 +208,8 @@ public class TypeRepository extends EntityRepository<Type> {
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.delete(
|
||||
FullyQualifiedName.buildHash(customPropertyFQN),
|
||||
FullyQualifiedName.buildHash(property.getPropertyType().getName()),
|
||||
customPropertyFQN,
|
||||
property.getPropertyType().getName(),
|
||||
Entity.TYPE,
|
||||
Entity.TYPE,
|
||||
Relationship.HAS.ordinal());
|
||||
@ -232,8 +228,8 @@ public class TypeRepository extends EntityRepository<Type> {
|
||||
daoCollection
|
||||
.fieldRelationshipDAO()
|
||||
.upsert(
|
||||
FullyQualifiedName.buildHash(customPropertyFQN),
|
||||
FullyQualifiedName.buildHash(updatedProperty.getPropertyType().getName()),
|
||||
customPropertyFQN,
|
||||
updatedProperty.getPropertyType().getName(),
|
||||
customPropertyFQN,
|
||||
updatedProperty.getPropertyType().getName(),
|
||||
Entity.TYPE,
|
||||
|
||||
@ -56,7 +56,6 @@ import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.search.IndexUtil;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ReIndexingHandler;
|
||||
|
||||
@ -326,8 +325,7 @@ public class SearchResource {
|
||||
String jobRecord;
|
||||
jobRecord =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.getLatestExtension(
|
||||
FullyQualifiedName.buildHash(ELASTIC_SEARCH_ENTITY_FQN_STREAM), ELASTIC_SEARCH_EXTENSION);
|
||||
.getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
|
||||
if (jobRecord != null) {
|
||||
return Response.status(Response.Status.OK)
|
||||
.entity(JsonUtils.readValue(jobRecord, EventPublisherJob.class))
|
||||
|
||||
@ -55,7 +55,7 @@ public class ReIndexingHandler {
|
||||
private static CollectionDAO dao;
|
||||
private static SearchClient searchClient;
|
||||
private static ExecutorService threadScheduler;
|
||||
private final Map<UUID, SearchIndexWorkflow> REINDEXING_JOB_MAP = new LinkedHashMap<>();
|
||||
private static final Map<UUID, SearchIndexWorkflow> REINDEXING_JOB_MAP = new LinkedHashMap<>();
|
||||
private static BlockingQueue<Runnable> taskQueue;
|
||||
|
||||
private ReIndexingHandler() {}
|
||||
@ -113,7 +113,7 @@ public class ReIndexingHandler {
|
||||
// Create Entry in the DB
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.insert(
|
||||
EntityUtil.hash(jobData.getId().toString()),
|
||||
jobData.getId().toString(),
|
||||
REINDEXING_JOB_EXTENSION,
|
||||
"eventPublisherJob",
|
||||
JsonUtils.pojoToJson(jobData));
|
||||
@ -173,8 +173,7 @@ public class ReIndexingHandler {
|
||||
SearchIndexWorkflow job = REINDEXING_JOB_MAP.get(jobId);
|
||||
if (job == null) {
|
||||
String recordString =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.getLatestExtension(EntityUtil.hash(jobId.toString()), REINDEXING_JOB_EXTENSION);
|
||||
dao.entityExtensionTimeSeriesDao().getLatestExtension(jobId.toString(), REINDEXING_JOB_EXTENSION);
|
||||
return JsonUtils.readValue(recordString, EventPublisherJob.class);
|
||||
}
|
||||
return REINDEXING_JOB_MAP.get(jobId).getJobData();
|
||||
|
||||
@ -0,0 +1,34 @@
|
||||
package org.openmetadata.service.util.jdbi;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Parameter;
|
||||
import java.lang.reflect.Type;
|
||||
import org.jdbi.v3.sqlobject.customizer.SqlStatementCustomizerFactory;
|
||||
import org.jdbi.v3.sqlobject.customizer.SqlStatementCustomizingAnnotation;
|
||||
import org.jdbi.v3.sqlobject.customizer.SqlStatementParameterCustomizer;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
|
||||
/** Convert fqn string to fqnHash */
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target({ElementType.PARAMETER})
|
||||
@SqlStatementCustomizingAnnotation(BindFQN.Factory.class)
|
||||
public @interface BindFQN {
|
||||
String value();
|
||||
|
||||
class Factory implements SqlStatementCustomizerFactory {
|
||||
@Override
|
||||
public SqlStatementParameterCustomizer createForParameter(
|
||||
Annotation annotation, Class<?> sqlObjectType, Method method, Parameter param, int index, Type type) {
|
||||
BindFQN bind = (BindFQN) annotation;
|
||||
return (stmt, arg) -> {
|
||||
String fqn = (String) arg;
|
||||
stmt.bind(bind.value(), FullyQualifiedName.buildHash(fqn));
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -25,7 +25,6 @@ import org.openmetadata.schema.analytics.ReportData;
|
||||
import org.openmetadata.schema.system.StepStats;
|
||||
import org.openmetadata.service.exception.SourceException;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.RestUtil;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
import org.openmetadata.service.workflows.interfaces.Source;
|
||||
@ -94,11 +93,10 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
|
||||
public ResultList<ReportData> getReportDataPagination(String entityFQN, int limit, String after) {
|
||||
// workaround. Should be fixed in https://github.com/open-metadata/OpenMetadata/issues/12298
|
||||
String upperCaseFQN = StringUtils.capitalize(entityFQN);
|
||||
int reportDataCount = dao.entityExtensionTimeSeriesDao().listCount(EntityUtil.hash(upperCaseFQN));
|
||||
int reportDataCount = dao.entityExtensionTimeSeriesDao().listCount(upperCaseFQN);
|
||||
List<CollectionDAO.ReportDataRow> reportDataList =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.getAfterExtension(
|
||||
EntityUtil.hash(upperCaseFQN), limit + 1, after == null ? "0" : RestUtil.decodeCursor(after));
|
||||
.getAfterExtension(upperCaseFQN, limit + 1, after == null ? "0" : RestUtil.decodeCursor(after));
|
||||
return getAfterExtensionList(reportDataList, after, limit, reportDataCount);
|
||||
}
|
||||
|
||||
|
||||
@ -56,7 +56,6 @@ import org.openmetadata.service.search.openSearch.OpenSearchDataInsightProcessor
|
||||
import org.openmetadata.service.search.openSearch.OpenSearchEntitiesProcessor;
|
||||
import org.openmetadata.service.search.openSearch.OpenSearchIndexSink;
|
||||
import org.openmetadata.service.socket.WebSocketManager;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ReIndexingHandler;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
@ -335,16 +334,12 @@ public class SearchIndexWorkflow implements Runnable {
|
||||
|
||||
public void updateRecordToDb() throws IOException {
|
||||
String recordString =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.getExtension(EntityUtil.hash(jobData.getId().toString()), REINDEXING_JOB_EXTENSION);
|
||||
dao.entityExtensionTimeSeriesDao().getExtension(jobData.getId().toString(), REINDEXING_JOB_EXTENSION);
|
||||
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
|
||||
long originalLastUpdate = lastRecord.getTimestamp();
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.update(
|
||||
EntityUtil.hash(jobData.getId().toString()),
|
||||
REINDEXING_JOB_EXTENSION,
|
||||
JsonUtils.pojoToJson(jobData),
|
||||
originalLastUpdate);
|
||||
jobData.getId().toString(), REINDEXING_JOB_EXTENSION, JsonUtils.pojoToJson(jobData), originalLastUpdate);
|
||||
}
|
||||
|
||||
private void reCreateIndexes(String entityType) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user