Update dq ingestion workflow (#12037)

* fix: fetch service from sourceConfig.entityFullyQualifiedName

* fix: python linting
This commit is contained in:
Teddy 2023-06-19 18:01:26 +02:00 committed by GitHub
parent f07c421264
commit 73808bf29b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 6 deletions

View File

@ -15,7 +15,7 @@ import json
import uuid
from datetime import datetime, timedelta
from functools import partial
from typing import Callable
from typing import Callable, cast
import airflow
from airflow import DAG
@ -30,9 +30,12 @@ from metadata.generated.schema.entity.services.metadataService import MetadataSe
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.fqn import split
try:
from airflow.operators.python import PythonOperator
@ -151,6 +154,24 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
service: StorageService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "testSuite":
entity_class = DatabaseService
ingestion_pipeline.sourceConfig.config = cast(
TestSuitePipeline, ingestion_pipeline.sourceConfig.config
)
split_fqn = split(
ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__
)
try:
service_fqn = split_fqn[0]
except IndexError:
raise ParsingConfigurationError(
"Invalid fully qualified name "
f"{ingestion_pipeline.sourceConfig.config.entityFullyQualifiedName.__root__}"
)
service: DatabaseService = metadata.get_by_name(
entity=entity_class, fqn=service_fqn
)
else:
raise InvalidServiceException(f"Invalid Service Type: {service_type}")
except ValidationError as original_error:

View File

@ -322,7 +322,7 @@ class OMetaServiceTest(TestCase):
sourceConfig=SourceConfig(
config=TestSuitePipeline(
type="TestSuite",
entityFullyQualifiedName="service.database.schema.table",
entityFullyQualifiedName=self.service.name.__root__,
)
),
openMetadataServerConnection=self.server_config,
@ -330,9 +330,9 @@ class OMetaServiceTest(TestCase):
startDate="2022-06-10T15:06:47+00:00",
),
service=EntityReference(
id=self.service.id,
type="databaseService",
name=self.service.name.__root__,
id=uuid.uuid4(),
type="testSuite",
name="test_test_suite_workflow",
),
)
@ -340,3 +340,4 @@ class OMetaServiceTest(TestCase):
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
parse_workflow_config_gracefully(config)
assert workflow_config.source.type == "mysql"