Fix #2875 - Profiler API Sink (#3011)

Fix #2875 - Profiler API Sink
This commit is contained in:
Pere Miquel Brull 2022-03-02 16:46:28 +01:00 committed by GitHub
parent d959a49f8d
commit 71207de362
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1284 additions and 753 deletions

View File

@ -37,13 +37,13 @@ lint: ## Run pylint on the Python sources to analyze the codebase
.PHONY: py_format
py_format: ## Run black and isort to format the Python codebase
isort $(PY_SOURCE) --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3
black $(PY_SOURCE) --extend-exclude $(PY_SOURCE)/metadata/generated
isort ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3
black ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated
.PHONY: py_format_check
py_format_check: ## Check if Python sources are correctly formatted
black --check --diff ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated
isort --check-only ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3
black --check --diff ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated
## Ingestion models generation
.PHONY: generate

View File

@ -0,0 +1,100 @@
[
{
"table": "bigquery_gcp.shopify.dim_staff",
"tableTests": [
{
"description": "Rows should always be 100 because of something",
"testCase": {
"config": {
"value": 120
},
"tableTestType": "tableRowCountToEqual"
},
"executionFrequency": "Daily",
"result": {
"executionTime": 1646221199,
"testCaseStatus": "Failed",
"result": "Found 100.0 rows vs. the expected 120"
}
},
{
"description": "Rows should always be 100 because of something",
"testCase": {
"config": {
"value": 120
},
"tableTestType": "tableRowCountToEqual"
},
"executionFrequency": "Daily",
"result": {
"executionTime": 1646220190,
"testCaseStatus": "Success",
"result": "Found 120.0 rows vs. the expected 120"
}
},
{
"description": "We expect certain columns",
"testCase": {
"config": {
"value": 5
},
"tableTestType": "tableColumnCountToEqual"
},
"executionFrequency": "Daily",
"result": {
"executionTime": 1646221199,
"testCaseStatus": "Success",
"result": "Found 5.0 columns vs. the expected 5"
}
}
],
"columnTests": [
{
"columnName": "user_id",
"description": "user_id should be positive",
"testCase": {
"config": {
"minValue": 0
},
"columnTestType": "columnValuesToBeBetween"
},
"executionFrequency": "Daily",
"result": {
"executionTime": 1646221199,
"testCaseStatus": "Success",
"result": "Found min=1.0 vs. the expected min=0"
}
},
{
"columnName": "user_id",
"description": "user_id should be positive",
"testCase": {
"config": {
"minValue": 0
},
"columnTestType": "columnValuesToBeBetween"
},
"executionFrequency": "Daily",
"result": {
"executionTime": 1646220190,
"testCaseStatus": "Success",
"result": "Found min=1.0 vs. the expected min=0"
}
},
{
"columnName": "email",
"description": "emails should be unique",
"testCase": {
"config": {},
"columnTestType": "columnValuesToBeUnique"
},
"executionFrequency": "Daily",
"result": {
"executionTime": 1646221199,
"testCaseStatus": "Success",
"result": "Found uniqueCount=100.0 vs. valuesCount=100.0"
}
}
]
}
]

View File

