Fix #1356: Add new entities to ES index from OpenMetadata server (#1423)

* Fix #1356: Add new entities to ES index from OpenMetadata server

Co-authored-by: Sachin-chaurasiya <sachinchaurasiyachotey87@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2021-12-04 08:45:17 -08:00 committed by GitHub
parent 128a69045d
commit 63a93c8944
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1410 additions and 482 deletions

View File

@ -0,0 +1,298 @@
/*
* Copyright 2021 Collate
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.elasticsearch;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.DbtModel;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.events.EventHandler;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class ElasticSearchEventHandler implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchEventHandler.class);
private RestHighLevelClient client;
private ElasticSearchIndexDefinition esIndexDefinition;
private final ActionListener<UpdateResponse> listener = new ActionListener<>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
LOG.info("Updated Elastic Search {}", updateResponse);
}
@Override
public void onFailure(Exception e) {
LOG.error("Failed to update Elastic Search", e);
}
};
public void init(CatalogApplicationConfig config, Jdbi jdbi) {
ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration();
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(),
"http"));
if(StringUtils.isNotEmpty(esConfig.getUsername())){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(),
esConfig.getPassword()));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
});
}
client = new RestHighLevelClient(restClientBuilder);
esIndexDefinition = new ElasticSearchIndexDefinition(client);
esIndexDefinition.createIndexes();
}
public Void process(ContainerRequestContext requestContext,
ContainerResponseContext responseContext) {
try {
LOG.info("request Context "+ requestContext.toString());
if (responseContext.getEntity() != null) {
Object entity = responseContext.getEntity();
UpdateRequest updateRequest = null;
String entityClass = entity.getClass().toString();
if (entityClass.toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
boolean exists =
esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.TABLE_SEARCH_INDEX);
if (exists) {
Table instance = (Table) entity;
updateRequest = updateTable(instance, responseContext);
}
} else if (entityClass.toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) {
boolean exists =
esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX);
if (exists) {
Dashboard instance = (Dashboard) entity;
updateRequest = updateDashboard(instance, responseContext);
}
} else if (entityClass.toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) {
boolean exists =
esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.TOPIC_SEARCH_INDEX);
if (exists) {
Topic instance = (Topic) entity;
updateRequest = updateTopic(instance, responseContext);
}
} else if (entityClass.toLowerCase().endsWith(Entity.PIPELINE.toLowerCase())) {
boolean exists =
esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX);
if (exists) {
Pipeline instance = (Pipeline) entity;
updateRequest = updatePipeline(instance, responseContext);
}
} else if (entityClass.toLowerCase().endsWith(Entity.DBTMODEL.toLowerCase())) {
boolean exists =
esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.DBT_MODEL_SEARCH_INDEX);
if (exists) {
DbtModel instance = (DbtModel) entity;
updateRequest = updateDbtModel(instance, responseContext);
}
} else if (entityClass.toLowerCase().equalsIgnoreCase(ChangeEvent.class.toString())) {
ChangeEvent changeEvent = (ChangeEvent) entity;
updateRequest = applyChangeEvent(changeEvent);
}
if (updateRequest != null) {
client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);
}
}
} catch (Exception e) {
LOG.error("failed to update ES doc", e);
}
return null;
}
private UpdateRequest applyChangeEvent(ChangeEvent event) {
String entityType = event.getEntityType();
ElasticSearchIndexType esIndexType = esIndexDefinition.getIndexMappingByEntityType(entityType);
UUID entityId = event.getEntityId();
ChangeDescription changeDescription = event.getChangeDescription();
List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded();
StringBuilder scriptTxt = new StringBuilder();
Map<String, Object> fieldAddParams = new HashMap<>();
for (FieldChange fieldChange: fieldsAdded) {
if (fieldChange.getName().equalsIgnoreCase("followers")) {
List<EntityReference> entityReferences = (List<EntityReference>) fieldChange.getNewValue();
List<String> newFollowers = new ArrayList<>();
for (EntityReference follower : entityReferences) {
newFollowers.add(follower.getId().toString());
}
fieldAddParams.put(fieldChange.getName(), newFollowers);
scriptTxt.append("ctx._source.followers.addAll(params.followers);");
}
}
for (FieldChange fieldChange: changeDescription.getFieldsDeleted()) {
if (fieldChange.getName().equalsIgnoreCase("followers")) {
List<EntityReference> entityReferences = (List<EntityReference>) fieldChange.getOldValue();
for (EntityReference follower : entityReferences) {
fieldAddParams.put(fieldChange.getName(), follower.getId().toString());
}
scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers))");
}
}
if (!scriptTxt.toString().isEmpty()) {
Script script = new Script(ScriptType.INLINE, "painless",
scriptTxt.toString(),
fieldAddParams);
UpdateRequest updateRequest = new UpdateRequest(esIndexType.indexName, entityId.toString());
updateRequest.script(script);
return updateRequest;
} else {
return null;
}
}
private UpdateRequest updateTable(Table instance, ContainerResponseContext responseContext)
throws JsonProcessingException {
int responseCode = responseContext.getStatus();
TableESIndex tableESIndex = TableESIndex.builder(instance, responseCode).build();
UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName,
instance.getId().toString());
if (responseCode != Response.Status.CREATED.getStatusCode()) {
scriptedUpsert(tableESIndex, updateRequest);
} else {
String json = JsonUtils.pojoToJson(tableESIndex);
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
}
return updateRequest;
}
private UpdateRequest updateTopic(Topic instance, ContainerResponseContext responseContext)
throws JsonProcessingException {
int responseCode = responseContext.getStatus();
TopicESIndex topicESIndex = TopicESIndex.builder(instance, responseCode).build();
UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName,
instance.getId().toString());
if (responseCode != Response.Status.CREATED.getStatusCode()) {
scriptedUpsert(topicESIndex, updateRequest);
} else {
//only upsert if its a new entity
updateRequest.doc(JsonUtils.pojoToJson(topicESIndex), XContentType.JSON);
updateRequest.docAsUpsert(true);
}
return updateRequest;
}
private UpdateRequest updateDashboard(Dashboard instance, ContainerResponseContext responseContext)
throws JsonProcessingException {
int responseCode = responseContext.getStatus();
DashboardESIndex dashboardESIndex = DashboardESIndex.builder(instance, responseCode).build();
UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName,
instance.getId().toString());
if (responseCode != Response.Status.CREATED.getStatusCode()) {
scriptedUpsert(dashboardESIndex, updateRequest);
} else {
updateRequest.doc(JsonUtils.pojoToJson(dashboardESIndex), XContentType.JSON);
updateRequest.docAsUpsert(true);
}
return updateRequest;
}
private UpdateRequest updatePipeline(Pipeline instance, ContainerResponseContext responseContext)
throws JsonProcessingException {
int responseCode = responseContext.getStatus();
PipelineESIndex pipelineESIndex = PipelineESIndex.builder(instance, responseCode).build();
UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName,
instance.getId().toString());
if (responseCode != Response.Status.CREATED.getStatusCode()) {
scriptedUpsert(pipelineESIndex, updateRequest);
} else {
//only upsert if it's a new entity
updateRequest.doc(JsonUtils.pojoToJson(pipelineESIndex), XContentType.JSON);
updateRequest.docAsUpsert(true);
}
return updateRequest;
}
private UpdateRequest updateDbtModel(DbtModel instance, ContainerResponseContext responseContext)
throws JsonProcessingException {
int responseCode = responseContext.getStatus();
DbtModelESIndex dbtModelESIndex = DbtModelESIndex.builder(instance, responseCode).build();
UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.DBT_MODEL_SEARCH_INDEX.indexName,
instance.getId().toString());
//only append to changeDescriptions on updates
if (responseCode != Response.Status.CREATED.getStatusCode()) {
scriptedUpsert(dbtModelESIndex, updateRequest);
} else {
updateRequest.doc(JsonUtils.pojoToJson(dbtModelESIndex), XContentType.JSON);
updateRequest.docAsUpsert(true);
}
return updateRequest;
}
private void scriptedUpsert(Object index, UpdateRequest updateRequest) {
String scriptTxt= "for (k in params.keySet()) {if (k == 'change_descriptions') " +
"{ ctx._source.change_descriptions.addAll(params.change_descriptions) } " +
"else { ctx._source.put(k, params.get(k)) }}";
Map<String, Object> doc = JsonUtils.getMap(index);
Script script = new Script(ScriptType.INLINE, "painless", scriptTxt,
doc);
updateRequest.script(script);
updateRequest.scriptedUpsert(true);
}
public void close() {
try {
this.client.close();
} catch (Exception e) {
LOG.error("Failed to close elastic search", e);
}
}
}

View File

@ -0,0 +1,684 @@
package org.openmetadata.catalog.elasticsearch;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.Value;
import lombok.experimental.SuperBuilder;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.DbtModel;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.type.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class ElasticSearchIndexDefinition {
Map<ElasticSearchIndexType, ElasticSearchIndexStatus> elasticSearchIndexes = new HashMap<>();
private final RestHighLevelClient client;
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexDefinition.class);
public ElasticSearchIndexDefinition(RestHighLevelClient client) {
this.client = client;
for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) {
elasticSearchIndexes.put(elasticSearchIndexType, ElasticSearchIndexStatus.NOT_CREATED);
}
}
public enum ElasticSearchIndexStatus {
CREATED,
NOT_CREATED,
FAILED
}
public enum ElasticSearchIndexType {
TABLE_SEARCH_INDEX("table_search_index", "elasticsearch/table_index_mapping.json"),
TOPIC_SEARCH_INDEX("topic_search_index", "elasticsearch/topic_index_mapping.json"),
DASHBOARD_SEARCH_INDEX("dashboard_search_index", "elasticsearch/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX("pipeline_search_index", "elasticsearch/pipeline_index_mapping.json"),
DBT_MODEL_SEARCH_INDEX("dbt_model_search_index", "elasticsearch/dbt_index_mapping.json");
public final String indexName;
public final String indexMappingFile;
ElasticSearchIndexType(String indexName, String indexMappingFile) {
this.indexName = indexName;
this.indexMappingFile = indexMappingFile;
}
}
public void createIndexes() {
try {
for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) {
createIndex(elasticSearchIndexType);
}
} catch(Exception e) {
LOG.error("Failed to created Elastic Search indexes due to", e);
}
}
public boolean checkIndexExistsOrCreate(ElasticSearchIndexType indexType) {
boolean exists = elasticSearchIndexes.get(indexType) == ElasticSearchIndexStatus.CREATED;
if (!exists) {
exists = createIndex(indexType);
}
return exists;
}
private boolean createIndex(ElasticSearchIndexType elasticSearchIndexType) {
try {
GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName);
gRequest.local(false);
boolean exists = client.indices().exists(gRequest, RequestOptions.DEFAULT);
if (!exists) {
String elasticSearchIndexMapping = getIndexMapping(elasticSearchIndexType);
CreateIndexRequest request = new CreateIndexRequest(elasticSearchIndexType.indexName);
request.mapping(elasticSearchIndexMapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
LOG.info(elasticSearchIndexType.indexName + " Created " + createIndexResponse.isAcknowledged());
}
setIndexStatus(elasticSearchIndexType, ElasticSearchIndexStatus.CREATED);
} catch(Exception e) {
setIndexStatus(elasticSearchIndexType, ElasticSearchIndexStatus.FAILED);
LOG.error("Failed to created Elastic Search indexes due to", e);
return false;
}
return true;
}
private void setIndexStatus(ElasticSearchIndexType indexType, ElasticSearchIndexStatus elasticSearchIndexStatus) {
elasticSearchIndexes.put(indexType, elasticSearchIndexStatus);
}
public String getIndexMapping(ElasticSearchIndexType elasticSearchIndexType)
throws URISyntaxException, IOException {
URL resource = ElasticSearchIndexDefinition.class
.getClassLoader().getResource(elasticSearchIndexType.indexMappingFile);
Path path = Paths.get(resource.toURI());
return new String(Files.readAllBytes(path));
}
public ElasticSearchIndexType getIndexMappingByEntityType(String type) {
if (type.equalsIgnoreCase(Entity.TABLE)) {
return ElasticSearchIndexType.TABLE_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.DASHBOARD)) {
return ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.PIPELINE)) {
return ElasticSearchIndexType.PIPELINE_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.TOPIC)) {
return ElasticSearchIndexType.TOPIC_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.DBTMODEL)) {
return ElasticSearchIndexType.DBT_MODEL_SEARCH_INDEX;
}
throw new RuntimeException("Failed to find index doc for type {}".format(type));
}
}
@SuperBuilder
@Data
class ElasticSearchIndex {
@JsonProperty("display_name")
String displayName;
String fqdn;
String service;
@JsonProperty("service_type")
String serviceType;
@JsonProperty("service_category")
String serviceCategory;
@JsonProperty("entity_type")
String entityType;
List<ElasticSearchSuggest> suggest;
String description;
String tier;
List<String> tags;
String owner;
List<String> followers;
@JsonProperty("last_updated_timestamp")
@Builder.Default
Long lastUpdatedTimestamp = System.currentTimeMillis();
@JsonProperty("change_descriptions")
List<ESChangeDescription> changeDescriptions;
public void parseTags(List<String> tags) {
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>(tags);
String tierTag = null;
for (String tag : tagsList) {
if (tag.toLowerCase().matches("(.*)tier(.*)")) {
tierTag = tag;
break;
}
}
if (tierTag != null) {
tagsList.remove(tierTag);
this.tier = tierTag;
}
this.tags = tagsList;
} else {
this.tags = tags;
}
}
}
@Getter
@Builder
class ElasticSearchSuggest {
String input;
Integer weight;
}
@Getter
@Builder
class FlattenColumn {
String name;
String description;
List<String> tags;
}
@Getter
@Setter
@Builder
class ESChangeDescription {
String updatedBy;
Long updatedAt;
List<FieldChange> fieldsAdded;
List<FieldChange> fieldsUpdated;
List<FieldChange> fieldsDeleted;
}
@EqualsAndHashCode(callSuper = true)
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class TableESIndex extends ElasticSearchIndex {
@JsonProperty("table_name")
String tableName;
@JsonProperty("table_id")
String tableId;
String database;
@JsonProperty("table_type")
String tableType;
@JsonProperty("column_names")
List<String> columnNames;
@JsonProperty("column_descriptions")
List<String> columnDescriptions;
@JsonProperty("monthly_stats")
Integer monthlyStats;
@JsonProperty("monthly_percentile_rank")
Integer monthlyPercentileRank;
@JsonProperty("weekly_stats")
Integer weeklyStats;
@JsonProperty("weekly_percentile_rank")
Integer weeklyPercentileRank;
@JsonProperty("daily_stats")
Integer dailyStats;
@JsonProperty("daily_percentile_rank")
Integer dailyPercentileRank;
public static TableESIndexBuilder builder(Table table, int responseCode) throws JsonProcessingException {
String tableId = table.getId().toString();
String tableName = table.getName();
String description = table.getDescription();
List<String> tags = new ArrayList<>();
List<String> columnNames = new ArrayList<>();
List<String> columnDescriptions = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(table.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(table.getName()).weight(10).build());
if (table.getTags() != null) {
table.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (table.getColumns() != null) {
List<FlattenColumn> cols = new ArrayList<>();
cols = parseColumns(table.getColumns(), cols, null);
for (FlattenColumn col : cols) {
if (col.getTags() != null) {
tags.addAll(col.getTags());
}
columnDescriptions.add(col.getDescription());
columnNames.add(col.getName());
}
}
TableESIndexBuilder tableESIndexBuilder = internalBuilder().tableId(tableId)
.tableName(tableName)
.displayName(tableName)
.description(description)
.fqdn(table.getFullyQualifiedName())
.suggest(suggest)
.entityType("table")
.serviceCategory("databaseService")
.columnNames(columnNames)
.columnDescriptions(columnDescriptions)
.tableType(table.getTableType().toString())
.tags(tags);
if (table.getDatabase() != null) {
tableESIndexBuilder.database(table.getDatabase().getName());
}
if (table.getService() != null) {
tableESIndexBuilder.service(table.getService().getName());
tableESIndexBuilder.serviceType(table.getServiceType().toString());
}
if (table.getUsageSummary() != null) {
tableESIndexBuilder.weeklyStats(table.getUsageSummary().getWeeklyStats().getCount())
.weeklyPercentileRank(table.getUsageSummary().getWeeklyStats().getPercentileRank().intValue())
.dailyStats(table.getUsageSummary().getDailyStats().getCount())
.dailyPercentileRank(table.getUsageSummary().getDailyStats().getPercentileRank().intValue())
.monthlyStats(table.getUsageSummary().getMonthlyStats().getCount())
.monthlyPercentileRank(table.getUsageSummary().getMonthlyStats().getPercentileRank().intValue());
}
if (table.getFollowers() != null) {
tableESIndexBuilder.followers(table.getFollowers().stream().map(item ->
item.getId().toString()).collect(Collectors.toList()));
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
tableESIndexBuilder.followers(Collections.emptyList());
}
if (table.getOwner() != null) {
tableESIndexBuilder.owner(table.getOwner().getId().toString());
}
ESChangeDescription esChangeDescription = null;
if (table.getChangeDescription() != null) {
esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime())
.updatedBy(table.getUpdatedBy()).build();
esChangeDescription.setFieldsAdded(table.getChangeDescription().getFieldsAdded());
esChangeDescription.setFieldsDeleted(table.getChangeDescription().getFieldsDeleted());
esChangeDescription.setFieldsUpdated(table.getChangeDescription().getFieldsUpdated());
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime())
.updatedBy(table.getUpdatedBy()).build();
}
tableESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null);
return tableESIndexBuilder;
}
private static List<FlattenColumn> parseColumns(List<Column> columns, List<FlattenColumn> flattenColumns,
String parentColumn) {
Optional<String> optParentColumn = Optional.ofNullable(parentColumn).filter(Predicate.not(String::isEmpty));
List<String> tags = new ArrayList<>();
for (Column col: columns) {
String columnName = col.getName();
if (optParentColumn.isPresent()) {
columnName = optParentColumn.get() + "." + columnName;
}
if (col.getTags() != null) {
tags = col.getTags().stream().map(TagLabel::getTagFQN).collect(Collectors.toList());
}
FlattenColumn flattenColumn = FlattenColumn.builder()
.name(columnName)
.description(col.getDescription()).build();
if (!tags.isEmpty()) {
flattenColumn.tags = tags;
}
flattenColumns.add(flattenColumn);
if (col.getChildren() != null) {
parseColumns(col.getChildren(), flattenColumns, col.getName());
}
}
return flattenColumns;
}
}
@EqualsAndHashCode(callSuper = true)
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class TopicESIndex extends ElasticSearchIndex {
@JsonProperty("topic_name")
String topicName;
@JsonProperty("topic_id")
String topicId;
public static TopicESIndexBuilder builder(Topic topic, int responseCode) throws JsonProcessingException {
List<String> tags = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(topic.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(topic.getName()).weight(10).build());
if (topic.getTags() != null) {
topic.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
TopicESIndexBuilder topicESIndexBuilder = internalBuilder().topicId(topic.getId().toString())
.topicName(topic.getName())
.displayName(topic.getDisplayName())
.description(topic.getDescription())
.fqdn(topic.getFullyQualifiedName())
.suggest(suggest)
.service(topic.getService().getName())
.serviceType(topic.getServiceType().toString())
.serviceCategory("messagingService")
.entityType("topic")
.tags(tags);
if (topic.getFollowers() != null) {
topicESIndexBuilder.followers(topic.getFollowers().stream().map(item ->
item.getId().toString()).collect(Collectors.toList()));
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
topicESIndexBuilder.followers(Collections.emptyList());
}
if (topic.getOwner() != null) {
topicESIndexBuilder.owner(topic.getOwner().getId().toString());
}
ESChangeDescription esChangeDescription = null;
if (topic.getChangeDescription() != null) {
esChangeDescription = ESChangeDescription.builder().updatedAt(topic.getUpdatedAt().getTime())
.updatedBy(topic.getUpdatedBy()).build();
esChangeDescription.setFieldsAdded(topic.getChangeDescription().getFieldsAdded());
esChangeDescription.setFieldsDeleted(topic.getChangeDescription().getFieldsDeleted());
esChangeDescription.setFieldsUpdated(topic.getChangeDescription().getFieldsUpdated());
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
esChangeDescription = ESChangeDescription.builder().updatedAt(topic.getUpdatedAt().getTime())
.updatedBy(topic.getUpdatedBy()).build();
}
topicESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null);
return topicESIndexBuilder;
}
}
@EqualsAndHashCode(callSuper = true)
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class DashboardESIndex extends ElasticSearchIndex {
@JsonProperty("dashboard_name")
String dashboardName;
@JsonProperty("dashboard_id")
String dashboardId;
@JsonProperty("chart_names")
List<String> chartNames;
@JsonProperty("chart_descriptions")
List<String> chartDescriptions;
@JsonProperty("monthly_stats")
Integer monthlyStats;
@JsonProperty("monthly_percentile_rank")
Integer monthlyPercentileRank;
@JsonProperty("weekly_stats")
Integer weeklyStats;
@JsonProperty("weekly_percentile_rank")
Integer weeklyPercentileRank;
@JsonProperty("daily_stats")
Integer dailyStats;
@JsonProperty("daily_percentile_rank")
Integer dailyPercentileRank;
public static DashboardESIndexBuilder builder(Dashboard dashboard, int responseCode) throws JsonProcessingException {
List<String> tags = new ArrayList<>();
List<String> chartNames = new ArrayList<>();
List<String> chartDescriptions = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(dashboard.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(dashboard.getDisplayName()).weight(10).build());
if (dashboard.getTags() != null) {
dashboard.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
for (EntityReference chart: dashboard.getCharts()) {
chartNames.add(chart.getDisplayName());
chartDescriptions.add(chart.getDescription());
}
DashboardESIndexBuilder dashboardESIndexBuilder = internalBuilder().dashboardId(dashboard.getId().toString())
.dashboardName(dashboard.getDisplayName())
.displayName(dashboard.getDisplayName())
.description(dashboard.getDescription())
.fqdn(dashboard.getFullyQualifiedName())
.chartNames(chartNames)
.chartDescriptions(chartDescriptions)
.entityType("dashboard")
.suggest(suggest)
.service(dashboard.getService().getName())
.serviceType(dashboard.getServiceType().toString())
.serviceCategory("dashboardService")
.tags(tags);
if (dashboard.getUsageSummary() != null) {
dashboardESIndexBuilder.weeklyStats(dashboard.getUsageSummary().getWeeklyStats().getCount())
.weeklyPercentileRank(dashboard.getUsageSummary().getWeeklyStats().getPercentileRank().intValue())
.dailyStats(dashboard.getUsageSummary().getDailyStats().getCount())
.dailyPercentileRank(dashboard.getUsageSummary().getDailyStats().getPercentileRank().intValue())
.monthlyStats(dashboard.getUsageSummary().getMonthlyStats().getCount())
.monthlyPercentileRank(dashboard.getUsageSummary().getMonthlyStats().getPercentileRank().intValue());
}
if (dashboard.getFollowers() != null) {
dashboardESIndexBuilder.followers(dashboard.getFollowers().stream().map(item ->
item.getId().toString()).collect(Collectors.toList()));
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
dashboardESIndexBuilder.followers(Collections.emptyList());
}
if (dashboard.getOwner() != null) {
dashboardESIndexBuilder.owner(dashboard.getOwner().getId().toString());
}
ESChangeDescription esChangeDescription = null;
if (dashboard.getChangeDescription() != null) {
esChangeDescription = ESChangeDescription.builder()
.updatedAt(dashboard.getUpdatedAt().getTime())
.updatedBy(dashboard.getUpdatedBy()).build();
esChangeDescription.setFieldsAdded(dashboard.getChangeDescription().getFieldsAdded());
esChangeDescription.setFieldsDeleted(dashboard.getChangeDescription().getFieldsDeleted());
esChangeDescription.setFieldsUpdated(dashboard.getChangeDescription().getFieldsUpdated());
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
esChangeDescription = ESChangeDescription.builder()
.updatedAt(dashboard.getUpdatedAt().getTime())
.updatedBy(dashboard.getUpdatedBy()).build();
}
dashboardESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null);
return dashboardESIndexBuilder;
}
}
@EqualsAndHashCode(callSuper = true)
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class PipelineESIndex extends ElasticSearchIndex {
@JsonProperty("pipeline_name")
String pipelineName;
@JsonProperty("pipeine_id")
String pipelineId;
@JsonProperty("task_names")
List<String> taskNames;
@JsonProperty("task_descriptions")
List<String> taskDescriptions;
public static PipelineESIndexBuilder builder(Pipeline pipeline, int responseCode) throws JsonProcessingException {
List<String> tags = new ArrayList<>();
List<String> taskNames = new ArrayList<>();
List<String> taskDescriptions = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(pipeline.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(pipeline.getDisplayName()).weight(10).build());
if (pipeline.getTags() != null) {
pipeline.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
for (Task task: pipeline.getTasks()) {
taskNames.add(task.getDisplayName());
taskDescriptions.add(task.getDescription());
}
PipelineESIndexBuilder pipelineESIndexBuilder = internalBuilder().pipelineId(pipeline.getId().toString())
.pipelineName(pipeline.getDisplayName())
.displayName(pipeline.getDisplayName())
.description(pipeline.getDescription())
.fqdn(pipeline.getFullyQualifiedName())
.taskNames(taskNames)
.taskDescriptions(taskDescriptions)
.entityType("pipeline")
.suggest(suggest)
.service(pipeline.getService().getName())
.serviceType(pipeline.getServiceType().toString())
.serviceCategory("pipelineService")
.tags(tags);
if (pipeline.getFollowers() != null) {
pipelineESIndexBuilder.followers(pipeline.getFollowers().stream().map(item ->
item.getId().toString()).collect(Collectors.toList()));
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
pipelineESIndexBuilder.followers(Collections.emptyList());
}
if (pipeline.getOwner() != null) {
pipelineESIndexBuilder.owner(pipeline.getOwner().getId().toString());
}
ESChangeDescription esChangeDescription = null;
if (pipeline.getChangeDescription() != null) {
esChangeDescription = ESChangeDescription.builder()
.updatedAt(pipeline.getUpdatedAt().getTime())
.updatedBy(pipeline.getUpdatedBy()).build();
esChangeDescription.setFieldsAdded(pipeline.getChangeDescription().getFieldsAdded());
esChangeDescription.setFieldsDeleted(pipeline.getChangeDescription().getFieldsDeleted());
esChangeDescription.setFieldsUpdated(pipeline.getChangeDescription().getFieldsUpdated());
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
esChangeDescription = ESChangeDescription.builder()
.updatedAt(pipeline.getUpdatedAt().getTime())
.updatedBy(pipeline.getUpdatedBy()).build();
}
pipelineESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null);
return pipelineESIndexBuilder;
}
}
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class DbtModelESIndex extends ElasticSearchIndex {
@JsonProperty("dbt_model_name")
String dbtModelName;
@JsonProperty("dbt_model_id")
String dbtModelId;
@JsonProperty("column_names")
List<String> columnNames;
@JsonProperty("column_descriptions")
List<String> columnDescriptions;
String database;
@JsonProperty("node_type")
String nodeType;
public static DbtModelESIndexBuilder builder(DbtModel dbtModel, int responseCode) throws JsonProcessingException {
List<String> tags = new ArrayList<>();
List<String> columnNames = new ArrayList<>();
List<String> columnDescriptions = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(dbtModel.getFullyQualifiedName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(dbtModel.getName()).weight(10).build());
if (dbtModel.getTags() != null) {
dbtModel.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
for (Column column: dbtModel.getColumns()) {
columnNames.add(column.getName());
columnDescriptions.add(column.getDescription());
}
DbtModelESIndexBuilder dbtModelESIndexBuilder = internalBuilder().dbtModelId(dbtModel.getId().toString())
.dbtModelName(dbtModel.getName())
.displayName(dbtModel.getName())
.description(dbtModel.getDescription())
.fqdn(dbtModel.getFullyQualifiedName())
.columnNames(columnNames)
.columnDescriptions(columnDescriptions)
.entityType("dbtmodel")
.suggest(suggest)
.serviceCategory("databaseService")
.nodeType(dbtModel.getDbtNodeType().toString())
.database(dbtModel.getDatabase().getName())
.tags(tags);
if (dbtModel.getFollowers() != null) {
dbtModelESIndexBuilder.followers(dbtModel.getFollowers().stream().map(item ->
item.getId().toString()).collect(Collectors.toList()));
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
dbtModelESIndexBuilder.followers(Collections.emptyList());
}
if (dbtModel.getOwner() != null) {
dbtModelESIndexBuilder.owner(dbtModel.getOwner().getId().toString());
}
ESChangeDescription esChangeDescription = null;
if (dbtModel.getChangeDescription() != null) {
esChangeDescription = ESChangeDescription.builder()
.updatedAt(dbtModel.getUpdatedAt().getTime())
.updatedBy(dbtModel.getUpdatedBy()).build();
esChangeDescription.setFieldsAdded(dbtModel.getChangeDescription().getFieldsAdded());
esChangeDescription.setFieldsDeleted(dbtModel.getChangeDescription().getFieldsDeleted());
esChangeDescription.setFieldsUpdated(dbtModel.getChangeDescription().getFieldsUpdated());
} else if (responseCode == Response.Status.CREATED.getStatusCode()) {
// add an entry when the entity gets created first-time
esChangeDescription = ESChangeDescription.builder()
.updatedAt(dbtModel.getUpdatedAt().getTime())
.updatedBy(dbtModel.getUpdatedBy()).build();
}
dbtModelESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription): null);
return dbtModelESIndexBuilder;
}
}

View File

@ -1,415 +0,0 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.events;
import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.type.TagLabel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class ElasticSearchEventHandler implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(AuditEventHandler.class);
private RestHighLevelClient client;
private final ActionListener<UpdateResponse> listener = new ActionListener<>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
LOG.info("Updated Elastic Search {}", updateResponse);
}
@Override
public void onFailure(Exception e) {
LOG.error("Failed to update Elastic Search", e);
}
};
public void init(CatalogApplicationConfig config, Jdbi jdbi) {
ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration();
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http"));
if(StringUtils.isNotEmpty(esConfig.getUsername())){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
});
}
this.client = new RestHighLevelClient(restClientBuilder);
}
public Void process(ContainerRequestContext requestContext,
ContainerResponseContext responseContext) {
try {
LOG.info("request Context "+ requestContext.toString());
String changeEventClazz = ChangeEvent.class.toString();
if (responseContext.getEntity() != null) {
Object entity = responseContext.getEntity();
UpdateRequest updateRequest = null;
String entityClass = entity.getClass().toString();
if (entityClass.toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
Table instance = (Table) entity;
updateRequest = updateTable(instance);
} else if (entityClass.toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) {
Dashboard instance = (Dashboard) entity;
updateRequest = updateDashboard(instance);
} else if (entityClass.toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) {
Topic instance = (Topic) entity;
updateRequest = updateTopic(instance);
} else if (entityClass.toLowerCase().endsWith(Entity.PIPELINE.toLowerCase())) {
Pipeline instance = (Pipeline) entity;
updateRequest = updatePipeline(instance);
} else if (entityClass.toLowerCase().equalsIgnoreCase(ChangeEvent.class.toString())) {
ChangeEvent changeEvent = (ChangeEvent) entity;
updateRequest = applyChangeEvent(changeEvent);
}
if (updateRequest != null) {
client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);
}
}
} catch (Exception e) {
LOG.error("failed to update ES doc", e);
}
return null;
}
private UpdateRequest applyChangeEvent(ChangeEvent event) {
String entityType = event.getEntityType();
String esIndex = getESIndex(entityType);
UUID entityId = event.getEntityId();
ChangeDescription changeDescription = event.getChangeDescription();
List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded();
StringBuilder scriptTxt = new StringBuilder();
Map<String, Object> fieldAddParams = new HashMap<>();
for (FieldChange fieldChange: fieldsAdded) {
if (fieldChange.getName().equalsIgnoreCase("followers")) {
List<EntityReference> entityReferences = (List<EntityReference>) fieldChange.getNewValue();
List<String> newFollowers = new ArrayList<>();
for (EntityReference follower : entityReferences) {
newFollowers.add(follower.getId().toString());
}
fieldAddParams.put(fieldChange.getName(), newFollowers);
scriptTxt.append("ctx._source.followers.addAll(params.followers);");
}
}
for (FieldChange fieldChange: changeDescription.getFieldsDeleted()) {
if (fieldChange.getName().equalsIgnoreCase("followers")) {
List<EntityReference> entityReferences = (List<EntityReference>) fieldChange.getOldValue();
for (EntityReference follower : entityReferences) {
fieldAddParams.put(fieldChange.getName(), follower.getId().toString());
}
scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers))");
}
}
if (!scriptTxt.toString().isEmpty()) {
Script script = new Script(ScriptType.INLINE, "painless",
scriptTxt.toString(),
fieldAddParams);
UpdateRequest updateRequest = new UpdateRequest(esIndex, entityId.toString());
updateRequest.script(script);
return updateRequest;
} else {
return null;
}
}
private UpdateRequest updateTable(Table instance) {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("description", instance.getDescription());
Set<String> tags = new HashSet<>();
List<String> columnDescriptions = new ArrayList<>();
List<String> columnNames = new ArrayList<>();
if (instance.getTags() != null) {
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (instance.getColumns() != null) {
List<FlattenColumn> cols = new ArrayList<>();
cols = parseColumns(instance.getColumns(), cols, null);
for (FlattenColumn col : cols) {
if (col.getTags() != null) {
tags.addAll(col.getTags());
}
columnDescriptions.add(col.getDescription());
columnNames.add(col.getName());
}
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>(tags);
String tierTag = null;
for (String tag: tagsList) {
if (tag.toLowerCase().matches("(.*)tier(.*)")) {
tierTag = tag;
break;
}
}
if (tierTag != null) {
tagsList.remove(tierTag);
jsonMap.put("tier", tierTag);
}
jsonMap.put("tags", tagsList);
} else {
jsonMap.put("tags", tags);
}
if (!columnNames.isEmpty()) {
jsonMap.put("column_names", columnNames);
}
if (!columnDescriptions.isEmpty()) {
jsonMap.put("column_descriptions", columnDescriptions);
}
if(instance.getOwner() != null) {
jsonMap.put("owner", instance.getOwner().getId().toString());
}
if (instance.getFollowers() != null) {
List<String> followers = new ArrayList<>();
for(EntityReference follower: instance.getFollowers()) {
followers.add(follower.getId().toString());
}
jsonMap.put("followers", followers);
}
jsonMap.put("last_updated_timestamp", System.currentTimeMillis());
UpdateRequest updateRequest = new UpdateRequest("table_search_index", instance.getId().toString());
updateRequest.doc(jsonMap);
return updateRequest;
}
private UpdateRequest updateTopic(Topic instance) {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("description", instance.getDescription());
Set<String> tags = new HashSet<>();
if (instance.getTags() != null) {
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>(tags);
String tierTag = null;
for (String tag: tagsList) {
if (tag.toLowerCase().matches("(.*)tier(.*)")) {
tierTag = tag;
break;
}
}
if (tierTag != null) {
tagsList.remove(tierTag);
jsonMap.put("tier", tierTag);
}
jsonMap.put("tags", tagsList);
} else {
jsonMap.put("tags", tags);
}
if(instance.getOwner() != null) {
jsonMap.put("owner", instance.getOwner().getId().toString());
}
if (instance.getFollowers() != null) {
List<String> followers = new ArrayList<>();
for(EntityReference follower: instance.getFollowers()) {
followers.add(follower.getId().toString());
}
jsonMap.put("followers", followers);
}
jsonMap.put("last_updated_timestamp", System.currentTimeMillis());
UpdateRequest updateRequest = new UpdateRequest("topic_search_index", instance.getId().toString());
updateRequest.doc(jsonMap);
return updateRequest;
}
private UpdateRequest updateDashboard(Dashboard instance) {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("description", instance.getDescription());
Set<String> tags = new HashSet<>();
if (instance.getTags() != null) {
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>(tags);
String tierTag = null;
for (String tag: tagsList) {
if (tag.toLowerCase().matches("(.*)tier(.*)")) {
tierTag = tag;
break;
}
}
if (tierTag != null) {
tagsList.remove(tierTag);
jsonMap.put("tier", tierTag);
}
jsonMap.put("tags", tagsList);
} else {
jsonMap.put("tags", tags);
}
if(instance.getOwner() != null) {
jsonMap.put("owner", instance.getOwner().getId().toString());
}
if (instance.getFollowers() != null) {
List<String> followers = new ArrayList<>();
for(EntityReference follower: instance.getFollowers()) {
followers.add(follower.getId().toString());
}
jsonMap.put("followers", followers);
}
jsonMap.put("last_updated_timestamp", System.currentTimeMillis());
UpdateRequest updateRequest = new UpdateRequest("dashboard_search_index", instance.getId().toString());
updateRequest.doc(jsonMap);
return updateRequest;
}
private UpdateRequest updatePipeline(Pipeline instance) {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("description", instance.getDescription());
Set<String> tags = new HashSet<>();
if (instance.getTags() != null) {
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>(tags);
String tierTag = null;
for (String tag: tagsList) {
if (tag.toLowerCase().matches("(.*)tier(.*)")) {
tierTag = tag;
break;
}
}
if (tierTag != null) {
tagsList.remove(tierTag);
jsonMap.put("tier", tierTag);
}
jsonMap.put("tags", tagsList);
} else {
jsonMap.put("tags", tags);
}
if(instance.getOwner() != null) {
jsonMap.put("owner", instance.getOwner().getId().toString());
}
if (instance.getFollowers() != null) {
List<String> followers = new ArrayList<>();
for(EntityReference follower: instance.getFollowers()) {
followers.add(follower.getId().toString());
}
jsonMap.put("followers", followers);
}
jsonMap.put("last_updated_timestamp", System.currentTimeMillis());
UpdateRequest updateRequest = new UpdateRequest("pipeline_search_index", instance.getId().toString());
updateRequest.doc(jsonMap);
return updateRequest;
}
private List<FlattenColumn> parseColumns(List<Column> columns, List<FlattenColumn> flattenColumns,
String parentColumn) {
Optional<String> optParentColumn = Optional.ofNullable(parentColumn).filter(Predicate.not(String::isEmpty));
List<String> tags = new ArrayList<>();
for (Column col: columns) {
String columnName = col.getName();
if (optParentColumn.isPresent()) {
columnName = optParentColumn.get() + "." + columnName;
}
if (col.getTags() != null) {
tags = col.getTags().stream().map(TagLabel::getTagFQN).collect(Collectors.toList());
}
FlattenColumn flattenColumn = FlattenColumn.builder()
.name(columnName)
.description(col.getDescription()).build();
if (!tags.isEmpty()) {
flattenColumn.tags = tags;
}
flattenColumns.add(flattenColumn);
if (col.getChildren() != null) {
parseColumns(col.getChildren(), flattenColumns, col.getName());
}
}
return flattenColumns;
}
private String getESIndex(String type) {
if (type.toLowerCase().equals("table")) {
return "table_search_index";
} else if (type.equalsIgnoreCase("dashboard")) {
return "dashboard_search_index";
} else if (type.equalsIgnoreCase("pipeline")) {
return "pipeline_search_index";
} else if (type.equalsIgnoreCase("topic")) {
return "topic_search_index";
}
throw new RuntimeException("Failed to find index doc for type {}".format(type));
}
@Getter
@Builder
public static class FlattenColumn {
String name;
String description;
List<String> tags;
}
public void close() {
try {
this.client.close();
} catch (Exception e) {
LOG.error("Failed to close elastic search", e);
}
}
}

View File

@ -141,6 +141,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
dashboard.setFullyQualifiedName(getFQN(dashboard));
EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), dashboard.getOwner()); // Validate owner
dashboard.setTags(EntityUtil.addDerivedTags(dao.tagDAO(), dashboard.getTags()));
dashboard.setCharts(getCharts(dashboard.getCharts()));
}
@Override
@ -221,7 +222,26 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
return charts.isEmpty() ? null : charts;
}
public void updateCharts(Dashboard original, Dashboard updated, EntityUpdater updater) throws JsonProcessingException {
/**
This method is used to populate the dashboard entity with all details of Chart EntityReference
Users/Tools can send minimum details required to set relationship as id, type are the only required
fields in entity reference, whereas we need to send fully populated object such that ElasticSearch index
has all the details.
*/
private List<EntityReference> getCharts(List<EntityReference> charts) throws IOException {
if (charts == null) {
return null;
}
List<EntityReference> chartRefs = new ArrayList<>();
for (EntityReference chart: charts) {
EntityReference chartRef = dao.chartDAO().findEntityReferenceById(chart.getId());
chartRefs.add(chartRef);
}
return chartRefs.isEmpty() ? null : chartRefs;
}
public void updateCharts(Dashboard original, Dashboard updated, EntityUpdater updater)
throws JsonProcessingException {
String dashboardId = updated.getId().toString();
// Remove all charts associated with this dashboard
@ -302,7 +322,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
public Dashboard getEntity() { return entity; }
@Override
public void setId(UUID id) { entity.setId(id);}
public void setId(UUID id) { entity.setId(id); }
@Override
public void setDescription(String description) {

View File

@ -207,8 +207,8 @@ public class TableRepository extends EntityRepository<Table> {
public void addQuery(UUID tableId, SQLQuery query) throws IOException {
// Validate the request content
try {
byte[] checksum = MessageDigest.getInstance("MD5").digest(query.getQuery().getBytes());
query.setChecksum(Hex.encodeHexString(checksum));
byte[] checksum = MessageDigest.getInstance("MD5").digest(query.getQuery().getBytes());
query.setChecksum(Hex.encodeHexString(checksum));
} catch(NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
@ -397,6 +397,14 @@ public class TableRepository extends EntityRepository<Table> {
return dao.databaseDAO().findEntityReferenceById(UUID.fromString(result.get(0)));
}
private EntityReference getDatabaseService(UUID databaseId) throws IOException {
// Find database for the table
EntityReference serviceRef = EntityUtil.getService(dao.relationshipDAO(), databaseId,
Entity.DATABASE_SERVICE);
serviceRef = dao.dbServiceDAO().findEntityReferenceById(serviceRef.getId());
return serviceRef;
}
private EntityReference getLocation(UUID tableId) throws IOException {
// Find the location of the table
List<String> result = dao.relationshipDAO().findTo(tableId.toString(), Relationship.HAS.ordinal(), Entity.LOCATION);
@ -773,7 +781,8 @@ public class TableRepository extends EntityRepository<Table> {
// Carry forward the user generated metadata from existing columns to new columns
for (Column updated : updatedColumns) {
// Find stored column matching name, data type and ordinal position
Column stored = origColumns.stream().filter(c -> EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null);
Column stored = origColumns.stream().filter(c ->
EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null);
if (stored == null) { // New column added
continue;
}

View File

@ -126,6 +126,7 @@ public class SearchResource {
if (sortOrderParam.equals("asc")) {
sortOrder = SortOrder.ASC;
}
if (index.equals("topic_search_index")) {
searchSourceBuilder = buildTopicSearchBuilder(query, from, size);
} else if (index.equals("dashboard_search_index")) {
@ -134,8 +135,10 @@ public class SearchResource {
searchSourceBuilder = buildPipelineSearchBuilder(query, from, size);
} else if (index.equals("dbt_model_search_index")) {
searchSourceBuilder = buildDbtModelSearchBuilder(query, from, size);
} else {
} else if (index.equals("table_search_index")) {
searchSourceBuilder = buildTableSearchBuilder(query, from, size);
} else {
searchSourceBuilder = buildAggregateSearchBuilder(query, from, size);
}
if (sortFieldParam != null && !sortFieldParam.isEmpty()) {
@ -184,6 +187,20 @@ public class SearchResource {
.build();
}
private SearchSourceBuilder buildAggregateSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
.lenient(true))
.aggregation(AggregationBuilders.terms("Service").field("service_type"))
.aggregation(AggregationBuilders.terms("ServiceCategory").field("service_category"))
.aggregation(AggregationBuilders.terms("EntityType").field("entity_type"))
.aggregation(AggregationBuilders.terms("Tier").field("tier"))
.aggregation(AggregationBuilders.terms("Tags").field("tags"))
.from(from).size(size);
return searchSourceBuilder;
}
private SearchSourceBuilder buildTableSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =

View File

@ -140,6 +140,10 @@ public final class JsonUtils {
return OBJECT_MAPPER.convertValue(o, JsonStructure.class);
}
public static Map<String, Object> getMap(Object o) {
return OBJECT_MAPPER.convertValue(o, Map.class);
}
public static <T> T readValue(String json, Class<T> clz) throws IOException {
if (json == null) {
return null;

View File

@ -0,0 +1,74 @@
{
"properties": {
"dashboard_name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "keyword"
},
"fqdn": {
"type": "keyword"
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"chart_names": {
"type":"text"
},
"chart_descriptions": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"monthly_stats":{
"type": "long"
},
"monthly_percentile_rank":{
"type": "long"
},
"weekly_stats":{
"type": "long"
},
"weekly_percentile_rank":{
"type": "long"
},
"daily_percentile_rank": {
"type": "long"
},
"daily_stats": {
"type": "long"
},
"change_descriptions": {
"type": "nested"
}
}
}

View File

@ -0,0 +1,76 @@
{
"properties": {
"dbt_model_name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"owner": {
"type": "text"
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"column_names": {
"type":"text"
},
"column_descriptions": {
"type": "text"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"database": {
"type": "text"
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "text"
},
"fieldsDeleted": {
"type": "text"
},
"fieldsUpdated": {
"type": "text"
}
}
}
}
}

View File

@ -0,0 +1,56 @@
{
"properties": {
"pipeline_name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "keyword"
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"task_names": {
"type":"text"
},
"task_descriptions": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested"
}
}
}

View File

@ -0,0 +1,77 @@
{
"properties": {
"table_name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"column_names": {
"type":"text"
},
"column_descriptions": {
"type": "text"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"database": {
"type": "text"
},
"suggest": {
"type": "completion"
},
"monthly_stats":{
"type": "long"
},
"monthly_percentile_rank":{
"type": "long"
},
"weekly_stats":{
"type": "long"
},
"weekly_percentile_rank":{
"type": "long"
},
"daily_percentile_rank": {
"type": "long"
},
"daily_stats": {
"type": "long"
},
"change_descriptions": {
"type": "nested"
}
}
}

View File

@ -0,0 +1,50 @@
{
"properties": {
"topic_name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested"
}
}
}

View File

@ -144,7 +144,8 @@ airflowConfiguration:
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"
- "org.openmetadata.catalog.events.ElasticSearchEventHandler"
- "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler"
- "org.openmetadata.catalog.events.ChangeEventHandler"
health:
delayedShutdownHandlerEnabled: true

View File

@ -118,7 +118,7 @@ elasticsearch:
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"
- "org.openmetadata.catalog.events.ElasticSearchEventHandler"
- "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler"
- "org.openmetadata.catalog.events.ChangeEventHandler"
airflowConfiguration:

View File

@ -52,7 +52,7 @@ elasticsearch:
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"
- "org.openmetadata.catalog.events.ElasticSearchEventHandler"
- "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler"
health:
delayedShutdownHandlerEnabled: true
@ -114,7 +114,7 @@ ElasticSearch is one of the pre-requisites to run OpenMetadata. Default configur
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.catalog.events.AuditEventHandler"
- "org.openmetadata.catalog.events.ElasticSearchEventHandler"
- "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler"
```
EventHandler configuration is optional. It will update the AuditLog in MySQL DB and also ElasticSearch indexes whenever any entity is updated either through UI or API interactions. We recommend you leave it there as it enhances the user experience.

View File

@ -32,7 +32,7 @@ class TableESDocument(BaseModel):
service_type: str
service_category: str
entity_type: str = "table"
table_name: str
name: str
suggest: List[dict]
description: Optional[str] = None
table_type: Optional[str] = None
@ -48,7 +48,6 @@ class TableESDocument(BaseModel):
tags: List[str]
fqdn: str
tier: Optional[str] = None
schema_description: Optional[str] = None
owner: str
followers: List[str]
@ -61,14 +60,13 @@ class TopicESDocument(BaseModel):
service_type: str
service_category: str
entity_type: str = "topic"
topic_name: str
name: str
suggest: List[dict]
description: Optional[str] = None
last_updated_timestamp: Optional[int]
tags: List[str]
fqdn: str
tier: Optional[str] = None
schema_description: Optional[str] = None
owner: str
followers: List[str]
@ -81,7 +79,7 @@ class DashboardESDocument(BaseModel):
service_type: str
service_category: str
entity_type: str = "dashboard"
dashboard_name: str
name: str
suggest: List[dict]
description: Optional[str] = None
last_updated_timestamp: Optional[int]
@ -108,7 +106,7 @@ class PipelineESDocument(BaseModel):
service_type: str
service_category: str
entity_type: str = "pipeline"
pipeline_name: str
name: str
suggest: List[dict]
description: Optional[str] = None
last_updated_timestamp: Optional[int]
@ -130,7 +128,7 @@ class DbtModelESDocument(BaseModel):
service_type: str
service_category: str
entity_type: str = "dbtmodel"
dbt_model_name: str
name: str
suggest: List[dict]
description: Optional[str] = None
dbt_type: Optional[str] = None
@ -140,7 +138,6 @@ class DbtModelESDocument(BaseModel):
tags: List[str]
fqdn: str
tier: Optional[str] = None
schema_description: Optional[str] = None
owner: str
followers: List[str]

View File

@ -235,7 +235,7 @@ class ElasticsearchSink(Sink):
service=service_entity.name,
service_type=service_entity.serviceType.name,
service_category="databaseService",
table_name=table.name.__root__,
name=table.name.__root__,
suggest=suggest,
description=table.description,
table_type=table_type,
@ -251,7 +251,6 @@ class ElasticsearchSink(Sink):
tier=tier,
tags=list(tags),
fqdn=fqdn,
schema_description=None,
owner=table_owner,
followers=table_followers,
)
@ -285,7 +284,7 @@ class ElasticsearchSink(Sink):
service=service_entity.name,
service_type=service_entity.serviceType.name,
service_category="messagingService",
topic_name=topic.name.__root__,
name=topic.name.__root__,
suggest=suggest,
description=topic.description,
last_updated_timestamp=timestamp,
@ -335,7 +334,7 @@ class ElasticsearchSink(Sink):
service=service_entity.name,
service_type=service_entity.serviceType.name,
service_category="dashboardService",
dashboard_name=dashboard.displayName,
name=dashboard.displayName,
chart_names=chart_names,
chart_descriptions=chart_descriptions,
suggest=suggest,
@ -393,7 +392,7 @@ class ElasticsearchSink(Sink):
service=service_entity.name,
service_type=service_entity.serviceType.name,
service_category="pipelineService",
pipeline_name=pipeline.displayName,
name=pipeline.displayName,
task_names=task_names,
task_descriptions=task_descriptions,
suggest=suggest,
@ -453,7 +452,7 @@ class ElasticsearchSink(Sink):
service=service_entity.name,
service_type=service_entity.serviceType.name,
service_category="databaseService",
dbt_model_name=dbt_model.name.__root__,
name=dbt_model.name.__root__,
suggest=suggest,
description=dbt_model.description,
dbt_model_type=dbt_node_type,

View File

@ -13,21 +13,12 @@ import textwrap
TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
{
"mappings":{
"properties": {
"table_name": {
"type":"text"
},
"schema": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"display_name": {
"type": "text"
},
@ -37,6 +28,9 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"followers": {
"type": "keyword"
},
"fqdn": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
@ -56,9 +50,6 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"tags": {
"type": "keyword"
},
"badges": {
"type": "text"
},
"service": {
"type": "keyword"
},
@ -96,7 +87,7 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "long"
}
}
}
}
}
"""
)
@ -109,15 +100,6 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"topic_name": {
"type":"text"
},
"schema": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"display_name": {
"type": "text"
},
@ -127,6 +109,9 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"followers": {
"type": "keyword"
},
"fqdn": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
@ -175,6 +160,9 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"owner": {
"type": "keyword"
},
"fqdn": {
"type": "keyword"
},
"followers": {
"type": "keyword"
},
@ -247,6 +235,9 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"display_name": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"owner": {
"type": "keyword"
},
@ -302,15 +293,6 @@ DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"dbt_model_name": {
"type":"text"
},
"schema": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"display_name": {
"type": "text"
},
@ -320,6 +302,9 @@ DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"followers": {
"type": "keyword"
},
"fqdn": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"

View File

@ -29,6 +29,7 @@ type SuggestionProp = {
type CommonSource = {
fqdn: string;
service_type: string;
name: string;
};
type TableSource = {
@ -185,7 +186,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => {
{tableSuggestions.map((suggestion: TableSource) => {
const fqdn = suggestion.fqdn;
const name = suggestion.table_name;
const name = suggestion.name;
const serviceType = suggestion.service_type;
return getSuggestionElement(
@ -203,7 +204,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => {
{topicSuggestions.map((suggestion: TopicSource) => {
const fqdn = suggestion.fqdn;
const name = suggestion.topic_name;
const name = suggestion.name;
const serviceType = suggestion.service_type;
return getSuggestionElement(
@ -221,7 +222,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => {
{dashboardSuggestions.map((suggestion: DashboardSource) => {
const fqdn = suggestion.fqdn;
const name = suggestion.dashboard_name;
const name = suggestion.name;
const serviceType = suggestion.service_type;
return getSuggestionElement(
@ -239,7 +240,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => {
{pipelineSuggestions.map((suggestion: PipelineSource) => {
const fqdn = suggestion.fqdn;
const name = suggestion.pipeline_name;
const name = suggestion.name;
const serviceType = suggestion.service_type;
return getSuggestionElement(
@ -257,7 +258,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => {
{DBTModelSuggestions.map((suggestion: DBTModelSource) => {
const fqdn = suggestion.fqdn;
const name = suggestion.dbt_model_name;
const name = suggestion.name;
const serviceType = suggestion.service_type;
return getSuggestionElement(

View File

@ -25,12 +25,7 @@ export const formatDataResponse = (hits) => {
hit._source.topic_id ||
hit._source.dashboard_id ||
hit._source.pipeline_id;
newData.name =
hit._source.table_name ||
hit._source.topic_name ||
hit._source.dashboard_name ||
hit._source.pipeline_name ||
hit._source.dbt_model_name;
newData.name = hit._source.name;
newData.description = hit._source.description;
newData.fullyQualifiedName = hit._source.fqdn;
newData.tableType = hit._source.table_type;