diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index 6862f4850c4..4df804b4fe5 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -20,6 +20,9 @@ from openmetadata_managed_apis.workflows.ingestion.common import ( ClientInitializationError, build_dag, ) +from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import ( + build_elasticsearch_sink, +) from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( @@ -85,26 +88,10 @@ def build_data_insight_workflow_config( "Could not retrieve the OpenMetadata service! This should not happen." ) - elasticsearch_service_config_dict = ( - openmetadata_service.connection.config.elasticsSearch.config.dict() + sink = build_elasticsearch_sink( + openmetadata_service.connection.config, ingestion_pipeline ) - elasticsearch_source_config_dict = { - ES_SOURCE_TO_ES_OBJ_ARGS[key]: value - for key, value in ingestion_pipeline.sourceConfig.config.dict().items() - if value and key != "type" - } - - sink = Sink( - type="elasticsearch", - config=ComponentConfig( - **elasticsearch_service_config_dict, - **elasticsearch_source_config_dict, - ), - ) - - openmetadata_service = cast(MetadataService, openmetadata_service) - workflow_config = OpenMetadataWorkflowConfig( source=WorkflowSource( type="dataInsight", diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/elasticsearch_sink.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/elasticsearch_sink.py new file mode 100644 index 00000000000..9acaa4ae84e --- /dev/null +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/elasticsearch_sink.py @@ -0,0 +1,54 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Build the elasticsearch sink +""" +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import Sink +from metadata.generated.schema.type.basic import ComponentConfig +from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS + + +def build_elasticsearch_sink( + openmetadata_service_connection: OpenMetadataConnection, + ingestion_pipeline: IngestionPipeline, +) -> Sink: + """ + Build the elasticsearch sink given the OM service and + the ingestion pipeline. + + Note that we need to map the JSON Schema properties names + to the arguments required by the elasticsearch sink in the + Python side. + """ + + elasticsearch_service_config_dict = ( + openmetadata_service_connection.elasticsSearch.config.dict() + ) + + elasticsearch_source_config_dict = { + ES_SOURCE_TO_ES_OBJ_ARGS[key]: value + for key, value in ingestion_pipeline.sourceConfig.config.dict().items() + if value and key != "type" + } + + return Sink( + type="elasticsearch", + config=ComponentConfig( + **elasticsearch_service_config_dict, + **elasticsearch_source_config_dict, + ), + ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py index 24526792bc0..bcaf96e0636 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py @@ -17,6 +17,9 @@ from openmetadata_managed_apis.workflows.ingestion.common import ( build_dag, metadata_ingestion_workflow, ) +from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import ( + build_elasticsearch_sink, +) from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( MetadataESConnection, @@ -64,17 +67,9 @@ def build_es_reindex_workflow_config( "Could not retrieve the OpenMetadata service! This should not happen." ) - om_service_elasticsearch_dict = { - key: value - for key, value in openmetadata_service.connection.config.elasticsSearch.config.dict().items() - if value - } - - ingestion_pipeline_elasticsearch_source_config = { - key: value - for key, value in ingestion_pipeline.sourceConfig.config.dict().items() - if value and key != "type" - } + sink = build_elasticsearch_sink( + openmetadata_service.connection.config, ingestion_pipeline + ) workflow_config = OpenMetadataWorkflowConfig( source=WorkflowSource( @@ -83,13 +78,7 @@ def build_es_reindex_workflow_config( serviceConnection=MetadataConnection(config=MetadataESConnection()), sourceConfig=SourceConfig(), ), - sink=Sink( - type="elasticsearch", - config=ComponentConfig( - **om_service_elasticsearch_dict, - **ingestion_pipeline_elasticsearch_source_config, - ), - ), + sink=sink, workflowConfig=WorkflowConfig( loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,