Fix #5303: Add an option to ElasticSearch connector to re-create indexes (#5304)

This commit is contained in:
Sriharsha Chintalapani 2022-06-05 13:47:44 -07:00 committed by GitHub
parent a62e57c6dc
commit a056ccd9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 23 deletions

View File

@ -153,9 +153,14 @@ public class ElasticSearchIndexDefinition {
private void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) { private void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) {
try { try {
GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName);
gRequest.local(false);
boolean exists = client.indices().exists(gRequest, RequestOptions.DEFAULT);
if (exists) {
DeleteIndexRequest request = new DeleteIndexRequest(elasticSearchIndexType.indexName); DeleteIndexRequest request = new DeleteIndexRequest(elasticSearchIndexType.indexName);
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
LOG.info("{} Deleted {}", elasticSearchIndexType.indexName, deleteIndexResponse.isAcknowledged()); LOG.info("{} Deleted {}", elasticSearchIndexType.indexName, deleteIndexResponse.isAcknowledged());
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to delete Elastic Search indexes due to", e); LOG.error("Failed to delete Elastic Search indexes due to", e);
} }

View File

@ -4,12 +4,7 @@
"serviceName": "openMetadata", "serviceName": "openMetadata",
"serviceConnection": { "serviceConnection": {
"config":{ "config":{
"type":"MetadataES", "type":"MetadataES"
"includeTables": "true",
"includeUsers": "true",
"includeTopics": "true",
"includeDashboards": "true",
"limitRecords": 10
} }
}, },
"sourceConfig":{"config":{}} "sourceConfig":{"config":{}}
@ -17,11 +12,9 @@
"sink": { "sink": {
"type": "elasticsearch", "type": "elasticsearch",
"config": { "config": {
"index_tables": "true",
"index_topics": "true",
"index_dashboards": "true",
"es_host": "localhost", "es_host": "localhost",
"es_port": 9200 "es_port": 9200,
"recreate_indexes": true
} }
}, },
"workflowConfig": { "workflowConfig": {

View File

@ -91,6 +91,7 @@ class ElasticSearchConfig(ConfigModel):
verify_certs: bool = False verify_certs: bool = False
timeout: int = 30 timeout: int = 30
ca_certs: Optional[str] = None ca_certs: Optional[str] = None
recreate_indexes: Optional[bool] = False
class ElasticsearchSink(Sink[Entity]): class ElasticsearchSink(Sink[Entity]):
@ -176,7 +177,10 @@ class ElasticsearchSink(Sink[Entity]):
Retrieve all indices that currently have {elasticsearch_alias} alias Retrieve all indices that currently have {elasticsearch_alias} alias
:return: list of elasticsearch indices :return: list of elasticsearch indices
""" """
if self.elasticsearch_client.indices.exists(index_name): if (
self.elasticsearch_client.indices.exists(index_name)
and not self.config.recreate_indexes
):
mapping = self.elasticsearch_client.indices.get_mapping() mapping = self.elasticsearch_client.indices.get_mapping()
if not mapping[index_name]["mappings"]: if not mapping[index_name]["mappings"]:
logger.debug( logger.debug(
@ -197,6 +201,9 @@ class ElasticsearchSink(Sink[Entity]):
+ "The index doesn't exist for a newly created ES. It's OK on first run." + "The index doesn't exist for a newly created ES. It's OK on first run."
) )
# create new index with mapping # create new index with mapping
self.elasticsearch_client.indices.delete(
index=index_name, request_timeout=self.config.timeout
)
self.elasticsearch_client.indices.create( self.elasticsearch_client.indices.create(
index=index_name, body=es_mapping, request_timeout=self.config.timeout index=index_name, body=es_mapping, request_timeout=self.config.timeout
) )
@ -316,7 +323,7 @@ class ElasticsearchSink(Sink[Entity]):
name=table.name.__root__, name=table.name.__root__,
suggest=suggest, suggest=suggest,
database_schema=str(database_schema_entity.name.__root__), database_schema=str(database_schema_entity.name.__root__),
description=table.description, description=str(table.description.__root__),
table_type=table_type, table_type=table_type,
last_updated_timestamp=timestamp, last_updated_timestamp=timestamp,
column_names=column_names, column_names=column_names,
@ -366,7 +373,7 @@ class ElasticsearchSink(Sink[Entity]):
service_category="messagingService", service_category="messagingService",
name=topic.name.__root__, name=topic.name.__root__,
suggest=suggest, suggest=suggest,
description=topic.description, description=topic.description.__root__,
last_updated_timestamp=timestamp, last_updated_timestamp=timestamp,
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
@ -400,7 +407,7 @@ class ElasticsearchSink(Sink[Entity]):
for chart in charts: for chart in charts:
chart_names.append(chart.displayName) chart_names.append(chart.displayName)
if chart.description is not None: if chart.description is not None:
chart_descriptions.append(chart.description) chart_descriptions.append(chart.description.__root__)
if len(chart.tags) > 0: if len(chart.tags) > 0:
for col_tag in chart.tags: for col_tag in chart.tags:
tags.add(col_tag.tagFQN.__root__) tags.add(col_tag.tagFQN.__root__)
@ -415,7 +422,7 @@ class ElasticsearchSink(Sink[Entity]):
chart_names=chart_names, chart_names=chart_names,
chart_descriptions=chart_descriptions, chart_descriptions=chart_descriptions,
suggest=suggest, suggest=suggest,
description=dashboard.description, description=dashboard.description.__root__,
last_updated_timestamp=timestamp, last_updated_timestamp=timestamp,
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
@ -456,7 +463,7 @@ class ElasticsearchSink(Sink[Entity]):
for task in tasks: for task in tasks:
task_names.append(task.displayName) task_names.append(task.displayName)
if task.description: if task.description:
task_descriptions.append(task.description) task_descriptions.append(task.description.__root__)
if tags in task and len(task.tags) > 0: if tags in task and len(task.tags) > 0:
for col_tag in task.tags: for col_tag in task.tags:
tags.add(col_tag.tagFQN) tags.add(col_tag.tagFQN)
@ -471,7 +478,7 @@ class ElasticsearchSink(Sink[Entity]):
task_names=task_names, task_names=task_names,
task_descriptions=task_descriptions, task_descriptions=task_descriptions,
suggest=suggest, suggest=suggest,
description=pipeline.description, description=pipeline.description.__root__,
last_updated_timestamp=timestamp, last_updated_timestamp=timestamp,
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
@ -548,7 +555,9 @@ class ElasticsearchSink(Sink[Entity]):
{"input": [glossary_term.name], "weight": 10}, {"input": [glossary_term.name], "weight": 10},
] ]
timestamp = glossary_term.updatedAt.__root__ timestamp = glossary_term.updatedAt.__root__
description = glossary_term.description if glossary_term.description else "" description = (
glossary_term.description.__root__ if glossary_term.description else ""
)
glossary_term_doc = GlossaryTermESDocument( glossary_term_doc = GlossaryTermESDocument(
glossary_term_id=str(glossary_term.id.__root__), glossary_term_id=str(glossary_term.id.__root__),
deleted=glossary_term.deleted, deleted=glossary_term.deleted,
@ -591,7 +600,7 @@ class ElasticsearchSink(Sink[Entity]):
) )
column_names.append(col_name) column_names.append(col_name)
if column.description: if column.description:
column_descriptions.append(column.description) column_descriptions.append(column.description.__root__)
if len(column.tags) > 0: if len(column.tags) > 0:
for col_tag in column.tags: for col_tag in column.tags:
tags.add(col_tag.tagFQN.__root__) tags.add(col_tag.tagFQN.__root__)

View File

@ -272,6 +272,8 @@ class MetadataRestSink(Sink[Entity]):
try: try:
topic_request = CreateTopicRequest( topic_request = CreateTopicRequest(
name=topic.name, name=topic.name,
displayName=topic.displayName,
description=topic.description,
service=topic.service, service=topic.service,
partitions=topic.partitions, partitions=topic.partitions,
replicationFactor=topic.replicationFactor, replicationFactor=topic.replicationFactor,