diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 05dfeb89035..370826283cc 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -15,7 +15,7 @@ import json import uuid from datetime import datetime, timedelta from functools import partial -from typing import Callable +from typing import Callable, cast import airflow from airflow import DAG @@ -30,9 +30,12 @@ from metadata.generated.schema.entity.services.metadataService import MetadataSe from metadata.generated.schema.entity.services.mlmodelService import MlModelService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService -from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuitePipeline, +) from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.fqn import split try: from airflow.operators.python import PythonOperator @@ -151,6 +154,24 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: service: StorageService = metadata.get_by_name( entity=entity_class, fqn=ingestion_pipeline.service.name ) + elif service_type == "testSuite": + entity_class = DatabaseService + ingestion_pipeline.sourceConfig.config = cast( + TestSuitePipeline, ingestion_pipeline.sourceConfig.config + ) + split_fqn = split( + ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__ + ) + try: + service_fqn = split_fqn[0] + except IndexError: + raise ParsingConfigurationError( + "Invalid fully qualified name " + f"{ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__}" + ) + service: DatabaseService = metadata.get_by_name( + entity=entity_class, fqn=service_fqn + ) else: raise InvalidServiceException(f"Invalid Service Type: {service_type}") except ValidationError as original_error: diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 38b89801bed..9473e5f8726 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -322,7 +322,7 @@ class OMetaServiceTest(TestCase): sourceConfig=SourceConfig( config=TestSuitePipeline( type="TestSuite", - entityFullyQualifiedName="service.database.schema.table", + entityFullyQualifiedName=self.service.name.__root__, ) ), openMetadataServerConnection=self.server_config, @@ -330,9 +330,9 @@ class OMetaServiceTest(TestCase): startDate="2022-06-10T15:06:47+00:00", ), service=EntityReference( - id=self.service.id, - type="databaseService", - name=self.service.name.__root__, + id=uuid.uuid4(), + type="testSuite", + name="test_test_suite_workflow", ), ) @@ -340,3 +340,4 @@ class OMetaServiceTest(TestCase): config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) parse_workflow_config_gracefully(config) + assert workflow_config.source.type == "mysql"