Fix DQ Workflow (#13631)

* Fix DQ Workflow

* Fix DQ Workflow
This commit is contained in:
Pere Miquel Brull 2023-10-18 11:49:38 +02:00 committed by GitHub
parent d70cf2ea7a
commit 899cd7e1fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 120 additions and 87 deletions

View File

@ -14,16 +14,11 @@ Test Suite Workflow Source
The main goal is to get the configured table from the API.
"""
import traceback
from typing import Iterable, List, Optional, cast
from metadata.data_quality.api.models import TableAndTests
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
@ -38,7 +33,6 @@ from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.fqn import split
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
@ -61,56 +55,8 @@ class TestSuiteSource(Source):
self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config
self.service: DatabaseService = self._retrieve_service()
self._retrieve_service_connection()
self.test_connection()
def _retrieve_service(self) -> DatabaseService:
"""Get service object from source config `entityFullyQualifiedName`"""
fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__
try:
service_name = split(fully_qualified_name)[0]
except IndexError as exc:
logger.debug(traceback.format_exc())
raise IndexError(
f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}"
)
try:
service = self.metadata.get_by_name(DatabaseService, service_name)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
return service
except ConnectionError as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
raise exc
def _retrieve_service_connection(self) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
"""
if (
not self.config.source.serviceConnection
and not self.metadata.config.forceEntityOverwriting
):
self.config.source.serviceConnection = ServiceConnection(
__root__=self.service.connection
)
def _get_table_entity(self) -> Optional[Table]:
"""given an entity fqn return the table entity
@ -166,9 +112,7 @@ class TestSuiteSource(Source):
"""
Check that the table has the proper test suite built in
"""
# If there is no executable test suite yet for the table, we'll need to create one
executable_test_suite = None
if not table.testSuite:
executable_test_suite = CreateTestSuiteRequest(
name=fqn.build(
@ -184,7 +128,7 @@ class TestSuiteSource(Source):
yield Either(
right=TableAndTests(
executable_test_suite=executable_test_suite,
service_type=self.service.serviceType.value,
service_type=self.config.source.serviceConnection.__root__.config.type.value,
)
)
@ -205,7 +149,7 @@ class TestSuiteSource(Source):
right=TableAndTests(
table=table,
test_cases=test_suite_cases,
service_type=self.service.serviceType.value,
service_type=self.config.source.serviceConnection.__root__.config.type.value,
)
)

View File

@ -284,33 +284,56 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
workflow has prepared the necessary components, and we will update the SUCCESS/FAILED
status at the end of the flow.
"""
maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name(
entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN
)
try:
maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name(
entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN
)
_, pipeline_name = fqn.split(
self.config.ingestionPipelineFQN
) # Get the name from <service>.<name>
service = self.metadata.get_by_name(
if maybe_pipeline:
return maybe_pipeline
# Get the name from <service>.<name> or, for test suites, <tableFQN>.testSuite
*_, pipeline_name = fqn.split(self.config.ingestionPipelineFQN)
service = self._get_ingestion_pipeline_service()
if service is not None:
return self.metadata.create_or_update(
CreateIngestionPipelineRequest(
name=pipeline_name,
service=EntityReference(
id=service.id,
type=get_reference_type_from_service_type(
self.service_type
),
),
pipelineType=get_pipeline_type_from_source_config(
self.config.source.sourceConfig.config
),
sourceConfig=self.config.source.sourceConfig,
airflowConfig=AirflowConfig(),
)
)
return maybe_pipeline
except Exception as exc:
logger.error(
f"Error trying to get or create the Ingestion Pipeline due to [{exc}]"
)
return None
def _get_ingestion_pipeline_service(self) -> Optional[T]:
"""
Ingestion Pipelines are linked to either an EntityService (DatabaseService, MessagingService,...)
or a Test Suite.
Depending on the Source Config Type, we'll need to GET one or the other to create
the Ingestion Pipeline
"""
return self.metadata.get_by_name(
entity=get_service_class_from_service_type(self.service_type),
fqn=self.config.source.serviceName,
)
if maybe_pipeline is None and service is not None:
return self.metadata.create_or_update(
CreateIngestionPipelineRequest(
name=pipeline_name,
service=EntityReference(
id=service.id,
type=get_reference_type_from_service_type(self.service_type),
),
pipelineType=get_pipeline_type_from_source_config(
self.config.source.sourceConfig.config
),
sourceConfig=self.config.source.sourceConfig,
airflowConfig=AirflowConfig(),
)
)
return maybe_pipeline

View File

@ -11,12 +11,21 @@
"""
Workflow definition for the Data Quality
"""
import traceback
from typing import Optional
from metadata.data_quality.processor.test_case_runner import TestCaseRunner
from metadata.data_quality.source.test_suite import TestSuiteSource
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.tests.testSuite import ServiceType, TestSuite
from metadata.ingestion.api.steps import Processor, Sink
from metadata.utils import fqn
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import test_suite_logger
from metadata.workflow.base import BaseWorkflow
from metadata.workflow.base import BaseWorkflow, T
logger = test_suite_logger()
@ -48,3 +57,59 @@ class TestSuiteWorkflow(BaseWorkflow):
def _get_test_runner_processor(self) -> Processor:
return TestCaseRunner.create(self.config.dict(), self.metadata)
def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None:
"""Get service object from source config `entityFullyQualifiedName`"""
if (
not self.config.source.serviceConnection
and not self.metadata.config.forceEntityOverwriting
):
fully_qualified_name = (
self.config.source.sourceConfig.config.entityFullyQualifiedName.__root__
)
try:
service_name = fqn.split(fully_qualified_name)[0]
except IndexError as exc:
logger.debug(traceback.format_exc())
raise IndexError(
f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}"
)
try:
service: DatabaseService = self.metadata.get_by_name(
DatabaseService, service_name
)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
self.config.source.serviceConnection = ServiceConnection(
__root__=service.connection
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
raise exc
def _get_ingestion_pipeline_service(self) -> Optional[T]:
"""
Ingestion Pipelines are linked to either an EntityService (DatabaseService, MessagingService,...)
or a Test Suite.
Depending on the Source Config Type, we'll need to GET one or the other to create
the Ingestion Pipeline
"""
return self.metadata.get_by_name(
entity=TestSuite,
fqn=fqn.build(
metadata=None,
entity_type=TestSuite,
table_fqn=self.config.source.sourceConfig.config.entityFullyQualifiedName,
),
)

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import datetime_to_ts
SUCCESS_THRESHOLD_VALUE = 90
@ -97,8 +98,8 @@ class WorkflowStatusMixin:
pipeline_status = PipelineStatus(
runId=self.run_id,
pipelineState=state,
startDate=datetime.now().timestamp() * 1000,
timestamp=datetime.now().timestamp() * 1000,
startDate=datetime_to_ts(datetime.now()),
timestamp=datetime_to_ts(datetime.now()),
)
else:
pipeline_status = self.metadata.get_pipeline_status(