diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java index e0aa49693a9..596e563c1d2 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventHandler.java @@ -137,7 +137,17 @@ public class ElasticSearchEventHandler implements EventHandler { List fieldsAdded = changeDescription.getFieldsAdded(); StringBuilder scriptTxt = new StringBuilder(); Map fieldAddParams = new HashMap<>(); - + ESChangeDescription esChangeDescription = ESChangeDescription.builder() + .updatedAt(event.getDateTime().getTime()) + .updatedBy(event.getUserName()) + .fieldsAdded(changeDescription.getFieldsAdded()) + .fieldsUpdated(changeDescription.getFieldsUpdated()) + .fieldsDeleted(changeDescription.getFieldsDeleted()).build(); + Map esChangeDescriptionDoc = JsonUtils.getMap(esChangeDescription); + fieldAddParams.put("change_description", esChangeDescriptionDoc); + fieldAddParams.put("last_updated_timestamp", event.getDateTime().getTime()); + scriptTxt.append("ctx._source.change_descriptions.add(params.change_description); "); + scriptTxt.append("ctx._source.last_updated_timestamp=params.last_updated_timestamp;"); for (FieldChange fieldChange: fieldsAdded) { if (fieldChange.getName().equalsIgnoreCase("followers")) { List entityReferences = (List) fieldChange.getNewValue(); @@ -159,15 +169,7 @@ public class ElasticSearchEventHandler implements EventHandler { scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers));"); } } - ESChangeDescription esChangeDescription = ESChangeDescription.builder() - .updatedAt(event.getDateTime().getTime()) - .updatedBy(event.getUserName()).build(); - esChangeDescription.setFieldsAdded(changeDescription.getFieldsAdded()); - esChangeDescription.setFieldsDeleted(changeDescription.getFieldsDeleted()); - esChangeDescription.setFieldsUpdated(changeDescription.getFieldsUpdated()); - Map esChangeDescriptionDoc = JsonUtils.getMap(esChangeDescription); - fieldAddParams.put("change_description", esChangeDescriptionDoc); - scriptTxt.append("ctx._source.change_descriptions.add(params.change_description);"); + if (!scriptTxt.toString().isEmpty()) { Script script = new Script(ScriptType.INLINE, "painless", scriptTxt.toString(), diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java index b286f868c6a..a48a77ad1f8 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java @@ -66,8 +66,7 @@ public class ElasticSearchIndexDefinition { TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/table_index_mapping.json"), TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/topic_index_mapping.json"), DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"), - PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"), - DBT_MODEL_SEARCH_INDEX("dbt_model_search_index", "/elasticsearch/dbt_index_mapping.json"); + PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"); public final String indexName; public final String indexMappingFile; @@ -212,9 +211,9 @@ class FlattenColumn { class ESChangeDescription { String updatedBy; Long updatedAt; - List fieldsAdded; - List fieldsUpdated; - List fieldsDeleted; + List fieldsAdded = new ArrayList<>(); + List fieldsUpdated = new ArrayList<>(); + List fieldsDeleted = new ArrayList<>(); } class ParseTags { @@ -351,7 +350,11 @@ class TableESIndex extends ElasticSearchIndex { esChangeDescription.setFieldsUpdated(table.getChangeDescription().getFieldsUpdated()); } else if (responseCode == Response.Status.CREATED.getStatusCode()) { esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime()) - .updatedBy(table.getUpdatedBy()).build(); + .updatedBy(table.getUpdatedBy()) + .fieldsAdded(new ArrayList<>()) + .fieldsUpdated(new ArrayList<>()) + .fieldsDeleted(new ArrayList<>()) + .build(); } tableESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null); return tableESIndexBuilder; diff --git a/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json index 957fd016a21..4a275b79f67 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json @@ -68,7 +68,27 @@ "type": "long" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } + } } } } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json index a4eb3f86b73..cf20269ca2b 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/dbt_index_mapping.json @@ -53,24 +53,7 @@ "type": "completion" }, "change_descriptions": { - "type": "nested", - "properties": { - "updatedAt": { - "type": "long" - }, - "updatedBy": { - "type": "text" - }, - "fieldsAdded": { - "type": "text" - }, - "fieldsDeleted": { - "type": "text" - }, - "fieldsUpdated": { - "type": "text" - } - } + "type": "nested" } } } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json index 0ff1136c1a1..c713cbf4da4 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json @@ -50,7 +50,27 @@ "type": "completion" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } + } } } } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json index 3d3c35d16a9..0ad3f81155d 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json @@ -71,7 +71,27 @@ "type": "long" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } + } } } } \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json index bf6de48ea72..b943059c20c 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json @@ -44,7 +44,27 @@ "type": "completion" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } + } } } } \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index dbdd204dea9..c211135e1be 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -36,7 +36,6 @@ from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.table_metadata import ( ChangeDescription, DashboardESDocument, - DbtModelESDocument, PipelineESDocument, TableESDocument, TopicESDocument, @@ -45,7 +44,6 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.sink.elasticsearch_constants import ( DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, - DBT_ELASTICSEARCH_INDEX_MAPPING, PIPELINE_ELASTICSEARCH_INDEX_MAPPING, TABLE_ELASTICSEARCH_INDEX_MAPPING, TOPIC_ELASTICSEARCH_INDEX_MAPPING, @@ -140,10 +138,6 @@ class ElasticsearchSink(Sink[Entity]): self._check_or_create_index( self.config.pipeline_index_name, PIPELINE_ELASTICSEARCH_INDEX_MAPPING ) - if self.config.index_dbt_models: - self._check_or_create_index( - self.config.dbt_index_name, DBT_ELASTICSEARCH_INDEX_MAPPING - ) def _check_or_create_index(self, index_name: str, es_mapping: str): """ @@ -226,7 +220,7 @@ class ElasticsearchSink(Sink[Entity]): column_descriptions = [] tags = set() - timestamp = time.time() + timestamp = int(table.updatedAt.__root__.timestamp()) tier = None for table_tag in table.tags: if "Tier" in table_tag.tagFQN: @@ -289,7 +283,7 @@ class ElasticsearchSink(Sink[Entity]): {"input": [topic_name], "weight": 10}, ] tags = set() - timestamp = time.time() + timestamp = topic.updatedAt.__root__.timestamp() service_entity = self.metadata.get_by_id( entity=MessagingService, entity_id=str(topic.service.id.__root__) ) @@ -321,7 +315,6 @@ class ElasticsearchSink(Sink[Entity]): followers=topic_followers, change_descriptions=change_descriptions, ) - print(topic_doc.json()) return topic_doc def _create_dashboard_es_doc(self, dashboard: Dashboard): @@ -329,7 +322,7 @@ class ElasticsearchSink(Sink[Entity]): dashboard_name = dashboard.name suggest = [{"input": [dashboard.displayName], "weight": 10}] tags = set() - timestamp = time.time() + timestamp = dashboard.updatedAt.__root__.timestamp() service_entity = self.metadata.get_by_id( entity=DashboardService, entity_id=str(dashboard.service.id.__root__) ) @@ -390,7 +383,7 @@ class ElasticsearchSink(Sink[Entity]): fqdn = pipeline.fullyQualifiedName suggest = [{"input": [pipeline.displayName], "weight": 10}] tags = set() - timestamp = time.time() + timestamp = pipeline.updatedAt.__root__.timestamp() service_entity = self.metadata.get_by_id( entity=PipelineService, entity_id=str(pipeline.service.id.__root__) ) diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py index bd140beac46..c4ef945a67d 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py @@ -87,9 +87,30 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "long" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } } } + } + } } } """ @@ -144,9 +165,30 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "completion" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } } } + } + } } } """ @@ -225,9 +267,30 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "long" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } } } + } + } } } """ @@ -288,77 +351,30 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "completion" }, "change_descriptions": { - "type": "nested" + "type": "nested", + "properties": { + "updatedAt": { + "type": "long" + }, + "updatedBy": { + "type": "text" + }, + "fieldsAdded": { + "type": "object", + "enabled": false + }, + "fieldsDeleted": { + "type": "object", + "enabled": false + }, + "fieldsUpdated": { + "type": "object", + "enabled": false + } } } - } - } - """ -) - - -DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( - """ - { - "mappings":{ - "properties": { - "name": { - "type":"text" - }, - "display_name": { - "type": "text" - }, - "owner": { - "type": "text" - }, - "followers": { - "type": "keyword" - }, - "fqdn": { - "type": "keyword" - }, - "last_updated_timestamp": { - "type": "date", - "format": "epoch_second" - }, - "description": { - "type": "text" - }, - "tier": { - "type": "keyword" - }, - "column_names": { - "type":"text" - }, - "column_descriptions": { - "type": "text" - }, - "tags": { - "type": "keyword" - }, - "service": { - "type": "keyword" - }, - "service_type": { - "type": "keyword" - }, - "service_category": { - "type": "keyword" - }, - "entity_type": { - "type": "keyword" - }, - "database": { - "type": "text" - }, - "suggest": { - "type": "completion" - }, - "change_descriptions": { - "type": "nested" - } - } - } + } + } } """ )