Fixes #15543: Implemented ElasticSearch Index Template Ingestion (#18686)

This commit is contained in:
Keshav Mohta 2024-11-26 14:42:43 +05:30 committed by GitHub
parent 8f33a1b92d
commit 10a7f4ea6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 169 additions and 19 deletions

View File

@ -12,6 +12,7 @@
Elasticsearch source to extract metadata
"""
import shutil
import traceback
from pathlib import Path
from typing import Any, Iterable, Optional
@ -21,6 +22,7 @@ from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.entity.data.searchIndex import (
IndexType,
SearchIndex,
SearchIndexSampleData,
)
@ -103,6 +105,7 @@ class ElasticsearchSource(SearchServiceSource):
fields=parse_es_index_mapping(
search_index_details.get(index_name, {}).get("mappings")
),
indexType=IndexType.Index,
)
yield Either(right=search_index_request)
self.register_record(search_index_request=search_index_request)
@ -143,6 +146,56 @@ class ElasticsearchSource(SearchServiceSource):
)
)
def get_search_index_template_list(self) -> Iterable[dict]:
"""
Get List of all search index template
"""
yield from self.client.indices.get_index_template().get("index_templates", [])
def get_search_index_template_name(
self, search_index_template_details: dict
) -> Optional[str]:
"""
Get Search Index Template Name
"""
return search_index_template_details and search_index_template_details["name"]
def yield_search_index_template(
self, search_index_template_details: Any
) -> Iterable[Either[CreateSearchIndexRequest]]:
"""
Method to Get Search Index Template Entity
"""
try:
if self.source_config.includeIndexTemplate:
index_name = self.get_search_index_template_name(
search_index_template_details
)
index_template = search_index_template_details["index_template"]
if index_name:
search_index_template_request = CreateSearchIndexRequest(
name=EntityName(index_name),
displayName=index_name,
searchIndexSettings=index_template.get("template", {}).get(
"settings", {}
),
service=FullyQualifiedEntityName(
self.context.get().search_service
),
fields=parse_es_index_mapping(
index_template.get("template", {}).get("mappings")
),
indexType=IndexType.IndexTemplate,
description=index_template.get("_meta", {}).get("description"),
)
yield Either(right=search_index_template_request)
self.register_record(
search_index_request=search_index_template_request
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Could not include index templates due to {exc}")
def close(self):
try:
if Path(self.service_connection.sslConfig.certificates.stagingDir).exists():

View File

@ -83,7 +83,7 @@ class SearchServiceTopology(ServiceTopology):
cache_entities=True,
),
],
children=["search_index"],
children=["search_index", "search_index_template"],
post_process=["mark_search_indexes_as_deleted"],
)
search_index: Annotated[
@ -107,6 +107,21 @@ class SearchServiceTopology(ServiceTopology):
],
)
search_index_template: Annotated[
TopologyNode, Field(description="Search Index Template Processing Node")
] = TopologyNode(
producer="get_search_index_template",
stages=[
NodeStage(
type_=SearchIndex,
context="search_index_template",
processor="yield_search_index_template",
consumer=["search_service"],
use_cache=True,
)
],
)
class SearchServiceSource(TopologyRunnerMixin, Source, ABC):
"""
@ -178,6 +193,34 @@ class SearchServiceSource(TopologyRunnerMixin, Source, ABC):
continue
yield index_details
def yield_search_index_template(
self, search_index_template_details: Any
) -> Iterable[Either[CreateSearchIndexRequest]]:
"""Method to Get Search Index Templates"""
def get_search_index_template_list(self) -> Optional[List[Any]]:
"""Get list of all search index templates"""
def get_search_index_template_name(self, search_index_template_details: Any) -> str:
"""Get Search Index Template Name"""
def get_search_index_template(self) -> Any:
if self.source_config.includeIndexTemplate:
for index_template_details in self.get_search_index_template_list():
if search_index_template_name := self.get_search_index_template_name(
index_template_details
):
if filter_by_search_index(
self.source_config.searchIndexFilterPattern,
search_index_template_name,
):
self.status.filter(
search_index_template_name,
"Search Index Template Filtered Out",
)
continue
yield index_template_details
def yield_create_request_search_service(
self, config: WorkflowSource
) -> Iterable[Either[CreateSearchServiceRequest]]:

View File

@ -389,6 +389,7 @@ public class SearchIndexRepository extends EntityRepository<SearchIndex> {
original.getSearchIndexSettings(),
updated.getSearchIndexSettings());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
recordChange("indexType", original.getIndexType(), updated.getIndexType());
}
private void updateSearchIndexFields(

View File

@ -628,11 +628,14 @@ public class SearchIndexResource extends EntityResource<SearchIndex, SearchIndex
}
private SearchIndex getSearchIndex(CreateSearchIndex create, String user) {
return repository
.copy(new SearchIndex(), create, user)
.withService(getEntityReference(Entity.SEARCH_SERVICE, create.getService()))
.withFields(create.getFields())
.withSearchIndexSettings(create.getSearchIndexSettings())
.withSourceHash(create.getSourceHash());
SearchIndex searchIndex =
repository
.copy(new SearchIndex(), create, user)
.withService(getEntityReference(Entity.SEARCH_SERVICE, create.getService()))
.withFields(create.getFields())
.withSearchIndexSettings(create.getSearchIndexSettings())
.withSourceHash(create.getSourceHash())
.withIndexType(create.getIndexType());
return searchIndex;
}
}

