diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index eebf96cd1d7..3b76e998502 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -7,15 +7,13 @@ "type":"MetadataES" } }, - "sourceConfig":{"config":{}} + "sourceConfig":{"config":{ + "type": "MetadataToElasticSearch" + }} }, "sink": { - "type": "elasticsearch", - "config": { - "es_host": "localhost", - "es_port": 9200, - "recreate_indexes": true - } + "type": "metadata-rest", + "config": {} }, "workflowConfig": { "openMetadataServerConfig": { diff --git a/ingestion/pipelines/metadata_to_es.yaml b/ingestion/pipelines/metadata_to_es.yaml index f43d1b7d9e8..76003926170 100644 --- a/ingestion/pipelines/metadata_to_es.yaml +++ b/ingestion/pipelines/metadata_to_es.yaml @@ -5,13 +5,11 @@ source: config: type: MetadataES sourceConfig: - config: {} + config: + type: MetadataToElasticSearch sink: - type: elasticsearch - config: - es_host: localhost - es_port: 9200 - recreate_indexes: true + type: metadata-rest + config: {} workflowConfig: openMetadataServerConfig: hostPort: http://localhost:8585/api diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index e2696c91b62..b5898a09c7c 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -19,7 +19,11 @@ from typing import Generic, List, Optional, Type, TypeVar from pydantic import BaseModel -from metadata.ingestion.ometa.client import REST +from metadata.generated.schema.api.createEventPublisherJob import ( + CreateEventPublisherJob, +) +from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult +from metadata.ingestion.ometa.client import REST, APIError from metadata.utils.elasticsearch import ES_INDEX_MAP from metadata.utils.logger import ometa_logger @@ -108,3 +112,30 @@ class ESMixin(Generic[T]): f"Elasticsearch search failed for query [{query_string}]: {exc}" ) return None + + def reindex_es( + self, + config: CreateEventPublisherJob, + ) -> Optional[EventPublisherResult]: + """ + Method to trigger elasticsearch reindex + """ + try: + resp = self.client.post(path="/search/reindex", data=config.json()) + return EventPublisherResult(**resp) + except APIError as err: + logger.debug(traceback.format_exc()) + logger.debug(f"Failed to trigger es reindex job due to {err}") + return None + + def get_reindex_job_status(self, job_id: str) -> Optional[EventPublisherResult]: + """ + Method to fetch the elasticsearch reindex job status + """ + try: + resp = self.client.get(path=f"/search/reindex/{job_id}") + return EventPublisherResult(**resp) + except APIError as err: + logger.debug(traceback.format_exc()) + logger.debug(f"Failed to fetch reindex job status due to {err}") + return None diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata.py deleted file mode 100644 index 53bcb985a1a..00000000000 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata.py +++ /dev/null @@ -1,227 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Metadata source module""" -import traceback -from typing import Iterable - -from metadata.generated.schema.entity.classification.classification import ( - Classification, -) -from metadata.generated.schema.entity.data.container import Container -from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.glossary import Glossary -from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm -from metadata.generated.schema.entity.data.mlmodel import MlModel -from metadata.generated.schema.entity.data.pipeline import Pipeline -from metadata.generated.schema.entity.data.query import Query -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.policies.policy import Policy -from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( - MetadataESConnection, -) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.messagingService import MessagingService -from metadata.generated.schema.entity.services.pipelineService import PipelineService -from metadata.generated.schema.entity.services.storageService import StorageService -from metadata.generated.schema.entity.teams.team import Team -from metadata.generated.schema.entity.teams.user import User -from metadata.generated.schema.metadataIngestion.workflow import ( - Source as WorkflowSource, -) -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -class MetadataSource(Source[Entity]): - """ - Metadata Source to Fetch All Entities from backend - """ - - config: WorkflowSource - - def __init__( - self, - config: WorkflowSource, - metadata_config: OpenMetadataConnection, - ): - super().__init__() - self.config = config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(metadata_config) - self.service_connection: MetadataESConnection = ( - config.serviceConnection.__root__.config - ) - self.wrote_something = False - self.tables = None - self.topics = None - - def prepare(self): - pass - - @classmethod - def create(cls, config_dict, metadata_config: OpenMetadataConnection): - raise NotImplementedError("Create Method not implemented") - - def next_record(self) -> Iterable[Entity]: # pylint: disable=too-many-branches - if self.service_connection.includeTables: - yield from self.fetch_entities( - entity_class=Table, - fields=[ - "columns", - "tableConstraints", - "usageSummary", - "owner", - "tags", - "followers", - ], - ) - if self.service_connection.includeTopics: - yield from self.fetch_entities( - entity_class=Topic, - fields=["owner", "tags", "followers"], - ) - if self.service_connection.includeDashboards: - yield from self.fetch_entities( - entity_class=Dashboard, - fields=[ - "owner", - "tags", - "followers", - "charts", - "usageSummary", - ], - ) - - if self.service_connection.includePipelines: - yield from self.fetch_entities( - entity_class=Pipeline, - fields=["owner", "tags", "followers", "tasks"], - ) - if self.service_connection.includeMlModels: - yield from self.fetch_entities( - entity_class=MlModel, - fields=["owner", "tags", "followers"], - ) - if self.service_connection.includeUsers: - yield from self.fetch_entities( - entity_class=User, - fields=["teams", "roles"], - ) - - if self.service_connection.includeTeams: - yield from self.fetch_entities( - entity_class=Team, - fields=["users", "owns", "parents"], - ) - - if self.service_connection.includeGlossaryTerms: - yield from self.fetch_entities( - entity_class=GlossaryTerm, - fields=[], - ) - yield from self.fetch_entities( - entity_class=Glossary, - fields=["owner", "tags", "reviewers", "usageCount"], - ) - - if self.service_connection.includePolicy: - yield from self.fetch_entities( - entity_class=Policy, - fields=[], - ) - if self.service_connection.includeTags: - yield from self.fetch_entities( - entity_class=Classification, - fields=[], - ) - - if self.service_connection.includeMessagingServices: - yield from self.fetch_entities( - entity_class=MessagingService, - fields=["owner"], - ) - - if self.service_connection.includeDatabaseServices: - yield from self.fetch_entities( - entity_class=DatabaseService, - fields=["owner"], - ) - - if self.service_connection.includePipelineServices: - yield from self.fetch_entities( - entity_class=PipelineService, - fields=["owner"], - ) - - if self.service_connection.includeContainers: - yield from self.fetch_entities( - entity_class=Container, - fields=["owner"], - ) - - if self.service_connection.includeStorageServices: - yield from self.fetch_entities( - entity_class=StorageService, - fields=["owner"], - ) - - if self.service_connection.includeQueries: - yield from self.fetch_entities( - entity_class=Query, - fields=["owner"], - ) - - def fetch_entities(self, entity_class, fields): - """ - Args: - entity_class: class of the entities to be fetched - fields: fields that must be additionally fetched - - Returns: - A list of entities with the requested fields - """ - try: - after = None - while True: - entities_list = self.metadata.list_entities( - entity=entity_class, - fields=fields, - after=after, - limit=self.service_connection.limitRecords, - ) - for entity in entities_list.entities: - self.status.scanned( - f"{entity_class.__name__} Scanned {entity.name}" - ) - yield entity - if entities_list.after is None: - break - after = entities_list.after - - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Fetching entities failed for [{entity_class.__name__}]: {exc}" - ) - - def close(self): - pass - - def test_connection(self) -> None: - pass diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py index 598698a6b75..ad1faffdc8f 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch/metadata.py @@ -8,8 +8,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Metadata source module""" +"""Metadata ES source module""" +import traceback +from time import sleep +from typing import Optional + +from metadata.generated.schema.api.createEventPublisherJob import ( + CreateEventPublisherJob, +) from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( MetadataESConnection, ) @@ -19,19 +26,47 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.metadata.metadata import MetadataSource +from metadata.generated.schema.system.eventPublisherJob import ( + EventPublisherResult, + PublisherType, + RunMode, + Status, +) +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.utils import model_str from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -class MetadataElasticsearchSource(MetadataSource): +class MetadataElasticsearchSource(Source[Entity]): """ Metadata Elasticsearch Source Used for metadata to ES pipeline """ + config: WorkflowSource + + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + self.service_connection: MetadataESConnection = ( + config.serviceConnection.__root__.config + ) + self.source_config = self.config.sourceConfig.config + self.reindex_job: Optional[EventPublisherResult] = None + + def prepare(self): + pass + @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -41,3 +76,73 @@ class MetadataElasticsearchSource(MetadataSource): f"Expected MetadataESConnection, but got {connection}" ) return cls(config, metadata_config) + + def create_reindex_job(self, job_config: CreateEventPublisherJob): + """ + Patch table constraints + """ + try: + self.reindex_job = self.metadata.reindex_es(config=job_config) + logger.debug("Successfully created the elasticsearch reindex job") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Unexpected error while triggering elasticsearch reindex job: {exc}" + ) + + def next_record(self) -> None: + job_config = CreateEventPublisherJob( + name=self.config.serviceName, + publisherType=PublisherType.elasticSearch, + runMode=RunMode.batch, + batchSize=self.source_config.batchSize, + searchIndexMappingLanguage=self.source_config.searchIndexMappingLanguage, + entities=self.service_connection.entities, + recreateIndex=self.source_config.recreateIndex, + ) + + self.create_reindex_job(job_config) + + if self.reindex_job: + self.log_reindex_status() + + yield from [] # nothing to yield + + def log_reindex_status(self) -> None: + """ + Method to log re-indexing job status. + """ + status = None + total_retries_count = 3 + current_try = 1 + + while status not in {Status.COMPLETED, Status.FAILED, Status.STOPPED}: + sleep(5) + job = self.metadata.get_reindex_job_status(model_str(self.reindex_job.id)) + if job and job.stats and job.stats.jobStats: + logger.info( + f"Processed {job.stats.jobStats.processedRecords} records," + f"{job.stats.jobStats.successRecords} succeeded" + f"and {job.stats.jobStats.failedRecords} failed." + ) + status = job.status + current_try = 1 + else: + logger.warning("Failed to fetch job stats") + current_try += 1 + + if current_try >= total_retries_count: + logger.error( + f"Failed to fetch job stats after {total_retries_count} retries" + ) + break + + if job.failure and job.failure.jobError: + logger.warning(f"Failure Context: {job.failure.jobError.context}") + logger.warning(f"Last Error: {job.failure.jobError.lastFailedReason}") + + def close(self): + self.metadata.close() + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/metadata/openmetadata/__init__.py b/ingestion/src/metadata/ingestion/source/metadata/openmetadata/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/ingestion/src/metadata/ingestion/source/metadata/openmetadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/openmetadata/metadata.py deleted file mode 100644 index f3e977342d9..00000000000 --- a/ingestion/src/metadata/ingestion/source/metadata/openmetadata/metadata.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Metadata source module""" - - -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - Source as WorkflowSource, -) -from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.metadata.metadata import MetadataSource -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -class OpenmetadataSource(MetadataSource): - """Metadata source Class""" - - @classmethod - def create(cls, config_dict, metadata_config: OpenMetadataConnection): - config: WorkflowSource = WorkflowSource.parse_obj(config_dict) - connection: OpenMetadataConnection = config.serviceConnection.__root__.config - if not isinstance(connection, OpenMetadataConnection): - raise InvalidSourceException( - f"Expected OpenMetadataConnection, but got {connection}" - ) - return cls(config, metadata_config) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py index bcaf96e0636..f89ce954c38 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py @@ -17,9 +17,6 @@ from openmetadata_managed_apis.workflows.ingestion.common import ( build_dag, metadata_ingestion_workflow, ) -from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import ( - build_elasticsearch_sink, -) from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( MetadataESConnection, @@ -39,11 +36,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.metadataIngestion.workflow import ( - SourceConfig, - WorkflowConfig, -) -from metadata.generated.schema.type.basic import ComponentConfig +from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -67,16 +60,14 @@ def build_es_reindex_workflow_config( "Could not retrieve the OpenMetadata service! This should not happen." ) - sink = build_elasticsearch_sink( - openmetadata_service.connection.config, ingestion_pipeline - ) + sink = Sink(type="metadata-rest", config={}) workflow_config = OpenMetadataWorkflowConfig( source=WorkflowSource( type="metadata_elasticsearch", serviceName=ingestion_pipeline.service.fullyQualifiedName, serviceConnection=MetadataConnection(config=MetadataESConnection()), - sourceConfig=SourceConfig(), + sourceConfig=ingestion_pipeline.sourceConfig, ), sink=sink, workflowConfig=WorkflowConfig( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java index 142d501e27a..752b709ccf8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java @@ -487,8 +487,8 @@ public class SearchResource { @Context SecurityContext securityContext, @Parameter(description = "jobId Id", schema = @Schema(type = "UUID")) @PathParam("jobId") UUID id) throws IOException { - // Only admins can issue a reindex request - authorizer.authorizeAdmin(securityContext); + // Only admins or bot can issue a reindex request + authorizer.authorizeAdminOrBot(securityContext); return Response.status(Response.Status.OK).entity(ReIndexingHandler.getInstance().getJob(id)).build(); } @@ -523,7 +523,8 @@ public class SearchResource { @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createRequest) { - authorizer.authorizeAdmin(securityContext); + // Only admins or bot can issue a reindex request + authorizer.authorizeAdminOrBot(securityContext); return Response.status(Response.Status.CREATED) .entity( ReIndexingHandler.getInstance() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java index a72f6477989..3a1e824c635 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java @@ -44,7 +44,7 @@ public interface Authorizer { void authorizeAdmin(SecurityContext securityContext); - boolean decryptSecret(SecurityContext securityContext); + void authorizeAdminOrBot(SecurityContext securityContext); boolean shouldMaskPasswords(SecurityContext securityContext); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java index 90a1c1e1287..8c01fd0a231 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java @@ -87,9 +87,12 @@ public class DefaultAuthorizer implements Authorizer { } @Override - public boolean decryptSecret(SecurityContext securityContext) { + public void authorizeAdminOrBot(SecurityContext securityContext) { SubjectContext subjectContext = getSubjectContext(securityContext); - return subjectContext.isAdmin() || subjectContext.isBot(); + if (subjectContext.isAdmin() || subjectContext.isBot()) { + return; + } + throw new AuthorizationException(notAdmin(securityContext.getUserPrincipal().getName())); } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java index 015c6366464..7e2b247571a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java @@ -101,8 +101,8 @@ public class NoopAuthorizer implements Authorizer { } @Override - public boolean decryptSecret(SecurityContext securityContext) { - return true; // Always decrypt + public void authorizeAdminOrBot(SecurityContext securityContext) { + /* Always authorize */ } @Override diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json index 9ae94f3ef9b..1425a4562f2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json @@ -9,7 +9,9 @@ "metadataESType": { "description": "Metadata to Elastic Search type", "type": "string", - "enum": ["MetadataES"], + "enum": [ + "MetadataES" + ], "default": "MetadataES" } }, @@ -19,94 +21,52 @@ "$ref": "#/definitions/metadataESType", "default": "MetadataES" }, - "includeTopics": { - "description": "Include Topics for Indexing", - "type": "boolean", - "default": "true" + "entities": { + "title": "Entities", + "description": "List of entities that you need to reindex", + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "table", + "topic", + "dashboard", + "pipeline", + "mlmodel", + "user", + "team", + "glossaryTerm", + "tag", + "entityReportData", + "webAnalyticEntityViewReportData", + "webAnalyticUserActivityReportData", + "container", + "query" + ], + "uniqueItems": true }, - "includeTables": { - "description": "Include Tables for Indexing", - "type": "boolean", - "default": "true" - }, - "includeDashboards": { - "description": "Include Dashboards for Indexing", - "type": "boolean", - "default": "true" - }, - "includePipelines": { - "description": "Include Pipelines for Indexing", - "type": "boolean", - "default": "true" - }, - "includeMlModels": { - "description": "Include MlModels for Indexing", - "type": "boolean", - "default": "true" - }, - "includeUsers": { - "description": "Include Users for Indexing", - "type": "boolean", - "default": "true" - }, - "includeTeams": { - "description": "Include Teams for Indexing", - "type": "boolean", - "default": "true" - }, - "includeGlossaryTerms": { - "description": "Include Glossary Terms for Indexing", - "type": "boolean", - "default": "true" - }, - "includePolicy": { - "description": "Include Tags for Policy", + "recreateIndex": { + "title": "Recreate Indexes", "type": "boolean", "default": true }, - "includeMessagingServices": { - "description": "Include Messaging Services for Indexing", - "type": "boolean", - "default": true + "runMode": { + "$ref": "../../../../system/eventPublisherJob.json#/definitions/runMode" }, - "includeDatabaseServices": { - "description": "Include Database Services for Indexing", - "type": "boolean", - "default": true + "searchIndexMappingLanguage": { + "description": "Recreate Indexes with updated Language", + "$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" }, - "includePipelineServices": { - "description": "Include Pipeline Services for Indexing", - "type": "boolean", - "default": true - }, - "includeTags": { - "description": "Include Tags for Indexing", - "type": "boolean", - "default": true - }, - "includeContainers": { - "description": "Include Containers for Indexing", - "type": "boolean", - "default": true - }, - "includeStorageServices": { - "description": "Include Storage Services for Indexing", - "type": "boolean", - "default": true - }, - "includeQueries": { - "description": "Include Queries for Indexing", - "type": "boolean", - "default": true - }, - "limitRecords": { - "description": "Limit the number of records for Indexing.", + "batchSize": { + "title": "Batch Size", + "description": "Maximum number of events sentx in a batch (Default 10).", "type": "integer", - "default": "1000" + "default": 100 }, "supportsMetadataExtraction": { "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" } }, "additionalProperties": false -} +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/metadataToElasticSearchPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/metadataToElasticSearchPipeline.json index 97a007bc2d2..43c4b401570 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/metadataToElasticSearchPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/metadataToElasticSearchPipeline.json @@ -51,6 +51,21 @@ "description": "Region name. Required when using AWS Credentials.", "type": "string", "default": null + }, + "searchIndexMappingLanguage": { + "description": "Recreate Indexes with updated Language", + "$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" + }, + "batchSize": { + "title": "Batch Size", + "description": "Maximum number of events entities in a batch (Default 1000).", + "type": "integer", + "default": 1000 + }, + "recreateIndex": { + "title": "Recreate Indexes", + "type": "boolean", + "default": true } }, "required": ["type"], diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Metadata/workflows/elasticSearchReindex.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Metadata/workflows/elasticSearchReindex.md index 62f24cc21ea..5f9f6fc8943 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Metadata/workflows/elasticSearchReindex.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Metadata/workflows/elasticSearchReindex.md @@ -11,46 +11,21 @@ $$ $$section -### CA Certificates $(id="caCerts") +### Batch Size $(id="batchSize") -The Certificate path needs to be added in the configuration. The path should be local in the Ingestion Container. +Maximum number of entities that are processed together in one iteration. $$ $$section -### Region Name $(id="regionName") +### Search Index Mapping Language $(id="searchIndexMappingLanguage") -Region name is required when using AWS Credentials. - -Each AWS Region is a separate geographic area in which AWS clusters data centers ([docs](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html)). - -As AWS can have instances in multiple regions, we need to know the region the service you want reach belongs to. +Select the default language for reindexing search. $$ $$section -### Timeout $(id="timeout") +### Recreate Index $(id="recreateIndex") -Connection Timeout. -$$ - -$$section - -### Use AWS Credentials $(id="useAwsCredentials") - -Indicates whether to use AWS credentials when connecting to OpenSearch in AWS. -$$ - -$$section - -### Verify Certificates $(id="useSSL") - -This indicates whether to use SSL when connecting to Elasticsearch. By default, we will ignore SSL settings. -$$ - -$$section - -### Enable Debug Logs $(id="verifyCerts") - -This indicates whether to verify certificates when using SSL connection to Elasticsearch. It's ignored by default and is set to true. Ensure that you send the certificates in the property `CA Certificates`. +This option if enabled, will delete the existing indexes and create them again. $$ \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/MetadataToESConfigForm/MetadataToESConfigForm.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/MetadataToESConfigForm/MetadataToESConfigForm.tsx index 862580b54b5..42655b2cedf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/MetadataToESConfigForm/MetadataToESConfigForm.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/MetadataToESConfigForm/MetadataToESConfigForm.tsx @@ -11,8 +11,13 @@ * limitations under the License. */ -import { Button, Col, Divider, Form, Input, Row, Switch } from 'antd'; +import { Button, Col, Divider, Form, Input, Row, Select } from 'antd'; import { AddIngestionState } from 'components/AddIngestion/addIngestion.interface'; +import { + ELASTIC_SEARCH_INITIAL_VALUES, + RECREATE_INDEX_OPTIONS, + RE_INDEX_LANG_OPTIONS, +} from 'constants/elasticsearch.constant'; import React, { useMemo } from 'react'; import { useTranslation } from 'react-i18next'; import { ConfigClass } from '../../../../generated/entity/services/ingestionPipelines/ingestionPipeline'; @@ -47,12 +52,15 @@ const MetadataToESConfigForm = ({ const initialValues = useMemo( () => ({ - caCerts: data.metadataToESConfig?.caCerts, - regionName: data.metadataToESConfig?.regionName, - timeout: data.metadataToESConfig?.timeout, - useAwsCredentials: data.metadataToESConfig?.useAwsCredentials, - useSSL: data.metadataToESConfig?.useSSL, - verifyCerts: data.metadataToESConfig?.verifyCerts, + recreateIndex: + data.metadataToESConfig?.recreateIndex ?? + ELASTIC_SEARCH_INITIAL_VALUES.recreateIndexPipeline, + searchIndexMappingLanguage: + data.metadataToESConfig?.searchIndexMappingLanguage ?? + ELASTIC_SEARCH_INITIAL_VALUES.searchIndexMappingLanguage, + batchSize: + data.metadataToESConfig?.batchSize ?? + ELASTIC_SEARCH_INITIAL_VALUES.batchSize, }), [data] ); @@ -65,55 +73,17 @@ const MetadataToESConfigForm = ({ layout="vertical" onFinish={handleSubmit} onFocus={(e) => onFocus(e.target.id)}> - - + + - - + + - - - - - {t('label.use-aws-credential-plural')} - - - form.setFieldsValue({ useAwsCredentials: value }) - } - /> - - - - - - - {t('label.use-ssl-uppercase')} - - form.setFieldsValue({ useSSL: value })} - /> - - - - - - - {t('label.verify-cert-plural')} - - form.setFieldsValue({ verifyCerts: value })} - /> - - + +