@ -5855,6 +5855,66 @@
"query": "select email from dim_staff",
"duration": 0.294
}
],
"tableProfile": [
{
"profileDate": "2021-10-16",
"columnCount": 5,
"rowCount": 100,
"columnProfile": [
{
"name": "user_id",
"uniqueCount": 100.0,
"uniqueProportion": 1.0,
"nullCount": 0.0,
"nullProportion": 0.0,
"sum": 5050,
"min": 1,
"max": 100,
"mean": 50.5,
"stddev": 28.86607004772212
},
{
"name": "shop_id",
"uniqueCount": 100.0,
"uniqueProportion": 1.0,
"nullCount": 0.0,
"nullProportion": 0.0,
"sum": 5050,
"min": 1,
"max": 100,
"mean": 50.5,
"stddev": 28.86607004772212
},
{
"name": "first_name",
"uniqueCount": 79.0,
"uniqueProportion": 0.79,
"nullCount": 1,
"nullProportion": 0.01,
"minLength": 5,
"maxLength": 19
},
{
"name": "last_name",
"uniqueCount": 100.0,
"uniqueProportion": 1.0,
"nullCount": 0,
"nullProportion": 0.0,
"minLength": 7,
"maxLength": 15
},
{
"name": "email",
"uniqueCount": 100.0,
"uniqueProportion": 1.0,
"nullCount": 0,
"nullProportion": 0.0,
"minLength": 12,
"maxLength": 30
}
]
}
]
},
{

View File

@ -45,7 +45,6 @@ base_requirements = {
"Jinja2>=2.11.3",
"PyYAML",
"jsonschema",
"parsimonious==0.8.1",
"sqllineage==1.3.3",
}

View File

@ -58,11 +58,15 @@ def metadata(debug: bool, log_level: str) -> None:
if debug:
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("metadata").setLevel(logging.DEBUG)
logging.getLogger("ORM Profiler Workflow").setLevel(logging.DEBUG)
logging.getLogger("Profiler").setLevel(logging.DEBUG)
elif log_level:
logging.getLogger().setLevel(log_level)
else:
logging.getLogger().setLevel(logging.WARNING)
logging.getLogger("metadata").setLevel(logging.INFO)
logging.getLogger("ORM Profiler Workflow").setLevel(logging.INFO)
logging.getLogger("Profiler").setLevel(logging.INFO)
@metadata.command()

View File

@ -8,29 +8,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Distinct Count Metric definition
This Model is required to ingest the sample data
from sample_data.py source.
The usual process to add new tests is either via the UI
or by using the ORM Profiler.
"""
from sqlalchemy import distinct, func
from typing import Optional
from metadata.orm_profiler.metrics.core import StaticMetric, _label
from pydantic import BaseModel
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
class DistinctCount(StaticMetric):
"""
Distinct COUNT Metric
Given a column, return the Distinct count. Ignores NULL values
"""
@classmethod
def name(cls):
return "distinctCount"
def metric_type(self):
return int
@_label
def fn(self):
return func.count(distinct(self.col))
class OMetaTableTest(BaseModel):
table_name: str
table_test: Optional[CreateTableTestRequest] = None
column_test: Optional[CreateColumnTestRequest] = None

View File

@ -4,8 +4,10 @@ Mixin class containing Table specific methods
To be used by OpenMetadata class
"""
import logging
from typing import List
from typing import List, Union
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.table import (
DataModel,
@ -15,6 +17,8 @@ from metadata.generated.schema.entity.data.table import (
TableJoins,
TableProfile,
)
from metadata.generated.schema.tests.columnTest import ColumnTest
from metadata.generated.schema.tests.tableTest import TableTest
from metadata.ingestion.models.table_queries import TableUsageRequest
from metadata.ingestion.ometa.client import REST
@ -131,3 +135,45 @@ class OMetaTableMixin:
data=table_join_request.json(),
)
logger.debug("published frequently joined with %s", resp)
def _add_tests(
self,
table: Table,
test: Union[CreateTableTestRequest, CreateColumnTestRequest],
path: str,
) -> Table:
"""
Internal function to add test data
:param table: Table instance
:param test: TableTest or ColumnTest to add
:param path: tableTest or columnTest str
:return: Updated Table instance
"""
resp = self.client.put(
f"{self.get_suffix(Table)}/{table.id.__root__}/{path}", data=test.json()
)
return Table(**resp)
def add_table_test(self, table: Table, table_test: CreateTableTestRequest) -> Table:
"""
For a given table, PUT new TableTest definitions and results
:param table: Table instance
:param table_test: table test data
:return: Updates Table instance
"""
return self._add_tests(table=table, test=table_test, path="tableTest")
def add_column_test(self, table: Table, col_test: CreateColumnTestRequest) -> Table:
"""
For a given table, PUT new TableTest definitions and results
:param table: Table instance
:param col_test: column test data
:return: Updates Table instance
"""
return self._add_tests(table=table, test=col_test, path="columnTest")

View File

@ -11,7 +11,7 @@
import logging
import traceback
from typing import TypeVar
from typing import TypeVar, Union
from pydantic import BaseModel, ValidationError
@ -29,6 +29,8 @@ from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequ
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.pipeline import Pipeline
@ -41,6 +43,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -74,6 +77,9 @@ class MetadataRestSink(Sink[Entity]):
config: MetadataRestSinkConfig
status: SinkStatus
# We want to catch any errors that might happen during the sink
# pylint: disable=broad-except
def __init__(
self,
ctx: WorkflowContext,
@ -121,6 +127,8 @@ class MetadataRestSink(Sink[Entity]):
self.write_ml_model(record)
elif isinstance(record, DeleteTable):
self.delete_table(record)
elif isinstance(record, OMetaTableTest):
self.write_table_tests(record)
else:
logging.info(
f"Ignoring the record due to unknown Record type {type(record)}"
@ -476,6 +484,33 @@ class MetadataRestSink(Sink[Entity]):
logger.debug(traceback.print_exc())
logger.error(err)
def write_table_tests(self, record: OMetaTableTest) -> None:
"""
Iterate over all table_tests and column_tests
for the given table and add them to the backend.
:param record: Sample data record
"""
try:
# Fetch the table that we have already ingested
table = self.metadata.get_by_name(entity=Table, fqdn=record.table_name)
test = None
if record.table_test:
self.metadata.add_table_test(table=table, table_test=record.table_test)
test = record.table_test.testCase.tableTestType.value
if record.column_test:
self.metadata.add_column_test(table=table, col_test=record.column_test)
test = record.column_test.testCase.columnTestType.value
logger.info(f"Table Tests: {record.table_name}.{test}")
self.status.records_written(f"Table Tests: {record.table_name}.{test}")
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def get_status(self):
return self.status

View File

@ -28,6 +28,8 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.pipeline import Pipeline
@ -36,12 +38,16 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.columnTest import ColumnTestCase
from metadata.generated.schema.tests.tableTest import TableTestCase
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
@ -168,6 +174,11 @@ class SampleTableMetadataGenerator:
class SampleDataSource(Source[Entity]):
"""
Loads JSON data and prepares the required
python objects to be sent to the Sink.
"""
def __init__(
self, config: SampleDataSourceConfig, metadata_config: MetadataServerConfig, ctx
):
@ -268,6 +279,9 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/models/models.json", "r")
)
self.user_entity = {}
self.table_tests = json.load(
open(self.config.sample_data_folder + "/datasets/tableTests.json", "r")
)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
@ -283,6 +297,7 @@ class SampleDataSource(Source[Entity]):
yield from self.ingest_locations()
yield from self.ingest_glue()
yield from self.ingest_tables()
yield from self.ingest_table_tests()
yield from self.ingest_topics()
yield from self.ingest_charts()
yield from self.ingest_dashboards()
@ -484,6 +499,41 @@ class SampleDataSource(Source[Entity]):
except Exception as err:
logger.error(err)
def ingest_table_tests(self) -> Iterable[OMetaTableTest]:
"""
Iterate over all tables and fetch
the sample test data. For each table test and column
test, prepare all TestCaseResults
"""
try:
for test_def in self.table_tests:
table_name = test_def["table"]
for table_test in test_def["tableTests"]:
create_table_test = CreateTableTestRequest(
description=table_test.get("description"),
testCase=TableTestCase.parse_obj(table_test["testCase"]),
executionFrequency=table_test["executionFrequency"],
result=TestCaseResult.parse_obj(table_test["result"]),
)
yield OMetaTableTest(
table_name=table_name, table_test=create_table_test
)
for col_test in test_def["columnTests"]:
create_column_test = CreateColumnTestRequest(
description=col_test.get("description"),
columnName=col_test["columnName"],
testCase=ColumnTestCase.parse_obj(col_test["testCase"]),
executionFrequency=col_test["executionFrequency"],
result=TestCaseResult.parse_obj(col_test["result"]),
)
yield OMetaTableTest(
table_name=table_name, column_test=create_column_test
)
except Exception as err: # pylint: disable=broad-except
logger.error(err)
def close(self):
pass

View File

@ -15,32 +15,25 @@ Return types for Profiler workflow execution.
We need to define this class as we end up having
multiple profilers per table and columns.
"""
from typing import List, Optional
from pydantic import BaseModel
from typing import Optional
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.table import Table, TableProfile
from metadata.orm_profiler.profiles.core import Profiler
from metadata.orm_profiler.profiles.models import ProfilerDef
from metadata.orm_profiler.validations.models import TestDef
from metadata.orm_profiler.validations.models import TestDef, TestSuite
class WorkflowResult(BaseModel):
class Config:
arbitrary_types_allowed = True
class ProfilerProcessorConfig(BaseModel):
class ProfilerProcessorConfig(ConfigModel):
"""
Defines how we read the processor information
from the workflow JSON definition
"""
profiler: Optional[ProfilerDef] = None
tests: Optional[TestDef] = None
test_suite: Optional[TestSuite] = None
class ProfileAndTests(BaseModel):
class ProfilerResponse(ConfigModel):
"""
ORM Profiler processor response.
@ -50,4 +43,4 @@ class ProfileAndTests(BaseModel):
table: Table
profile: TableProfile
tests: Optional[TestDef] = None
record_tests: Optional[TestDef] = None

View File

@ -42,7 +42,7 @@ from metadata.ingestion.source.sql_source_common import (
SQLConnectionConfig,
SQLSourceStatus,
)
from metadata.orm_profiler.api.models import ProfileAndTests, ProfilerProcessorConfig
from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.orm_profiler.engines import create_and_bind_session, get_engine
from metadata.orm_profiler.utils import logger
@ -191,7 +191,8 @@ class ProfilerWorkflow:
self.metadata.list_entities(
entity=Table,
fields=[
"tableProfile"
"tableProfile",
"tests",
], # We will need it for window metrics to check past data
params={
"database": f"{self.source_config.service_name}.{database.name.__root__}"
@ -210,7 +211,7 @@ class ProfilerWorkflow:
Run the profiling and tests
"""
for entity in self.list_entities():
profile_and_tests: ProfileAndTests = self.processor.process(entity)
profile_and_tests: ProfilerResponse = self.processor.process(entity)
if hasattr(self, "sink"):
self.sink.write_record(profile_and_tests)

View File

@ -16,7 +16,7 @@ from typing import Any, Dict, Optional, Tuple
from metadata.orm_profiler.metrics.core import ComposedMetric
from metadata.orm_profiler.metrics.static.count import Count
from metadata.orm_profiler.metrics.static.distinct_count import DistinctCount
from metadata.orm_profiler.metrics.static.unique_count import UniqueCount
class DuplicateCount(ComposedMetric):
@ -31,7 +31,7 @@ class DuplicateCount(ComposedMetric):
@classmethod
def required_metrics(cls) -> Tuple[str, ...]:
return Count.name(), DistinctCount.name()
return Count.name(), UniqueCount.name()
@property
def metric_type(self):
@ -43,9 +43,9 @@ class DuplicateCount(ComposedMetric):
results of other Metrics
"""
count = res.get(Count.name())
distinct_count = res.get(DistinctCount.name())
unique_count = res.get(UniqueCount.name())
if count is not None and distinct_count is not None:
return count - distinct_count
if count is not None and unique_count is not None:
return count - unique_count
return None

View File

@ -24,7 +24,6 @@ from metadata.orm_profiler.metrics.composed.like_ratio import LikeRatio
from metadata.orm_profiler.metrics.composed.null_ratio import NullRatio
from metadata.orm_profiler.metrics.composed.unique_ratio import UniqueRatio
from metadata.orm_profiler.metrics.static.count import Count
from metadata.orm_profiler.metrics.static.distinct_count import DistinctCount
from metadata.orm_profiler.metrics.static.histogram import Histogram
from metadata.orm_profiler.metrics.static.ilike_count import ILikeCount
from metadata.orm_profiler.metrics.static.like_count import LikeCount
@ -51,7 +50,6 @@ class Metrics(MetricRegistry):
# Static Metrics
MEAN = Mean
COUNT = Count
DISTINCT_COUNT = DistinctCount
HISTOGRAM = Histogram
ILIKE_COUNT = ILikeCount
LIKE_COUNT = LikeCount

View File

@ -15,6 +15,7 @@ Max Metric definition
from sqlalchemy import func
from metadata.orm_profiler.metrics.core import StaticMetric, _label
from metadata.orm_profiler.orm.registry import is_quantifiable
class Max(StaticMetric):
@ -30,4 +31,6 @@ class Max(StaticMetric):
@_label
def fn(self):
if not is_quantifiable(self.col.type):
return None
return func.max(self.col)

View File

@ -63,7 +63,7 @@ class MaxLength(StaticMetric):
if is_concatenable(self.col.type):
return MaxLengthFn(self.col)
logger.warning(
logger.debug(
f"Don't know how to process type {self.col.type} when computing MAX_LENGTH"
)
return None

View File

@ -15,6 +15,7 @@ Min Metric definition
from sqlalchemy import func
from metadata.orm_profiler.metrics.core import StaticMetric, _label
from metadata.orm_profiler.orm.registry import is_quantifiable
class Min(StaticMetric):
@ -30,4 +31,6 @@ class Min(StaticMetric):
@_label
def fn(self):
if not is_quantifiable(self.col.type):
return None
return func.min(self.col)

View File

@ -63,7 +63,7 @@ class MinLength(StaticMetric):
if is_concatenable(self.col.type):
return MinLengthFn(self.col)
logger.warning(
logger.debug(
f"Don't know how to process type {self.col.type} when computing MIN_LENGTH"
)
return None

View File

@ -70,7 +70,7 @@ class StdDev(StaticMetric):
if is_quantifiable(self.col.type):
return StdDevFn(self.col)
logger.info(
logger.debug(
f"{self.col} has type {self.col.type}, which is not listed as quantifiable."
+ " We won't compute STDDEV for it."
)

View File

@ -15,27 +15,33 @@ Defines the ORM Profiler processor
For each table, we compute its profiler
and run the validations.
"""
import logging
from dataclasses import dataclass, field
from typing import List
from datetime import datetime
from typing import List, Optional, Union
from sqlalchemy.orm import DeclarativeMeta, InstrumentedAttribute, Session
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.columnTest import ColumnTest, ColumnTestCase
from metadata.generated.schema.tests.tableTest import TableTest, TableTestCase
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.orm_profiler.api.models import ProfileAndTests, ProfilerProcessorConfig
from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiles.core import Profiler
from metadata.orm_profiler.profiles.default import DefaultProfiler
from metadata.orm_profiler.utils import logger
from metadata.orm_profiler.validations.core import Validation
from metadata.orm_profiler.validations.models import TestDef
from metadata.orm_profiler.validations.core import validate
from metadata.orm_profiler.validations.models import TestDef, TestSuite
logger = logger()
logger = logging.getLogger("ORM Profiler Workflow")
@dataclass
@ -77,6 +83,8 @@ class OrmProfilerProcessor(Processor[Table]):
self.report = {"tests": {}}
self.execution_date = datetime.now()
# OpenMetadata client to fetch tables
self.metadata = OpenMetadata(self.metadata_config)
@ -119,6 +127,7 @@ class OrmProfilerProcessor(Processor[Table]):
*metrics,
session=self.session,
table=orm,
profile_date=self.execution_date,
)
def profile_entity(self, orm, table: Table) -> TableProfile:
@ -141,69 +150,259 @@ class OrmProfilerProcessor(Processor[Table]):
self.status.processed(table.fullyQualifiedName)
return profiler.get_profile()
def log_validation(self, name: str, validation: Validation) -> None:
def log_test_result(self, name: str, result: TestCaseResult) -> None:
"""
Log validation results
Log test case results
"""
self.status.tested(name)
if not validation.valid:
self.status.failure(
f"{name}: Expected value {validation.value} vs. Real value {validation.computed_metric}"
if not result.testCaseStatus == TestCaseStatus.Success:
self.status.failure(f"{name}: {result.result}")
@staticmethod
def get_test_name(table: Table, test_type: str, column_name: str = None):
"""
Build a unique identifier to log the test
in the shape of FQDN.[column].test_type
:param table: Table Entity
:param test_type: We expected one test type per table & column
:param column_name: Column name, if logging a column test
:return: Unique name for this execution
"""
col = f".{column_name}." if column_name else "."
return table.fullyQualifiedName + col + test_type
def run_table_test(
self, table: Table, test_case: TableTestCase, profiler_results: TableProfile
) -> Optional[TestCaseResult]:
"""
Run & log the table test against the TableProfile.
:param table: Table Entity being processed
:param test_case: Table Test Case to run
:param profiler_results: Table profiler with informed metrics
:return: TestCaseResult
"""
test_name = self.get_test_name(
table=table, test_type=test_case.tableTestType.value
)
if test_name in self.status.tests:
logger.info(
f"Test {test_name} has already been computed in this execution."
)
return None
test_case_result: TestCaseResult = validate(
test_case.config,
table_profile=profiler_results,
execution_date=self.execution_date,
)
self.log_test_result(name=test_name, result=test_case_result)
return test_case_result
def run_column_test(
self,
table: Table,
column: str,
test_case: ColumnTestCase,
profiler_results: TableProfile,
) -> Optional[TestCaseResult]:
"""
Run & log the column test against the ColumnProfile
:param table: Table Entity being processed
:param column: Column being tested
:param test_case: Column Test Case to run
:param profiler_results: Table profiler with informed metrics
:return: TestCaseResult
"""
test_name = self.get_test_name(
table=table, test_type=test_case.columnTestType.value, column_name=column
)
if test_name in self.status.tests:
logger.info(
f"Test {test_name} has already been computed in this execution."
)
return None
# Check if we computed a profile for the required column
col_profiler_res = next(
iter(
col_profiler
for col_profiler in profiler_results.columnProfile
if col_profiler.name == column
),
None,
)
if col_profiler_res is None:
msg = (
f"Cannot find a profiler that computed the column {column}"
+ f" Skipping validation {test_name}"
)
self.status.failure(msg)
return TestCaseResult(
executionTime=self.execution_date.timestamp(),
status=TestCaseStatus.Aborted,
result=msg,
)
def validate_entity(self, orm, profiler_results: TableProfile) -> TestDef:
test_case_result: TestCaseResult = validate(
test_case.config,
col_profiler_res,
execution_date=self.execution_date,
)
self.log_test_result(name=test_name, result=test_case_result)
return test_case_result
def validate_config_tests(
self, table: Table, profiler_results: TableProfile
) -> Optional[TestDef]:
"""
Given a table, check if it has any tests pending.
If so, run the Validations against the profiler_results
and return the computed Validations.
The result will have the shape {test_name: Validation}
Type of entity is DeclarativeMeta
Here we take care of new incoming tests in the workflow
definition. Run them and prepare the new TestDef
of the record, that will be sent to the sink to
update the Table Entity.
"""
if not isinstance(orm, DeclarativeMeta):
raise ValueError(f"Entity {orm} should be a DeclarativeMeta.")
logger.info(f"Checking validations for {orm}...")
logger.info(f"Checking validations for {table.fullyQualifiedName}...")
# We have all validations parsed at read-time
test_def: TestDef = self.config.tests
test_suite: TestSuite = self.config.test_suite
# Check if I have tests for the table I am processing
my_record_tests = next(
iter(
test
for test in test_suite.tests
if test.table == table.fullyQualifiedName
),
None,
)
if not my_record_tests:
return None
# Compute all validations against the profiler results
for test in test_def.table_tests:
for validation in test.expression:
# Pass the whole dict, although we just want columnCount & rowCount
validation.validate(profiler_results.dict())
self.log_validation(name=test.name, validation=validation)
for table_test in my_record_tests.table_tests:
test_case_result = self.run_table_test(
table=table,
test_case=table_test.testCase,
profiler_results=profiler_results,
)
if test_case_result:
table_test.result = test_case_result
for column_res in test_def.column_tests:
for test in column_res.columns:
col_profiler_res = next(
iter(
col_profiler
for col_profiler in profiler_results.columnProfile
if col_profiler.name == test.column
),
None,
for column_test in my_record_tests.column_tests:
test_case_result = self.run_column_test(
table=table,
column=column_test.columnName,
test_case=column_test.testCase,
profiler_results=profiler_results,
)
if test_case_result:
column_test.result = test_case_result
return my_record_tests
def validate_entity_tests(
self,
table: Table,
profiler_results: TableProfile,
config_tests: Optional[TestDef],
) -> Optional[TestDef]:
"""
This method checks the tests that are
configured at entity level, i.e., have been
stored via the API at some other point in time.
If we find a test that has already been run
from the workflow config, we will skip it
and trust the workflow input.
:param table: OpenMetadata Table Entity being processed
:param profiler_results: TableProfile with computed metrics
:param config_tests: Results of running the configuration tests
"""
# We need to keep track of all ran tests, so let's initialize
# a TestDef class with either what we have from the incoming
# config, or leaving it empty.
# During the Entity processing, we will add here
# any tests we discover from the Entity side.
record_tests = (
TestDef(
table=config_tests.table,
table_tests=config_tests.table_tests
if config_tests.table_tests
else [],
column_tests=config_tests.column_tests
if config_tests.column_tests
else [],
)
if config_tests
else TestDef(
table=table.fullyQualifiedName, table_tests=[], column_tests=[]
)
)
# Note that the tests configured in the Entity as `TableTest` and
# `ColumnTest`. However, to PUT the results we need the API form:
# `CreateTableTestRequest` and `CreateColumnTestRequest`.
# We will convert the found tests before running them.
# Fetch all table tests, if any
table_tests = (
table_test for table_test in (table.tableTests or [])
) # tableTests are optional, so it might be a list or None
for table_test in table_tests:
test_case_result = self.run_table_test(
table=table,
test_case=table_test.testCase,
profiler_results=profiler_results,
)
if test_case_result:
create_table_test = CreateTableTestRequest(
description=table_test.description,
testCase=table_test.testCase,
executionFrequency=table_test.executionFrequency,
owner=table_test.owner,
result=test_case_result,
)
if col_profiler_res is None:
self.status.warning(
f"Cannot find a profiler that computed the column {test.column} for {column_res.table}."
+ f" Skipping validation {test}"
record_tests.table_tests.append(create_table_test)
# For all columns, check if any of them has tests and fetch them
col_tests = (
col_test
for col in table.columns
for col_test in (
col.columnTests or []
) # columnTests is optional, so it might be a list or None
if col.columnTests
)
for column_test in col_tests:
if column_test:
test_case_result = self.run_column_test(
table=table,
column=column_test.columnName,
test_case=column_test.testCase,
profiler_results=profiler_results,
)
if test_case_result:
create_column_test = CreateColumnTestRequest(
columnName=column_test.columnName,
description=column_test.description,
testCase=column_test.testCase,
executionFrequency=column_test.executionFrequency,
owner=column_test.owner,
result=test_case_result,
)
record_tests.column_tests.append(create_column_test)
for validation in test.expression:
validation.validate(col_profiler_res.dict())
self.log_validation(name=test.name, validation=validation)
return record_tests
return test_def
def process(self, record: Table) -> ProfileAndTests:
def process(self, record: Table) -> ProfilerResponse:
"""
Run the profiling and tests
"""
# Convert entity to ORM. Fetch the db by ID to make sure we use the proper db name
database = self.metadata.get_by_id(
entity=Database, entity_id=record.database.id
@ -212,14 +411,18 @@ class OrmProfilerProcessor(Processor[Table]):
entity_profile = self.profile_entity(orm_table, record)
entity_validations = None
if self.config.tests:
entity_validations = self.validate_entity(orm_table, entity_profile)
# First, check if we have any tests directly configured in the workflow
config_tests = None
if self.config.test_suite:
config_tests = self.validate_config_tests(record, entity_profile)
res = ProfileAndTests(
# Then, Check if the entity has any tests
record_tests = self.validate_entity_tests(record, entity_profile, config_tests)
res = ProfilerResponse(
table=record,
profile=entity_profile,
tests=entity_validations,
record_tests=record_tests,
)
return res

View File

@ -187,7 +187,7 @@ class Profiler(Generic[MetricType]):
This should only execute column metrics.
"""
logger.info("Running SQL Profiler...")
logger.debug("Running SQL Profiler...")
col_metrics = self.get_col_metrics(self.static_metrics)
@ -242,7 +242,7 @@ class Profiler(Generic[MetricType]):
Data should be saved under self.results
"""
logger.info("Running post Profiler...")
logger.debug("Running post Profiler...")
current_col_results: Dict[str, Any] = self._column_results.get(col.name)
if not current_col_results:

View File

@ -39,7 +39,6 @@ class DefaultProfiler(Profiler):
# Column Metrics
Metrics.MEAN.value,
Metrics.COUNT.value,
Metrics.DISTINCT_COUNT.value,
Metrics.MIN.value,
Metrics.MIN_LENGTH.value,
Metrics.MAX.value,

View File

@ -20,24 +20,6 @@ from pydantic import BaseModel, validator
from metadata.orm_profiler.metrics.registry import Metrics
class TimeMetricDef(BaseModel):
"""
Model representing the input of a time metric
"""
name: str # metric name
window: int # time delta in days to apply
class CustomMetricDef(BaseModel):
"""
Model representing the input of a time metric
"""
name: str # metric name
sql: str # custom SQL query to run
class ProfilerDef(BaseModel):
"""
Incoming profiler definition from the
@ -46,8 +28,9 @@ class ProfilerDef(BaseModel):
name: str # Profiler name
metrics: List[str] # names of currently supported Static and Composed metrics
time_metrics: List[TimeMetricDef] = None
custom_metrics: List[CustomMetricDef] = None
# TBD:
# time_metrics: List[TimeMetricDef] = None
# custom_metrics: List[CustomMetricDef] = None
# rule_metrics: ...
@validator("metrics", each_item=True)

View File

@ -18,7 +18,7 @@ from pathlib import Path
from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.orm_profiler.api.models import ProfileAndTests
from metadata.orm_profiler.api.models import ProfilerResponse
from metadata.orm_profiler.utils import logger
logger = logger()
@ -54,7 +54,7 @@ class FileSink(Sink[Entity]):
config = FileSinkConfig.parse_obj(config_dict)
return cls(ctx, config)
def write_record(self, record: ProfileAndTests) -> None:
def write_record(self, record: ProfilerResponse) -> None:
if self.wrote_something:
self.file.write("\n")
@ -62,25 +62,9 @@ class FileSink(Sink[Entity]):
self.file.write(f"Profile for: {record.table.fullyQualifiedName}\n")
self.file.write(f"{record.profile.json()}\n")
if record.tests:
if record.record_tests:
self.file.write(f"\nTest results:\n")
for test in record.tests.table_tests:
self.file.write(f"\tTable Tests results:\n")
for validation in test.expression:
self.file.write(
f"\t\t{test.name}: {validation.valid}, (Real) {validation.computed_metric}"
+ f" <{validation.operator.__name__}> {validation.value} (expected)\n"
)
for col_test in record.tests.column_tests:
self.file.write(f"\tColumn Tests results:\n")
for column in col_test.columns:
for validation in column.expression:
self.file.write(
f"\t\t[{column.column}] - {column.name}: {validation.valid}, (Real) {validation.computed_metric}"
+ f" <{validation.operator.__name__}> {validation.value} (expected)\n"
)
self.file.write(f"{record.record_tests.json()}\n")
self.wrote_something = True
self.report.records_written(record.table.fullyQualifiedName)

View File

@ -12,11 +12,15 @@
"""
OpenMetadata REST Sink implementation for the ORM Profiler results
"""
import traceback
from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.orm_profiler.api.models import ProfilerResponse
from metadata.orm_profiler.utils import logger
logger = logger()
@ -56,7 +60,31 @@ class MetadataRestSink(Sink[Entity]):
return self.status
def close(self) -> None:
pass
self.metadata.close()
def write_record(self, record: Entity) -> None:
pass
def write_record(self, record: ProfilerResponse) -> None:
try:
self.metadata.ingest_table_profile_data(
table=record.table, table_profile=[record.profile]
)
if record.record_tests:
for table_test in record.record_tests.table_tests:
self.metadata.add_table_test(
table=record.table, table_test=table_test
)
for col_test in record.record_tests.column_tests:
self.metadata.add_column_test(table=record.table, col_test=col_test)
logger.info(
f"Successfully ingested profiler & test data for {record.table.fullyQualifiedName}"
)
self.status.records_written(f"Table: {record.table.fullyQualifiedName}")
except APIError as err:
logger.error(
f"Failed to sink profiler & test data for {record.table.fullyQualifiedName} - {err}"
)
logger.debug(traceback.print_exc())
self.status.failure(f"Table: {record.table.fullyQualifiedName}")

View File

@ -0,0 +1,66 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ColumnValuesToBeBetween validation implementation
"""
from datetime import datetime
from metadata.generated.schema.entity.data.table import ColumnProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween,
)
from metadata.orm_profiler.utils import logger
logger = logger()
def column_values_to_be_between(
test_case: ColumnValuesToBeBetween,
col_profile: ColumnProfile,
execution_date: datetime,
) -> TestCaseResult:
"""
Validate Column Values metric
:param test_case: ColumnValuesToBeBetween
:param col_profile: should contain MIN & MAX metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
if col_profile.min is None or col_profile.max is None:
msg = (
"We expect `min` & `max` to be informed on the profiler for ColumnValuesToBeBetween"
+ f" but got min={col_profile.min}, max={col_profile.max}."
)
logger.error(msg)
return TestCaseResult(
executionTime=execution_date.timestamp(),
testCaseStatus=TestCaseStatus.Aborted,
result=msg,
)
status = (
TestCaseStatus.Success
if col_profile.min >= test_case.minValue
and col_profile.max <= test_case.maxValue
else TestCaseStatus.Failed
)
result = (
f"Found min={col_profile.min}, max={col_profile.max} vs."
+ f" the expected min={test_case.minValue}, max={test_case.maxValue}."
)
return TestCaseResult(
executionTime=execution_date.timestamp(), testCaseStatus=status, result=result
)

View File

@ -0,0 +1,65 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ColumnValuesToBeUnique validation implementation
"""
from datetime import datetime
from metadata.generated.schema.entity.data.table import ColumnProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.column.columnValuesToBeUnique import (
ColumnValuesToBeUnique,
)
from metadata.orm_profiler.utils import logger
logger = logger()
def column_values_to_be_unique(
_: ColumnValuesToBeUnique,
col_profile: ColumnProfile,
execution_date: datetime,
) -> TestCaseResult:
"""
Validate Column Values metric
:param _: ColumnValuesToBeUnique. Just used to trigger singledispatch
:param col_profile: should contain count and distinct count metrics
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
if col_profile.valuesCount is None or col_profile.uniqueCount is None:
msg = (
"We expect `valuesCount` & `uniqueCount` to be informed on the profiler for ColumnValuesToBeUnique"
+ f" but got valuesCount={col_profile.valuesCount}, uniqueCount={col_profile.uniqueCount}."
)
logger.error(msg)
return TestCaseResult(
executionTime=execution_date.timestamp(),
testCaseStatus=TestCaseStatus.Aborted,
result=msg,
)
status = (
TestCaseStatus.Success
if col_profile.valuesCount == col_profile.uniqueCount
else TestCaseStatus.Failed
)
result = (
f"Found valuesCount={col_profile.valuesCount} vs. uniqueCount={col_profile.uniqueCount}."
+ f" Both counts should be equal for column values to be unique."
)
return TestCaseResult(
executionTime=execution_date.timestamp(), testCaseStatus=status, result=result
)

View File

@ -10,137 +10,48 @@
# limitations under the License.
"""
Core Validation definitions
Core Validation definitions.
In this module we define how to check specific test case
behavior based on the computed metrics of the profiler.
These functions should not raise an error, but rather
mark the test as Failure/Aborted and pass a proper
result string. The ORM Processor will be the one in charge
of logging these issues.
"""
import operator as op
from typing import Any, Callable, Dict, Union
from functools import singledispatch
from pydantic import ValidationError
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.orm_profiler.utils import logger
from metadata.orm_profiler.validations.column.column_values_to_be_between import (
column_values_to_be_between,
)
from metadata.orm_profiler.validations.column.column_values_to_be_unique import (
column_values_to_be_unique,
)
from metadata.orm_profiler.validations.table.table_row_count_to_equal import (
table_row_count_to_equal,
)
logger = logger()
class ValidationConversionException(Exception):
@singledispatch
def validate(test_case, *args, **kwargs) -> TestCaseResult:
"""
Issue converting parser results to our Metrics and Validations
Default function to validate test cases.
Note that the first argument should be a positional argument.
"""
raise NotImplementedError(
f"Missing test case validation implementation for {type(test_case)}."
)
class MissingMetricException(Exception):
"""
The required Metric is not available in the profiler results
"""
# Table Tests
validate.register(table_row_count_to_equal)
_OPERATOR_MAP = {
"<": op.lt,
">": op.gt,
"==": op.eq,
"!=": op.ne,
"<=": op.le,
">=": op.ge,
}
class Validation:
"""
Base class for Validation definition.
This data will be extracted from the TestDef expression.
We will map here the results from parsing with
the grammar.
"""
def __init__(self, metric: str, operator: Callable, value: Union[float, int, str]):
self.metric = metric
self.operator = operator
self.value = value
self.valid = None
self.computed_metric = None
@classmethod
def create(cls, raw_validation: Dict[str, str]) -> "Validation":
"""
Given the results of the grammar parser, convert
them to a Validation class with the assigned Metric,
the right operator and the casted value.
"""
raw_metric = raw_validation.get("metric")
if not raw_metric:
raise ValidationConversionException(
f"Missing metric information in {raw_validation}."
)
metric = Metrics.get(raw_metric.upper())
if not metric:
logger.error("Error trying to get Metric from Registry.")
raise ValidationConversionException(
f"Cannot find metric from {raw_validation} in the Registry."
)
metric_name = metric.name()
operator = _OPERATOR_MAP.get(raw_validation.get("operation"))
if not operator:
logger.error("Invalid Operator when converting to validation.")
raise ValidationConversionException(
f"Cannot find operator from {raw_validation}."
)
raw_value = raw_validation.get("value")
if not raw_value:
logger.error("Missing or Empty value")
raise ValidationConversionException(
f"Missing or empty value in {raw_validation}."
)
if raw_value.isdigit():
value = int(raw_value) # Check if int
else:
try:
value = float(raw_value) # Otherwise, might be float
except ValueError:
value = raw_value # If not, leave as string
try:
validation = cls(metric=metric_name, operator=operator, value=value)
except ValidationError as err:
logger.error(
"Error trying to convert a RAW validation to a Validation model"
)
raise err
return validation
def validate(self, results: Dict[str, Any]) -> "Validation":
"""
Given a Validation and the profiler results,
compare their data.
We will call this function for each validation
that we received.
Each time we will return a Validation object
with a properly informed `valid` field,
containing the result of the validation.
"""
self.computed_metric = results.get(self.metric)
if not self.computed_metric:
raise MissingMetricException(
f"The required metric {self.metric} is not available"
+ f" in the profiler results: {results}."
)
is_valid = self.operator(
self.computed_metric, self.value # Order matters. Real value vs. expected
)
self.valid = is_valid # Either True / False
return self # return self for convenience
# Column Tests
validate.register(column_values_to_be_between)
validate.register(column_values_to_be_unique)

View File

@ -1,112 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validation grammar definition.
We will use this to convert the test cases specified
in the JSON workflows to our internal metrics, profilers
and validations.
We need to parse expressions such as:
- "row_count > 1000"
- "qty_sold_per_day > 100"
Test definitions are going to be pointing to tables or columns.
This means that we will have that information beforehand.
The job of the grammar is to make it as easy as possible
for users to define their tests and link that logic
to the profiler objects.
"""
from typing import Dict, List
from parsimonious import NodeVisitor
from parsimonious.grammar import Grammar
# To understand the grammar definition better, read it from bottom to top.
# Line by line, we are doing:
# 1. ws: define the whitespace regex
# 2. operand: set of supported operands. Only one can match
# 3. operation: [space]operand[space] or just operand. Spaces are optional
# 4. value: what do we match in the expression. It can be any word, e.g., 100, Food, Seat500, 0.3
# Note that we need to start checking the number. Otherwise, `would` match but leave
# unparsed string in decimals.
# 5. metric: letters and underscores. We will match them to Metrics afterwards
grammar = Grammar(
"""
test_def = (rule / sep)*
rule = metric operation value
sep = ws? "&" ws?
metric = ~r"[a-zA-Z_]+"
value = number / word
word = ~r"[-\w]+"
number = ~r"\d+.?(\d+)?"
operation = ws? operand ws?
operand = "==" / "!=" / "<=" / ">=" / "<" / ">"
ws = ~"\s*"
"""
)
class ExpVisitor(NodeVisitor):
"""
Visit the extracted nodes from our
test expression definition
"""
@staticmethod
def visit_test_def(_, visited_children):
"""Returns the overall output."""
validations = [child[0] for child in visited_children if child[0]]
return validations
@staticmethod
def visit_rule(_, visited_children):
rule = {key: value for key, value in visited_children}
return rule
@staticmethod
def visit_sep(*args, **kwargs):
pass
@staticmethod
def visit_metric(node, _):
return node.expr_name, node.text
@staticmethod
def visit_operation(node, _):
return node.expr_name, node.text.strip()
@staticmethod
def visit_value(node, _):
return node.expr_name, node.text
def generic_visit(self, node, visited_children):
"""
Generic visit method. Return the children
or the node if there are no children
"""
return visited_children or node
def parse(expression: str, visitor: ExpVisitor) -> List[Dict[str, str]]:
"""
Given an expression, parse it with
our grammar and return the visiting
result
"""
tree = grammar.parse(expression)
return visitor.visit(tree)

View File

@ -13,82 +13,32 @@
Models to map tests and validations from
JSON workflows to the profiler
"""
from typing import List
from typing import List, Optional
from parsimonious import ParseError
from pydantic import BaseModel, validator
from metadata.orm_profiler.utils import logger
from metadata.orm_profiler.validations.core import Validation
from metadata.orm_profiler.validations.grammar import ExpVisitor, parse
logger = logger()
visitor = ExpVisitor()
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
class ExpressionModel(BaseModel):
class TestDef(ConfigModel):
"""
We use this model to convert expression to Validations
on the fly while we are parsing the JSON configuration.
Table test definition
It is a comfortable and fast way to know if we'll be
able to parse the tests with our grammar.
We expect:
- table name
- List of table tests
- List of column tests
"""
expression: List[Validation]
enabled: bool = True
@validator("expression", pre=True)
def convert_expression(cls, value): # cls as per pydantic docs
"""
Make sure that we can parse the expression to a
validation
"""
try:
raw_validation = parse(value, visitor)
except ParseError as err:
logger.error(f"Cannot parse expression properly: {value}")
raise err
return [Validation.create(val) for val in raw_validation]
class Config:
arbitrary_types_allowed = True
table: str # Table FQDN
table_tests: Optional[List[CreateTableTestRequest]] = None
column_tests: Optional[List[CreateColumnTestRequest]] = None
class TableTest(ExpressionModel):
class TestSuite(ConfigModel):
"""
Incoming table test definition from the workflow
Config test definition
"""
name: str
table: str # fullyQualifiedName
class ColumnTestExpression(ExpressionModel):
"""
Column test expression definition
"""
name: str
column: str
class ColumnTest(BaseModel):
"""
Incoming column test definition from the workflow
"""
table: str # fullyQualifiedName
name: str # Set of col tests
columns: List[ColumnTestExpression]
class TestDef(BaseModel):
"""
Base Test definition
"""
name: str
table_tests: List[TableTest] = None
column_tests: List[ColumnTest] = None
name: str # Test Suite name
tests: List[TestDef]

View File

@ -0,0 +1,58 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TableRowCountToEqual validation implementation
"""
from datetime import datetime
from metadata.generated.schema.entity.data.table import TableProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.table.tableRowCountToEqual import (
TableRowCountToEqual,
)
from metadata.orm_profiler.utils import logger
logger = logger()
def table_row_count_to_equal(
test_case: TableRowCountToEqual,
table_profile: TableProfile,
execution_date: datetime,
) -> TestCaseResult:
"""
Validate row count metric
:param test_case: TableRowCountToEqual
:param table_profile: should contain row count metric
:param execution_date: Datetime when the tests ran
:return: TestCaseResult with status and results
"""
if table_profile.rowCount is None:
msg = "rowCount should not be None for TableRowCountToEqual"
logger.error(msg)
return TestCaseResult(
executionTime=execution_date.timestamp(),
testCaseStatus=TestCaseStatus.Aborted,
result=msg,
)
status = (
TestCaseStatus.Success
if table_profile.rowCount == test_case.value
else TestCaseStatus.Failed
)
result = f"Found {table_profile.rowCount} rows vs. the expected {test_case.value}"
return TestCaseResult(
executionTime=execution_date.timestamp(), testCaseStatus=status, result=result
)

View File

@ -16,12 +16,16 @@ import uuid
from datetime import datetime
from unittest import TestCase
import pytest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
Column,
@ -39,8 +43,26 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween,
)
from metadata.generated.schema.tests.columnTest import (
ColumnTest,
ColumnTestCase,
ColumnTestType,
)
from metadata.generated.schema.tests.table.tableRowCountToEqual import (
TableRowCountToEqual,
)
from metadata.generated.schema.tests.tableTest import (
TableTest,
TableTestCase,
TableTestType,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.table_queries import TableUsageRequest
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
@ -382,3 +404,123 @@ class OMetaTableTest(TestCase):
)
assert res.id == entity_ref.id
def test_add_table_tests(self):
"""
Add tableTests to table instance
"""
table = self.metadata.create_or_update(data=self.create)
table_test = CreateTableTestRequest(
description="Testing something",
testCase=TableTestCase(
config=TableRowCountToEqual(value=100),
tableTestType=TableTestType.tableRowCountToEqual,
),
)
table_with_test = self.metadata.add_table_test(
table=table, table_test=table_test
)
assert len(table_with_test.tableTests) == 1
assert table_with_test.tableTests[0].testCase == table_test.testCase
test_case_result = TestCaseResult(
result="some result",
executionTime=datetime.now().timestamp(),
testCaseStatus=TestCaseStatus.Success,
)
table_test_with_res = CreateTableTestRequest(
description="Testing something",
testCase=TableTestCase(
config=TableRowCountToEqual(value=100),
tableTestType=TableTestType.tableRowCountToEqual,
),
result=test_case_result,
)
table_with_test_and_res = self.metadata.add_table_test(
table=table, table_test=table_test_with_res
)
assert len(table_with_test_and_res.tableTests[0].results) == 1
assert (
table_with_test_and_res.tableTests[0].results[0].testCaseStatus
== TestCaseStatus.Success
)
def test_add_column_tests(self):
"""
Add columnTests to table instance
"""
table = self.metadata.create_or_update(data=self.create)
col_test = CreateColumnTestRequest(
columnName="id",
testCase=ColumnTestCase(
config=ColumnValuesToBeBetween(minValue=1, maxValue=3),
columnTestType=ColumnTestType.columnValuesToBeBetween,
),
)
updated_table = self.metadata.add_column_test(table=table, col_test=col_test)
id_test = next(
iter([col for col in updated_table.columns if col.name.__root__ == "id"]),
None,
)
assert len(id_test.columnTests) == 1
assert id_test.columnTests[0].testCase == col_test.testCase
# Column needs to exist in the table!
with pytest.raises(APIError):
ko_test = CreateColumnTestRequest(
columnName="random_column",
testCase=ColumnTestCase(
config=ColumnValuesToBeBetween(minValue=1, maxValue=3),
columnTestType=ColumnTestType.columnValuesToBeBetween,
),
)
self.metadata.add_column_test(table=table, col_test=ko_test)
col_test_res = TestCaseResult(
result="some result",
executionTime=datetime.now().timestamp(),
testCaseStatus=TestCaseStatus.Success,
)
col_test_with_res = CreateColumnTestRequest(
columnName="id",
testCase=ColumnTestCase(
config=ColumnValuesToBeBetween(minValue=1, maxValue=3),
columnTestType=ColumnTestType.columnValuesToBeBetween,
),
result=col_test_res,
)
table_with_test_and_res = self.metadata.add_column_test(
table=table, col_test=col_test_with_res
)
id_test_res = next(
iter(
[
col
for col in table_with_test_and_res.columns
if col.name.__root__ == "id"
]
),
None,
)
assert len(id_test_res.columnTests[0].results) == 1
assert (
id_test_res.columnTests[0].results[0].testCaseStatus
== TestCaseStatus.Success
)

View File

@ -118,34 +118,36 @@ class ProfilerWorkflowTest(TestCase):
"config": {
"profiler": {
"name": "my_profiler",
"metrics": ["row_count", "min", "COUNT", "null_count"],
"metrics": ["row_count", "min", "max", "COUNT", "null_count"],
},
"tests": {
"name": "my_tests",
"table_tests": [
"test_suite": {
"name": "My Test Suite",
"tests": [
{
"name": "check row number",
"table": "test_sqlite.main.users",
"expression": "row_count == 2",
}
],
"column_tests": [
{
"name": "some column tests",
"table": "test_sqlite.main.users",
"columns": [
"table": "test_sqlite.main.users", # FQDN
"table_tests": [
{
"name": "check name count",
"column": "name",
"expression": "count < 10",
},
{
"name": "check null count",
"column": "nickname",
"expression": "null_count == 0",
"testCase": {
"config": {
"value": 100,
},
"tableTestType": "tableRowCountToEqual",
},
},
],
}
"column_tests": [
{
"columnName": "age",
"testCase": {
"config": {
"minValue": 0,
"maxValue": 99,
},
"columnTestType": "columnValuesToBeBetween",
},
}
],
},
],
},
},

View File

@ -1,83 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test the Test Definition grammar
"""
import pytest
from parsimonious import ParseError
from metadata.orm_profiler.validations.grammar import ExpVisitor, parse
visitor = ExpVisitor()
def test_simple_parsing():
"""
Play with simple expressions
"""
res = parse("row_count == 100", visitor)
assert res == [{"metric": "row_count", "operation": "==", "value": "100"}]
res = parse("something_else > random", visitor)
assert res == [{"metric": "something_else", "operation": ">", "value": "random"}]
res = parse("something_else <= random", visitor)
assert res == [{"metric": "something_else", "operation": "<=", "value": "random"}]
# No spaces are needed
res = parse("hello!=99", visitor)
assert res == [{"metric": "hello", "operation": "!=", "value": "99"}]
res = parse("random==Seat500", visitor)
assert res == [{"metric": "random", "operation": "==", "value": "Seat500"}]
def test_multiple_parsing():
"""
We can also evaluate multiple test definitions
"""
res = parse("metric_a < value1 & metric_b == value2 & metric_c != value3", visitor)
expected = [
{"metric": "metric_a", "operation": "<", "value": "value1"},
{"metric": "metric_b", "operation": "==", "value": "value2"},
{"metric": "metric_c", "operation": "!=", "value": "value3"},
]
assert res == expected
# No spaces are needed
res = parse("metric_a<value1&metric_b==value2&metric_c!=value3", visitor)
assert res == expected
def test_complex_values():
"""
Check all the values we can cover
"""
res = parse("ratio < 0.1", visitor)
assert res == [{"metric": "ratio", "operation": "<", "value": "0.1"}]
def test_parse_error():
"""
Check formats that won't be parsed
by the grammar
"""
with pytest.raises(ParseError):
parse("wont match", visitor)
with pytest.raises(ParseError):
parse("ok not_an_operand ok", visitor)
with pytest.raises(ParseError):
parse("ok == !!!", visitor)

View File

@ -172,30 +172,17 @@ class MetricsTest(TestCase):
assert res.get(User.comments.name)[Metrics.MEAN.name] == 15.0
def test_distinct(self):
"""
Check distinct count
"""
dist = Metrics.DISTINCT_COUNT.value
res = (
Profiler(dist, session=self.session, table=User, use_cols=[User.age])
.execute()
._column_results
)
assert res.get(User.age.name)[Metrics.DISTINCT_COUNT.name] == 2
def test_duplicate_count(self):
"""
Check composed duplicate count
"""
count = Metrics.COUNT.value
dist = Metrics.DISTINCT_COUNT.value
unique = Metrics.UNIQUE_COUNT.value
dup_count = Metrics.DUPLICATE_COUNT.value
res = (
Profiler(
count,
dist,
unique,
dup_count,
session=self.session,
table=User,
@ -331,13 +318,14 @@ class MetricsTest(TestCase):
assert res.get(User.age.name)[Metrics.MAX.name] == 31
res = (
Profiler(_max, session=self.session, table=User, use_cols=[User.name])
.execute()
._column_results
)
# TMP disable min/max on strings
# res = (
# Profiler(_max, session=self.session, table=User, use_cols=[User.name])
# .execute()
# ._column_results
# )
assert res.get(User.name.name)[Metrics.MAX.name] == "John"
# assert res.get(User.name.name)[Metrics.MAX.name] == "John"
def test_min_length(self):
"""

View File

@ -93,7 +93,6 @@ class ProfilerTest(TestCase):
missingCount=None,
uniqueCount=2,
uniqueProportion=1.0,
distinctCount=2,
min=30.0,
max=31.0,
minLength=None,

View File

@ -8,111 +8,199 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test that we can safely convert to validation
and check results
"""
import operator as op
import pytest
"""
Test Table and Column Tests' validate implementations.
from metadata.orm_profiler.validations.core import (
Validation,
ValidationConversionException,
Each test should validate the Success, Failure and Aborted statuses
"""
from datetime import datetime
from metadata.generated.schema.entity.data.table import ColumnProfile, TableProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween,
)
from metadata.orm_profiler.validations.grammar import ExpVisitor, parse
from metadata.generated.schema.tests.column.columnValuesToBeUnique import (
ColumnValuesToBeUnique,
)
from metadata.generated.schema.tests.table.tableRowCountToEqual import (
TableRowCountToEqual,
)
from metadata.orm_profiler.validations.core import validate
visitor = ExpVisitor()
EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d")
def test_model_conversion():
def test_table_row_count_to_equal():
"""
Check that we can properly convert to a Validation model
Check TableRowCountToEqual
"""
raw_validation = parse("count == 100", visitor)[0]
model = Validation.create(raw_validation)
table_profile = TableProfile(
profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"),
rowCount=100,
)
assert model.metric == "valuesCount"
assert model.operator == op.eq
assert model.value == 100
res_ok = validate(
TableRowCountToEqual(value=100),
table_profile=table_profile,
execution_date=EXECUTION_DATE,
)
assert res_ok == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Success,
result="Found 100.0 rows vs. the expected 100",
)
raw_validation = parse("min > something", visitor)[0]
model = Validation.create(raw_validation)
res_ko = validate(
TableRowCountToEqual(value=50),
table_profile=table_profile,
execution_date=EXECUTION_DATE,
)
assert model.metric == "min"
assert model.operator == op.gt
assert model.value == "something"
assert res_ko == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Failed,
result="Found 100.0 rows vs. the expected 50",
)
raw_validation = parse("null_ratio < 0.2", visitor)[0]
model = Validation.create(raw_validation)
table_profile_aborted = TableProfile(
profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"),
)
assert model.metric == "nullProportion"
assert model.operator == op.lt
assert model.value == 0.2
res_aborted = validate(
TableRowCountToEqual(value=100),
table_profile=table_profile_aborted,
execution_date=EXECUTION_DATE,
)
# This validation does not make sense, but we are just checking cases
raw_validation = parse("null_ratio >= 5.4", visitor)[0]
model = Validation.create(raw_validation)
assert model.metric == "nullProportion"
assert model.operator == op.ge
assert model.value == 5.4
assert res_aborted == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Aborted,
result="rowCount should not be None for TableRowCountToEqual",
)
def test_model_conversion_exceptions():
def test_column_values_to_be_between():
"""
Check that we cannot pass malformed data
Check ColumnValuesToBeBetween
"""
# No info at all
with pytest.raises(ValidationConversionException):
Validation.create({})
column_profile = ColumnProfile(
min=1,
max=3,
)
# Invalid metric, cannot be found in Registry
with pytest.raises(ValidationConversionException):
Validation.create({"metric": "not a valid metric"})
res_ok = validate(
ColumnValuesToBeBetween(
minValue=0,
maxValue=3,
),
col_profile=column_profile,
execution_date=EXECUTION_DATE,
)
assert res_ok == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Success,
result="Found min=1.0, max=3.0 vs. the expected min=0, max=3.",
)
# Missing Operation key
with pytest.raises(ValidationConversionException):
Validation.create({"metric": "min"})
res_ko = validate(
ColumnValuesToBeBetween(
minValue=0,
maxValue=2,
),
col_profile=column_profile,
execution_date=EXECUTION_DATE,
)
# Invalid Operation value
with pytest.raises(ValidationConversionException):
Validation.create({"metric": "min", "operation": "invalid operation"})
assert res_ko == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Failed,
result="Found min=1.0, max=3.0 vs. the expected min=0, max=2.",
)
# Missing value key
with pytest.raises(ValidationConversionException):
Validation.create({"metric": "min", "operation": "=="})
column_profile_aborted = ColumnProfile(
min=1,
)
# Empty value
with pytest.raises(ValidationConversionException):
Validation.create({"metric": "min", "operation": "==", "value": ""})
res_aborted = validate(
ColumnValuesToBeBetween(
minValue=0,
maxValue=3,
),
col_profile=column_profile_aborted,
execution_date=EXECUTION_DATE,
)
assert res_aborted == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Aborted,
result=(
"We expect `min` & `max` to be informed on the profiler for ColumnValuesToBeBetween"
+ " but got min=1.0, max=None."
),
)
def test_validate():
def test_column_values_to_be_unique():
"""
Make sure that we are properly flagging validation results
Check ColumnValuesToBeUnique
"""
results = {"valuesCount": 100}
raw_validation = parse("count == 100", visitor)[0]
validation = Validation.create(raw_validation)
column_profile = ColumnProfile(
valuesCount=10,
uniqueCount=10,
)
assert validation.validate(results).valid
res_ok = validate(
ColumnValuesToBeUnique(),
col_profile=column_profile,
execution_date=EXECUTION_DATE,
)
assert res_ok == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Success,
result=(
"Found valuesCount=10.0 vs. uniqueCount=10.0."
+ " Both counts should be equal for column values to be unique."
),
)
raw_validation = parse("count != 100", visitor)[0]
validation = Validation.create(raw_validation)
column_profile_ko = ColumnProfile(
valuesCount=10,
uniqueCount=5,
)
assert not validation.validate(results).valid
res_ko = validate(
ColumnValuesToBeUnique(),
col_profile=column_profile_ko,
execution_date=EXECUTION_DATE,
)
results = {"nullProportion": 0.2}
assert res_ko == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Failed,
result=(
"Found valuesCount=10.0 vs. uniqueCount=5.0."
+ " Both counts should be equal for column values to be unique."
),
)
raw_validation = parse("Null_Ratio < 0.3", visitor)[0]
validation = Validation.create(raw_validation)
column_profile_aborted = ColumnProfile()
assert validation.validate(results).valid
res_aborted = validate(
ColumnValuesToBeUnique(),
col_profile=column_profile_aborted,
execution_date=EXECUTION_DATE,
)
raw_validation = parse("Null_Ratio >= 0.3", visitor)[0]
validation = Validation.create(raw_validation)
assert not validation.validate(results).valid
assert res_aborted == TestCaseResult(
executionTime=EXECUTION_DATE.timestamp(),
testCaseStatus=TestCaseStatus.Aborted,
result=(
"We expect `valuesCount` & `uniqueCount` to be informed on the profiler for ColumnValuesToBeUnique"
+ " but got valuesCount=None, uniqueCount=None."
),
)

