Fix #12302: Add ElasticSearch Ingestion Source (#12892)

This commit is contained in:
Mayur Singal 2023-08-18 15:10:31 +05:30 committed by GitHub
parent 9f4440c562
commit 2f7a2193e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 981 additions and 31 deletions

View File

@ -0,0 +1,20 @@
source:
type: elasticsearch
serviceName: local_elasticsearch
serviceConnection:
config:
type: ElasticSearch
hostPort: localhost:9200
sourceConfig:
config:
type: SearchMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -0,0 +1,25 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Model required to ingest search index sample data
"""
from pydantic import BaseModel
from metadata.generated.schema.entity.data.searchIndex import (
SearchIndex,
SearchIndexSampleData,
)
class OMetaIndexSampleData(BaseModel):
entity: SearchIndex
data: SearchIndexSampleData

View File

@ -0,0 +1,75 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mixin class containing Search Index specific methods
To be used by OpenMetadata class
"""
import traceback
from typing import Optional
from metadata.generated.schema.entity.data.searchIndex import (
SearchIndex,
SearchIndexSampleData,
)
from metadata.ingestion.ometa.client import REST
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
class OMetaSearchIndexMixin:
"""
OpenMetadata API methods related to search index.
To be inherited by OpenMetadata
"""
client: REST
def ingest_search_index_sample_data(
self, search_index: SearchIndex, sample_data: SearchIndexSampleData
) -> Optional[SearchIndexSampleData]:
"""
PUT sample data for a search index
:param search_index: SearchIndex Entity to update
:param sample_data: Data to add
"""
resp = None
try:
resp = self.client.put(
f"{self.get_suffix(SearchIndex)}/{search_index.id.__root__}/sampleData",
data=sample_data.json(),
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to PUT sample data for {search_index.fullyQualifiedName.__root__}: {exc}"
)
if resp:
try:
return SearchIndexSampleData(**resp["sampleData"])
except UnicodeError as err:
logger.debug(traceback.format_exc())
logger.warning(
"Unicode Error parsing the sample data response "
f"from {search_index.fullyQualifiedName.__root__}: {err}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
"Error trying to parse sample data results"
f"from {search_index.fullyQualifiedName.__root__}: {exc}"
)
return None

View File

@ -95,6 +95,7 @@ from metadata.ingestion.ometa.mixins.patch_mixin import OMetaPatchMixin
from metadata.ingestion.ometa.mixins.pipeline_mixin import OMetaPipelineMixin
from metadata.ingestion.ometa.mixins.query_mixin import OMetaQueryMixin
from metadata.ingestion.ometa.mixins.role_policy_mixin import OMetaRolePolicyMixin
from metadata.ingestion.ometa.mixins.search_index_mixin import OMetaSearchIndexMixin
from metadata.ingestion.ometa.mixins.server_mixin import OMetaServerMixin
from metadata.ingestion.ometa.mixins.service_mixin import OMetaServiceMixin
from metadata.ingestion.ometa.mixins.table_mixin import OMetaTableMixin
@ -171,6 +172,7 @@ class OpenMetadata(
OMetaUserMixin,
OMetaQueryMixin,
OMetaRolePolicyMixin,
OMetaSearchIndexMixin,
Generic[T, C],
):
"""

View File

@ -41,6 +41,7 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
from metadata.ingestion.models.tests_data import (
OMetaLogicalTestSuiteSample,
OMetaTestCaseResultsSample,
@ -109,6 +110,9 @@ class MetadataRestSink(Sink[Entity]):
OMetaTestCaseResultsSample, self.write_test_case_results_sample
)
self.write_record.register(OMetaTopicSampleData, self.write_topic_sample_data)
self.write_record.register(
OMetaIndexSampleData, self.write_search_index_sample_data
)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
@ -467,5 +471,27 @@ class MetadataRestSink(Sink[Entity]):
f"Unexpected error while ingesting sample data for topic [{record.topic.name.__root__}]: {exc}"
)
def write_search_index_sample_data(self, record: OMetaIndexSampleData):
"""
Ingest Search Index Sample Data
"""
try:
if record.data.messages:
self.metadata.ingest_search_index_sample_data(
record.entity,
record.data,
)
logger.debug(
f"Successfully ingested sample data for {record.entity.name.__root__}"
)
self.status.records_written(
f"SearchIndexSampleData: {record.entity.name.__root__}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unexpected error while ingesting sample data for search index [{record.entity.name.__root__}]: {exc}"
)
def close(self):
pass

View File

@ -0,0 +1,83 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source connection handler
"""
from typing import Optional
from elasticsearch import Elasticsearch
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import (
ApiAuthentication,
BasicAuthentication,
ElasticsearchConnection,
)
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
"""
Create connection
"""
basic_auth = None
api_key = None
if isinstance(connection.authType, BasicAuthentication):
basic_auth = (
connection.authType.username,
connection.authType.password.get_secret_value(),
)
if isinstance(connection.authType, ApiAuthentication):
api_key = (
connection.authType.apiKeyId,
connection.authType.apiKey.get_secret_value(),
)
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
return Elasticsearch(
[connection.hostPort],
basic_auth=basic_auth,
api_key=api_key,
scheme=connection.scheme.value,
**connection.connectionArguments.__root__
)
def test_connection(
metadata: OpenMetadata,
client: Elasticsearch,
service_connection: ElasticsearchConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
test_fn = {
"CheckAccess": client.info,
"GetSearchIndexes": client.indices.get_alias,
}
test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)

