ES reindex Airflow workflow (#8723)

* Added support for 'Metadata Service' connectors

* Fixed failing unit tests

* Removed delete button for 'Metadata Service' connector of type OpenMetadataServer

* Code optimization for SelectServiceType component

* Fixed errors arrived due to backend changes related to OpenMetadata Services

* init reindex

* Added additional step for adding Metadata to ES configs while creating ingestion pipeline for Metadata Service

* Add ES reindex

* Add ES reindex

* Format

* Format

* Rename service

* Pick service name from IngestionPipeline

* fix ui conflicts

Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com>
Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com>
This commit is contained in:
Pere Miquel Brull 2022-11-14 18:59:56 +01:00 committed by GitHub
parent b04485a5bc
commit f33003485e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 147 additions and 16 deletions

View File

@ -1,7 +1,7 @@
{
"source": {
"type": "metadata_elasticsearch",
"serviceName": "openMetadata",
"serviceName": "Openmetadata",
"serviceConnection": {
"config":{
"type":"MetadataES"

View File

@ -51,6 +51,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
@ -375,6 +376,14 @@ class OpenMetadata(
):
return "/services/mlmodelServices"
if issubclass(
entity,
get_args(
Union[MetadataService, self.get_create_entity_type(MetadataService)]
),
):
return "/services/metadataServices"
if issubclass(
entity,
get_args(

View File

@ -9,7 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
testSuite DAG function builder
Data Insights DAG function builder
"""
from airflow import DAG
@ -39,7 +39,8 @@ def build_data_insight_workflow_config(
workflow_config = OpenMetadataWorkflowConfig(
source=build_source(ingestion_pipeline),
sink=ingestion_pipeline.sink,
# ingestion_pipeline.service.serviceConnection.elasticsearch
sink=ingestion_pipeline.openMetadataServerConnection.elasticsSearch,
processor=Processor(
type="data-insight-processor",
config={},
@ -55,7 +56,7 @@ def build_data_insight_workflow_config(
def build_data_insight_dag(ingestion_pipeline: IngestionPipeline) -> DAG:
"""Build a simple testSuite DAG"""
"""Build a simple Data Insight DAG"""
workflow_config = build_data_insight_workflow_config(ingestion_pipeline)
dag = build_dag(
task_name="data_insight_task",

View File

@ -0,0 +1,113 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ElasticSearch reindex DAG function builder
"""
from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import (
ClientInitializationError,
build_dag,
build_source,
metadata_ingestion_workflow,
)
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
MetadataESConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.metadataService import (
MetadataConnection,
MetadataService,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
Sink,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import (
SourceConfig,
WorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
def build_es_reindex_workflow_config(
ingestion_pipeline: IngestionPipeline,
) -> OpenMetadataWorkflowConfig:
"""
Given an airflow_pipeline, prepare the workflow config JSON
"""
try:
metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection)
except Exception as exc:
raise ClientInitializationError(f"Failed to initialize the client: {exc}")
openmetadata_service: MetadataService = metadata.get_by_name(
entity=MetadataService, fqn=ingestion_pipeline.service.fullyQualifiedName
)
if not openmetadata_service:
raise ValueError(
"Could not retrieve the OpenMetadata service! This should not happen."
)
om_service_elasticsearch_dict = {
key: value
for key, value in openmetadata_service.connection.config.elasticsSearch.config.dict().items()
if value
}
ingestion_pipeline_elasticsearch_source_config = {
key: value
for key, value in ingestion_pipeline.sourceConfig.config.dict().items()
if value and key != "type"
}
workflow_config = OpenMetadataWorkflowConfig(
source=WorkflowSource(
type="metadata_elasticsearch",
serviceName=ingestion_pipeline.service.fullyQualifiedName,
serviceConnection=MetadataConnection(config=MetadataESConnection()),
sourceConfig=SourceConfig(),
),
sink=Sink(
type="elasticsearch",
config={
**om_service_elasticsearch_dict,
**ingestion_pipeline_elasticsearch_source_config,
},
),
workflowConfig=WorkflowConfig(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config
def build_es_reindex_dag(ingestion_pipeline: IngestionPipeline) -> DAG:
"""Build a simple Data Insight DAG"""
workflow_config = build_es_reindex_workflow_config(ingestion_pipeline)
dag = build_dag(
task_name="elasticsearch_reindex_task",
ingestion_pipeline=ingestion_pipeline,
workflow_config=workflow_config,
workflow_fn=metadata_ingestion_workflow,
)
return dag

View File

@ -16,6 +16,9 @@ Add a function for each type from PipelineType
from openmetadata_managed_apis.workflows.ingestion.data_insight import (
build_data_insight_dag,
)
from openmetadata_managed_apis.workflows.ingestion.es_reindex import (
build_es_reindex_dag,
)
from openmetadata_managed_apis.workflows.ingestion.lineage import build_lineage_dag
from openmetadata_managed_apis.workflows.ingestion.metadata import build_metadata_dag
from openmetadata_managed_apis.workflows.ingestion.profiler import build_profiler_dag
@ -36,4 +39,5 @@ build_registry.add(PipelineType.usage.value)(build_usage_dag)
build_registry.add(PipelineType.lineage.value)(build_lineage_dag)
build_registry.add(PipelineType.profiler.value)(build_profiler_dag)
build_registry.add(PipelineType.TestSuite.value)(build_test_suite_dag)
build_registry.add(PipelineType.dataInsight)(build_data_insight_dag)
build_registry.add(PipelineType.dataInsight.value)(build_data_insight_dag)
build_registry.add(PipelineType.elasticSearchReindex.value)(build_es_reindex_dag)

View File

@ -203,7 +203,7 @@
"additionalProperties": false,
"required": ["type"]
},
"supportMetadataToElasticSearchExtraction": {
"supportsDataInsightExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDataInsightExtraction"
},
"supportsElasticSearchReindexingExtraction": {

View File

@ -25,9 +25,6 @@
},
{
"$ref": "../mlmodelService.json#/definitions/mlModelConnection"
},
{
"$ref": "../metadataService.json#/definitions/metadataConnection"
}
]
}

View File

@ -18,12 +18,14 @@
"default": "MetadataToElasticSearch"
},
"useSSL": {
"description": "Indicates whether to use SSL",
"title": "Use SSL",
"description": "Indicates whether to use SSL when connecting to ElasticSearch. By default, we will ignore SSL settings.",
"type": "boolean",
"default": false
},
"verifyCerts": {
"description": "Indicates whether to verify certificates",
"title": "Validate Certificates",
"description": "Indicates whether to verify certificates when using SSL connection to ElasticSearch. Ignored by default. Is set to true, make sure to send the certificates in the property `CA Certificates`.",
"type": "boolean",
"default": false
},
@ -33,17 +35,22 @@
"default": 30
},
"caCerts": {
"description": "Certificate path to be added in configuration",
"type": "string"
"title": "CA Certificates",
"description": "Certificate path to be added in configuration. The path should be local in the Ingestion Container.",
"type": "string",
"default": null
},
"useAwsCredentials": {
"description": "Indicates whether to use aws credentials",
"title": "Use AWS Credentials",
"description": "Indicates whether to use aws credentials when connecting to OpenSearch in AWS.",
"type": "boolean",
"default": false
},
"regionName": {
"description": "Region name in case of useAwsCredentials",
"type": "string"
"title": "AWS Region Name",
"description": "Region name. Required when using AWS Credentials.",
"type": "string",
"default": null
}
},
"required": ["type"],