From 595e5c1b890c2948944ac4e38e96e9cba6bf93e3 Mon Sep 17 00:00:00 2001 From: Teddy Date: Fri, 30 Sep 2022 17:57:03 +0200 Subject: [PATCH] Added checks to see if DAG can reach OM (#7794) --- ingestion/src/metadata/test_suite/api/workflow.py | 8 +++++++- .../workflows/ingestion/common.py | 10 +++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index abfa0335c8f..e712855802b 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -15,6 +15,7 @@ Workflow definition for the test suite from __future__ import annotations +import sys import traceback from copy import deepcopy from logging import Logger @@ -277,8 +278,9 @@ class TestSuiteWorkflow: processor.config.testSuites """ test_suite_entities = [] + test_suites = self.processor_config.testSuites or [] - for test_suite in self.processor_config.testSuites: + for test_suite in test_suites: test_suite_entity = self.metadata.get_by_name( entity=TestSuite, fqn=test_suite.name, @@ -388,6 +390,10 @@ class TestSuiteWorkflow: self.get_test_suite_entity_for_ui_workflow() or self.get_or_create_test_suite_entity_for_cli_workflow() ) + if not test_suites: + logger.warning("No testSuite found in configuration file. Exiting.") + sys.exit(1) + test_cases = self.get_test_cases_from_test_suite(test_suites) if self.processor_config.testSuites: cli_config_test_cases_def = self.get_test_case_from_cli_config() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index e6334dd94d2..1f23101ea3e 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -24,6 +24,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.mlmodelService import MlModelService from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type import basic from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -92,13 +93,20 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: ] = None if service_type == "testSuite": + service = metadata.get_by_name( + entity=TestSuite, fqn=ingestion_pipeline.service.name + ) # check we are able to access OM server + if not service: + raise InvalidServiceException( + f"Could not get service from type {service_type}" + ) return WorkflowSource( type=service_type, serviceName=ingestion_pipeline.service.name, sourceConfig=ingestion_pipeline.sourceConfig, ) - elif service_type == "databaseService": + if service_type == "databaseService": service: DatabaseService = metadata.get_by_name( entity=DatabaseService, fqn=ingestion_pipeline.service.name )