Fix ES reindex (#10663)

This commit is contained in:
Pere Miquel Brull 2023-03-20 13:00:51 +01:00 committed by GitHub
parent 6d717c6c33
commit 40b4c9c487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 36 deletions

View File

@ -20,6 +20,9 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
ClientInitializationError, ClientInitializationError,
build_dag, build_dag,
) )
from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import (
build_elasticsearch_sink,
)
from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.data_insight.api.workflow import DataInsightWorkflow
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
@ -85,26 +88,10 @@ def build_data_insight_workflow_config(
"Could not retrieve the OpenMetadata service! This should not happen." "Could not retrieve the OpenMetadata service! This should not happen."
) )
elasticsearch_service_config_dict = ( sink = build_elasticsearch_sink(
openmetadata_service.connection.config.elasticsSearch.config.dict() openmetadata_service.connection.config, ingestion_pipeline
) )
elasticsearch_source_config_dict = {
ES_SOURCE_TO_ES_OBJ_ARGS[key]: value
for key, value in ingestion_pipeline.sourceConfig.config.dict().items()
if value and key != "type"
}
sink = Sink(
type="elasticsearch",
config=ComponentConfig(
**elasticsearch_service_config_dict,
**elasticsearch_source_config_dict,
),
)
openmetadata_service = cast(MetadataService, openmetadata_service)
workflow_config = OpenMetadataWorkflowConfig( workflow_config = OpenMetadataWorkflowConfig(
source=WorkflowSource( source=WorkflowSource(
type="dataInsight", type="dataInsight",

View File

@ -0,0 +1,54 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Build the elasticsearch sink
"""
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import Sink
from metadata.generated.schema.type.basic import ComponentConfig
from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS
def build_elasticsearch_sink(
openmetadata_service_connection: OpenMetadataConnection,
ingestion_pipeline: IngestionPipeline,
) -> Sink:
"""
Build the elasticsearch sink given the OM service and
the ingestion pipeline.
Note that we need to map the JSON Schema properties names
to the arguments required by the elasticsearch sink in the
Python side.
"""
elasticsearch_service_config_dict = (
openmetadata_service_connection.elasticsSearch.config.dict()
)
elasticsearch_source_config_dict = {
ES_SOURCE_TO_ES_OBJ_ARGS[key]: value
for key, value in ingestion_pipeline.sourceConfig.config.dict().items()
if value and key != "type"
}
return Sink(
type="elasticsearch",
config=ComponentConfig(
**elasticsearch_service_config_dict,
**elasticsearch_source_config_dict,
),
)

View File

@ -17,6 +17,9 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag, build_dag,
metadata_ingestion_workflow, metadata_ingestion_workflow,
) )
from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import (
build_elasticsearch_sink,
)
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
MetadataESConnection, MetadataESConnection,
@ -64,17 +67,9 @@ def build_es_reindex_workflow_config(
"Could not retrieve the OpenMetadata service! This should not happen." "Could not retrieve the OpenMetadata service! This should not happen."
) )
om_service_elasticsearch_dict = { sink = build_elasticsearch_sink(
key: value openmetadata_service.connection.config, ingestion_pipeline
for key, value in openmetadata_service.connection.config.elasticsSearch.config.dict().items() )
if value
}
ingestion_pipeline_elasticsearch_source_config = {
key: value
for key, value in ingestion_pipeline.sourceConfig.config.dict().items()
if value and key != "type"
}
workflow_config = OpenMetadataWorkflowConfig( workflow_config = OpenMetadataWorkflowConfig(
source=WorkflowSource( source=WorkflowSource(
@ -83,13 +78,7 @@ def build_es_reindex_workflow_config(
serviceConnection=MetadataConnection(config=MetadataESConnection()), serviceConnection=MetadataConnection(config=MetadataESConnection()),
sourceConfig=SourceConfig(), sourceConfig=SourceConfig(),
), ),
sink=Sink( sink=sink,
type="elasticsearch",
config=ComponentConfig(
**om_service_elasticsearch_dict,
**ingestion_pipeline_elasticsearch_source_config,
),
),
workflowConfig=WorkflowConfig( workflowConfig=WorkflowConfig(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,