Added checks to see if DAG can reach OM (#7794)

This commit is contained in:
Teddy 2022-09-30 17:57:03 +02:00 committed by GitHub
parent 4520c138f8
commit 595e5c1b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 2 deletions

View File

@ -15,6 +15,7 @@ Workflow definition for the test suite
from __future__ import annotations from __future__ import annotations
import sys
import traceback import traceback
from copy import deepcopy from copy import deepcopy
from logging import Logger from logging import Logger
@ -277,8 +278,9 @@ class TestSuiteWorkflow:
processor.config.testSuites processor.config.testSuites
""" """
test_suite_entities = [] test_suite_entities = []
test_suites = self.processor_config.testSuites or []
for test_suite in self.processor_config.testSuites: for test_suite in test_suites:
test_suite_entity = self.metadata.get_by_name( test_suite_entity = self.metadata.get_by_name(
entity=TestSuite, entity=TestSuite,
fqn=test_suite.name, fqn=test_suite.name,
@ -388,6 +390,10 @@ class TestSuiteWorkflow:
self.get_test_suite_entity_for_ui_workflow() self.get_test_suite_entity_for_ui_workflow()
or self.get_or_create_test_suite_entity_for_cli_workflow() or self.get_or_create_test_suite_entity_for_cli_workflow()
) )
if not test_suites:
logger.warning("No testSuite found in configuration file. Exiting.")
sys.exit(1)
test_cases = self.get_test_cases_from_test_suite(test_suites) test_cases = self.get_test_cases_from_test_suite(test_suites)
if self.processor_config.testSuites: if self.processor_config.testSuites:
cli_config_test_cases_def = self.get_test_case_from_cli_config() cli_config_test_cases_def = self.get_test_case_from_cli_config()

View File

@ -24,6 +24,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe
from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type import basic from metadata.generated.schema.type import basic
from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -92,13 +93,20 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
] = None ] = None
if service_type == "testSuite": if service_type == "testSuite":
service = metadata.get_by_name(
entity=TestSuite, fqn=ingestion_pipeline.service.name
) # check we are able to access OM server
if not service:
raise InvalidServiceException(
f"Could not get service from type {service_type}"
)
return WorkflowSource( return WorkflowSource(
type=service_type, type=service_type,
serviceName=ingestion_pipeline.service.name, serviceName=ingestion_pipeline.service.name,
sourceConfig=ingestion_pipeline.sourceConfig, sourceConfig=ingestion_pipeline.sourceConfig,
) )
elif service_type == "databaseService": if service_type == "databaseService":
service: DatabaseService = metadata.get_by_name( service: DatabaseService = metadata.get_by_name(
entity=DatabaseService, fqn=ingestion_pipeline.service.name entity=DatabaseService, fqn=ingestion_pipeline.service.name
) )