From 6f33465b7c31deeeff97210deb69f639e434d4a5 Mon Sep 17 00:00:00 2001 From: Teddy Date: Fri, 23 Jun 2023 06:40:32 +0200 Subject: [PATCH] fix: move service retrival to workflow (#12109) --- .../src/metadata/data_quality/api/workflow.py | 71 ++++++++++--------- .../test_suite/test_e2e_workflow.py | 2 +- .../workflows/ingestion/common.py | 30 +++----- .../test_workflow_creation.py | 1 - 4 files changed, 49 insertions(+), 55 deletions(-) diff --git a/ingestion/src/metadata/data_quality/api/workflow.py b/ingestion/src/metadata/data_quality/api/workflow.py index 8d6ec7bc8e1..1493b19475e 100644 --- a/ingestion/src/metadata/data_quality/api/workflow.py +++ b/ingestion/src/metadata/data_quality/api/workflow.py @@ -39,10 +39,10 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.entity.services.connections.serviceConnection import ( ServiceConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( PipelineState, ) -from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) @@ -57,10 +57,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.ometa.client_utils import create_ometa_client from metadata.utils import entity_link -from metadata.utils.class_helper import ( - get_service_class_from_service_type, - get_service_type_from_source_type, -) +from metadata.utils.fqn import split from metadata.utils.importer import get_sink from metadata.utils.logger import test_suite_logger from metadata.utils.workflow_output_handler import print_test_suite_status @@ -104,8 +101,10 @@ class TestSuiteWorkflow(WorkflowStatusMixin): ) self.metadata = create_ometa_client(self.metadata_config) - self._retrieve_service_connection() self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config + self.service: DatabaseService = self._retrieve_service() + self._retrieve_service_connection() + self.processor_config: TestSuiteProcessorConfig = ( TestSuiteProcessorConfig.parse_obj( self.config.processor.dict().get("config") @@ -147,6 +146,34 @@ class TestSuiteWorkflow(WorkflowStatusMixin): ) raise err + def _retrieve_service(self) -> DatabaseService: + """Get service object from source config `entityFullyQualifiedName`""" + fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ + try: + service_name = split(fully_qualified_name)[0] + except IndexError as exc: + logger.debug(traceback.format_exc()) + raise IndexError( + f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" + ) + try: + service = self.metadata.get_by_name(DatabaseService, service_name) + if not service: + raise ConnectionError( + f"Could not retrieve service with name `{service_name}`. " + "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " + "or the JWT Token is invalid." + ) + except ConnectionError as exc: + raise exc + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error getting service connection for service name [{service_name}]" + f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" + ) + return service + def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: """given an entity fqn return the table entity @@ -383,7 +410,7 @@ class TestSuiteWorkflow(WorkflowStatusMixin): openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) test_suite_runner = test_suite_source_factory.create( - self.config.source.type.lower(), + self.service.serviceType.value.lower(), self.config, self.metadata, self.table_entity, @@ -412,37 +439,13 @@ class TestSuiteWorkflow(WorkflowStatusMixin): in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it from the service object itself through the default `SecretsManager`. """ - service_type: ServiceType = get_service_type_from_source_type( - self.config.source.type - ) if ( not self.config.source.serviceConnection and not self.metadata.config.forceEntityOverwriting ): - service_name = self.config.source.serviceName - try: - service = self.metadata.get_by_name( - get_service_class_from_service_type(service_type), - service_name, - ) - if not service: - raise ConnectionError( - f"Could not retrieve service with name `{service_name}`. " - "Typically caused by the `serviceName` does not exists in OpenMetadata " - "or the JWT Token is invalid." - ) - if service: - self.config.source.serviceConnection = ServiceConnection( - __root__=service.connection - ) - except ConnectionError as exc: - raise exc - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error getting service connection for service name [{service_name}]" - f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" - ) + self.config.source.serviceConnection = ServiceConnection( + __root__=self.service.connection + ) def execute(self): """Execute test suite workflow""" diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index 0c5bd8749f9..c9193dab628 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -50,7 +50,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata test_suite_config = { "source": { "type": "custom-database", - "serviceName": "test_suite_service_test", + "serviceName": "MyRabdomWorkflow", "sourceConfig": { "config": { "type": "TestSuite", diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 370826283cc..54b84de1e3a 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -15,7 +15,7 @@ import json import uuid from datetime import datetime, timedelta from functools import partial -from typing import Callable, cast +from typing import Callable import airflow from airflow import DAG @@ -119,6 +119,16 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: entity_class = None try: + if service_type == "testSuite": + # check we can access OM server + metadata.health_check() + return WorkflowSource( + type=service_type, + serviceName=ingestion_pipeline.service.name, + sourceConfig=ingestion_pipeline.sourceConfig, + serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName` + ) + if service_type == "databaseService": entity_class = DatabaseService service: DatabaseService = metadata.get_by_name( @@ -154,24 +164,6 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: service: StorageService = metadata.get_by_name( entity=entity_class, fqn=ingestion_pipeline.service.name ) - elif service_type == "testSuite": - entity_class = DatabaseService - ingestion_pipeline.sourceConfig.config = cast( - TestSuitePipeline, ingestion_pipeline.sourceConfig.config - ) - split_fqn = split( - ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__ - ) - try: - service_fqn = split_fqn[0] - except IndexError: - raise ParsingConfigurationError( - "Invalid fully qualified name " - f"{ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__}" - ) - service: DatabaseService = metadata.get_by_name( - entity=entity_class, fqn=service_fqn - ) else: raise InvalidServiceException(f"Invalid Service Type: {service_type}") except ValidationError as original_error: diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 9473e5f8726..4baf2a86335 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -340,4 +340,3 @@ class OMetaServiceTest(TestCase): config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) parse_workflow_config_gracefully(config) - assert workflow_config.source.type == "mysql"