View File

@ -0,0 +1,123 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Elasticsearch source to extract metadata
"""
from typing import Any, Iterable, Optional
from elasticsearch import Elasticsearch
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.entity.data.searchIndex import SearchIndexSampleData
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import (
ElasticsearchConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
from metadata.ingestion.source.search.elasticsearch.parser import parse_es_index_mapping
from metadata.ingestion.source.search.search_service import SearchServiceSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
WILDCARD_SEARCH = "*"
class ElasticsearchSource(SearchServiceSource):
"""
Implements the necessary methods ot extract
Search Index metadata from Elastic Search
"""
def __init__(self, config: Source, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.client: Elasticsearch = self.connection
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: ElasticsearchConnection = config.serviceConnection.__root__.config
if not isinstance(connection, ElasticsearchConnection):
raise InvalidSourceException(
f"Expected ElasticsearchConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_search_index_list(self) -> Iterable[dict]:
"""
Get List of all search index
"""
index_list = self.client.indices.get_alias() or {}
for index in index_list.keys():
yield self.client.indices.get(index)
def get_search_index_name(self, search_index_details: dict) -> Optional[str]:
"""
Get Search Index Name
"""
if search_index_details and len(search_index_details) == 1:
return list(search_index_details.keys())[0]
return None
def yield_search_index(
self, search_index_details: Any
) -> Iterable[CreateSearchIndexRequest]:
"""
Method to Get Search Index Entity
"""
index_name = self.get_search_index_name(search_index_details)
if index_name:
yield CreateSearchIndexRequest(
name=index_name,
displayName=index_name,
searchIndexSettings=search_index_details.get(index_name, {}).get(
"settings", {}
),
service=self.context.search_service.fullyQualifiedName.__root__,
fields=parse_es_index_mapping(
search_index_details.get(index_name, {}).get("mappings")
),
)
def yield_search_index_sample_data(
self, search_index_details: Any
) -> Iterable[OMetaIndexSampleData]:
"""
Method to Get Sample Data of Search Index Entity
"""
if self.source_config.includeSampleData and self.context.search_index:
sample_data = self.client.search(
index=self.context.search_index.name.__root__,
q=WILDCARD_SEARCH,
size=self.source_config.sampleSize,
request_timeout=self.service_connection.connectionTimeoutSecs,
)
yield OMetaIndexSampleData(
entity=self.context.search_index,
data=SearchIndexSampleData(
messages=[
str(message)
for message in sample_data.get("hits", {}).get("hits", [])
]
),
)

View File

@ -0,0 +1,62 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to parse the jsonschema
"""
import traceback
from typing import List, Optional
from metadata.generated.schema.entity.data.searchIndex import DataType, SearchIndexField
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
# If any type of ES field is not recognized mark it as unknown
# pylint: disable=no-member,unused-argument,protected-access
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
DataType._missing_ = _missing_
def parse_es_index_mapping(mapping: dict) -> Optional[List[SearchIndexField]]:
"""
Recursively convert the parsed schema into required models
"""
field_models = []
try:
properties = mapping.get("properties", {})
for key, value in properties.items():
data_type = (
DataType(value.get("type").upper())
if value.get("type")
else DataType.OBJECT
)
field_models.append(
SearchIndexField(
name=key,
dataType=data_type,
description=value.get("description"),
children=parse_es_index_mapping(value)
if value.get("properties")
else None,
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the index properties: {exc}")
return field_models

View File

@ -0,0 +1,226 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for ingesting search index services
"""
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional, Set
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.entity.data.searchIndex import (
SearchIndex,
SearchIndexSampleData,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.searchService import (
SearchConnection,
SearchService,
)
from metadata.generated.schema.metadataIngestion.searchServiceMetadataPipeline import (
SearchServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import (
DeleteEntity,
delete_entity_from_source,
)
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.utils import fqn
from metadata.utils.filters import filter_by_search_index
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SearchServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Search Services.
We could have a topology validator. We can only consume
data that has been produced by any parent node.
"""
root = TopologyNode(
producer="get_services",
stages=[
NodeStage(
type_=SearchService,
context="search_service",
processor="yield_create_request_search_service",
overwrite=False,
must_return=True,
),
],
children=["search_index"],
post_process=["mark_search_indexes_as_deleted"],
)
search_index = TopologyNode(
producer="get_search_index",
stages=[
NodeStage(
type_=SearchIndex,
context="search_index",
processor="yield_search_index",
consumer=["search_service"],
),
NodeStage(
type_=OMetaIndexSampleData,
context="search_index_sample_data",
processor="yield_search_index_sample_data",
consumer=["search_service"],
ack_sink=False,
nullable=True,
),
],
)
class SearchServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Base class for Search Services.
It implements the topology and context.
"""
source_config: SearchServiceMetadataPipeline
config: WorkflowSource
# Big union of types we want to fetch dynamically
service_connection: SearchConnection.__fields__["config"].type_
topology = SearchServiceTopology()
context = create_source_context(topology)
index_source_state: Set = set()
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.source_config: SearchServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.service_connection = self.config.serviceConnection.__root__.config
self.connection = get_connection(self.service_connection)
# Flag the connection for the test connection
self.connection_obj = self.connection
self.test_connection()
@abstractmethod
def yield_search_index(
self, search_index_details: Any
) -> Iterable[CreateSearchIndexRequest]:
"""
Method to Get Search Index Entity
"""
def yield_search_index_sample_data(
self, search_index_details: Any
) -> Iterable[SearchIndexSampleData]:
"""
Method to Get Sample Data of Search Index Entity
"""
@abstractmethod
def get_search_index_list(self) -> Optional[List[Any]]:
"""
Get List of all search index
"""
@abstractmethod
def get_search_index_name(self, search_index_details: Any) -> str:
"""
Get Search Index Name
"""
def get_search_index(self) -> Any:
for index_details in self.get_search_index_list():
search_index_name = self.get_search_index_name(index_details)
if filter_by_search_index(
self.source_config.searchIndexFilterPattern,
search_index_name,
):
self.status.filter(
search_index_name,
"Search Index Filtered Out",
)
continue
yield index_details
def yield_create_request_search_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=SearchService, config=config
)
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def prepare(self):
"""
Nothing to prepare by default
"""
def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
test_connection_fn(self.metadata, self.connection_obj, self.service_connection)
def mark_search_indexes_as_deleted(self) -> Iterable[DeleteEntity]:
"""
Method to mark the search index as deleted
"""
if self.source_config.markDeletedSearchIndexes:
yield from delete_entity_from_source(
metadata=self.metadata,
entity_type=SearchIndex,
entity_source_state=self.index_source_state,
mark_deleted_entity=self.source_config.markDeletedSearchIndexes,
params={
"service": self.context.search_service.fullyQualifiedName.__root__
},
)
def register_record(self, search_index_request: CreateSearchIndexRequest) -> None:
"""
Mark the search index record as scanned and update the index_source_state
"""
index_fqn = fqn.build(
self.metadata,
entity_type=SearchIndex,
service_name=search_index_request.service.__root__,
search_index_name=search_index_request.name.__root__,
)
self.index_source_state.add(index_fqn)
self.status.scanned(search_index_request.name.__root__)
def close(self):
"""
Nothing to close by default
"""

View File

@ -238,3 +238,18 @@ def filter_by_datamodel(
:return: True for filtering, False otherwise
"""
return _filter(datamodel_filter_pattern, datamodel_name)
def filter_by_search_index(
search_index_filter_pattern: Optional[FilterPattern], search_index_name: str
) -> bool:
"""
Return True if the models needs to be filtered, False otherwise
Include takes precedence over exclude
:param search_index_filter_pattern: Model defining search index filtering logic
:param search_index_name: search index name
:return: True for filtering, False otherwise
"""
return _filter(search_index_filter_pattern, search_index_name)

View File

@ -33,6 +33,7 @@ from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.teams.team import Team
@ -264,6 +265,20 @@ def _(
return _build(service_name, topic_name)
@fqn_build_registry.add(SearchIndex)
def _(
_: OpenMetadata, # ES Index not necessary for Search Index FQN building
*,
service_name: str,
search_index_name: str,
) -> str:
if not service_name or not search_index_name:
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, search_index=`{search_index_name}``"
)
return _build(service_name, search_index_name)
@fqn_build_registry.add(Tag)
def _(
_: OpenMetadata, # ES Index not necessary for Tag FQN building

View File

@ -0,0 +1,218 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test ES using the topology
"""
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.entity.data.searchIndex import DataType, SearchIndexField
from metadata.generated.schema.entity.services.searchService import (
SearchConnection,
SearchService,
SearchServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.source.search.elasticsearch.metadata import ElasticsearchSource
mock_es_config = {
"source": {
"type": "elasticsearch",
"serviceName": "local_elasticsearch",
"serviceConnection": {
"config": {
"type": "ElasticSearch",
"authType": {
"username": "username",
"password": "password",
},
"hostPort": "localhost:9200",
}
},
"sourceConfig": {"config": {"type": "SearchMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_SETTINGS = {
"index": {
"routing": {"allocation": {"include": {"_tier_preference": "data_content"}}},
"number_of_shards": "1",
"provided_name": "test_case_search_index",
"creation_date": "1692181190239",
"analysis": {
"filter": {"om_stemmer": {"name": "english", "type": "stemmer"}},
"normalizer": {
"lowercase_normalizer": {
"filter": ["lowercase"],
"type": "custom",
"char_filter": [],
}
},
"analyzer": {
"om_ngram": {
"filter": ["lowercase"],
"min_gram": "1",
"max_gram": "2",
"tokenizer": "ngram",
},
"om_analyzer": {
"filter": ["lowercase", "om_stemmer"],
"tokenizer": "letter",
},
},
},
"number_of_replicas": "1",
"uuid": "8HAGhnVkSy-X__XwWFdJqg",
"version": {"created": "7160399"},
}
}
MOCK_DETAILS = {
"test_case_search_index": {
"aliases": {},
"mappings": {
"properties": {
"href": {"type": "text"},
"name": {
"type": "text",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256},
"ngram": {"type": "text", "analyzer": "om_ngram"},
},
"analyzer": "om_analyzer",
},
"owner": {
"properties": {
"deleted": {"type": "text"},
"description": {"type": "text"},
"displayName": {
"type": "text",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
},
},
"fullyQualifiedName": {"type": "text"},
"href": {"type": "text"},
"id": {"type": "text"},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
},
},
"type": {"type": "keyword"},
}
},
}
},
"settings": MOCK_SETTINGS,
}
}
MOCK_SEARCH_SERVICE = SearchService(
id="85811038-099a-11ed-861d-0242ac120002",
name="es_source",
fullyQualifiedName="es_source",
connection=SearchConnection(),
serviceType=SearchServiceType.ElasticSearch,
)
EXPECTED_RESULT = CreateSearchIndexRequest(
name="test_case_search_index",
displayName="test_case_search_index",
searchIndexSettings=MOCK_SETTINGS,
service="es_source",
fields=[
SearchIndexField(
name="href",
dataType=DataType.TEXT,
),
SearchIndexField(
name="name",
dataType=DataType.TEXT,
),
SearchIndexField(
name="owner",
dataType=DataType.OBJECT,
children=[
SearchIndexField(
name="deleted",
dataType=DataType.TEXT,
),
SearchIndexField(
name="description",
dataType=DataType.TEXT,
),
SearchIndexField(
name="displayName",
dataType=DataType.TEXT,
),
SearchIndexField(
name="fullyQualifiedName",
dataType=DataType.TEXT,
),
SearchIndexField(
name="href",
dataType=DataType.TEXT,
),
SearchIndexField(
name="id",
dataType=DataType.TEXT,
),
SearchIndexField(
name="name",
dataType=DataType.KEYWORD,
),
SearchIndexField(
name="type",
dataType=DataType.KEYWORD,
),
],
),
],
)
class ElasticSearchUnitTest(TestCase):
@patch(
"metadata.ingestion.source.search.search_service.SearchServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_es_config)
self.es_source = ElasticsearchSource.create(
mock_es_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.es_source.context.__dict__["search_service"] = MOCK_SEARCH_SERVICE
def test_partition_parse_columns(self):
actual_index = self.es_source.yield_search_index(MOCK_DETAILS)
self.assertEqual(list(actual_index), [EXPECTED_RESULT])

View File

@ -0,0 +1,21 @@
{
"name": "ElasticSearch",
"displayName": "ElasticSearch Test Connection",
"description": "This Test Connection validates the access against the server and basic metadata extraction of dashboards and charts.",
"steps": [
{
"name": "CheckAccess",
"description": "Validate that the API is accessible with the given credentials",
"errorMessage": "Failed to connect to elasticsearch, please validate the credentials",
"shortCircuit": true,
"mandatory": true
},
{
"name": "GetSearchIndexes",
"description": "Validate that the API is accessible with the given credentials",
"errorMessage": "Failed to connect to elasticsearch, please validate the credentials",
"shortCircuit": true,
"mandatory": true
}
]
}

View File

@ -11,7 +11,10 @@
"searchIndexSettings": {
"javaType": "org.openmetadata.schema.type.searchindex.SearchIndexSettings",
"description": "Contains key/value pair of SearchIndex Settings.",
"type": "object"
"type": "object",
"additionalProperties": {
".{1,}": { "type": "string" }
}
},
"searchIndexSampleData": {
"type": "object",
@ -63,7 +66,8 @@
"GEO_SHAPE",
"POINT",
"SHAPE",
"PERCOLATOR"
"PERCOLATOR",
"UNKNOWN"
]
},
"searchIndexFieldName": {

View File

@ -11,6 +11,42 @@
"type": "string",
"enum": ["ElasticSearch"],
"default": "ElasticSearch"
},
"connectionScheme": {
"description": "ElasticSearch Connection Scheme",
"type": "string",
"enum": ["http", "https"],
"default": "http"
},
"basicAuthentication": {
"properties": {
"username": {
"description": "Elastic Search Username for Login",
"type": "string"
},
"password": {
"description": "Elastic Search Password for Login",
"type": "string",
"format": "password"
}
},
"required": ["username","password"],
"type": "object"
},
"apiAuthentication": {
"type": "object",
"properties": {
"apiKeyId": {
"description": "Elastic Search API Key ID for API Authentication",
"type": "string"
},
"apiKey": {
"description": "Elastic Search API Key for API Authentication",
"type": "string",
"format": "password"
}
},
"required": ["apiKeyId","apiKey"]
}
},
"properties": {
@ -28,41 +64,25 @@
"scheme": {
"description": "Http/Https connection scheme",
"type": "string",
"$ref": "#/definitions/connectionScheme",
"default": "http"
},
"username": {
"description": "Elastic Search Username for Login",
"type": "string"
},
"password": {
"description": "Elastic Search Password for Login",
"type": "string"
},
"truststorePath": {
"description": "Truststore Path",
"type": "string"
},
"truststorePassword": {
"description": "Truststore Password",
"type": "string"
"authType": {
"title": "Auth Configuration Type",
"description": "Choose Auth Config Type.",
"oneOf": [
{
"$ref": "#/definitions/basicAuthentication"
},
{
"$ref": "#/definitions/apiAuthentication"
}
]
},
"connectionTimeoutSecs": {
"description": "Connection Timeout in Seconds",
"type": "integer",
"default": 5
},
"socketTimeoutSecs": {
"description": "Socket Timeout in Seconds",
"type": "integer",
"default": 60
},
"keepAliveTimeoutSecs": {
"description": "Keep Alive Timeout in Seconds",
"type": "integer"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
"default": 30
},
"connectionArguments": {
"title": "Connection Arguments",

View File

@ -21,6 +21,21 @@
"searchIndexFilterPattern": {
"description": "Regex to only fetch search indexes that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"markDeletedSearchIndexes": {
"description": "Optional configuration to soft delete search indexes in OpenMetadata if the source search indexes are deleted. Also, if the search index is deleted, all the associated entities like lineage, etc., with that search index will be deleted",
"type": "boolean",
"default": true
},
"includeSampleData": {
"description": "Optional configuration to turn off fetching sample data for search index.",
"type": "boolean",
"default": true
},
"sampleSize": {
"description": "No. of rows of sample data we want to ingest.",
"default": 10,
"type": "integer"
}
},
"additionalProperties": false