DQ base workflow (#13454)

* DQ BaseWorkflow

* Test suite runner

* test Suite workflow

* Refactor DQ for BaseWorkflow

* Lint

* Fix source

* Fix source

* Fix source

* Fix source

* Fix test

* Fix test

* Fix test
This commit is contained in:
Pere Miquel Brull 2023-10-06 18:29:18 +02:00 committed by GitHub
parent c0ababd8ad
commit aed9e3875f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 960 additions and 813 deletions

View File

@ -15,13 +15,13 @@ import os
import yaml
from metadata.data_quality.api.workflow import TestSuiteWorkflow
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.utils.logger import set_loggers_level
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow

View File

@ -17,12 +17,12 @@ import sys
import traceback
from metadata.config.common import load_config_file
from metadata.data_quality.api.workflow import TestSuiteWorkflow
from metadata.utils.logger import cli_logger
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_test_suite_status,
print_status,
)
logger = cli_logger()
@ -48,5 +48,5 @@ def run_test(config_path: str) -> None:
workflow.execute()
workflow.stop()
print_test_suite_status(workflow)
print_status(workflow)
workflow.raise_from_status()

View File

@ -18,8 +18,13 @@ multiple test cases per workflow.
from typing import List, Optional
from pydantic import BaseModel, Field
from metadata.config.common import ConfigModel
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
class TestCaseDefinition(ConfigModel):
@ -38,3 +43,27 @@ class TestSuiteProcessorConfig(ConfigModel):
testCases: Optional[List[TestCaseDefinition]] = None
forceUpdate: Optional[bool] = False
class TestCaseResultResponse(BaseModel):
testCaseResult: TestCaseResult
testCase: TestCase
class TableAndTests(BaseModel):
"""Source response bringing together the table and test cases"""
table: Table = Field(None, description="Table being processed by the DQ workflow")
service_type: str = Field(..., description="Service type the table belongs to")
test_cases: Optional[List[TestCase]] = Field(
None, description="Test Cases already existing in the Test Suite, if any"
)
executable_test_suite: Optional[CreateTestSuiteRequest] = Field(
None, description="If no executable test suite is found, we'll create one"
)
class TestCaseResults(BaseModel):
"""Processor response with a list of computed Test Case Results"""
test_results: Optional[List[TestCaseResultResponse]]

View File

