fix: move service retrival to workflow (#12109)

This commit is contained in:
Teddy 2023-06-23 06:40:32 +02:00 committed by GitHub
parent 525180c0e2
commit 6f33465b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 55 deletions

View File

@ -39,10 +39,10 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
@ -57,10 +57,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.utils import entity_link
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.fqn import split
from metadata.utils.importer import get_sink
from metadata.utils.logger import test_suite_logger
from metadata.utils.workflow_output_handler import print_test_suite_status
@ -104,8 +101,10 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
)
self.metadata = create_ometa_client(self.metadata_config)
self._retrieve_service_connection()
self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config
self.service: DatabaseService = self._retrieve_service()
self._retrieve_service_connection()
self.processor_config: TestSuiteProcessorConfig = (
TestSuiteProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
@ -147,6 +146,34 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
)
raise err
def _retrieve_service(self) -> DatabaseService:
"""Get service object from source config `entityFullyQualifiedName`"""
fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__
try:
service_name = split(fully_qualified_name)[0]
except IndexError as exc:
logger.debug(traceback.format_exc())
raise IndexError(
f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}"
)
try:
service = self.metadata.get_by_name(DatabaseService, service_name)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
except ConnectionError as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
return service
def _get_table_entity(self, entity_fqn: str) -> Optional[Table]:
"""given an entity fqn return the table entity
@ -383,7 +410,7 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
openmetadata_test_cases = self.filter_for_om_test_cases(test_cases)
test_suite_runner = test_suite_source_factory.create(
self.config.source.type.lower(),
self.service.serviceType.value.lower(),
self.config,
self.metadata,
self.table_entity,
@ -412,37 +439,13 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
"""
service_type: ServiceType = get_service_type_from_source_type(
self.config.source.type
)
if (
not self.config.source.serviceConnection
and not self.metadata.config.forceEntityOverwriting
):
service_name = self.config.source.serviceName
try:
service = self.metadata.get_by_name(
get_service_class_from_service_type(service_type),
service_name,
)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `serviceName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
if service:
self.config.source.serviceConnection = ServiceConnection(
__root__=service.connection
)
except ConnectionError as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
self.config.source.serviceConnection = ServiceConnection(
__root__=self.service.connection
)
def execute(self):
"""Execute test suite workflow"""

View File

@ -50,7 +50,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
test_suite_config = {
"source": {
"type": "custom-database",
"serviceName": "test_suite_service_test",
"serviceName": "MyRabdomWorkflow",
"sourceConfig": {
"config": {
"type": "TestSuite",

View File

@ -15,7 +15,7 @@ import json
import uuid
from datetime import datetime, timedelta
from functools import partial
from typing import Callable, cast
from typing import Callable
import airflow
from airflow import DAG
@ -119,6 +119,16 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
entity_class = None
try:
if service_type == "testSuite":
# check we can access OM server
metadata.health_check()
return WorkflowSource(
type=service_type,
serviceName=ingestion_pipeline.service.name,
sourceConfig=ingestion_pipeline.sourceConfig,
serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
)
if service_type == "databaseService":
entity_class = DatabaseService
service: DatabaseService = metadata.get_by_name(
@ -154,24 +164,6 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
service: StorageService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "testSuite":
entity_class = DatabaseService
ingestion_pipeline.sourceConfig.config = cast(
TestSuitePipeline, ingestion_pipeline.sourceConfig.config
)
split_fqn = split(
ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__
)
try:
service_fqn = split_fqn[0]
except IndexError:
raise ParsingConfigurationError(
"Invalid fully qualified name "
f"{ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__}"
)
service: DatabaseService = metadata.get_by_name(
entity=entity_class, fqn=service_fqn
)
else:
raise InvalidServiceException(f"Invalid Service Type: {service_type}")
except ValidationError as original_error:

View File

@ -340,4 +340,3 @@ class OMetaServiceTest(TestCase):
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
parse_workflow_config_gracefully(config)
assert workflow_config.source.type == "mysql"