diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java new file mode 100644 index 00000000000..bdf9350cad0 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java @@ -0,0 +1,298 @@ +/* + * Copyright 2021 Collate + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.jdbi.v3.core.Jdbi; +import org.openmetadata.catalog.CatalogApplicationConfig; +import org.openmetadata.catalog.ElasticSearchConfiguration; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType; +import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.data.DbtModel; +import org.openmetadata.catalog.entity.data.Pipeline; +import org.openmetadata.catalog.entity.data.Table; +import org.openmetadata.catalog.entity.data.Topic; +import org.openmetadata.catalog.events.EventHandler; +import org.openmetadata.catalog.type.ChangeDescription; +import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.type.FieldChange; +import org.openmetadata.catalog.util.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + + +public class ElasticSearchEventHandler implements EventHandler { + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchEventHandler.class); + private RestHighLevelClient client; + private ElasticSearchIndexDefinition esIndexDefinition; + + private final ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(UpdateResponse updateResponse) { + LOG.info("Updated Elastic Search {}", updateResponse); + } + + @Override + public void onFailure(Exception e) { + LOG.error("Failed to update Elastic Search", e); + } + }; + + public void init(CatalogApplicationConfig config, Jdbi jdbi) { + ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration(); + RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), + "http")); + if(StringUtils.isNotEmpty(esConfig.getUsername())){ + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), + esConfig.getPassword())); + restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + return httpAsyncClientBuilder; + }); + } + client = new RestHighLevelClient(restClientBuilder); + esIndexDefinition = new ElasticSearchIndexDefinition(client); + esIndexDefinition.createIndexes(); + } + + + public Void process(ContainerRequestContext requestContext, + ContainerResponseContext responseContext) { + try { + LOG.info("request Context "+ requestContext.toString()); + if (responseContext.getEntity() != null) { + Object entity = responseContext.getEntity(); + UpdateRequest updateRequest = null; + String entityClass = entity.getClass().toString(); + if (entityClass.toLowerCase().endsWith(Entity.TABLE.toLowerCase())) { + boolean exists = + esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.TABLE_SEARCH_INDEX); + if (exists) { + Table instance = (Table) entity; + updateRequest = updateTable(instance, responseContext); + } + } else if (entityClass.toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) { + boolean exists = + esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX); + if (exists) { + Dashboard instance = (Dashboard) entity; + updateRequest = updateDashboard(instance, responseContext); + } + } else if (entityClass.toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) { + boolean exists = + esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.TOPIC_SEARCH_INDEX); + if (exists) { + Topic instance = (Topic) entity; + updateRequest = updateTopic(instance, responseContext); + } + } else if (entityClass.toLowerCase().endsWith(Entity.PIPELINE.toLowerCase())) { + boolean exists = + esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX); + if (exists) { + Pipeline instance = (Pipeline) entity; + updateRequest = updatePipeline(instance, responseContext); + } + } else if (entityClass.toLowerCase().endsWith(Entity.DBTMODEL.toLowerCase())) { + boolean exists = + esIndexDefinition.checkIndexExistsOrCreate(ElasticSearchIndexType.DBT_MODEL_SEARCH_INDEX); + if (exists) { + DbtModel instance = (DbtModel) entity; + updateRequest = updateDbtModel(instance, responseContext); + } + } else if (entityClass.toLowerCase().equalsIgnoreCase(ChangeEvent.class.toString())) { + ChangeEvent changeEvent = (ChangeEvent) entity; + updateRequest = applyChangeEvent(changeEvent); + } + if (updateRequest != null) { + client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener); + } + } + } catch (Exception e) { + LOG.error("failed to update ES doc", e); + } + return null; + } + + private UpdateRequest applyChangeEvent(ChangeEvent event) { + String entityType = event.getEntityType(); + ElasticSearchIndexType esIndexType = esIndexDefinition.getIndexMappingByEntityType(entityType); + UUID entityId = event.getEntityId(); + ChangeDescription changeDescription = event.getChangeDescription(); + List fieldsAdded = changeDescription.getFieldsAdded(); + StringBuilder scriptTxt = new StringBuilder(); + Map fieldAddParams = new HashMap<>(); + for (FieldChange fieldChange: fieldsAdded) { + if (fieldChange.getName().equalsIgnoreCase("followers")) { + List entityReferences = (List) fieldChange.getNewValue(); + List newFollowers = new ArrayList<>(); + for (EntityReference follower : entityReferences) { + newFollowers.add(follower.getId().toString()); + } + fieldAddParams.put(fieldChange.getName(), newFollowers); + scriptTxt.append("ctx._source.followers.addAll(params.followers);"); + } + } + + for (FieldChange fieldChange: changeDescription.getFieldsDeleted()) { + if (fieldChange.getName().equalsIgnoreCase("followers")) { + List entityReferences = (List) fieldChange.getOldValue(); + for (EntityReference follower : entityReferences) { + fieldAddParams.put(fieldChange.getName(), follower.getId().toString()); + } + + scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers))"); + } + } + + if (!scriptTxt.toString().isEmpty()) { + Script script = new Script(ScriptType.INLINE, "painless", + scriptTxt.toString(), + fieldAddParams); + UpdateRequest updateRequest = new UpdateRequest(esIndexType.indexName, entityId.toString()); + updateRequest.script(script); + return updateRequest; + } else { + return null; + } + } + + private UpdateRequest updateTable(Table instance, ContainerResponseContext responseContext) + throws JsonProcessingException { + int responseCode = responseContext.getStatus(); + TableESIndex tableESIndex = TableESIndex.builder(instance, responseCode).build(); + UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, + instance.getId().toString()); + if (responseCode != Response.Status.CREATED.getStatusCode()) { + scriptedUpsert(tableESIndex, updateRequest); + } else { + String json = JsonUtils.pojoToJson(tableESIndex); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + } + return updateRequest; + } + + private UpdateRequest updateTopic(Topic instance, ContainerResponseContext responseContext) + throws JsonProcessingException { + int responseCode = responseContext.getStatus(); + TopicESIndex topicESIndex = TopicESIndex.builder(instance, responseCode).build(); + UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, + instance.getId().toString()); + if (responseCode != Response.Status.CREATED.getStatusCode()) { + scriptedUpsert(topicESIndex, updateRequest); + } else { + //only upsert if its a new entity + updateRequest.doc(JsonUtils.pojoToJson(topicESIndex), XContentType.JSON); + updateRequest.docAsUpsert(true); + } + return updateRequest; + } + + private UpdateRequest updateDashboard(Dashboard instance, ContainerResponseContext responseContext) + throws JsonProcessingException { + int responseCode = responseContext.getStatus(); + DashboardESIndex dashboardESIndex = DashboardESIndex.builder(instance, responseCode).build(); + UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, + instance.getId().toString()); + if (responseCode != Response.Status.CREATED.getStatusCode()) { + scriptedUpsert(dashboardESIndex, updateRequest); + } else { + updateRequest.doc(JsonUtils.pojoToJson(dashboardESIndex), XContentType.JSON); + updateRequest.docAsUpsert(true); + } + return updateRequest; + } + + private UpdateRequest updatePipeline(Pipeline instance, ContainerResponseContext responseContext) + throws JsonProcessingException { + int responseCode = responseContext.getStatus(); + PipelineESIndex pipelineESIndex = PipelineESIndex.builder(instance, responseCode).build(); + UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName, + instance.getId().toString()); + if (responseCode != Response.Status.CREATED.getStatusCode()) { + scriptedUpsert(pipelineESIndex, updateRequest); + } else { + //only upsert if it's a new entity + updateRequest.doc(JsonUtils.pojoToJson(pipelineESIndex), XContentType.JSON); + updateRequest.docAsUpsert(true); + } + return updateRequest; + } + + private UpdateRequest updateDbtModel(DbtModel instance, ContainerResponseContext responseContext) + throws JsonProcessingException { + int responseCode = responseContext.getStatus(); + DbtModelESIndex dbtModelESIndex = DbtModelESIndex.builder(instance, responseCode).build(); + UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexType.DBT_MODEL_SEARCH_INDEX.indexName, + instance.getId().toString()); + //only append to changeDescriptions on updates + if (responseCode != Response.Status.CREATED.getStatusCode()) { + scriptedUpsert(dbtModelESIndex, updateRequest); + } else { + updateRequest.doc(JsonUtils.pojoToJson(dbtModelESIndex), XContentType.JSON); + updateRequest.docAsUpsert(true); + } + return updateRequest; + } + + private void scriptedUpsert(Object index, UpdateRequest updateRequest) { + String scriptTxt= "for (k in params.keySet()) {if (k == 'change_descriptions') " + + "{ ctx._source.change_descriptions.addAll(params.change_descriptions) } " + + "else { ctx._source.put(k, params.get(k)) }}"; + Map doc = JsonUtils.getMap(index); + Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, + doc); + updateRequest.script(script); + updateRequest.scriptedUpsert(true); + } + + public void close() { + try { + this.client.close(); + } catch (Exception e) { + LOG.error("Failed to close elastic search", e); + } + } + +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java new file mode 100644 index 00000000000..e72fd7a2619 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java @@ -0,0 +1,684 @@ +package org.openmetadata.catalog.elasticsearch; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.xcontent.XContentType; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.data.DbtModel; +import org.openmetadata.catalog.entity.data.Pipeline; +import org.openmetadata.catalog.entity.data.Table; +import org.openmetadata.catalog.entity.data.Topic; +import org.openmetadata.catalog.type.Column; +import org.openmetadata.catalog.type.EntityReference; +import org.openmetadata.catalog.type.FieldChange; +import org.openmetadata.catalog.type.TagLabel; +import org.openmetadata.catalog.type.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class ElasticSearchIndexDefinition { + Map elasticSearchIndexes = new HashMap<>(); + private final RestHighLevelClient client; + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexDefinition.class); + + public ElasticSearchIndexDefinition(RestHighLevelClient client) { + this.client = client; + for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) { + elasticSearchIndexes.put(elasticSearchIndexType, ElasticSearchIndexStatus.NOT_CREATED); + } + } + + public enum ElasticSearchIndexStatus { + CREATED, + NOT_CREATED, + FAILED + } + + public enum ElasticSearchIndexType { + TABLE_SEARCH_INDEX("table_search_index", "elasticsearch/table_index_mapping.json"), + TOPIC_SEARCH_INDEX("topic_search_index", "elasticsearch/topic_index_mapping.json"), + DASHBOARD_SEARCH_INDEX("dashboard_search_index", "elasticsearch/dashboard_index_mapping.json"), + PIPELINE_SEARCH_INDEX("pipeline_search_index", "elasticsearch/pipeline_index_mapping.json"), + DBT_MODEL_SEARCH_INDEX("dbt_model_search_index", "elasticsearch/dbt_index_mapping.json"); + + public final String indexName; + public final String indexMappingFile; + + ElasticSearchIndexType(String indexName, String indexMappingFile) { + this.indexName = indexName; + this.indexMappingFile = indexMappingFile; + } + } + + public void createIndexes() { + try { + for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) { + createIndex(elasticSearchIndexType); + } + } catch(Exception e) { + LOG.error("Failed to created Elastic Search indexes due to", e); + } + } + + public boolean checkIndexExistsOrCreate(ElasticSearchIndexType indexType) { + boolean exists = elasticSearchIndexes.get(indexType) == ElasticSearchIndexStatus.CREATED; + if (!exists) { + exists = createIndex(indexType); + } + return exists; + } + + private boolean createIndex(ElasticSearchIndexType elasticSearchIndexType) { + try { + GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName); + gRequest.local(false); + boolean exists = client.indices().exists(gRequest, RequestOptions.DEFAULT); + if (!exists) { + String elasticSearchIndexMapping = getIndexMapping(elasticSearchIndexType); + CreateIndexRequest request = new CreateIndexRequest(elasticSearchIndexType.indexName); + request.mapping(elasticSearchIndexMapping, XContentType.JSON); + CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); + LOG.info(elasticSearchIndexType.indexName + " Created " + createIndexResponse.isAcknowledged()); + } + setIndexStatus(elasticSearchIndexType, ElasticSearchIndexStatus.CREATED); + } catch(Exception e) { + setIndexStatus(elasticSearchIndexType, ElasticSearchIndexStatus.FAILED); + LOG.error("Failed to created Elastic Search indexes due to", e); + return false; + } + return true; + } + + private void setIndexStatus(ElasticSearchIndexType indexType, ElasticSearchIndexStatus elasticSearchIndexStatus) { + elasticSearchIndexes.put(indexType, elasticSearchIndexStatus); + } + + public String getIndexMapping(ElasticSearchIndexType elasticSearchIndexType) + throws URISyntaxException, IOException { + URL resource = ElasticSearchIndexDefinition.class + .getClassLoader().getResource(elasticSearchIndexType.indexMappingFile); + Path path = Paths.get(resource.toURI()); + return new String(Files.readAllBytes(path)); + } + + public ElasticSearchIndexType getIndexMappingByEntityType(String type) { + if (type.equalsIgnoreCase(Entity.TABLE)) { + return ElasticSearchIndexType.TABLE_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.DASHBOARD)) { + return ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.PIPELINE)) { + return ElasticSearchIndexType.PIPELINE_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.TOPIC)) { + return ElasticSearchIndexType.TOPIC_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.DBTMODEL)) { + return ElasticSearchIndexType.DBT_MODEL_SEARCH_INDEX; + } + throw new RuntimeException("Failed to find index doc for type {}".format(type)); + } + +} + +@SuperBuilder +@Data +class ElasticSearchIndex { + @JsonProperty("display_name") + String displayName; + String fqdn; + String service; + @JsonProperty("service_type") + String serviceType; + @JsonProperty("service_category") + String serviceCategory; + @JsonProperty("entity_type") + String entityType; + List suggest; + String description; + String tier; + List tags; + String owner; + List followers; + @JsonProperty("last_updated_timestamp") + @Builder.Default + Long lastUpdatedTimestamp = System.currentTimeMillis(); + @JsonProperty("change_descriptions") + List changeDescriptions; + + + public void parseTags(List tags) { + if (!tags.isEmpty()) { + List tagsList = new ArrayList<>(tags); + String tierTag = null; + for (String tag : tagsList) { + if (tag.toLowerCase().matches("(.*)tier(.*)")) { + tierTag = tag; + break; + } + } + if (tierTag != null) { + tagsList.remove(tierTag); + this.tier = tierTag; + } + this.tags = tagsList; + } else { + this.tags = tags; + } + } +} + +@Getter +@Builder +class ElasticSearchSuggest { + String input; + Integer weight; +} + +@Getter +@Builder +class FlattenColumn { + String name; + String description; + List tags; +} + +@Getter +@Setter +@Builder +class ESChangeDescription { + String updatedBy; + Long updatedAt; + List fieldsAdded; + List fieldsUpdated; + List fieldsDeleted; +} + +@EqualsAndHashCode(callSuper = true) +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class TableESIndex extends ElasticSearchIndex { + @JsonProperty("table_name") + String tableName; + @JsonProperty("table_id") + String tableId; + String database; + @JsonProperty("table_type") + String tableType; + @JsonProperty("column_names") + List columnNames; + @JsonProperty("column_descriptions") + List columnDescriptions; + @JsonProperty("monthly_stats") + Integer monthlyStats; + @JsonProperty("monthly_percentile_rank") + Integer monthlyPercentileRank; + @JsonProperty("weekly_stats") + Integer weeklyStats; + @JsonProperty("weekly_percentile_rank") + Integer weeklyPercentileRank; + @JsonProperty("daily_stats") + Integer dailyStats; + @JsonProperty("daily_percentile_rank") + Integer dailyPercentileRank; + + + public static TableESIndexBuilder builder(Table table, int responseCode) throws JsonProcessingException { + String tableId = table.getId().toString(); + String tableName = table.getName(); + String description = table.getDescription(); + List tags = new ArrayList<>(); + List columnNames = new ArrayList<>(); + List columnDescriptions = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(table.getFullyQualifiedName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(table.getName()).weight(10).build()); + + if (table.getTags() != null) { + table.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + if (table.getColumns() != null) { + List cols = new ArrayList<>(); + cols = parseColumns(table.getColumns(), cols, null); + + for (FlattenColumn col : cols) { + if (col.getTags() != null) { + tags.addAll(col.getTags()); + } + columnDescriptions.add(col.getDescription()); + columnNames.add(col.getName()); + } + } + TableESIndexBuilder tableESIndexBuilder = internalBuilder().tableId(tableId) + .tableName(tableName) + .displayName(tableName) + .description(description) + .fqdn(table.getFullyQualifiedName()) + .suggest(suggest) + .entityType("table") + .serviceCategory("databaseService") + .columnNames(columnNames) + .columnDescriptions(columnDescriptions) + .tableType(table.getTableType().toString()) + .tags(tags); + + if (table.getDatabase() != null) { + tableESIndexBuilder.database(table.getDatabase().getName()); + } + + if (table.getService() != null) { + tableESIndexBuilder.service(table.getService().getName()); + tableESIndexBuilder.serviceType(table.getServiceType().toString()); + } + + if (table.getUsageSummary() != null) { + tableESIndexBuilder.weeklyStats(table.getUsageSummary().getWeeklyStats().getCount()) + .weeklyPercentileRank(table.getUsageSummary().getWeeklyStats().getPercentileRank().intValue()) + .dailyStats(table.getUsageSummary().getDailyStats().getCount()) + .dailyPercentileRank(table.getUsageSummary().getDailyStats().getPercentileRank().intValue()) + .monthlyStats(table.getUsageSummary().getMonthlyStats().getCount()) + .monthlyPercentileRank(table.getUsageSummary().getMonthlyStats().getPercentileRank().intValue()); + } + if (table.getFollowers() != null) { + tableESIndexBuilder.followers(table.getFollowers().stream().map(item -> + item.getId().toString()).collect(Collectors.toList())); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + tableESIndexBuilder.followers(Collections.emptyList()); + } + + if (table.getOwner() != null) { + tableESIndexBuilder.owner(table.getOwner().getId().toString()); + } + + ESChangeDescription esChangeDescription = null; + + if (table.getChangeDescription() != null) { + esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime()) + .updatedBy(table.getUpdatedBy()).build(); + esChangeDescription.setFieldsAdded(table.getChangeDescription().getFieldsAdded()); + esChangeDescription.setFieldsDeleted(table.getChangeDescription().getFieldsDeleted()); + esChangeDescription.setFieldsUpdated(table.getChangeDescription().getFieldsUpdated()); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime()) + .updatedBy(table.getUpdatedBy()).build(); + } + tableESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null); + return tableESIndexBuilder; + } + + + private static List parseColumns(List columns, List flattenColumns, + String parentColumn) { + Optional optParentColumn = Optional.ofNullable(parentColumn).filter(Predicate.not(String::isEmpty)); + List tags = new ArrayList<>(); + for (Column col: columns) { + String columnName = col.getName(); + if (optParentColumn.isPresent()) { + columnName = optParentColumn.get() + "." + columnName; + } + if (col.getTags() != null) { + tags = col.getTags().stream().map(TagLabel::getTagFQN).collect(Collectors.toList()); + } + + FlattenColumn flattenColumn = FlattenColumn.builder() + .name(columnName) + .description(col.getDescription()).build(); + + if (!tags.isEmpty()) { + flattenColumn.tags = tags; + } + flattenColumns.add(flattenColumn); + if (col.getChildren() != null) { + parseColumns(col.getChildren(), flattenColumns, col.getName()); + } + } + return flattenColumns; + } +} + + +@EqualsAndHashCode(callSuper = true) +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class TopicESIndex extends ElasticSearchIndex { + @JsonProperty("topic_name") + String topicName; + @JsonProperty("topic_id") + String topicId; + + public static TopicESIndexBuilder builder(Topic topic, int responseCode) throws JsonProcessingException { + List tags = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(topic.getFullyQualifiedName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(topic.getName()).weight(10).build()); + + if (topic.getTags() != null) { + topic.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + TopicESIndexBuilder topicESIndexBuilder = internalBuilder().topicId(topic.getId().toString()) + .topicName(topic.getName()) + .displayName(topic.getDisplayName()) + .description(topic.getDescription()) + .fqdn(topic.getFullyQualifiedName()) + .suggest(suggest) + .service(topic.getService().getName()) + .serviceType(topic.getServiceType().toString()) + .serviceCategory("messagingService") + .entityType("topic") + .tags(tags); + + if (topic.getFollowers() != null) { + topicESIndexBuilder.followers(topic.getFollowers().stream().map(item -> + item.getId().toString()).collect(Collectors.toList())); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + topicESIndexBuilder.followers(Collections.emptyList()); + } + + if (topic.getOwner() != null) { + topicESIndexBuilder.owner(topic.getOwner().getId().toString()); + } + + ESChangeDescription esChangeDescription = null; + + if (topic.getChangeDescription() != null) { + esChangeDescription = ESChangeDescription.builder().updatedAt(topic.getUpdatedAt().getTime()) + .updatedBy(topic.getUpdatedBy()).build(); + esChangeDescription.setFieldsAdded(topic.getChangeDescription().getFieldsAdded()); + esChangeDescription.setFieldsDeleted(topic.getChangeDescription().getFieldsDeleted()); + esChangeDescription.setFieldsUpdated(topic.getChangeDescription().getFieldsUpdated()); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + esChangeDescription = ESChangeDescription.builder().updatedAt(topic.getUpdatedAt().getTime()) + .updatedBy(topic.getUpdatedBy()).build(); + } + + topicESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null); + + return topicESIndexBuilder; + } +} + + +@EqualsAndHashCode(callSuper = true) +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class DashboardESIndex extends ElasticSearchIndex { + @JsonProperty("dashboard_name") + String dashboardName; + @JsonProperty("dashboard_id") + String dashboardId; + @JsonProperty("chart_names") + List chartNames; + @JsonProperty("chart_descriptions") + List chartDescriptions; + @JsonProperty("monthly_stats") + Integer monthlyStats; + @JsonProperty("monthly_percentile_rank") + Integer monthlyPercentileRank; + @JsonProperty("weekly_stats") + Integer weeklyStats; + @JsonProperty("weekly_percentile_rank") + Integer weeklyPercentileRank; + @JsonProperty("daily_stats") + Integer dailyStats; + @JsonProperty("daily_percentile_rank") + Integer dailyPercentileRank; + + public static DashboardESIndexBuilder builder(Dashboard dashboard, int responseCode) throws JsonProcessingException { + List tags = new ArrayList<>(); + List chartNames = new ArrayList<>(); + List chartDescriptions = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(dashboard.getFullyQualifiedName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(dashboard.getDisplayName()).weight(10).build()); + + if (dashboard.getTags() != null) { + dashboard.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + for (EntityReference chart: dashboard.getCharts()) { + chartNames.add(chart.getDisplayName()); + chartDescriptions.add(chart.getDescription()); + } + + DashboardESIndexBuilder dashboardESIndexBuilder = internalBuilder().dashboardId(dashboard.getId().toString()) + .dashboardName(dashboard.getDisplayName()) + .displayName(dashboard.getDisplayName()) + .description(dashboard.getDescription()) + .fqdn(dashboard.getFullyQualifiedName()) + .chartNames(chartNames) + .chartDescriptions(chartDescriptions) + .entityType("dashboard") + .suggest(suggest) + .service(dashboard.getService().getName()) + .serviceType(dashboard.getServiceType().toString()) + .serviceCategory("dashboardService") + .tags(tags); + + if (dashboard.getUsageSummary() != null) { + dashboardESIndexBuilder.weeklyStats(dashboard.getUsageSummary().getWeeklyStats().getCount()) + .weeklyPercentileRank(dashboard.getUsageSummary().getWeeklyStats().getPercentileRank().intValue()) + .dailyStats(dashboard.getUsageSummary().getDailyStats().getCount()) + .dailyPercentileRank(dashboard.getUsageSummary().getDailyStats().getPercentileRank().intValue()) + .monthlyStats(dashboard.getUsageSummary().getMonthlyStats().getCount()) + .monthlyPercentileRank(dashboard.getUsageSummary().getMonthlyStats().getPercentileRank().intValue()); + } + + if (dashboard.getFollowers() != null) { + dashboardESIndexBuilder.followers(dashboard.getFollowers().stream().map(item -> + item.getId().toString()).collect(Collectors.toList())); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + dashboardESIndexBuilder.followers(Collections.emptyList()); + } + if (dashboard.getOwner() != null) { + dashboardESIndexBuilder.owner(dashboard.getOwner().getId().toString()); + } + ESChangeDescription esChangeDescription = null; + if (dashboard.getChangeDescription() != null) { + esChangeDescription = ESChangeDescription.builder() + .updatedAt(dashboard.getUpdatedAt().getTime()) + .updatedBy(dashboard.getUpdatedBy()).build(); + esChangeDescription.setFieldsAdded(dashboard.getChangeDescription().getFieldsAdded()); + esChangeDescription.setFieldsDeleted(dashboard.getChangeDescription().getFieldsDeleted()); + esChangeDescription.setFieldsUpdated(dashboard.getChangeDescription().getFieldsUpdated()); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + esChangeDescription = ESChangeDescription.builder() + .updatedAt(dashboard.getUpdatedAt().getTime()) + .updatedBy(dashboard.getUpdatedBy()).build(); + } + dashboardESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null); + return dashboardESIndexBuilder; + } +} + + +@EqualsAndHashCode(callSuper = true) +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class PipelineESIndex extends ElasticSearchIndex { + @JsonProperty("pipeline_name") + String pipelineName; + @JsonProperty("pipeine_id") + String pipelineId; + @JsonProperty("task_names") + List taskNames; + @JsonProperty("task_descriptions") + List taskDescriptions; + + public static PipelineESIndexBuilder builder(Pipeline pipeline, int responseCode) throws JsonProcessingException { + List tags = new ArrayList<>(); + List taskNames = new ArrayList<>(); + List taskDescriptions = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(pipeline.getFullyQualifiedName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(pipeline.getDisplayName()).weight(10).build()); + + if (pipeline.getTags() != null) { + pipeline.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + for (Task task: pipeline.getTasks()) { + taskNames.add(task.getDisplayName()); + taskDescriptions.add(task.getDescription()); + } + + PipelineESIndexBuilder pipelineESIndexBuilder = internalBuilder().pipelineId(pipeline.getId().toString()) + .pipelineName(pipeline.getDisplayName()) + .displayName(pipeline.getDisplayName()) + .description(pipeline.getDescription()) + .fqdn(pipeline.getFullyQualifiedName()) + .taskNames(taskNames) + .taskDescriptions(taskDescriptions) + .entityType("pipeline") + .suggest(suggest) + .service(pipeline.getService().getName()) + .serviceType(pipeline.getServiceType().toString()) + .serviceCategory("pipelineService") + .tags(tags); + + if (pipeline.getFollowers() != null) { + pipelineESIndexBuilder.followers(pipeline.getFollowers().stream().map(item -> + item.getId().toString()).collect(Collectors.toList())); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + pipelineESIndexBuilder.followers(Collections.emptyList()); + } + + if (pipeline.getOwner() != null) { + pipelineESIndexBuilder.owner(pipeline.getOwner().getId().toString()); + } + + ESChangeDescription esChangeDescription = null; + if (pipeline.getChangeDescription() != null) { + esChangeDescription = ESChangeDescription.builder() + .updatedAt(pipeline.getUpdatedAt().getTime()) + .updatedBy(pipeline.getUpdatedBy()).build(); + esChangeDescription.setFieldsAdded(pipeline.getChangeDescription().getFieldsAdded()); + esChangeDescription.setFieldsDeleted(pipeline.getChangeDescription().getFieldsDeleted()); + esChangeDescription.setFieldsUpdated(pipeline.getChangeDescription().getFieldsUpdated()); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + esChangeDescription = ESChangeDescription.builder() + .updatedAt(pipeline.getUpdatedAt().getTime()) + .updatedBy(pipeline.getUpdatedBy()).build(); + } + pipelineESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null); + return pipelineESIndexBuilder; + } +} + + +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class DbtModelESIndex extends ElasticSearchIndex { + @JsonProperty("dbt_model_name") + String dbtModelName; + @JsonProperty("dbt_model_id") + String dbtModelId; + @JsonProperty("column_names") + List columnNames; + @JsonProperty("column_descriptions") + List columnDescriptions; + String database; + @JsonProperty("node_type") + String nodeType; + + public static DbtModelESIndexBuilder builder(DbtModel dbtModel, int responseCode) throws JsonProcessingException { + List tags = new ArrayList<>(); + List columnNames = new ArrayList<>(); + List columnDescriptions = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(dbtModel.getFullyQualifiedName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(dbtModel.getName()).weight(10).build()); + + if (dbtModel.getTags() != null) { + dbtModel.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + for (Column column: dbtModel.getColumns()) { + columnNames.add(column.getName()); + columnDescriptions.add(column.getDescription()); + } + + DbtModelESIndexBuilder dbtModelESIndexBuilder = internalBuilder().dbtModelId(dbtModel.getId().toString()) + .dbtModelName(dbtModel.getName()) + .displayName(dbtModel.getName()) + .description(dbtModel.getDescription()) + .fqdn(dbtModel.getFullyQualifiedName()) + .columnNames(columnNames) + .columnDescriptions(columnDescriptions) + .entityType("dbtmodel") + .suggest(suggest) + .serviceCategory("databaseService") + .nodeType(dbtModel.getDbtNodeType().toString()) + .database(dbtModel.getDatabase().getName()) + .tags(tags); + + + if (dbtModel.getFollowers() != null) { + dbtModelESIndexBuilder.followers(dbtModel.getFollowers().stream().map(item -> + item.getId().toString()).collect(Collectors.toList())); + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + dbtModelESIndexBuilder.followers(Collections.emptyList()); + } + if (dbtModel.getOwner() != null) { + dbtModelESIndexBuilder.owner(dbtModel.getOwner().getId().toString()); + } + ESChangeDescription esChangeDescription = null; + if (dbtModel.getChangeDescription() != null) { + esChangeDescription = ESChangeDescription.builder() + .updatedAt(dbtModel.getUpdatedAt().getTime()) + .updatedBy(dbtModel.getUpdatedBy()).build(); + esChangeDescription.setFieldsAdded(dbtModel.getChangeDescription().getFieldsAdded()); + esChangeDescription.setFieldsDeleted(dbtModel.getChangeDescription().getFieldsDeleted()); + esChangeDescription.setFieldsUpdated(dbtModel.getChangeDescription().getFieldsUpdated()); + + } else if (responseCode == Response.Status.CREATED.getStatusCode()) { + // add an entry when the entity gets created first-time + esChangeDescription = ESChangeDescription.builder() + .updatedAt(dbtModel.getUpdatedAt().getTime()) + .updatedBy(dbtModel.getUpdatedBy()).build(); + } + + dbtModelESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription): null); + return dbtModelESIndexBuilder; + } +} + + diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java deleted file mode 100644 index 3800f2e0ed7..00000000000 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ElasticSearchEventHandler.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Copyright 2021 Collate - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openmetadata.catalog.events; - -import lombok.Builder; -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptType; -import org.jdbi.v3.core.Jdbi; -import org.openmetadata.catalog.CatalogApplicationConfig; -import org.openmetadata.catalog.ElasticSearchConfiguration; -import org.openmetadata.catalog.Entity; -import org.openmetadata.catalog.entity.data.Dashboard; -import org.openmetadata.catalog.entity.data.Pipeline; -import org.openmetadata.catalog.entity.data.Table; -import org.openmetadata.catalog.entity.data.Topic; -import org.openmetadata.catalog.type.ChangeDescription; -import org.openmetadata.catalog.type.ChangeEvent; -import org.openmetadata.catalog.type.Column; -import org.openmetadata.catalog.type.EntityReference; -import org.openmetadata.catalog.type.FieldChange; -import org.openmetadata.catalog.type.TagLabel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.container.ContainerRequestContext; -import javax.ws.rs.container.ContainerResponseContext; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Predicate; -import java.util.stream.Collectors; - - -public class ElasticSearchEventHandler implements EventHandler { - private static final Logger LOG = LoggerFactory.getLogger(AuditEventHandler.class); - private RestHighLevelClient client; - private final ActionListener listener = new ActionListener<>() { - @Override - public void onResponse(UpdateResponse updateResponse) { - LOG.info("Updated Elastic Search {}", updateResponse); - } - - @Override - public void onFailure(Exception e) { - LOG.error("Failed to update Elastic Search", e); - } - }; - - public void init(CatalogApplicationConfig config, Jdbi jdbi) { - ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration(); - RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esConfig.getHost(), esConfig.getPort(), "http")); - if(StringUtils.isNotEmpty(esConfig.getUsername())){ - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword())); - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - return httpAsyncClientBuilder; - }); - } - this.client = new RestHighLevelClient(restClientBuilder); - } - - public Void process(ContainerRequestContext requestContext, - ContainerResponseContext responseContext) { - try { - LOG.info("request Context "+ requestContext.toString()); - String changeEventClazz = ChangeEvent.class.toString(); - if (responseContext.getEntity() != null) { - Object entity = responseContext.getEntity(); - UpdateRequest updateRequest = null; - String entityClass = entity.getClass().toString(); - if (entityClass.toLowerCase().endsWith(Entity.TABLE.toLowerCase())) { - Table instance = (Table) entity; - updateRequest = updateTable(instance); - } else if (entityClass.toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) { - Dashboard instance = (Dashboard) entity; - updateRequest = updateDashboard(instance); - } else if (entityClass.toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) { - Topic instance = (Topic) entity; - updateRequest = updateTopic(instance); - } else if (entityClass.toLowerCase().endsWith(Entity.PIPELINE.toLowerCase())) { - Pipeline instance = (Pipeline) entity; - updateRequest = updatePipeline(instance); - } else if (entityClass.toLowerCase().equalsIgnoreCase(ChangeEvent.class.toString())) { - ChangeEvent changeEvent = (ChangeEvent) entity; - updateRequest = applyChangeEvent(changeEvent); - } - if (updateRequest != null) { - client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener); - } - } - } catch (Exception e) { - LOG.error("failed to update ES doc", e); - } - return null; - } - - private UpdateRequest applyChangeEvent(ChangeEvent event) { - String entityType = event.getEntityType(); - String esIndex = getESIndex(entityType); - UUID entityId = event.getEntityId(); - ChangeDescription changeDescription = event.getChangeDescription(); - List fieldsAdded = changeDescription.getFieldsAdded(); - StringBuilder scriptTxt = new StringBuilder(); - Map fieldAddParams = new HashMap<>(); - for (FieldChange fieldChange: fieldsAdded) { - if (fieldChange.getName().equalsIgnoreCase("followers")) { - List entityReferences = (List) fieldChange.getNewValue(); - List newFollowers = new ArrayList<>(); - for (EntityReference follower : entityReferences) { - newFollowers.add(follower.getId().toString()); - } - fieldAddParams.put(fieldChange.getName(), newFollowers); - scriptTxt.append("ctx._source.followers.addAll(params.followers);"); - } - } - - for (FieldChange fieldChange: changeDescription.getFieldsDeleted()) { - if (fieldChange.getName().equalsIgnoreCase("followers")) { - List entityReferences = (List) fieldChange.getOldValue(); - for (EntityReference follower : entityReferences) { - fieldAddParams.put(fieldChange.getName(), follower.getId().toString()); - } - - scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers))"); - } - } - - if (!scriptTxt.toString().isEmpty()) { - Script script = new Script(ScriptType.INLINE, "painless", - scriptTxt.toString(), - fieldAddParams); - UpdateRequest updateRequest = new UpdateRequest(esIndex, entityId.toString()); - updateRequest.script(script); - return updateRequest; - } else { - return null; - } - } - - private UpdateRequest updateTable(Table instance) { - Map jsonMap = new HashMap<>(); - jsonMap.put("description", instance.getDescription()); - Set tags = new HashSet<>(); - List columnDescriptions = new ArrayList<>(); - List columnNames = new ArrayList<>(); - if (instance.getTags() != null) { - instance.getTags().forEach(tag -> tags.add(tag.getTagFQN())); - } - if (instance.getColumns() != null) { - - List cols = new ArrayList<>(); - cols = parseColumns(instance.getColumns(), cols, null); - - for (FlattenColumn col : cols) { - if (col.getTags() != null) { - tags.addAll(col.getTags()); - } - columnDescriptions.add(col.getDescription()); - columnNames.add(col.getName()); - } - } - - if (!tags.isEmpty()) { - List tagsList = new ArrayList<>(tags); - String tierTag = null; - for (String tag: tagsList) { - if (tag.toLowerCase().matches("(.*)tier(.*)")) { - tierTag = tag; - break; - } - } - if (tierTag != null) { - tagsList.remove(tierTag); - jsonMap.put("tier", tierTag); - } - jsonMap.put("tags", tagsList); - } else { - jsonMap.put("tags", tags); - } - - if (!columnNames.isEmpty()) { - jsonMap.put("column_names", columnNames); - } - if (!columnDescriptions.isEmpty()) { - jsonMap.put("column_descriptions", columnDescriptions); - } - if(instance.getOwner() != null) { - jsonMap.put("owner", instance.getOwner().getId().toString()); - } - if (instance.getFollowers() != null) { - List followers = new ArrayList<>(); - for(EntityReference follower: instance.getFollowers()) { - followers.add(follower.getId().toString()); - } - jsonMap.put("followers", followers); - } - jsonMap.put("last_updated_timestamp", System.currentTimeMillis()); - UpdateRequest updateRequest = new UpdateRequest("table_search_index", instance.getId().toString()); - updateRequest.doc(jsonMap); - return updateRequest; - } - - private UpdateRequest updateTopic(Topic instance) { - Map jsonMap = new HashMap<>(); - jsonMap.put("description", instance.getDescription()); - Set tags = new HashSet<>(); - - if (instance.getTags() != null) { - instance.getTags().forEach(tag -> tags.add(tag.getTagFQN())); - } - if (!tags.isEmpty()) { - List tagsList = new ArrayList<>(tags); - String tierTag = null; - for (String tag: tagsList) { - if (tag.toLowerCase().matches("(.*)tier(.*)")) { - tierTag = tag; - break; - } - } - if (tierTag != null) { - tagsList.remove(tierTag); - jsonMap.put("tier", tierTag); - } - jsonMap.put("tags", tagsList); - } else { - jsonMap.put("tags", tags); - } - - if(instance.getOwner() != null) { - jsonMap.put("owner", instance.getOwner().getId().toString()); - } - if (instance.getFollowers() != null) { - List followers = new ArrayList<>(); - for(EntityReference follower: instance.getFollowers()) { - followers.add(follower.getId().toString()); - } - jsonMap.put("followers", followers); - } - jsonMap.put("last_updated_timestamp", System.currentTimeMillis()); - UpdateRequest updateRequest = new UpdateRequest("topic_search_index", instance.getId().toString()); - updateRequest.doc(jsonMap); - return updateRequest; - } - - private UpdateRequest updateDashboard(Dashboard instance) { - Map jsonMap = new HashMap<>(); - jsonMap.put("description", instance.getDescription()); - Set tags = new HashSet<>(); - if (instance.getTags() != null) { - instance.getTags().forEach(tag -> tags.add(tag.getTagFQN())); - } - - if (!tags.isEmpty()) { - List tagsList = new ArrayList<>(tags); - String tierTag = null; - for (String tag: tagsList) { - if (tag.toLowerCase().matches("(.*)tier(.*)")) { - tierTag = tag; - break; - } - } - if (tierTag != null) { - tagsList.remove(tierTag); - jsonMap.put("tier", tierTag); - } - jsonMap.put("tags", tagsList); - } else { - jsonMap.put("tags", tags); - } - - if(instance.getOwner() != null) { - jsonMap.put("owner", instance.getOwner().getId().toString()); - } - if (instance.getFollowers() != null) { - List followers = new ArrayList<>(); - for(EntityReference follower: instance.getFollowers()) { - followers.add(follower.getId().toString()); - } - jsonMap.put("followers", followers); - } - jsonMap.put("last_updated_timestamp", System.currentTimeMillis()); - UpdateRequest updateRequest = new UpdateRequest("dashboard_search_index", instance.getId().toString()); - updateRequest.doc(jsonMap); - return updateRequest; - } - - private UpdateRequest updatePipeline(Pipeline instance) { - Map jsonMap = new HashMap<>(); - jsonMap.put("description", instance.getDescription()); - Set tags = new HashSet<>(); - if (instance.getTags() != null) { - instance.getTags().forEach(tag -> tags.add(tag.getTagFQN())); - } - - if (!tags.isEmpty()) { - List tagsList = new ArrayList<>(tags); - String tierTag = null; - for (String tag: tagsList) { - if (tag.toLowerCase().matches("(.*)tier(.*)")) { - tierTag = tag; - break; - } - } - if (tierTag != null) { - tagsList.remove(tierTag); - jsonMap.put("tier", tierTag); - } - jsonMap.put("tags", tagsList); - } else { - jsonMap.put("tags", tags); - } - - if(instance.getOwner() != null) { - jsonMap.put("owner", instance.getOwner().getId().toString()); - } - if (instance.getFollowers() != null) { - List followers = new ArrayList<>(); - for(EntityReference follower: instance.getFollowers()) { - followers.add(follower.getId().toString()); - } - jsonMap.put("followers", followers); - } - jsonMap.put("last_updated_timestamp", System.currentTimeMillis()); - UpdateRequest updateRequest = new UpdateRequest("pipeline_search_index", instance.getId().toString()); - updateRequest.doc(jsonMap); - return updateRequest; - } - - private List parseColumns(List columns, List flattenColumns, - String parentColumn) { - Optional optParentColumn = Optional.ofNullable(parentColumn).filter(Predicate.not(String::isEmpty)); - List tags = new ArrayList<>(); - for (Column col: columns) { - String columnName = col.getName(); - if (optParentColumn.isPresent()) { - columnName = optParentColumn.get() + "." + columnName; - } - if (col.getTags() != null) { - tags = col.getTags().stream().map(TagLabel::getTagFQN).collect(Collectors.toList()); - } - - FlattenColumn flattenColumn = FlattenColumn.builder() - .name(columnName) - .description(col.getDescription()).build(); - if (!tags.isEmpty()) { - flattenColumn.tags = tags; - } - flattenColumns.add(flattenColumn); - if (col.getChildren() != null) { - parseColumns(col.getChildren(), flattenColumns, col.getName()); - } - } - return flattenColumns; - } - - private String getESIndex(String type) { - if (type.toLowerCase().equals("table")) { - return "table_search_index"; - } else if (type.equalsIgnoreCase("dashboard")) { - return "dashboard_search_index"; - } else if (type.equalsIgnoreCase("pipeline")) { - return "pipeline_search_index"; - } else if (type.equalsIgnoreCase("topic")) { - return "topic_search_index"; - } - throw new RuntimeException("Failed to find index doc for type {}".format(type)); - } - @Getter - @Builder - public static class FlattenColumn { - String name; - String description; - List tags; - } - - public void close() { - try { - this.client.close(); - } catch (Exception e) { - LOG.error("Failed to close elastic search", e); - } - } - -} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DashboardRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DashboardRepository.java index 25bed1bffa1..f969647a935 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DashboardRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DashboardRepository.java @@ -141,6 +141,7 @@ public class DashboardRepository extends EntityRepository { dashboard.setFullyQualifiedName(getFQN(dashboard)); EntityUtil.populateOwner(dao.userDAO(), dao.teamDAO(), dashboard.getOwner()); // Validate owner dashboard.setTags(EntityUtil.addDerivedTags(dao.tagDAO(), dashboard.getTags())); + dashboard.setCharts(getCharts(dashboard.getCharts())); } @Override @@ -221,7 +222,26 @@ public class DashboardRepository extends EntityRepository { return charts.isEmpty() ? null : charts; } - public void updateCharts(Dashboard original, Dashboard updated, EntityUpdater updater) throws JsonProcessingException { + /** + This method is used to populate the dashboard entity with all details of Chart EntityReference + Users/Tools can send minimum details required to set relationship as id, type are the only required + fields in entity reference, whereas we need to send fully populated object such that ElasticSearch index + has all the details. + */ + private List getCharts(List charts) throws IOException { + if (charts == null) { + return null; + } + List chartRefs = new ArrayList<>(); + for (EntityReference chart: charts) { + EntityReference chartRef = dao.chartDAO().findEntityReferenceById(chart.getId()); + chartRefs.add(chartRef); + } + return chartRefs.isEmpty() ? null : chartRefs; + } + + public void updateCharts(Dashboard original, Dashboard updated, EntityUpdater updater) + throws JsonProcessingException { String dashboardId = updated.getId().toString(); // Remove all charts associated with this dashboard @@ -302,7 +322,7 @@ public class DashboardRepository extends EntityRepository { public Dashboard getEntity() { return entity; } @Override - public void setId(UUID id) { entity.setId(id);} + public void setId(UUID id) { entity.setId(id); } @Override public void setDescription(String description) { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java index a327fbb8cbd..adf9e92a9ff 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java @@ -207,8 +207,8 @@ public class TableRepository extends EntityRepository { public void addQuery(UUID tableId, SQLQuery query) throws IOException { // Validate the request content try { - byte[] checksum = MessageDigest.getInstance("MD5").digest(query.getQuery().getBytes()); - query.setChecksum(Hex.encodeHexString(checksum)); + byte[] checksum = MessageDigest.getInstance("MD5").digest(query.getQuery().getBytes()); + query.setChecksum(Hex.encodeHexString(checksum)); } catch(NoSuchAlgorithmException e) { throw new RuntimeException(e); } @@ -397,6 +397,14 @@ public class TableRepository extends EntityRepository
{ return dao.databaseDAO().findEntityReferenceById(UUID.fromString(result.get(0))); } + private EntityReference getDatabaseService(UUID databaseId) throws IOException { + // Find database for the table + EntityReference serviceRef = EntityUtil.getService(dao.relationshipDAO(), databaseId, + Entity.DATABASE_SERVICE); + serviceRef = dao.dbServiceDAO().findEntityReferenceById(serviceRef.getId()); + return serviceRef; + } + private EntityReference getLocation(UUID tableId) throws IOException { // Find the location of the table List result = dao.relationshipDAO().findTo(tableId.toString(), Relationship.HAS.ordinal(), Entity.LOCATION); @@ -773,7 +781,8 @@ public class TableRepository extends EntityRepository
{ // Carry forward the user generated metadata from existing columns to new columns for (Column updated : updatedColumns) { // Find stored column matching name, data type and ordinal position - Column stored = origColumns.stream().filter(c -> EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null); + Column stored = origColumns.stream().filter(c -> + EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null); if (stored == null) { // New column added continue; } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java index 518bf24578e..726d2017beb 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java @@ -126,6 +126,7 @@ public class SearchResource { if (sortOrderParam.equals("asc")) { sortOrder = SortOrder.ASC; } + if (index.equals("topic_search_index")) { searchSourceBuilder = buildTopicSearchBuilder(query, from, size); } else if (index.equals("dashboard_search_index")) { @@ -134,8 +135,10 @@ public class SearchResource { searchSourceBuilder = buildPipelineSearchBuilder(query, from, size); } else if (index.equals("dbt_model_search_index")) { searchSourceBuilder = buildDbtModelSearchBuilder(query, from, size); - } else { + } else if (index.equals("table_search_index")) { searchSourceBuilder = buildTableSearchBuilder(query, from, size); + } else { + searchSourceBuilder = buildAggregateSearchBuilder(query, from, size); } if (sortFieldParam != null && !sortFieldParam.isEmpty()) { @@ -184,6 +187,20 @@ public class SearchResource { .build(); } + private SearchSourceBuilder buildAggregateSearchBuilder(String query, int from, int size) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.queryStringQuery(query) + .lenient(true)) + .aggregation(AggregationBuilders.terms("Service").field("service_type")) + .aggregation(AggregationBuilders.terms("ServiceCategory").field("service_category")) + .aggregation(AggregationBuilders.terms("EntityType").field("entity_type")) + .aggregation(AggregationBuilders.terms("Tier").field("tier")) + .aggregation(AggregationBuilders.terms("Tags").field("tags")) + .from(from).size(size); + + return searchSourceBuilder; + } + private SearchSourceBuilder buildTableSearchBuilder(String query, int from, int size) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); HighlightBuilder.Field highlightTableName = diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java index 91f88238b9a..6ee4a1b4835 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/JsonUtils.java @@ -140,6 +140,10 @@ public final class JsonUtils { return OBJECT_MAPPER.convertValue(o, JsonStructure.class); } + public static Map getMap(Object o) { + return OBJECT_MAPPER.convertValue(o, Map.class); + } + public static T readValue(String json, Class clz) throws IOException { if (json == null) { return null; diff --git a/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json new file mode 100644 index 00000000000..92c56957837 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json @@ -0,0 +1,74 @@ +{ + "properties": { + "dashboard_name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "owner": { + "type": "keyword" + }, + "fqdn": { + "type": "keyword" + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "chart_names": { + "type":"text" + }, + "chart_descriptions": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "service": { + "type": "keyword" + }, + "service_type": { + "type": "keyword" + }, + "service_category": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + }, + "monthly_stats":{ + "type": "long" + }, + "monthly_percentile_rank":{ + "type": "long" + }, + "weekly_stats":{ + "type": "long" + }, + "weekly_percentile_rank":{ + "type": "long" + }, + "daily_percentile_rank": { + "type": "long" + }, + "daily_stats": { + "type": "long" + }, + "change_descriptions": { + "type": "nested" + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json new file mode 100644 index 00000000000..916bd4783e1 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json @@ -0,0 +1,76 @@ +{ + "properties": { + "dbt_model_name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "fqdn": { + "type": "keyword" + }, + "owner": { + "type": "text" + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "column_names": { + "type":"text" + }, + "column_descriptions": { + "type": "text" + }, + "tags": { + "type": "keyword" + }, + "service": { + "type": "keyword" + }, + "service_type": { + "type": "keyword" + }, + "service_category": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "database": { + "type": "text" + }, + "suggest": { + "type": "completion" + }, + "change_descriptions": { + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "text" + }, + "fieldsDeleted": { + "type": "text" + }, + "fieldsUpdated": { + "type": "text" + } + } + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json new file mode 100644 index 00000000000..3397d8c3014 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json @@ -0,0 +1,56 @@ +{ + "properties": { + "pipeline_name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "owner": { + "type": "keyword" + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "fqdn": { + "type": "keyword" + }, + "task_names": { + "type":"text" + }, + "task_descriptions": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "service": { + "type": "keyword" + }, + "service_type": { + "type": "keyword" + }, + "service_category": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + }, + "change_descriptions": { + "type": "nested" + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json new file mode 100644 index 00000000000..3cc26c774c9 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json @@ -0,0 +1,77 @@ +{ + "properties": { + "table_name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "owner": { + "type": "text" + }, + "fqdn": { + "type": "keyword" + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "column_names": { + "type":"text" + }, + "column_descriptions": { + "type": "text" + }, + "tags": { + "type": "keyword" + }, + "service": { + "type": "keyword" + }, + "service_type": { + "type": "keyword" + }, + "service_category": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "database": { + "type": "text" + }, + "suggest": { + "type": "completion" + }, + "monthly_stats":{ + "type": "long" + }, + "monthly_percentile_rank":{ + "type": "long" + }, + "weekly_stats":{ + "type": "long" + }, + "weekly_percentile_rank":{ + "type": "long" + }, + "daily_percentile_rank": { + "type": "long" + }, + "daily_stats": { + "type": "long" + }, + "change_descriptions": { + "type": "nested" + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json new file mode 100644 index 00000000000..7a29a3ca82d --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json @@ -0,0 +1,50 @@ +{ + "properties": { + "topic_name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "owner": { + "type": "text" + }, + "fqdn": { + "type": "keyword" + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "service": { + "type": "keyword" + }, + "service_type": { + "type": "keyword" + }, + "service_category": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + }, + "change_descriptions": { + "type": "nested" + } + } +} \ No newline at end of file diff --git a/conf/openmetadata-security.yaml b/conf/openmetadata-security.yaml index ab4482ee563..3f72ca4be37 100644 --- a/conf/openmetadata-security.yaml +++ b/conf/openmetadata-security.yaml @@ -144,7 +144,8 @@ airflowConfiguration: eventHandlerConfiguration: eventHandlerClassNames: - "org.openmetadata.catalog.events.AuditEventHandler" - - "org.openmetadata.catalog.events.ElasticSearchEventHandler" + - "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler" + - "org.openmetadata.catalog.events.ChangeEventHandler" health: delayedShutdownHandlerEnabled: true diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index ba0ebc674ff..a8c8eca3600 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -118,7 +118,7 @@ elasticsearch: eventHandlerConfiguration: eventHandlerClassNames: - "org.openmetadata.catalog.events.AuditEventHandler" - - "org.openmetadata.catalog.events.ElasticSearchEventHandler" + - "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler" - "org.openmetadata.catalog.events.ChangeEventHandler" airflowConfiguration: diff --git a/docs/install/configuration.md b/docs/install/configuration.md index 28f5c402e35..1ff62db75d3 100644 --- a/docs/install/configuration.md +++ b/docs/install/configuration.md @@ -52,7 +52,7 @@ elasticsearch: eventHandlerConfiguration: eventHandlerClassNames: - "org.openmetadata.catalog.events.AuditEventHandler" - - "org.openmetadata.catalog.events.ElasticSearchEventHandler" + - "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler" health: delayedShutdownHandlerEnabled: true @@ -114,7 +114,7 @@ ElasticSearch is one of the pre-requisites to run OpenMetadata. Default configur eventHandlerConfiguration: eventHandlerClassNames: - "org.openmetadata.catalog.events.AuditEventHandler" - - "org.openmetadata.catalog.events.ElasticSearchEventHandler" + - "org.openmetadata.catalog.elasticsearch.ElasticSearchEventHandler" ``` EventHandler configuration is optional. It will update the AuditLog in MySQL DB and also ElasticSearch indexes whenever any entity is updated either through UI or API interactions. We recommend you leave it there as it enhances the user experience. diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 8ca4074805a..11b72c29668 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -32,7 +32,7 @@ class TableESDocument(BaseModel): service_type: str service_category: str entity_type: str = "table" - table_name: str + name: str suggest: List[dict] description: Optional[str] = None table_type: Optional[str] = None @@ -48,7 +48,6 @@ class TableESDocument(BaseModel): tags: List[str] fqdn: str tier: Optional[str] = None - schema_description: Optional[str] = None owner: str followers: List[str] @@ -61,14 +60,13 @@ class TopicESDocument(BaseModel): service_type: str service_category: str entity_type: str = "topic" - topic_name: str + name: str suggest: List[dict] description: Optional[str] = None last_updated_timestamp: Optional[int] tags: List[str] fqdn: str tier: Optional[str] = None - schema_description: Optional[str] = None owner: str followers: List[str] @@ -81,7 +79,7 @@ class DashboardESDocument(BaseModel): service_type: str service_category: str entity_type: str = "dashboard" - dashboard_name: str + name: str suggest: List[dict] description: Optional[str] = None last_updated_timestamp: Optional[int] @@ -108,7 +106,7 @@ class PipelineESDocument(BaseModel): service_type: str service_category: str entity_type: str = "pipeline" - pipeline_name: str + name: str suggest: List[dict] description: Optional[str] = None last_updated_timestamp: Optional[int] @@ -130,7 +128,7 @@ class DbtModelESDocument(BaseModel): service_type: str service_category: str entity_type: str = "dbtmodel" - dbt_model_name: str + name: str suggest: List[dict] description: Optional[str] = None dbt_type: Optional[str] = None @@ -140,7 +138,6 @@ class DbtModelESDocument(BaseModel): tags: List[str] fqdn: str tier: Optional[str] = None - schema_description: Optional[str] = None owner: str followers: List[str] diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index b9079ffedda..24965fe826a 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -235,7 +235,7 @@ class ElasticsearchSink(Sink): service=service_entity.name, service_type=service_entity.serviceType.name, service_category="databaseService", - table_name=table.name.__root__, + name=table.name.__root__, suggest=suggest, description=table.description, table_type=table_type, @@ -251,7 +251,6 @@ class ElasticsearchSink(Sink): tier=tier, tags=list(tags), fqdn=fqdn, - schema_description=None, owner=table_owner, followers=table_followers, ) @@ -285,7 +284,7 @@ class ElasticsearchSink(Sink): service=service_entity.name, service_type=service_entity.serviceType.name, service_category="messagingService", - topic_name=topic.name.__root__, + name=topic.name.__root__, suggest=suggest, description=topic.description, last_updated_timestamp=timestamp, @@ -335,7 +334,7 @@ class ElasticsearchSink(Sink): service=service_entity.name, service_type=service_entity.serviceType.name, service_category="dashboardService", - dashboard_name=dashboard.displayName, + name=dashboard.displayName, chart_names=chart_names, chart_descriptions=chart_descriptions, suggest=suggest, @@ -393,7 +392,7 @@ class ElasticsearchSink(Sink): service=service_entity.name, service_type=service_entity.serviceType.name, service_category="pipelineService", - pipeline_name=pipeline.displayName, + name=pipeline.displayName, task_names=task_names, task_descriptions=task_descriptions, suggest=suggest, @@ -453,7 +452,7 @@ class ElasticsearchSink(Sink): service=service_entity.name, service_type=service_entity.serviceType.name, service_category="databaseService", - dbt_model_name=dbt_model.name.__root__, + name=dbt_model.name.__root__, suggest=suggest, description=dbt_model.description, dbt_model_type=dbt_node_type, diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py index 0ff1d3d57aa..96cc2795c04 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py @@ -13,21 +13,12 @@ import textwrap TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( """ - { + { "mappings":{ "properties": { "table_name": { "type":"text" }, - "schema": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, "display_name": { "type": "text" }, @@ -37,6 +28,9 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "followers": { "type": "keyword" }, + "fqdn": { + "type": "keyword" + }, "last_updated_timestamp": { "type": "date", "format": "epoch_second" @@ -56,9 +50,6 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "tags": { "type": "keyword" }, - "badges": { - "type": "text" - }, "service": { "type": "keyword" }, @@ -96,7 +87,7 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "long" } } - } + } } """ ) @@ -109,15 +100,6 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "topic_name": { "type":"text" }, - "schema": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, "display_name": { "type": "text" }, @@ -127,6 +109,9 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "followers": { "type": "keyword" }, + "fqdn": { + "type": "keyword" + }, "last_updated_timestamp": { "type": "date", "format": "epoch_second" @@ -175,6 +160,9 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "owner": { "type": "keyword" }, + "fqdn": { + "type": "keyword" + }, "followers": { "type": "keyword" }, @@ -247,6 +235,9 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "display_name": { "type": "text" }, + "fqdn": { + "type": "keyword" + }, "owner": { "type": "keyword" }, @@ -302,15 +293,6 @@ DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "dbt_model_name": { "type":"text" }, - "schema": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, "display_name": { "type": "text" }, @@ -320,6 +302,9 @@ DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "followers": { "type": "keyword" }, + "fqdn": { + "type": "keyword" + }, "last_updated_timestamp": { "type": "date", "format": "epoch_second" diff --git a/openmetadata-ui/src/main/resources/ui/src/components/app-bar/Suggestions.tsx b/openmetadata-ui/src/main/resources/ui/src/components/app-bar/Suggestions.tsx index 97ebe5f808b..943b59cbc33 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/app-bar/Suggestions.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/app-bar/Suggestions.tsx @@ -29,6 +29,7 @@ type SuggestionProp = { type CommonSource = { fqdn: string; service_type: string; + name: string; }; type TableSource = { @@ -185,7 +186,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => { {tableSuggestions.map((suggestion: TableSource) => { const fqdn = suggestion.fqdn; - const name = suggestion.table_name; + const name = suggestion.name; const serviceType = suggestion.service_type; return getSuggestionElement( @@ -203,7 +204,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => { {topicSuggestions.map((suggestion: TopicSource) => { const fqdn = suggestion.fqdn; - const name = suggestion.topic_name; + const name = suggestion.name; const serviceType = suggestion.service_type; return getSuggestionElement( @@ -221,7 +222,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => { {dashboardSuggestions.map((suggestion: DashboardSource) => { const fqdn = suggestion.fqdn; - const name = suggestion.dashboard_name; + const name = suggestion.name; const serviceType = suggestion.service_type; return getSuggestionElement( @@ -239,7 +240,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => { {pipelineSuggestions.map((suggestion: PipelineSource) => { const fqdn = suggestion.fqdn; - const name = suggestion.pipeline_name; + const name = suggestion.name; const serviceType = suggestion.service_type; return getSuggestionElement( @@ -257,7 +258,7 @@ const Suggestions = ({ searchText, isOpen, setIsOpen }: SuggestionProp) => { {DBTModelSuggestions.map((suggestion: DBTModelSource) => { const fqdn = suggestion.fqdn; - const name = suggestion.dbt_model_name; + const name = suggestion.name; const serviceType = suggestion.service_type; return getSuggestionElement( diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/APIUtils.js b/openmetadata-ui/src/main/resources/ui/src/utils/APIUtils.js index a4950e79042..1be68bd4509 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/APIUtils.js +++ b/openmetadata-ui/src/main/resources/ui/src/utils/APIUtils.js @@ -25,12 +25,7 @@ export const formatDataResponse = (hits) => { hit._source.topic_id || hit._source.dashboard_id || hit._source.pipeline_id; - newData.name = - hit._source.table_name || - hit._source.topic_name || - hit._source.dashboard_name || - hit._source.pipeline_name || - hit._source.dbt_model_name; + newData.name = hit._source.name; newData.description = hit._source.description; newData.fullyQualifiedName = hit._source.fqdn; newData.tableType = hit._source.table_type;