@ -1,511 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Workflow definition for the test suite
"""
from __future__ import annotations
import traceback
from copy import deepcopy
from logging import Logger
from typing import List, Optional, cast
from pydantic import BaseModel, ValidationError
from metadata.config.common import WorkflowExecutionError
from metadata.data_quality.api.models import (
TestCaseDefinition,
TestSuiteProcessorConfig,
)
from metadata.data_quality.source.test_suite_source_factory import (
test_suite_source_factory,
)
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName
from metadata.ingestion.api.models import StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.utils import entity_link
from metadata.utils.fqn import split
from metadata.utils.importer import get_sink
from metadata.utils.logger import test_suite_logger
from metadata.workflow.workflow_output_handler import print_test_suite_status
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger: Logger = test_suite_logger()
class TestCaseToCreate(BaseModel):
"""Test case to create"""
test_suite_name: str
test_case_name: str
entity_link: str
def __hash__(self):
"""make this base model hashable on unique_name"""
return hash(f"{self.test_suite_name}.{self.test_case_name}")
def __str__(self) -> str:
"""make this base model printable"""
return f"{self.test_suite_name}.{self.test_case_name}"
class TestSuiteWorkflow(WorkflowStatusMixin):
"""workflow to run the test suite"""
def __init__(self, config: OpenMetadataWorkflowConfig):
"""
Instantiate test suite workflow class
Args:
config: OM workflow configuration object
Attributes:
config: OM workflow configuration object
"""
self.config = config
self.metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
self.metadata = create_ometa_client(self.metadata_config)
self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config
self.service: DatabaseService = self._retrieve_service()
self._retrieve_service_connection()
self.processor_config: TestSuiteProcessorConfig = (
TestSuiteProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
)
self.set_ingestion_pipeline_status(state=PipelineState.running)
self.status = ProcessorStatus()
self.table_entity: Optional[Table] = self._get_table_entity(
self.source_config.entityFullyQualifiedName.__root__
)
if self.config.sink:
self.sink = get_sink(
sink_type=self.config.sink.type,
sink_config=self.config.sink,
metadata_config=self.metadata_config,
from_="data_quality",
)
@classmethod
def create(cls, config_dict) -> TestSuiteWorkflow:
"""
Instantiate a TestSuiteWorkflow object form a yaml or json config file
Args:
config_dict: json or yaml configuration file
Returns:
a test suite workflow
"""
try:
config = parse_workflow_config_gracefully(config_dict)
return cls(config)
except ValidationError as err:
logger.error(
f"Error trying to parse the Profiler Workflow configuration: {err}"
)
raise err
def _retrieve_service(self) -> DatabaseService:
"""Get service object from source config `entityFullyQualifiedName`"""
fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__
try:
service_name = split(fully_qualified_name)[0]
except IndexError as exc:
logger.debug(traceback.format_exc())
raise IndexError(
f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}"
)
try:
service = self.metadata.get_by_name(DatabaseService, service_name)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
except ConnectionError as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
return service
def _get_table_entity(self, entity_fqn: str) -> Optional[Table]:
"""given an entity fqn return the table entity
Args:
entity_fqn: entity fqn for the test case
"""
return self.metadata.get_by_name(
entity=Table,
fqn=entity_fqn,
fields=["tableProfilerConfig", "testSuite"],
)
def create_or_return_test_suite_entity(self) -> Optional[TestSuite]:
"""
try to get test suite name from source.servicName.
In the UI workflow we'll write the entity name (i.e. the test suite)
to source.serviceName.
"""
self.table_entity = cast(Table, self.table_entity) # satisfy type checker
test_suite = self.table_entity.testSuite
if test_suite and not test_suite.executable:
logger.debug(
f"Test suite {test_suite.fullyQualifiedName.__root__} is not executable."
)
return None
if self.processor_config.testCases and not test_suite:
# This should cover scenarios where we are running the tests from the CLI workflow
# and no corresponding tests suite exist in the platform. We, therefore, will need
# to create the test suite first.
logger.debug(
"Test suite name not found in the platform. Creating the test suite from processor config."
)
test_suite = self.metadata.create_or_update_executable_test_suite(
CreateTestSuiteRequest(
name=f"{self.source_config.entityFullyQualifiedName.__root__}.TestSuite",
displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite",
description="Test Suite created from YAML processor config file",
owner=None,
executableEntityReference=self.source_config.entityFullyQualifiedName.__root__,
)
)
return test_suite
def get_test_cases_from_test_suite(
self, test_suite: TestSuite
) -> Optional[List[TestCase]]:
"""
Get test cases from test suite name
Args:
test_suite_name: the name of the test suite
"""
test_cases = self.metadata.list_entities(
entity=TestCase,
fields=["testSuite", "entityLink", "testDefinition"],
params={"testSuiteId": test_suite.id.__root__},
).entities
test_cases = cast(List[TestCase], test_cases) # satisfy type checker
if self.processor_config.testCases is not None:
cli_test_cases = self.get_test_case_from_cli_config() # type: ignore
cli_test_cases = cast(
List[TestCaseDefinition], cli_test_cases
) # satisfy type checker
test_cases = self.compare_and_create_test_cases(
cli_test_cases, test_cases, test_suite
)
return test_cases
def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]:
"""
Filter test cases for OM test cases only. This will prevent us from running non OM test cases
Args:
test_cases: list of test cases
"""
om_test_cases: List[TestCase] = []
for test_case in test_cases:
test_definition: TestDefinition = self.metadata.get_by_id(
TestDefinition, test_case.testDefinition.id
)
if TestPlatform.OpenMetadata not in test_definition.testPlatforms:
logger.debug(
f"Test case {test_case.name.__root__} is not an OpenMetadata test case."
)
continue
om_test_cases.append(test_case)
return om_test_cases
def get_test_case_from_cli_config(
self,
) -> Optional[List[TestCaseDefinition]]:
"""Get all the test cases names defined in the CLI config file"""
if self.processor_config.testCases is not None:
return list(self.processor_config.testCases)
return None
def _update_test_cases(
self, test_cases_to_update: List[TestCaseDefinition], test_cases: List[TestCase]
):
"""Given a list of CLI test definition patch test cases in the platform
Args:
test_cases_to_update (List[TestCaseDefinition]): list of test case definitions
"""
test_cases_to_update_names = {
test_case_to_update.name for test_case_to_update in test_cases_to_update
}
for indx, test_case in enumerate(deepcopy(test_cases)):
if test_case.name.__root__ in test_cases_to_update_names:
test_case_definition = next(
test_case_to_update
for test_case_to_update in test_cases_to_update
if test_case_to_update.name == test_case.name.__root__
)
updated_test_case = self.metadata.patch_test_case_definition(
source=test_case,
entity_link=entity_link.get_entity_link(
self.source_config.entityFullyQualifiedName.__root__,
test_case_definition.columnName,
),
test_case_parameter_values=test_case_definition.parameterValues,
)
if updated_test_case:
test_cases.pop(indx)
test_cases.append(updated_test_case)
return test_cases
def compare_and_create_test_cases(
self,
cli_test_cases_definitions: Optional[List[TestCaseDefinition]],
test_cases: List[TestCase],
test_suite: TestSuite,
) -> Optional[List[TestCase]]:
"""
compare test cases defined in CLI config workflow with test cases
defined on the server
Args:
cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite
test_cases: list of test cases entities fetch from the server using test suite names in the config file
"""
if not cli_test_cases_definitions:
return test_cases
test_cases = deepcopy(test_cases)
test_case_names = {test_case.name.__root__ for test_case in test_cases}
# we'll check the test cases defined in the CLI config file and not present in the platform
test_cases_to_create = [
cli_test_case_definition
for cli_test_case_definition in cli_test_cases_definitions
if cli_test_case_definition.name not in test_case_names
]
if self.processor_config and self.processor_config.forceUpdate:
test_cases_to_update = [
cli_test_case_definition
for cli_test_case_definition in cli_test_cases_definitions
if cli_test_case_definition.name in test_case_names
]
test_cases = self._update_test_cases(test_cases_to_update, test_cases)
if not test_cases_to_create:
return test_cases
for test_case_to_create in test_cases_to_create:
logger.debug(f"Creating test case with name {test_case_to_create.name}")
try:
test_case = self.metadata.create_or_update(
CreateTestCaseRequest(
name=test_case_to_create.name,
description=test_case_to_create.description,
displayName=test_case_to_create.displayName,
testDefinition=FullyQualifiedEntityName(
__root__=test_case_to_create.testDefinitionName
),
entityLink=EntityLink(
__root__=entity_link.get_entity_link(
self.source_config.entityFullyQualifiedName.__root__,
test_case_to_create.columnName,
)
),
testSuite=test_suite.fullyQualifiedName.__root__,
parameterValues=list(test_case_to_create.parameterValues)
if test_case_to_create.parameterValues
else None,
owner=None,
)
)
test_cases.append(test_case)
except Exception as exc:
error = (
f"Couldn't create test case name {test_case_to_create.name}: {exc}"
)
logger.error(error)
logger.debug(traceback.format_exc())
self.status.failed(
StackTraceError(
name=self.source_config.entityFullyQualifiedName.__root__,
error=error,
stack_trace=traceback.format_exc(),
)
)
return test_cases
def run_test_suite(self):
"""Main logic to run the tests"""
if not self.table_entity:
logger.debug(traceback.format_exc())
raise ValueError(
f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}. "
"Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid."
)
test_suite = self.create_or_return_test_suite_entity()
if not test_suite:
logger.debug(
f"No test suite found for table {self.source_config.entityFullyQualifiedName.__root__} "
"or test suite is not executable."
)
return
test_cases = self.get_test_cases_from_test_suite(test_suite)
if not test_cases:
logger.debug(
f"No test cases found for table {self.source_config.entityFullyQualifiedName.__root__}"
f"and test suite {test_suite.fullyQualifiedName.__root__}"
)
return
openmetadata_test_cases = self.filter_for_om_test_cases(test_cases)
test_suite_runner = test_suite_source_factory.create(
self.service.serviceType.value.lower(),
self.config,
self.metadata,
self.table_entity,
).get_data_quality_runner()
for test_case in openmetadata_test_cases:
try:
test_result = test_suite_runner.run_and_handle(test_case)
if not test_result:
continue
if hasattr(self, "sink"):
self.sink.write_record(test_result)
logger.debug(f"Successfully ran test case {test_case.name.__root__}")
self.status.processed(test_case.fullyQualifiedName.__root__)
except Exception as exc:
error = f"Could not run test case {test_case.name.__root__}: {exc}"
logger.debug(traceback.format_exc())
logger.error(error)
self.status.failed(
StackTraceError(
name=test_case.name.__root__,
error=error,
stack_trace=traceback.format_exc(),
)
)
def _retrieve_service_connection(self) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
"""
if (
not self.config.source.serviceConnection
and not self.metadata.config.forceEntityOverwriting
):
self.config.source.serviceConnection = ServiceConnection(
__root__=self.service.connection
)
def execute(self):
"""Execute test suite workflow"""
try:
self.run_test_suite()
# At the end of the `execute`, update the associated Ingestion Pipeline status as success
self.set_ingestion_pipeline_status(PipelineState.success)
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
logger.debug(traceback.format_exc())
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def print_status(self) -> None:
"""
Print the workflow results with click
"""
print_test_suite_status(self)
def result_status(self) -> int:
"""
Returns 1 if status is failed, 0 otherwise.
"""
if self.status.failures or (
hasattr(self, "sink") and self.sink.get_status().failures
):
return 1
return 0
def _raise_from_status_internal(self, raise_warnings=False):
"""
Check source, processor and sink status and raise if needed
Our profiler source will never log any failure, only filters,
as we are just picking up data from OM.
"""
if self.status.failures:
raise WorkflowExecutionError("Processor reported errors", self.status)
if hasattr(self, "sink") and self.sink.get_status().failures:
raise WorkflowExecutionError("Sink reported errors", self.sink.get_status())
if raise_warnings:
if self.status.warnings:
raise WorkflowExecutionError("Processor reported warnings", self.status)
if hasattr(self, "sink") and self.sink.get_status().warnings:
raise WorkflowExecutionError(
"Sink reported warnings", self.sink.get_status()
)
def stop(self):
"""
Close all connections
"""
self.metadata.close()