View File

@ -31,6 +31,7 @@ public record SearchEntityIndex(org.openmetadata.schema.entity.data.SearchIndex
doc.put("tags", parseTags.getTags());
doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(searchIndex.getService()));
doc.put("indexType", searchIndex.getIndexType());
doc.put("lineage", SearchIndex.getLineageData(searchIndex.getEntityReference()));
return doc;
}

View File

@ -494,7 +494,10 @@
"format": "strict_date_optional_time||epoch_millis"
}
}
}
},
"indexType": {
"type": "text"
}
}
}
}

View File

@ -491,7 +491,10 @@
"format": "strict_date_optional_time||epoch_millis"
}
}
}
},
"indexType": {
"type": "text"
}
}
}
}

View File

@ -482,7 +482,10 @@
"format": "strict_date_optional_time||epoch_millis"
}
}
}
},
"indexType": {
"type": "text"
}
}
}
}

View File

@ -74,6 +74,11 @@
"type": "string",
"minLength": 1,
"maxLength": 32
},
"indexType": {
"description": "Whether the entity is index or index template.",
"$ref": "../../entity/data/searchIndex.json#/properties/indexType",
"default": "Index"
}
},
"required": ["name", "service", "fields"],

View File

@ -6,14 +6,18 @@
"description": "A `SearchIndex` is a index mapping definition in ElasticSearch or OpenSearch",
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.SearchIndex",
"javaInterfaces": ["org.openmetadata.schema.EntityInterface"],
"javaInterfaces": [
"org.openmetadata.schema.EntityInterface"
],
"definitions": {
"searchIndexSettings": {
"javaType": "org.openmetadata.schema.type.searchindex.SearchIndexSettings",
"description": "Contains key/value pair of SearchIndex Settings.",
"type": "object",
"additionalProperties": {
".{1,}": { "type": "string" }
".{1,}": {
"type": "string"
}
}
},
"searchIndexSampleData": {
@ -92,7 +96,9 @@
"searchIndexField": {
"type": "object",
"javaType": "org.openmetadata.schema.type.SearchIndexField",
"javaInterfaces": ["org.openmetadata.schema.FieldInterface"],
"javaInterfaces": [
"org.openmetadata.schema.FieldInterface"
],
"description": "This schema defines the type for a field in a searchIndex.",
"properties": {
"name": {
@ -232,15 +238,15 @@
"description": "Entity extension data with custom attributes added to the entity.",
"$ref": "../../type/basic.json#/definitions/entityExtension"
},
"domain" : {
"domain": {
"description": "Domain the SearchIndex belongs to. When not set, the SearchIndex inherits the domain from the messaging service it belongs to.",
"$ref": "../../type/entityReference.json"
},
"dataProducts" : {
"dataProducts": {
"description": "List of data products this entity is part of.",
"$ref" : "../../type/entityReferenceList.json"
"$ref": "../../type/entityReferenceList.json"
},
"votes" : {
"votes": {
"description": "Votes on the entity.",
"$ref": "../../type/votes.json"
},
@ -256,8 +262,20 @@
"type": "string",
"minLength": 1,
"maxLength": 32
},
"indexType": {
"description": "Whether the entity is index or index template.",
"type": "string",
"javaType": "org.openmetadata.schema.entity.type.SearchIndexType",
"enum": ["Index", "IndexTemplate"],
"default": "Index"
}
},
"required": ["id", "name", "service", "fields"],
"required": [
"id",
"name",
"service",
"fields"
],
"additionalProperties": false
}
}

View File

@ -50,6 +50,12 @@
"description": "Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source. If the toggle is set to true, the metadata fetched from the source will override the existing metadata in the OpenMetadata server. If the toggle is set to false, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. This is applicable for fields like description, tags, owner and displayName",
"type": "boolean",
"default": false
},
"includeIndexTemplate":{
"title": "Include Index Template",
"description": "Enable the 'Include Index Template' toggle to manage the ingestion of index template data.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false

View File

@ -53,6 +53,17 @@ This is applicable for fields like description, tags, owner and displayName
$$
$$section
### Include Index Template $(id="includeIndexTemplate")
`Include Index Template` toggle to manage the ingestion of index templates metadata from the source.
If the toggle is `enabled`, index templates metadata will be ingested from the source.
If the toggle is `disabled`, index templates metadata will not be ingested from the source.
$$
$$section
### Sample Size $(id="sampleSize")