From a056ccd9a0352d19cd3b4640bb1f117e4c543ea2 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 5 Jun 2022 13:47:44 -0700 Subject: [PATCH] Fix #5303: Add an option to ElasticSearch connector to re-create indexes (#5304) --- .../ElasticSearchIndexDefinition.java | 11 +++++--- ingestion/pipelines/metadata_to_es.json | 15 +++-------- .../metadata/ingestion/sink/elasticsearch.py | 27 ++++++++++++------- .../metadata/ingestion/sink/metadata_rest.py | 2 ++ 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java index 55f29b57a04..05b90408de0 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java @@ -153,9 +153,14 @@ public class ElasticSearchIndexDefinition { private void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) { try { - DeleteIndexRequest request = new DeleteIndexRequest(elasticSearchIndexType.indexName); - AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); - LOG.info("{} Deleted {}", elasticSearchIndexType.indexName, deleteIndexResponse.isAcknowledged()); + GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName); + gRequest.local(false); + boolean exists = client.indices().exists(gRequest, RequestOptions.DEFAULT); + if (exists) { + DeleteIndexRequest request = new DeleteIndexRequest(elasticSearchIndexType.indexName); + AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); + LOG.info("{} Deleted {}", elasticSearchIndexType.indexName, deleteIndexResponse.isAcknowledged()); + } } catch (IOException e) { LOG.error("Failed to delete Elastic Search indexes due to", e); } diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index 70876378b8d..be33fea7f23 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -4,24 +4,17 @@ "serviceName": "openMetadata", "serviceConnection": { "config":{ - "type":"MetadataES", - "includeTables": "true", - "includeUsers": "true", - "includeTopics": "true", - "includeDashboards": "true", - "limitRecords": 10 - } + "type":"MetadataES" + } }, "sourceConfig":{"config":{}} }, "sink": { "type": "elasticsearch", "config": { - "index_tables": "true", - "index_topics": "true", - "index_dashboards": "true", "es_host": "localhost", - "es_port": 9200 + "es_port": 9200, + "recreate_indexes": true } }, "workflowConfig": { diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 55caa35da4b..d3fae125b45 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -91,6 +91,7 @@ class ElasticSearchConfig(ConfigModel): verify_certs: bool = False timeout: int = 30 ca_certs: Optional[str] = None + recreate_indexes: Optional[bool] = False class ElasticsearchSink(Sink[Entity]): @@ -176,7 +177,10 @@ class ElasticsearchSink(Sink[Entity]): Retrieve all indices that currently have {elasticsearch_alias} alias :return: list of elasticsearch indices """ - if self.elasticsearch_client.indices.exists(index_name): + if ( + self.elasticsearch_client.indices.exists(index_name) + and not self.config.recreate_indexes + ): mapping = self.elasticsearch_client.indices.get_mapping() if not mapping[index_name]["mappings"]: logger.debug( @@ -197,6 +201,9 @@ class ElasticsearchSink(Sink[Entity]): + "The index doesn't exist for a newly created ES. It's OK on first run." ) # create new index with mapping + self.elasticsearch_client.indices.delete( + index=index_name, request_timeout=self.config.timeout + ) self.elasticsearch_client.indices.create( index=index_name, body=es_mapping, request_timeout=self.config.timeout ) @@ -316,7 +323,7 @@ class ElasticsearchSink(Sink[Entity]): name=table.name.__root__, suggest=suggest, database_schema=str(database_schema_entity.name.__root__), - description=table.description, + description=str(table.description.__root__), table_type=table_type, last_updated_timestamp=timestamp, column_names=column_names, @@ -366,7 +373,7 @@ class ElasticsearchSink(Sink[Entity]): service_category="messagingService", name=topic.name.__root__, suggest=suggest, - description=topic.description, + description=topic.description.__root__, last_updated_timestamp=timestamp, tier=tier, tags=list(tags), @@ -400,7 +407,7 @@ class ElasticsearchSink(Sink[Entity]): for chart in charts: chart_names.append(chart.displayName) if chart.description is not None: - chart_descriptions.append(chart.description) + chart_descriptions.append(chart.description.__root__) if len(chart.tags) > 0: for col_tag in chart.tags: tags.add(col_tag.tagFQN.__root__) @@ -415,7 +422,7 @@ class ElasticsearchSink(Sink[Entity]): chart_names=chart_names, chart_descriptions=chart_descriptions, suggest=suggest, - description=dashboard.description, + description=dashboard.description.__root__, last_updated_timestamp=timestamp, tier=tier, tags=list(tags), @@ -456,7 +463,7 @@ class ElasticsearchSink(Sink[Entity]): for task in tasks: task_names.append(task.displayName) if task.description: - task_descriptions.append(task.description) + task_descriptions.append(task.description.__root__) if tags in task and len(task.tags) > 0: for col_tag in task.tags: tags.add(col_tag.tagFQN) @@ -471,7 +478,7 @@ class ElasticsearchSink(Sink[Entity]): task_names=task_names, task_descriptions=task_descriptions, suggest=suggest, - description=pipeline.description, + description=pipeline.description.__root__, last_updated_timestamp=timestamp, tier=tier, tags=list(tags), @@ -548,7 +555,9 @@ class ElasticsearchSink(Sink[Entity]): {"input": [glossary_term.name], "weight": 10}, ] timestamp = glossary_term.updatedAt.__root__ - description = glossary_term.description if glossary_term.description else "" + description = ( + glossary_term.description.__root__ if glossary_term.description else "" + ) glossary_term_doc = GlossaryTermESDocument( glossary_term_id=str(glossary_term.id.__root__), deleted=glossary_term.deleted, @@ -591,7 +600,7 @@ class ElasticsearchSink(Sink[Entity]): ) column_names.append(col_name) if column.description: - column_descriptions.append(column.description) + column_descriptions.append(column.description.__root__) if len(column.tags) > 0: for col_tag in column.tags: tags.add(col_tag.tagFQN.__root__) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 2728b69080d..2416a16b39f 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -272,6 +272,8 @@ class MetadataRestSink(Sink[Entity]): try: topic_request = CreateTopicRequest( name=topic.name, + displayName=topic.displayName, + description=topic.description, service=topic.service, partitions=topic.partitions, replicationFactor=topic.replicationFactor,