View File

@ -18,7 +18,25 @@ from copy import deepcopy
import sqlalchemy as sqa
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween,
)
from metadata.generated.schema.tests.columnTest import (
ColumnTest,
ColumnTestCase,
ColumnTestType,
)
from metadata.generated.schema.tests.table.tableRowCountToEqual import (
TableRowCountToEqual,
)
from metadata.generated.schema.tests.tableTest import (
TableTest,
TableTestCase,
TableTestType,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sqlite import SQLiteConfig
@ -26,6 +44,7 @@ from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor
from metadata.orm_profiler.profiles.default import DefaultProfiler
from metadata.orm_profiler.profiles.models import ProfilerDef
from metadata.orm_profiler.validations.models import TestDef, TestSuite
config = {
"source": {"type": "sqlite", "config": {"service_name": "my_service"}},
@ -52,7 +71,7 @@ def test_init_workflow():
assert isinstance(workflow.processor, OrmProfilerProcessor)
assert workflow.processor.config.profiler is None
assert workflow.processor.config.tests is None
assert workflow.processor.config.test_suite is None
def test_filter_entities():
@ -188,110 +207,66 @@ def test_tests_def():
Validate the test case definition
"""
test_config = deepcopy(config)
test_config["processor"]["config"]["tests"] = {
"name": "my_tests",
"table_tests": [
test_config["processor"]["config"]["test_suite"] = {
"name": "My Test Suite",
"tests": [
{
"name": "first_test",
"table": "service.db.name",
"expression": "row_count > 100",
"enabled": False,
},
{
"name": "another_test",
"table": "service.db.name",
"expression": "row_count > 1000 & row_count < 2000",
},
],
"column_tests": [
{
"table": "service.db.name",
"name": "set_of_col_tests",
"columns": [
"table": "service.db.name", # FQDN
"table_tests": [
{
"name": "first_col_test",
"column": "column_name_1",
"expression": "min > 5",
},
{
"name": "another_col_test",
"column": "column_name_1",
"expression": "min > 5 & min < 10",
},
{
"name": "second_col_test",
"column": "column_name_2",
"expression": "null_ratio < 0.1",
"testCase": {
"config": {
"value": 100,
},
"tableTestType": "tableRowCountToEqual",
},
},
],
}
"column_tests": [
{
"columnName": "age",
"testCase": {
"config": {
"minValue": 0,
"maxValue": 99,
},
"columnTestType": "columnValuesToBeBetween",
},
}
],
},
],
}
test_workflow = ProfilerWorkflow.create(test_config)
assert isinstance(test_workflow.processor, OrmProfilerProcessor)
tests = test_workflow.processor.config.tests
suite = test_workflow.processor.config.test_suite
assert tests.name == "my_tests"
expected = TestSuite(
name="My Test Suite",
tests=[
TestDef(
table="service.db.name",
table_tests=[
CreateTableTestRequest(
testCase=TableTestCase(
config=TableRowCountToEqual(value=100),
tableTestType=TableTestType.tableRowCountToEqual,
),
)
],
column_tests=[
CreateColumnTestRequest(
columnName="age",
testCase=ColumnTestCase(
config=ColumnValuesToBeBetween(minValue=0, maxValue=99),
columnTestType=ColumnTestType.columnValuesToBeBetween,
),
)
],
)
],
)
# Check cardinality
assert len(tests.table_tests) == 2
assert len(tests.column_tests) == 1
assert len(tests.column_tests[0].columns) == 3
assert tests.table_tests[0].name == "first_test"
assert tests.table_tests[0].table == "service.db.name"
assert tests.table_tests[0].expression[0].metric == "rowCount"
assert not tests.table_tests[0].enabled
assert tests.column_tests[0].columns[0].name == "first_col_test"
assert tests.column_tests[0].columns[0].column == "column_name_1"
assert tests.column_tests[0].columns[0].expression[0].metric == "min"
assert tests.column_tests[0].columns[0].enabled
# We cannot do a 1:1 general assertion because we are dynamically
# creating the Validation classes. Then, the internal IDs don't match
# and the assertion fails. However, and for visual representation,
# the resulting class looks like follows:
# TestDef(
# name="my_tests",
# table_tests=[
# # I can have multiple tests on the same table
# TableTest(
# name="first_test",
# table="service.db.name",
# expression="row_number > 100", # This will be one Validation
# enabled=False,
# ),
# TableTest(
# name="another_test",
# table="service.db.name",
# expression="row_number > 1000 & row_number < 2000", # This will be two Validations
# ),
# ],
# column_tests=[
# ColumnTest(
# table="service.db.name",
# name="set_of_col_tests",
# columns=[
# ColumnTestExpression(
# name="first_col_test",
# column="column_name_1",
# expression="min > 5", # One Validation
# ),
# ColumnTestExpression(
# name="another_col_test",
# column="column_name_1",
# expression="min > 5 & min < 10", # Two Validations
# ),
# ColumnTestExpression(
# name="second_col_test",
# column="column_name_2",
# expression="null_ratio < 0.1", # One Validation
# ),
# ],
# )
# ],
# )
assert suite == expected