View File

@ -0,0 +1,307 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This Processor is in charge of executing the test cases
"""
import traceback
from copy import deepcopy
from typing import List, Optional, cast
from metadata.data_quality.api.models import (
TableAndTests,
TestCaseDefinition,
TestCaseResultResponse,
TestCaseResults,
TestSuiteProcessorConfig,
)
from metadata.data_quality.runner.core import DataTestsRunner
from metadata.data_quality.runner.test_suite_source_factory import (
test_suite_source_factory,
)
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Processor
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import entity_link, fqn
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
class TestCaseRunner(Processor):
"""Execute the test suite tests and create test cases from the YAML config"""
def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata):
super().__init__()
self.config = config
self.metadata = metadata
self.processor_config: TestSuiteProcessorConfig = (
TestSuiteProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
)
def _run(self, record: TableAndTests) -> Either:
# First, create the executable test suite if it does not exist yet
# This could happen if the process is executed from YAML and not the UI
if record.executable_test_suite:
# We pass the test suite request to the sink
return Either(right=record.executable_test_suite)
# Add the test cases from the YAML file, if any
test_cases = self.get_test_cases(
test_cases=record.test_cases,
test_suite_fqn=fqn.build(
None,
TestSuite,
table_fqn=record.table.fullyQualifiedName.__root__,
),
table_fqn=record.table.fullyQualifiedName.__root__,
)
if not test_cases:
return Either(
left=StackTraceError(
name="No test Cases",
error=f"No tests cases found for table {record.table.fullyQualifiedName.__root__}",
)
)
openmetadata_test_cases = self.filter_for_om_test_cases(test_cases)
test_suite_runner = test_suite_source_factory.create(
record.service_type.lower(),
self.config,
self.metadata,
record.table,
).get_data_quality_runner()
test_results = [
test_case_result
for test_case in openmetadata_test_cases
if (test_case_result := self._run_test_case(test_case, test_suite_runner))
]
return Either(right=TestCaseResults(test_results=test_results))
def get_test_cases(
self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str
) -> List[TestCase]:
"""
Based on the test suite test cases that we already know, pick up
the rest from the YAML config, compare and create the new ones
"""
if self.processor_config.testCases is not None:
cli_test_cases = self.get_test_case_from_cli_config() # type: ignore
cli_test_cases = cast(
List[TestCaseDefinition], cli_test_cases
) # satisfy type checker
return self.compare_and_create_test_cases(
cli_test_cases_definitions=cli_test_cases,
test_cases=test_cases,
test_suite_fqn=test_suite_fqn,
table_fqn=table_fqn,
)
return test_cases
def get_test_case_from_cli_config(
self,
) -> Optional[List[TestCaseDefinition]]:
"""Get all the test cases names defined in the CLI config file"""
if self.processor_config.testCases is not None:
return list(self.processor_config.testCases)
return None
def compare_and_create_test_cases(
self,
cli_test_cases_definitions: Optional[List[TestCaseDefinition]],
test_cases: List[TestCase],
table_fqn: str,
test_suite_fqn: str,
) -> List[TestCase]:
"""
compare test cases defined in CLI config workflow with test cases
defined on the server
Args:
cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite
test_cases: list of test cases entities fetch from the server using test suite names in the config file
table_fqn: table being tested
test_suite_fqn: FQN of the table + .testSuite
"""
if not cli_test_cases_definitions:
return test_cases
test_cases = deepcopy(test_cases) or []
test_case_names = (
{test_case.name.__root__ for test_case in test_cases}
if test_cases
else set()
)
# we'll check the test cases defined in the CLI config file and not present in the platform
test_cases_to_create = [
cli_test_case_definition
for cli_test_case_definition in cli_test_cases_definitions
if cli_test_case_definition.name not in test_case_names
]
if self.processor_config and self.processor_config.forceUpdate:
test_cases_to_update = [
cli_test_case_definition
for cli_test_case_definition in cli_test_cases_definitions
if cli_test_case_definition.name in test_case_names
]
test_cases = self._update_test_cases(
test_cases_to_update, test_cases, table_fqn
)
if not test_cases_to_create:
return test_cases
for test_case_to_create in test_cases_to_create:
logger.debug(f"Creating test case with name {test_case_to_create.name}")
try:
test_case = self.metadata.create_or_update(
CreateTestCaseRequest(
name=test_case_to_create.name,
description=test_case_to_create.description,
displayName=test_case_to_create.displayName,
testDefinition=FullyQualifiedEntityName(
__root__=test_case_to_create.testDefinitionName
),
entityLink=EntityLink(
__root__=entity_link.get_entity_link(
table_fqn,
test_case_to_create.columnName,
)
),
testSuite=test_suite_fqn,
parameterValues=list(test_case_to_create.parameterValues)
if test_case_to_create.parameterValues
else None,
owner=None,
)
)
test_cases.append(test_case)
except Exception as exc:
error = (
f"Couldn't create test case name {test_case_to_create.name}: {exc}"
)
logger.error(error)
logger.debug(traceback.format_exc())
self.status.failed(
StackTraceError(
name=table_fqn,
error=error,
stack_trace=traceback.format_exc(),
)
)
return test_cases
def _update_test_cases(
self,
test_cases_to_update: List[TestCaseDefinition],
test_cases: List[TestCase],
table_fqn: str,
):
"""Given a list of CLI test definition patch test cases in the platform
Args:
test_cases_to_update (List[TestCaseDefinition]): list of test case definitions
"""
test_cases_to_update_names = {
test_case_to_update.name for test_case_to_update in test_cases_to_update
}
for indx, test_case in enumerate(deepcopy(test_cases)):
if test_case.name.__root__ in test_cases_to_update_names:
test_case_definition = next(
test_case_to_update
for test_case_to_update in test_cases_to_update
if test_case_to_update.name == test_case.name.__root__
)
updated_test_case = self.metadata.patch_test_case_definition(
source=test_case,
entity_link=entity_link.get_entity_link(
table_fqn,
test_case_definition.columnName,
),
test_case_parameter_values=test_case_definition.parameterValues,
)
if updated_test_case:
test_cases.pop(indx)
test_cases.append(updated_test_case)
return test_cases
def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]:
"""
Filter test cases for OM test cases only. This will prevent us from running non OM test cases
Args:
test_cases: list of test cases
"""
om_test_cases: List[TestCase] = []
for test_case in test_cases:
test_definition: TestDefinition = self.metadata.get_by_id(
TestDefinition, test_case.testDefinition.id
)
if TestPlatform.OpenMetadata not in test_definition.testPlatforms:
logger.debug(
f"Test case {test_case.name.__root__} is not an OpenMetadata test case."
)
continue
om_test_cases.append(test_case)
return om_test_cases
def _run_test_case(
self, test_case: TestCase, test_suite_runner: DataTestsRunner
) -> Optional[TestCaseResultResponse]:
"""Execute the test case and return the result, if any"""
try:
test_result = test_suite_runner.run_and_handle(test_case)
self.status.scanned(test_case.fullyQualifiedName.__root__)
return test_result
except Exception as exc:
error = f"Could not run test case {test_case.name.__root__}: {exc}"
logger.debug(traceback.format_exc())
logger.error(error)
self.status.failed(
StackTraceError(
name=test_case.name.__root__,
error=error,
stack_trace=traceback.format_exc(),
)
)
return None
@classmethod
def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step":
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config, metadata=metadata)
def close(self) -> None:
"""Nothing to close"""

View File

@ -36,7 +36,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,)
class BaseTestSuiteSource:
class BaseTestSuiteRunner:
"""Base class for the data quality runner"""
def __init__(

View File

@ -14,8 +14,8 @@ Main class to run data tests
"""
from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.data_quality.runner.models import TestCaseResultResponse
from metadata.generated.schema.tests.testCase import TestCase
from metadata.utils.logger import test_suite_logger
@ -26,15 +26,15 @@ class DataTestsRunner:
"""class to execute the test validation"""
def __init__(self, test_runner_interface: TestSuiteInterface):
self.test_runner_interace = test_runner_interface
self.test_runner_interface = test_runner_interface
def run_and_handle(self, test_case: TestCase):
"""run and handle test case validation"""
logger.info(
f"Executing test case {test_case.name.__root__} "
f"for entity {self.test_runner_interace.table_entity.fullyQualifiedName.__root__}"
f"for entity {self.test_runner_interface.table_entity.fullyQualifiedName.__root__}"
)
test_result = self.test_runner_interace.run_test_case(
test_result = self.test_runner_interface.run_test_case(
test_case,
)

View File

@ -1,24 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test case result response object
"""
from pydantic import BaseModel
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
class TestCaseResultResponse(BaseModel):
testCaseResult: TestCaseResult
testCase: TestCase

View File

@ -13,20 +13,20 @@
Factory class for creating test suite source objects
"""
from metadata.data_quality.source.base_test_suite_source import BaseTestSuiteSource
from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner
class TestSuiteSourceFactory:
class TestSuiteRunnerFactory:
"""Creational factory for test suite source objects"""
def __init__(self):
self._source_type = {"base": BaseTestSuiteSource}
self._source_type = {"base": BaseTestSuiteRunner}
def register_source(self, source_type: str, source_class):
"""Register a new source type"""
self._source_type[source_type] = source_class
def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteSource:
def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteRunner:
"""Create source object based on source type"""
source_class = self._source_type.get(source_type)
if not source_class:
@ -35,4 +35,4 @@ class TestSuiteSourceFactory:
return source_class(*args, **kwargs)
test_suite_source_factory = TestSuiteSourceFactory()
test_suite_source_factory = TestSuiteRunnerFactory()

View File

@ -1,90 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata REST Sink implementation for the ORM Profiler results
"""
import traceback
from typing import Optional
from metadata.config.common import ConfigModel
from metadata.data_quality.runner.models import TestCaseResultResponse
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import StackTraceError
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
class MetadataRestSinkConfig(ConfigModel):
api_endpoint: Optional[str] = None
class MetadataRestSink(Sink[Entity]):
"""
Metadata Sink sending the test suite
to the OM API
"""
config: MetadataRestSinkConfig
def __init__(
self,
config: MetadataRestSinkConfig,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.wrote_something = False
self.metadata = OpenMetadata(self.metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = MetadataRestSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def close(self) -> None:
self.metadata.close()
def write_record(self, record: TestCaseResultResponse) -> None:
try:
self.metadata.add_test_case_results(
test_results=record.testCaseResult,
test_case_fqn=record.testCase.fullyQualifiedName.__root__,
)
logger.info(
f"Successfully ingested test case results for test case {record.testCase.name.__root__}"
)
self.status.records_written(
f"Test Case: {record.testCase.fullyQualifiedName.__root__}"
)
except APIError as err:
name = record.testCase.fullyQualifiedName.__root__
error = f"Failed to sink test case results for {name}: {err}"
logger.error(error)
logger.debug(traceback.format_exc())
self.status.failed(
StackTraceError(
name=name,
error=error,
stack_trace=traceback.format_exc(),
)
)

View File

@ -0,0 +1,218 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Suite Workflow Source
The main goal is to get the configured table from the API.
"""
import traceback
from typing import Iterable, List, Optional, cast
from metadata.data_quality.api.models import TableAndTests
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.fqn import split
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
class TestSuiteSource(Source):
"""
Gets the ingredients required to run the tests
"""
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
):
super().__init__()
self.config = config
self.metadata = metadata
self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config
self.service: DatabaseService = self._retrieve_service()
self._retrieve_service_connection()
self.test_connection()
def _retrieve_service(self) -> DatabaseService:
"""Get service object from source config `entityFullyQualifiedName`"""
fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__
try:
service_name = split(fully_qualified_name)[0]
except IndexError as exc:
logger.debug(traceback.format_exc())
raise IndexError(
f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}"
)
try:
service = self.metadata.get_by_name(DatabaseService, service_name)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
return service
except ConnectionError as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
raise exc
def _retrieve_service_connection(self) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
"""
if (
not self.config.source.serviceConnection
and not self.metadata.config.forceEntityOverwriting
):
self.config.source.serviceConnection = ServiceConnection(
__root__=self.service.connection
)
def _get_table_entity(self) -> Optional[Table]:
"""given an entity fqn return the table entity
Args:
entity_fqn: entity fqn for the test case
"""
table: Table = self.metadata.get_by_name(
entity=Table,
fqn=self.source_config.entityFullyQualifiedName.__root__,
fields=["tableProfilerConfig", "testSuite"],
)
return table
def _get_test_cases_from_test_suite(
self, test_suite: Optional[TestSuite]
) -> Optional[List[TestCase]]:
"""Return test cases if the test suite exists and has them"""
if test_suite:
test_cases = self.metadata.list_entities(
entity=TestCase,
fields=["testSuite", "entityLink", "testDefinition"],
params={"testSuiteId": test_suite.id.__root__},
).entities
test_cases = cast(List[TestCase], test_cases) # satisfy type checker
return test_cases
return None
def prepare(self):
"""Nothing to prepare"""
def test_connection(self) -> None:
self.metadata.health_check()
def _iter(self) -> Iterable[Either[TableAndTests]]:
table: Table = self._get_table_entity()
if table:
yield from self._process_table_suite(table)
else:
yield Either(
left=StackTraceError(
name="Missing Table",
error=f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}."
" Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid.",
)
)
def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]:
"""
Check that the table has the proper test suite built in
"""
# If there is no executable test suite yet for the table, we'll need to create one
executable_test_suite = None
if not table.testSuite:
executable_test_suite = CreateTestSuiteRequest(
name=fqn.build(
None,
TestSuite,
table_fqn=self.source_config.entityFullyQualifiedName.__root__,
),
displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite",
description="Test Suite created from YAML processor config file",
owner=None,
executableEntityReference=self.source_config.entityFullyQualifiedName.__root__,
)
yield Either(
right=TableAndTests(
executable_test_suite=executable_test_suite,
service_type=self.service.serviceType.value,
)
)
if table.testSuite and not table.testSuite.executable:
yield Either(
left=StackTraceError(
name="Non-executable Test Suite",
error=f"The table {self.source_config.entityFullyQualifiedName.__root__} "
"has a test suite that is not executable.",
)
)
else:
test_suite_cases = self._get_test_cases_from_test_suite(table.testSuite)
yield Either(
right=TableAndTests(
table=table,
test_cases=test_suite_cases,
service_type=self.service.serviceType.value,
)
)
@classmethod
def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step":
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config, metadata=metadata)
def close(self) -> None:
"""Nothing to close"""

View File

@ -37,9 +37,13 @@ class Status(BaseModel):
def scanned(self, record: Any) -> None:
"""
Clean up the status results we want to show
Clean up the status results we want to show.
We allow to not consider specific records that
are not worth keeping record of.
"""
self.records.append(get_log_name(record))
if log_name := get_log_name(record):
self.records.append(log_name)
def warning(self, key: str, reason: str) -> None:
self.warnings.append({key: reason})

View File

@ -14,7 +14,6 @@ Abstract definition of each step
from abc import ABC, abstractmethod
from typing import Any
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.api.step import BulkStep, IterStep, ReturnStep, StageStep
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
@ -51,23 +50,10 @@ class Source(IterStep, ABC):
class Sink(ReturnStep, ABC):
"""All Sinks must inherit this base class."""
@abstractmethod
def _run(self, record: Entity) -> Either:
"""
Send the data somewhere, e.g., the OM API
"""
class Processor(ReturnStep, ABC):
"""All Processor must inherit this base class"""
@abstractmethod
def _run(self, record: Entity) -> Either:
"""
Post process a given entity and return it
or a new one
"""
class Stage(StageStep, ABC):
"""All Stages must inherit this base class."""

View File

@ -9,9 +9,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pydantic classes overwritten defaults ones of code generation
"""
Pydantic classes overwritten defaults ones of code generation.
This classes are used in the generated module, which should have NO
dependencies against any other metadata package. This class should
be self-sufficient with only pydantic at import time.
"""
import logging
import warnings
from typing import Any, Dict
@ -19,9 +23,7 @@ from pydantic.types import OptionalInt, SecretStr
from pydantic.utils import update_not_none
from pydantic.validators import constr_length_validator, str_validator
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
logger = logging.getLogger("metadata")
class CustomSecretStr(SecretStr):
@ -101,6 +103,6 @@ class CustomSecretStr(SecretStr):
)
except Exception as exc:
logger.error(
f"Secret value [{secret_id}] not present in the configured secrets manages: {exc}"
f"Secret value [{secret_id}] not present in the configured secrets manager: {exc}"
)
return self._secret_value

View File

@ -22,6 +22,7 @@ from requests.exceptions import HTTPError
from metadata.config.common import ConfigModel
from metadata.data_insight.source.metadata import DataInsightRecord
from metadata.data_quality.api.models import TestCaseResultResponse, TestCaseResults
from metadata.generated.schema.analytics.reportData import ReportData
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
@ -30,6 +31,7 @@ from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.api.tests.createLogicalTestCases import (
CreateLogicalTestCases,
)
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.dashboard import Dashboard
@ -369,6 +371,18 @@ class MetadataRestSink(Sink):
)
return Either(right=record.test_case_results)
@_run_dispatch.register
def write_test_case_results(self, record: TestCaseResultResponse):
"""Write the test case result"""
res = self.metadata.add_test_case_results(
test_results=record.testCaseResult,
test_case_fqn=record.testCase.fullyQualifiedName.__root__,
)
logger.debug(
f"Successfully ingested test case results for test case {record.testCase.name.__root__}"
)
return Either(right=res)
@_run_dispatch.register
def write_data_insight_sample(
self, record: OMetaDataInsightSample
@ -379,7 +393,7 @@ class MetadataRestSink(Sink):
self.metadata.add_data_insight_report_data(
record.record,
)
return Either(left=None, right=record.record)
return Either(right=record.record)
@_run_dispatch.register
def write_data_insight(self, record: DataInsightRecord) -> Either[ReportData]:
@ -492,6 +506,29 @@ class MetadataRestSink(Sink):
return Either(right=table)
@_run_dispatch.register
def write_executable_test_suite(
self, record: CreateTestSuiteRequest
) -> Either[TestSuite]:
"""
From the test suite workflow we might need to create executable test suites
"""
test_suite = self.metadata.create_or_update_executable_test_suite(record)
return Either(right=test_suite)
@_run_dispatch.register
def write_test_case_result_list(self, record: TestCaseResults):
"""Record the list of test case result responses"""
for result in record.test_results or []:
self.metadata.add_test_case_results(
test_results=result.testCaseResult,
test_case_fqn=result.testCase.fullyQualifiedName.__root__,
)
self.status.scanned(result)
return Either(right=record)
def close(self):
"""
We don't have anything to close since we are using the given metadata client

