diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 4adba6acc12..4fc62248d6a 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -14,16 +14,11 @@ Test Suite Workflow Source The main goal is to get the configured table from the API. """ -import traceback from typing import Iterable, List, Optional, cast from metadata.data_quality.api.models import TableAndTests from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) @@ -38,7 +33,6 @@ from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from metadata.utils.fqn import split from metadata.utils.logger import test_suite_logger logger = test_suite_logger() @@ -61,56 +55,8 @@ class TestSuiteSource(Source): self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config - self.service: DatabaseService = self._retrieve_service() - self._retrieve_service_connection() - self.test_connection() - def _retrieve_service(self) -> DatabaseService: - """Get service object from source config `entityFullyQualifiedName`""" - fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ - try: - service_name = split(fully_qualified_name)[0] - except IndexError as exc: - logger.debug(traceback.format_exc()) - raise IndexError( - f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" - ) - try: - service = self.metadata.get_by_name(DatabaseService, service_name) - if not service: - raise ConnectionError( - f"Could not retrieve service with name `{service_name}`. " - "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " - "or the JWT Token is invalid." - ) - - return service - - except ConnectionError as exc: - raise exc - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error getting service connection for service name [{service_name}]" - f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" - ) - raise exc - - def _retrieve_service_connection(self) -> None: - """ - We override the current `serviceConnection` source config object if source workflow service already exists - in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it - from the service object itself through the default `SecretsManager`. - """ - if ( - not self.config.source.serviceConnection - and not self.metadata.config.forceEntityOverwriting - ): - self.config.source.serviceConnection = ServiceConnection( - __root__=self.service.connection - ) - def _get_table_entity(self) -> Optional[Table]: """given an entity fqn return the table entity @@ -166,9 +112,7 @@ class TestSuiteSource(Source): """ Check that the table has the proper test suite built in """ - # If there is no executable test suite yet for the table, we'll need to create one - executable_test_suite = None if not table.testSuite: executable_test_suite = CreateTestSuiteRequest( name=fqn.build( @@ -184,7 +128,7 @@ class TestSuiteSource(Source): yield Either( right=TableAndTests( executable_test_suite=executable_test_suite, - service_type=self.service.serviceType.value, + service_type=self.config.source.serviceConnection.__root__.config.type.value, ) ) @@ -205,7 +149,7 @@ class TestSuiteSource(Source): right=TableAndTests( table=table, test_cases=test_suite_cases, - service_type=self.service.serviceType.value, + service_type=self.config.source.serviceConnection.__root__.config.type.value, ) ) diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index edc90d79453..49a1c121385 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -284,33 +284,56 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): workflow has prepared the necessary components, and we will update the SUCCESS/FAILED status at the end of the flow. """ - maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name( - entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN - ) + try: + maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name( + entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN + ) - _, pipeline_name = fqn.split( - self.config.ingestionPipelineFQN - ) # Get the name from . - service = self.metadata.get_by_name( + if maybe_pipeline: + return maybe_pipeline + + # Get the name from . or, for test suites, .testSuite + *_, pipeline_name = fqn.split(self.config.ingestionPipelineFQN) + + service = self._get_ingestion_pipeline_service() + + if service is not None: + + return self.metadata.create_or_update( + CreateIngestionPipelineRequest( + name=pipeline_name, + service=EntityReference( + id=service.id, + type=get_reference_type_from_service_type( + self.service_type + ), + ), + pipelineType=get_pipeline_type_from_source_config( + self.config.source.sourceConfig.config + ), + sourceConfig=self.config.source.sourceConfig, + airflowConfig=AirflowConfig(), + ) + ) + + return maybe_pipeline + + except Exception as exc: + logger.error( + f"Error trying to get or create the Ingestion Pipeline due to [{exc}]" + ) + return None + + def _get_ingestion_pipeline_service(self) -> Optional[T]: + """ + Ingestion Pipelines are linked to either an EntityService (DatabaseService, MessagingService,...) + or a Test Suite. + + Depending on the Source Config Type, we'll need to GET one or the other to create + the Ingestion Pipeline + """ + + return self.metadata.get_by_name( entity=get_service_class_from_service_type(self.service_type), fqn=self.config.source.serviceName, ) - - if maybe_pipeline is None and service is not None: - - return self.metadata.create_or_update( - CreateIngestionPipelineRequest( - name=pipeline_name, - service=EntityReference( - id=service.id, - type=get_reference_type_from_service_type(self.service_type), - ), - pipelineType=get_pipeline_type_from_source_config( - self.config.source.sourceConfig.config - ), - sourceConfig=self.config.source.sourceConfig, - airflowConfig=AirflowConfig(), - ) - ) - - return maybe_pipeline diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py index 099ab305b87..ca9ddd25867 100644 --- a/ingestion/src/metadata/workflow/data_quality.py +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -11,12 +11,21 @@ """ Workflow definition for the Data Quality """ +import traceback +from typing import Optional + from metadata.data_quality.processor.test_case_runner import TestCaseRunner from metadata.data_quality.source.test_suite import TestSuiteSource +from metadata.generated.schema.entity.services.connections.serviceConnection import ( + ServiceConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.tests.testSuite import ServiceType, TestSuite from metadata.ingestion.api.steps import Processor, Sink +from metadata.utils import fqn from metadata.utils.importer import import_sink_class from metadata.utils.logger import test_suite_logger -from metadata.workflow.base import BaseWorkflow +from metadata.workflow.base import BaseWorkflow, T logger = test_suite_logger() @@ -48,3 +57,59 @@ class TestSuiteWorkflow(BaseWorkflow): def _get_test_runner_processor(self) -> Processor: return TestCaseRunner.create(self.config.dict(), self.metadata) + + def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None: + """Get service object from source config `entityFullyQualifiedName`""" + if ( + not self.config.source.serviceConnection + and not self.metadata.config.forceEntityOverwriting + ): + fully_qualified_name = ( + self.config.source.sourceConfig.config.entityFullyQualifiedName.__root__ + ) + try: + service_name = fqn.split(fully_qualified_name)[0] + except IndexError as exc: + logger.debug(traceback.format_exc()) + raise IndexError( + f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" + ) + try: + service: DatabaseService = self.metadata.get_by_name( + DatabaseService, service_name + ) + if not service: + raise ConnectionError( + f"Could not retrieve service with name `{service_name}`. " + "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " + "or the JWT Token is invalid." + ) + + self.config.source.serviceConnection = ServiceConnection( + __root__=service.connection + ) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error getting service connection for service name [{service_name}]" + f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" + ) + raise exc + + def _get_ingestion_pipeline_service(self) -> Optional[T]: + """ + Ingestion Pipelines are linked to either an EntityService (DatabaseService, MessagingService,...) + or a Test Suite. + + Depending on the Source Config Type, we'll need to GET one or the other to create + the Ingestion Pipeline + """ + return self.metadata.get_by_name( + entity=TestSuite, + fqn=fqn.build( + metadata=None, + entity_type=TestSuite, + table_fqn=self.config.source.sourceConfig.config.entityFullyQualifiedName, + ), + ) diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index 12056832de5..05373d26ffd 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -26,6 +26,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.helpers import datetime_to_ts SUCCESS_THRESHOLD_VALUE = 90 @@ -97,8 +98,8 @@ class WorkflowStatusMixin: pipeline_status = PipelineStatus( runId=self.run_id, pipelineState=state, - startDate=datetime.now().timestamp() * 1000, - timestamp=datetime.now().timestamp() * 1000, + startDate=datetime_to_ts(datetime.now()), + timestamp=datetime_to_ts(datetime.now()), ) else: pipeline_status = self.metadata.get_pipeline_status(