View File

@ -41,6 +41,7 @@ from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.dispatch import class_register
from metadata.utils.elasticsearch import get_entity_from_es_result
@ -81,12 +82,15 @@ def split(s: str) -> List[str]:
return splitter.split()
def _build(*args) -> str:
def _build(*args, quote: bool = True) -> str:
"""
Equivalent of Java's FullyQualifiedName#build
"""
quoted = [quote_name(name) for name in args]
return FQN_SEPARATOR.join(quoted)
if quote:
quoted = [quote_name(name) for name in args]
return FQN_SEPARATOR.join(quoted)
return FQN_SEPARATOR.join(args)
def unquote_name(name: str) -> str:
@ -255,6 +259,16 @@ def _(
return _build(service_name, mlmodel_name)
@fqn_build_registry.add(TestSuite)
def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str:
"""
We don't need to quote since this comes from a table FQN.
We're replicating the backend logic of the FQN generation in the TestSuiteRepository
for executable test suites.
"""
return _build(table_fqn, "testSuite", quote=False)
@fqn_build_registry.add(Topic)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building

View File

@ -18,6 +18,11 @@ from functools import singledispatch
from types import DynamicClassAttribute
from typing import Optional, Union
from metadata.data_quality.api.models import (
TableAndTests,
TestCaseResultResponse,
TestCaseResults,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.ingestion.api.models import Entity
from metadata.ingestion.models.delete_entity import DeleteEntity
@ -171,7 +176,7 @@ def log_ansi_encoded_string(
@singledispatch
def get_log_name(record: Entity) -> str:
def get_log_name(record: Entity) -> Optional[str]:
try:
return f"{type(record).__name__} [{record.name.__root__}]"
except Exception:
@ -225,3 +230,22 @@ def _(record: OMetaLifeCycleData) -> str:
Capture the lifecycle changes of an Entity
"""
return f"{type(record.entity).__name__} Lifecycle [{record.entity.name.__root__}]"
@get_log_name.register
def _(record: TableAndTests) -> str:
if record.table:
return f"Tests for [{record.table.fullyQualifiedName.__root__}]"
return f"Test Suite [{record.executable_test_suite.name.__root__}]"
@get_log_name.register
def _(_: TestCaseResults) -> Optional[str]:
"""We don't want to log this in the status"""
return None
@get_log_name.register
def _(record: TestCaseResultResponse) -> str:
return record.testCase.fullyQualifiedName.__root__

View File

@ -0,0 +1,50 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Workflow definition for the Data Quality
"""
from metadata.data_quality.processor.test_case_runner import TestCaseRunner
from metadata.data_quality.source.test_suite import TestSuiteSource
from metadata.ingestion.api.steps import Processor, Sink
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import test_suite_logger
from metadata.workflow.base import BaseWorkflow
logger = test_suite_logger()
class TestSuiteWorkflow(BaseWorkflow):
"""
DAta Quality ingestion workflow implementation
We check the source connection test when initializing
this workflow. No need to do anything here if this does not pass
"""
def set_steps(self):
self.source = TestSuiteSource.create(self.config.dict(), self.metadata)
test_runner_processor = self._get_test_runner_processor()
sink = self._get_sink()
self.steps = (test_runner_processor, sink)
def _get_sink(self) -> Sink:
sink_type = self.config.sink.type
sink_class = import_sink_class(sink_type=sink_type)
sink_config = self.config.sink.dict().get("config", {})
sink: Sink = sink_class.create(sink_config, self.metadata)
logger.debug(f"Sink type:{self.config.sink.type}, {sink_class} configured")
return sink
def _get_test_runner_processor(self) -> Processor:
return TestCaseRunner.create(self.config.dict(), self.metadata)

View File

@ -20,7 +20,6 @@ from datetime import datetime, timedelta
import sqlalchemy as sqa
from sqlalchemy.orm import Session, declarative_base
from metadata.data_quality.api.workflow import TestSuiteWorkflow
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
@ -46,11 +45,12 @@ from metadata.generated.schema.entity.services.databaseService import (
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
test_suite_config = {
"source": {
"type": "custom-database",
"serviceName": "MyRabdomWorkflow",
"serviceName": "test_suite_service_test",
"sourceConfig": {
"config": {
"type": "TestSuite",
@ -146,7 +146,7 @@ class TestE2EWorkflow(unittest.TestCase):
)
)
table = cls.metadata.create_or_update(
cls.metadata.create_or_update(
CreateTableRequest(
name="users",
columns=[
@ -214,6 +214,7 @@ class TestE2EWorkflow(unittest.TestCase):
"""test cli workflow e2e"""
workflow = TestSuiteWorkflow.create(test_suite_config)
workflow.execute()
workflow.raise_from_status()
test_case_1 = self.metadata.get_by_name(
entity=TestCase,

View File

@ -14,44 +14,67 @@ Validate workflow configs and filters
"""
import unittest
from collections.abc import MutableSequence
from copy import deepcopy
import uuid
from typing import List
from metadata.data_quality.api.workflow import TestSuiteWorkflow
from metadata.generated.schema.entity.data.table import Table
from metadata.data_quality.api.models import TableAndTests
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.ingestion.api.models import Either
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
sqlite_shared = "file:cachedb?mode=memory&cache=shared&check_same_thread=False"
test_suite_config = {
"source": {
"type": "custom-database",
"serviceName": "sample_data",
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": "sample_data.ecommerce_db.shopify.dim_address",
def get_test_suite_config(service_name: str, table_name: str) -> dict:
return {
"source": {
"type": "custom-database",
"serviceName": service_name,
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": table_name,
}
},
},
"processor": {"type": "orm-test-runner", "config": {}},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
},
"processor": {"type": "orm-test-runner", "config": {}},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
}
class TestSuiteWorkflowTests(unittest.TestCase):
@ -59,40 +82,161 @@ class TestSuiteWorkflowTests(unittest.TestCase):
metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(
test_suite_config["workflowConfig"]["openMetadataServerConfig"]
get_test_suite_config("", "")["workflowConfig"]["openMetadataServerConfig"]
)
)
test_case_ids = []
test_suite_ids = []
def tearDown(self) -> None:
for test_case_id in self.test_case_ids:
self.metadata.delete(
entity=TestCase,
entity_id=test_case_id,
recursive=True,
hard_delete=True,
)
for test_suite_id in self.test_suite_ids:
self.metadata.delete_executable_test_suite(
entity=TestSuite,
entity_id=test_suite_id,
recursive=True,
hard_delete=True,
)
@classmethod
def setUpClass(cls) -> None:
"""Prepare ingredients"""
service = CreateDatabaseServiceRequest(
name=str(uuid.uuid4()),
serviceType=DatabaseServiceType.Mysql,
connection=DatabaseConnection(
config=MysqlConnection(
username="username",
authType=BasicAuth(
password="password",
),
hostPort="http://localhost:1234",
)
),
)
cls.service_entity: DatabaseService = cls.metadata.create_or_update(
data=service
)
self.test_case_ids = []
self.test_suite_ids = []
create_db = CreateDatabaseRequest(
name=str(uuid.uuid4()),
service=cls.service_entity.fullyQualifiedName,
)
create_db_entity = cls.metadata.create_or_update(data=create_db)
create_schema = CreateDatabaseSchemaRequest(
name=str(uuid.uuid4()),
database=create_db_entity.fullyQualifiedName,
)
cls.schema_entity = cls.metadata.create_or_update(data=create_schema)
create_table = CreateTableRequest(
name=str(uuid.uuid4()),
databaseSchema=cls.schema_entity.fullyQualifiedName,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
cls.table_with_suite: Table = cls.metadata.create_or_update(create_table)
cls.test_suite = cls.metadata.create_or_update_executable_test_suite(
data=CreateTestSuiteRequest(
name="test-suite",
executableEntityReference=cls.table_with_suite.fullyQualifiedName.__root__,
)
)
cls.metadata.create_or_update(
CreateTestCaseRequest(
name="testCaseForIntegration",
entityLink=f"<#E::table::{cls.table_with_suite.fullyQualifiedName.__root__}>",
testSuite=cls.test_suite.fullyQualifiedName,
testDefinition="tableRowCountToEqual",
parameterValues=[TestCaseParameterValue(name="value", value=10)],
)
)
create_table_2 = CreateTableRequest(
name=str(uuid.uuid4()),
databaseSchema=cls.schema_entity.fullyQualifiedName,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
cls.table: Table = cls.metadata.create_or_update(create_table_2)
@classmethod
def tearDownClass(cls) -> None:
"""Clean up"""
cls.metadata.delete(
entity=DatabaseService,
entity_id=cls.service_entity.id,
recursive=True,
hard_delete=True,
)
def test_create_workflow_object(self):
"""Test workflow object is correctly instantiated"""
TestSuiteWorkflow.create(test_suite_config)
assert True
TestSuiteWorkflow.create(
get_test_suite_config(
service_name=self.service_entity.name.__root__,
table_name=self.table_with_suite.fullyQualifiedName.__root__,
)
)
def test_create_workflow_object_from_cli_config(self):
def test_create_workflow_object_with_table_with_test_suite(self):
"""test workflow object is instantiated correctly from cli config"""
_test_suite_config = deepcopy(test_suite_config)
workflow = TestSuiteWorkflow.create(
get_test_suite_config(
service_name=self.service_entity.name.__root__,
table_name=self.table_with_suite.fullyQualifiedName.__root__,
)
)
table: Table = workflow.source._get_table_entity()
table_and_tests: TableAndTests = list(
workflow.source._process_table_suite(table=table)
)[0]
# If the table already has a test suite, we won't be generating one
self.assertIsNotNone(table.testSuite)
self.assertIsNone(table_and_tests.right.executable_test_suite)
self.assertTrue(len(table_and_tests.right.test_cases) >= 1)
# We will pick up the tests from it
self.assertTrue(
next(
(
test
for test in table_and_tests.right.test_cases
if test.name.__root__ == "testCaseForIntegration"
),
None,
)
)
def test_create_workflow_config_with_table_without_suite(self):
"""We'll prepare the test suite creation payload"""
workflow = TestSuiteWorkflow.create(
get_test_suite_config(
service_name=self.service_entity.name.__root__,
table_name=self.table.fullyQualifiedName.__root__,
)
)
# If the table does not have a test suite, we'll prepare the request to create one
table: Table = workflow.source._get_table_entity()
table_and_tests: Either[TableAndTests] = list(
workflow.source._process_table_suite(table=table)
)[0]
self.assertIsNone(table.testSuite)
self.assertEquals(
table_and_tests.right.executable_test_suite.name.__root__,
self.table.fullyQualifiedName.__root__ + ".testSuite",
)
def test_create_workflow_config_with_tests(self):
"""We'll get the tests from the workflow YAML"""
_test_suite_config = get_test_suite_config(
service_name=self.service_entity.name.__root__,
table_name=self.table_with_suite.fullyQualifiedName.__root__,
)
processor = {
"processor": {
@ -113,71 +257,42 @@ class TestSuiteWorkflowTests(unittest.TestCase):
}
_test_suite_config.update(processor)
workflow = TestSuiteWorkflow.create(_test_suite_config)
workflow_test_suite = workflow.create_or_return_test_suite_entity()
test_suite = self.metadata.get_by_name(
entity=TestSuite,
fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite",
table: Table = workflow.source._get_table_entity()
table_and_tests: Either[TableAndTests] = list(
workflow.source._process_table_suite(table=table)
)[0]
test_cases: List[TestCase] = workflow.steps[0].get_test_cases(
test_cases=table_and_tests.right.test_cases,
test_suite_fqn=self.table_with_suite.fullyQualifiedName.__root__
+ ".testSuite",
table_fqn=self.table_with_suite.fullyQualifiedName.__root__,
)
assert workflow_test_suite.id == test_suite.id
self.test_suite_ids = [test_suite.id]
# 1 defined test cases + the new one in the YAML
self.assertTrue(len(table_and_tests.right.test_cases) >= 1)
def test_create_or_return_test_suite_entity(self):
"""test we can correctly retrieve a test suite"""
_test_suite_config = deepcopy(test_suite_config)
workflow = TestSuiteWorkflow.create(_test_suite_config)
test_suite = workflow.create_or_return_test_suite_entity()
expected_test_suite = self.metadata.get_by_name(
entity=TestSuite, fqn="critical_metrics_suite"
new_test_case = next(
(test for test in test_cases if test.name.__root__ == "my_test_case"), None
)
self.assertIsNotNone(new_test_case)
assert test_suite
def test_get_test_cases_from_test_suite(self):
"""test test cases are correctly returned for specific test suite"""
_test_suite_config = deepcopy(test_suite_config)
processor = {
"processor": {
"type": "orm-test-runner",
"config": {
"testCases": [
{
"name": "my_test_case",
"testDefinitionName": "tableColumnCountToBeBetween",
"parameterValues": [
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
}
]
},
}
}
_test_suite_config.update(processor)
workflow = TestSuiteWorkflow.create(_test_suite_config)
test_suite = workflow.create_or_return_test_suite_entity()
test_cases = workflow.get_test_cases_from_test_suite(test_suite)
assert isinstance(test_cases, MutableSequence)
assert isinstance(test_cases[0], TestCase)
assert {"my_test_case"}.intersection(
{test_case.name.__root__ for test_case in test_cases}
# cleanup
self.metadata.delete(
entity=TestCase,
entity_id=new_test_case.id,
recursive=True,
hard_delete=True,
)
for test_case in test_cases:
self.metadata.delete(entity=TestCase, entity_id=test_case.id)
def test_get_test_case_names_from_cli_config(self):
"""test we can get all test case names from cli config"""
_test_suite_config = deepcopy(test_suite_config)
_test_suite_config = get_test_suite_config(
service_name=self.service_entity.name.__root__,
table_name=self.table_with_suite.fullyQualifiedName.__root__,
)
processor = {
"processor": {
@ -208,7 +323,7 @@ class TestSuiteWorkflowTests(unittest.TestCase):
_test_suite_config.update(processor)
workflow = TestSuiteWorkflow.create(_test_suite_config)
test_cases_def = workflow.get_test_case_from_cli_config()
test_cases_def = workflow.steps[0].get_test_case_from_cli_config()
assert [test_case_def.name for test_case_def in test_cases_def] == [
"my_test_case",
@ -217,7 +332,10 @@ class TestSuiteWorkflowTests(unittest.TestCase):
def test_compare_and_create_test_cases(self):
"""Test function creates the correct test case if they don't exists"""
_test_suite_config = deepcopy(test_suite_config)
_test_suite_config = get_test_suite_config(
service_name=self.service_entity.name.__root__,
table_name=self.table_with_suite.fullyQualifiedName.__root__,
)
processor = {
"processor": {
@ -235,7 +353,7 @@ class TestSuiteWorkflowTests(unittest.TestCase):
{
"name": "my_test_case_two",
"testDefinitionName": "columnValuesToBeBetween",
"columnName": "address_id",
"columnName": "id",
"parameterValues": [
{"name": "minValue", "value": 1},
{"name": "maxValue", "value": 5},
@ -251,62 +369,44 @@ class TestSuiteWorkflowTests(unittest.TestCase):
assert not self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address.my_test_case",
fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case",
)
assert not self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two",
fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case_two",
)
test_suite = workflow.create_or_return_test_suite_entity()
test_cases = self.metadata.list_entities(
entity=TestCase,
fields=["testSuite", "entityLink", "testDefinition"],
params={"testSuiteId": test_suite.id.__root__},
).entities
config_test_cases_def = workflow.get_test_case_from_cli_config()
created_test_case = workflow.compare_and_create_test_cases(
config_test_cases_def, test_cases, test_suite
table: Table = workflow.source._get_table_entity()
table_and_tests: Either[TableAndTests] = list(
workflow.source._process_table_suite(table=table)
)[0]
config_test_cases_def = workflow.steps[0].get_test_case_from_cli_config()
created_test_case = workflow.steps[0].compare_and_create_test_cases(
cli_test_cases_definitions=config_test_cases_def,
test_cases=table_and_tests.right.test_cases,
test_suite_fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.testSuite",
table_fqn=self.table_with_suite.fullyQualifiedName.__root__,
)
# clean up test
my_test_case = self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address.my_test_case",
fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case",
fields=["testDefinition", "testSuite"],
)
my_test_case_two = self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two",
fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.id.my_test_case_two",
fields=["testDefinition", "testSuite"],
)
assert my_test_case
assert my_test_case_two
assert len(created_test_case) == 2
# We return the existing test & the 2 new ones
assert len(created_test_case) >= 3
self.metadata.delete(entity=TestCase, entity_id=my_test_case.id)
self.metadata.delete(entity=TestCase, entity_id=my_test_case_two.id)
def test_get_table_entity(self):
"""test get service connection returns correct info"""
workflow = TestSuiteWorkflow.create(test_suite_config)
service_connection = workflow._get_table_entity(
"sample_data.ecommerce_db.shopify.dim_address"
)
assert isinstance(service_connection, Table)
# def test_filter_for_om_test_cases(self):
# """test filter for OM test cases method"""
# om_test_case_1 = TestCase(
# name="om_test_case_1",
# testDefinition=self.metadata.get_entity_reference(
# TestDefinition,
# "columnValuesToMatchRegex"
# ),
# entityLink="<entityLink>",
# testSuite=self.metadata.get_entity_reference("sample_data.ecommerce_db.shopify.dim_address.TestSuite"),
# )

View File

@ -17,7 +17,6 @@ from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from metadata.data_quality.api.workflow import TestSuiteWorkflow
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
@ -29,6 +28,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
WorkflowConfig,
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import print_test_suite_status

View File

@ -33,7 +33,6 @@ from openmetadata_managed_apis.workflows.ingestion.usage import (
build_usage_workflow_config,
)
from metadata.data_quality.api.workflow import TestSuiteWorkflow
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -69,6 +68,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow