diff --git a/Makefile b/Makefile index 1f8a214bbc0..ded4bed04d7 100644 --- a/Makefile +++ b/Makefile @@ -37,13 +37,13 @@ lint: ## Run pylint on the Python sources to analyze the codebase .PHONY: py_format py_format: ## Run black and isort to format the Python codebase - isort $(PY_SOURCE) --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3 - black $(PY_SOURCE) --extend-exclude $(PY_SOURCE)/metadata/generated + isort ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3 + black ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated .PHONY: py_format_check py_format_check: ## Check if Python sources are correctly formatted - black --check --diff ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated isort --check-only ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3 + black --check --diff ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated ## Ingestion models generation .PHONY: generate diff --git a/ingestion/examples/sample_data/datasets/tableTests.json b/ingestion/examples/sample_data/datasets/tableTests.json new file mode 100644 index 00000000000..ff58f61c173 --- /dev/null +++ b/ingestion/examples/sample_data/datasets/tableTests.json @@ -0,0 +1,100 @@ +[ + { + "table": "bigquery_gcp.shopify.dim_staff", + "tableTests": [ + { + "description": "Rows should always be 100 because of something", + "testCase": { + "config": { + "value": 120 + }, + "tableTestType": "tableRowCountToEqual" + }, + "executionFrequency": "Daily", + "result": { + "executionTime": 1646221199, + "testCaseStatus": "Failed", + "result": "Found 100.0 rows vs. the expected 120" + } + }, + { + "description": "Rows should always be 100 because of something", + "testCase": { + "config": { + "value": 120 + }, + "tableTestType": "tableRowCountToEqual" + }, + "executionFrequency": "Daily", + "result": { + "executionTime": 1646220190, + "testCaseStatus": "Success", + "result": "Found 120.0 rows vs. the expected 120" + } + }, + { + "description": "We expect certain columns", + "testCase": { + "config": { + "value": 5 + }, + "tableTestType": "tableColumnCountToEqual" + }, + "executionFrequency": "Daily", + "result": { + "executionTime": 1646221199, + "testCaseStatus": "Success", + "result": "Found 5.0 columns vs. the expected 5" + } + } + ], + "columnTests": [ + { + "columnName": "user_id", + "description": "user_id should be positive", + "testCase": { + "config": { + "minValue": 0 + }, + "columnTestType": "columnValuesToBeBetween" + }, + "executionFrequency": "Daily", + "result": { + "executionTime": 1646221199, + "testCaseStatus": "Success", + "result": "Found min=1.0 vs. the expected min=0" + } + }, + { + "columnName": "user_id", + "description": "user_id should be positive", + "testCase": { + "config": { + "minValue": 0 + }, + "columnTestType": "columnValuesToBeBetween" + }, + "executionFrequency": "Daily", + "result": { + "executionTime": 1646220190, + "testCaseStatus": "Success", + "result": "Found min=1.0 vs. the expected min=0" + } + }, + { + "columnName": "email", + "description": "emails should be unique", + "testCase": { + "config": {}, + "columnTestType": "columnValuesToBeUnique" + }, + "executionFrequency": "Daily", + "result": { + "executionTime": 1646221199, + "testCaseStatus": "Success", + "result": "Found uniqueCount=100.0 vs. valuesCount=100.0" + } + } + ] + } +] \ No newline at end of file diff --git a/ingestion/examples/sample_data/datasets/tables.json b/ingestion/examples/sample_data/datasets/tables.json index ee3f4080b74..6df99597a42 100644 --- a/ingestion/examples/sample_data/datasets/tables.json +++ b/ingestion/examples/sample_data/datasets/tables.json @@ -5855,6 +5855,66 @@ "query": "select email from dim_staff", "duration": 0.294 } + ], + "tableProfile": [ + { + "profileDate": "2021-10-16", + "columnCount": 5, + "rowCount": 100, + "columnProfile": [ + { + "name": "user_id", + "uniqueCount": 100.0, + "uniqueProportion": 1.0, + "nullCount": 0.0, + "nullProportion": 0.0, + "sum": 5050, + "min": 1, + "max": 100, + "mean": 50.5, + "stddev": 28.86607004772212 + }, + { + "name": "shop_id", + "uniqueCount": 100.0, + "uniqueProportion": 1.0, + "nullCount": 0.0, + "nullProportion": 0.0, + "sum": 5050, + "min": 1, + "max": 100, + "mean": 50.5, + "stddev": 28.86607004772212 + }, + { + "name": "first_name", + "uniqueCount": 79.0, + "uniqueProportion": 0.79, + "nullCount": 1, + "nullProportion": 0.01, + "minLength": 5, + "maxLength": 19 + }, + { + "name": "last_name", + "uniqueCount": 100.0, + "uniqueProportion": 1.0, + "nullCount": 0, + "nullProportion": 0.0, + "minLength": 7, + "maxLength": 15 + }, + { + "name": "email", + "uniqueCount": 100.0, + "uniqueProportion": 1.0, + "nullCount": 0, + "nullProportion": 0.0, + "minLength": 12, + "maxLength": 30 + } + ] + } ] }, { diff --git a/ingestion/setup.py b/ingestion/setup.py index 8e8dadeb0be..d60de1c3358 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -45,7 +45,6 @@ base_requirements = { "Jinja2>=2.11.3", "PyYAML", "jsonschema", - "parsimonious==0.8.1", "sqllineage==1.3.3", } diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index d8d494059cd..690cbd12c4a 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -58,11 +58,15 @@ def metadata(debug: bool, log_level: str) -> None: if debug: logging.getLogger().setLevel(logging.INFO) logging.getLogger("metadata").setLevel(logging.DEBUG) + logging.getLogger("ORM Profiler Workflow").setLevel(logging.DEBUG) + logging.getLogger("Profiler").setLevel(logging.DEBUG) elif log_level: logging.getLogger().setLevel(log_level) else: logging.getLogger().setLevel(logging.WARNING) logging.getLogger("metadata").setLevel(logging.INFO) + logging.getLogger("ORM Profiler Workflow").setLevel(logging.INFO) + logging.getLogger("Profiler").setLevel(logging.INFO) @metadata.command() diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/distinct_count.py b/ingestion/src/metadata/ingestion/models/table_tests.py similarity index 50% rename from ingestion/src/metadata/orm_profiler/metrics/static/distinct_count.py rename to ingestion/src/metadata/ingestion/models/table_tests.py index 50cb2355f1a..1d414dab9aa 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/distinct_count.py +++ b/ingestion/src/metadata/ingestion/models/table_tests.py @@ -8,29 +8,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ -Distinct Count Metric definition +This Model is required to ingest the sample data +from sample_data.py source. + +The usual process to add new tests is either via the UI +or by using the ORM Profiler. """ -from sqlalchemy import distinct, func +from typing import Optional -from metadata.orm_profiler.metrics.core import StaticMetric, _label +from pydantic import BaseModel + +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest -class DistinctCount(StaticMetric): - """ - Distinct COUNT Metric - - Given a column, return the Distinct count. Ignores NULL values - """ - - @classmethod - def name(cls): - return "distinctCount" - - def metric_type(self): - return int - - @_label - def fn(self): - return func.count(distinct(self.col)) +class OMetaTableTest(BaseModel): + table_name: str + table_test: Optional[CreateTableTestRequest] = None + column_test: Optional[CreateColumnTestRequest] = None diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py index 044382ca497..affec3ba231 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py @@ -4,8 +4,10 @@ Mixin class containing Table specific methods To be used by OpenMetadata class """ import logging -from typing import List +from typing import List, Union +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.table import ( DataModel, @@ -15,6 +17,8 @@ from metadata.generated.schema.entity.data.table import ( TableJoins, TableProfile, ) +from metadata.generated.schema.tests.columnTest import ColumnTest +from metadata.generated.schema.tests.tableTest import TableTest from metadata.ingestion.models.table_queries import TableUsageRequest from metadata.ingestion.ometa.client import REST @@ -131,3 +135,45 @@ class OMetaTableMixin: data=table_join_request.json(), ) logger.debug("published frequently joined with %s", resp) + + def _add_tests( + self, + table: Table, + test: Union[CreateTableTestRequest, CreateColumnTestRequest], + path: str, + ) -> Table: + """ + Internal function to add test data + + :param table: Table instance + :param test: TableTest or ColumnTest to add + :param path: tableTest or columnTest str + :return: Updated Table instance + """ + resp = self.client.put( + f"{self.get_suffix(Table)}/{table.id.__root__}/{path}", data=test.json() + ) + + return Table(**resp) + + def add_table_test(self, table: Table, table_test: CreateTableTestRequest) -> Table: + """ + For a given table, PUT new TableTest definitions and results + + :param table: Table instance + :param table_test: table test data + :return: Updates Table instance + """ + + return self._add_tests(table=table, test=table_test, path="tableTest") + + def add_column_test(self, table: Table, col_test: CreateColumnTestRequest) -> Table: + """ + For a given table, PUT new TableTest definitions and results + + :param table: Table instance + :param col_test: column test data + :return: Updates Table instance + """ + + return self._add_tests(table=table, test=col_test, path="columnTest") diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index cb166b3b837..58cbf928687 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -11,7 +11,7 @@ import logging import traceback -from typing import TypeVar +from typing import TypeVar, Union from pydantic import BaseModel, ValidationError @@ -29,6 +29,8 @@ from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequ from metadata.generated.schema.api.teams.createRole import CreateRoleRequest from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.pipeline import Pipeline @@ -41,6 +43,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable +from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -74,6 +77,9 @@ class MetadataRestSink(Sink[Entity]): config: MetadataRestSinkConfig status: SinkStatus + # We want to catch any errors that might happen during the sink + # pylint: disable=broad-except + def __init__( self, ctx: WorkflowContext, @@ -121,6 +127,8 @@ class MetadataRestSink(Sink[Entity]): self.write_ml_model(record) elif isinstance(record, DeleteTable): self.delete_table(record) + elif isinstance(record, OMetaTableTest): + self.write_table_tests(record) else: logging.info( f"Ignoring the record due to unknown Record type {type(record)}" @@ -476,6 +484,33 @@ class MetadataRestSink(Sink[Entity]): logger.debug(traceback.print_exc()) logger.error(err) + def write_table_tests(self, record: OMetaTableTest) -> None: + """ + Iterate over all table_tests and column_tests + for the given table and add them to the backend. + + :param record: Sample data record + """ + try: + # Fetch the table that we have already ingested + table = self.metadata.get_by_name(entity=Table, fqdn=record.table_name) + + test = None + if record.table_test: + self.metadata.add_table_test(table=table, table_test=record.table_test) + test = record.table_test.testCase.tableTestType.value + + if record.column_test: + self.metadata.add_column_test(table=table, col_test=record.column_test) + test = record.column_test.testCase.columnTestType.value + + logger.info(f"Table Tests: {record.table_name}.{test}") + self.status.records_written(f"Table Tests: {record.table_name}.{test}") + except Exception as err: + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) + logger.error(err) + def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 66f7fdf5e49..c41acc21e0b 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -28,6 +28,8 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.teams.createRole import CreateRoleRequest from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.location import Location, LocationType from metadata.generated.schema.entity.data.pipeline import Pipeline @@ -36,12 +38,16 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.generated.schema.tests.columnTest import ColumnTestCase +from metadata.generated.schema.tests.tableTest import TableTestCase from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard +from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -168,6 +174,11 @@ class SampleTableMetadataGenerator: class SampleDataSource(Source[Entity]): + """ + Loads JSON data and prepares the required + python objects to be sent to the Sink. + """ + def __init__( self, config: SampleDataSourceConfig, metadata_config: MetadataServerConfig, ctx ): @@ -268,6 +279,9 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/models/models.json", "r") ) self.user_entity = {} + self.table_tests = json.load( + open(self.config.sample_data_folder + "/datasets/tableTests.json", "r") + ) @classmethod def create(cls, config_dict, metadata_config_dict, ctx): @@ -283,6 +297,7 @@ class SampleDataSource(Source[Entity]): yield from self.ingest_locations() yield from self.ingest_glue() yield from self.ingest_tables() + yield from self.ingest_table_tests() yield from self.ingest_topics() yield from self.ingest_charts() yield from self.ingest_dashboards() @@ -484,6 +499,41 @@ class SampleDataSource(Source[Entity]): except Exception as err: logger.error(err) + def ingest_table_tests(self) -> Iterable[OMetaTableTest]: + """ + Iterate over all tables and fetch + the sample test data. For each table test and column + test, prepare all TestCaseResults + """ + try: + for test_def in self.table_tests: + table_name = test_def["table"] + + for table_test in test_def["tableTests"]: + create_table_test = CreateTableTestRequest( + description=table_test.get("description"), + testCase=TableTestCase.parse_obj(table_test["testCase"]), + executionFrequency=table_test["executionFrequency"], + result=TestCaseResult.parse_obj(table_test["result"]), + ) + yield OMetaTableTest( + table_name=table_name, table_test=create_table_test + ) + + for col_test in test_def["columnTests"]: + create_column_test = CreateColumnTestRequest( + description=col_test.get("description"), + columnName=col_test["columnName"], + testCase=ColumnTestCase.parse_obj(col_test["testCase"]), + executionFrequency=col_test["executionFrequency"], + result=TestCaseResult.parse_obj(col_test["result"]), + ) + yield OMetaTableTest( + table_name=table_name, column_test=create_column_test + ) + except Exception as err: # pylint: disable=broad-except + logger.error(err) + def close(self): pass diff --git a/ingestion/src/metadata/orm_profiler/api/models.py b/ingestion/src/metadata/orm_profiler/api/models.py index c790c73503a..045156979fe 100644 --- a/ingestion/src/metadata/orm_profiler/api/models.py +++ b/ingestion/src/metadata/orm_profiler/api/models.py @@ -15,32 +15,25 @@ Return types for Profiler workflow execution. We need to define this class as we end up having multiple profilers per table and columns. """ -from typing import List, Optional - -from pydantic import BaseModel +from typing import Optional +from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.table import Table, TableProfile -from metadata.orm_profiler.profiles.core import Profiler from metadata.orm_profiler.profiles.models import ProfilerDef -from metadata.orm_profiler.validations.models import TestDef +from metadata.orm_profiler.validations.models import TestDef, TestSuite -class WorkflowResult(BaseModel): - class Config: - arbitrary_types_allowed = True - - -class ProfilerProcessorConfig(BaseModel): +class ProfilerProcessorConfig(ConfigModel): """ Defines how we read the processor information from the workflow JSON definition """ profiler: Optional[ProfilerDef] = None - tests: Optional[TestDef] = None + test_suite: Optional[TestSuite] = None -class ProfileAndTests(BaseModel): +class ProfilerResponse(ConfigModel): """ ORM Profiler processor response. @@ -50,4 +43,4 @@ class ProfileAndTests(BaseModel): table: Table profile: TableProfile - tests: Optional[TestDef] = None + record_tests: Optional[TestDef] = None diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 23311b1f9e5..c4c310a0485 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -42,7 +42,7 @@ from metadata.ingestion.source.sql_source_common import ( SQLConnectionConfig, SQLSourceStatus, ) -from metadata.orm_profiler.api.models import ProfileAndTests, ProfilerProcessorConfig +from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.orm_profiler.engines import create_and_bind_session, get_engine from metadata.orm_profiler.utils import logger @@ -191,7 +191,8 @@ class ProfilerWorkflow: self.metadata.list_entities( entity=Table, fields=[ - "tableProfile" + "tableProfile", + "tests", ], # We will need it for window metrics to check past data params={ "database": f"{self.source_config.service_name}.{database.name.__root__}" @@ -210,7 +211,7 @@ class ProfilerWorkflow: Run the profiling and tests """ for entity in self.list_entities(): - profile_and_tests: ProfileAndTests = self.processor.process(entity) + profile_and_tests: ProfilerResponse = self.processor.process(entity) if hasattr(self, "sink"): self.sink.write_record(profile_and_tests) diff --git a/ingestion/src/metadata/orm_profiler/metrics/composed/duplicate_count.py b/ingestion/src/metadata/orm_profiler/metrics/composed/duplicate_count.py index 35c9cc5296e..e5a65a47e86 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/composed/duplicate_count.py +++ b/ingestion/src/metadata/orm_profiler/metrics/composed/duplicate_count.py @@ -16,7 +16,7 @@ from typing import Any, Dict, Optional, Tuple from metadata.orm_profiler.metrics.core import ComposedMetric from metadata.orm_profiler.metrics.static.count import Count -from metadata.orm_profiler.metrics.static.distinct_count import DistinctCount +from metadata.orm_profiler.metrics.static.unique_count import UniqueCount class DuplicateCount(ComposedMetric): @@ -31,7 +31,7 @@ class DuplicateCount(ComposedMetric): @classmethod def required_metrics(cls) -> Tuple[str, ...]: - return Count.name(), DistinctCount.name() + return Count.name(), UniqueCount.name() @property def metric_type(self): @@ -43,9 +43,9 @@ class DuplicateCount(ComposedMetric): results of other Metrics """ count = res.get(Count.name()) - distinct_count = res.get(DistinctCount.name()) + unique_count = res.get(UniqueCount.name()) - if count is not None and distinct_count is not None: - return count - distinct_count + if count is not None and unique_count is not None: + return count - unique_count return None diff --git a/ingestion/src/metadata/orm_profiler/metrics/registry.py b/ingestion/src/metadata/orm_profiler/metrics/registry.py index 37a8926f7ae..efc6c41df5d 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/registry.py +++ b/ingestion/src/metadata/orm_profiler/metrics/registry.py @@ -24,7 +24,6 @@ from metadata.orm_profiler.metrics.composed.like_ratio import LikeRatio from metadata.orm_profiler.metrics.composed.null_ratio import NullRatio from metadata.orm_profiler.metrics.composed.unique_ratio import UniqueRatio from metadata.orm_profiler.metrics.static.count import Count -from metadata.orm_profiler.metrics.static.distinct_count import DistinctCount from metadata.orm_profiler.metrics.static.histogram import Histogram from metadata.orm_profiler.metrics.static.ilike_count import ILikeCount from metadata.orm_profiler.metrics.static.like_count import LikeCount @@ -51,7 +50,6 @@ class Metrics(MetricRegistry): # Static Metrics MEAN = Mean COUNT = Count - DISTINCT_COUNT = DistinctCount HISTOGRAM = Histogram ILIKE_COUNT = ILikeCount LIKE_COUNT = LikeCount diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/max.py b/ingestion/src/metadata/orm_profiler/metrics/static/max.py index 439c310ba04..6bc333a1403 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/max.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/max.py @@ -15,6 +15,7 @@ Max Metric definition from sqlalchemy import func from metadata.orm_profiler.metrics.core import StaticMetric, _label +from metadata.orm_profiler.orm.registry import is_quantifiable class Max(StaticMetric): @@ -30,4 +31,6 @@ class Max(StaticMetric): @_label def fn(self): + if not is_quantifiable(self.col.type): + return None return func.max(self.col) diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py b/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py index c945258909e..7d9fe3d7fc6 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py @@ -63,7 +63,7 @@ class MaxLength(StaticMetric): if is_concatenable(self.col.type): return MaxLengthFn(self.col) - logger.warning( + logger.debug( f"Don't know how to process type {self.col.type} when computing MAX_LENGTH" ) return None diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/min.py b/ingestion/src/metadata/orm_profiler/metrics/static/min.py index 89df9f847e3..a94b5093966 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/min.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/min.py @@ -15,6 +15,7 @@ Min Metric definition from sqlalchemy import func from metadata.orm_profiler.metrics.core import StaticMetric, _label +from metadata.orm_profiler.orm.registry import is_quantifiable class Min(StaticMetric): @@ -30,4 +31,6 @@ class Min(StaticMetric): @_label def fn(self): + if not is_quantifiable(self.col.type): + return None return func.min(self.col) diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py b/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py index 1dfe9256e65..025a26d411d 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py @@ -63,7 +63,7 @@ class MinLength(StaticMetric): if is_concatenable(self.col.type): return MinLengthFn(self.col) - logger.warning( + logger.debug( f"Don't know how to process type {self.col.type} when computing MIN_LENGTH" ) return None diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/stddev.py b/ingestion/src/metadata/orm_profiler/metrics/static/stddev.py index 29faac08443..a438fe4fb68 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/stddev.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/stddev.py @@ -70,7 +70,7 @@ class StdDev(StaticMetric): if is_quantifiable(self.col.type): return StdDevFn(self.col) - logger.info( + logger.debug( f"{self.col} has type {self.col.type}, which is not listed as quantifiable." + " We won't compute STDDEV for it." ) diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index b042364470d..f3f8cded45e 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -15,27 +15,33 @@ Defines the ORM Profiler processor For each table, we compute its profiler and run the validations. """ +import logging from dataclasses import dataclass, field -from typing import List +from datetime import datetime +from typing import List, Optional, Union -from sqlalchemy.orm import DeclarativeMeta, InstrumentedAttribute, Session +from sqlalchemy.orm import DeclarativeMeta, Session +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Table, TableProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.columnTest import ColumnTest, ColumnTestCase +from metadata.generated.schema.tests.tableTest import TableTest, TableTestCase from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.processor import Processor, ProcessorStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig -from metadata.orm_profiler.api.models import ProfileAndTests, ProfilerProcessorConfig +from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.orm_profiler.metrics.registry import Metrics from metadata.orm_profiler.orm.converter import ometa_to_orm from metadata.orm_profiler.profiles.core import Profiler from metadata.orm_profiler.profiles.default import DefaultProfiler -from metadata.orm_profiler.utils import logger -from metadata.orm_profiler.validations.core import Validation -from metadata.orm_profiler.validations.models import TestDef +from metadata.orm_profiler.validations.core import validate +from metadata.orm_profiler.validations.models import TestDef, TestSuite -logger = logger() +logger = logging.getLogger("ORM Profiler Workflow") @dataclass @@ -77,6 +83,8 @@ class OrmProfilerProcessor(Processor[Table]): self.report = {"tests": {}} + self.execution_date = datetime.now() + # OpenMetadata client to fetch tables self.metadata = OpenMetadata(self.metadata_config) @@ -119,6 +127,7 @@ class OrmProfilerProcessor(Processor[Table]): *metrics, session=self.session, table=orm, + profile_date=self.execution_date, ) def profile_entity(self, orm, table: Table) -> TableProfile: @@ -141,69 +150,259 @@ class OrmProfilerProcessor(Processor[Table]): self.status.processed(table.fullyQualifiedName) return profiler.get_profile() - def log_validation(self, name: str, validation: Validation) -> None: + def log_test_result(self, name: str, result: TestCaseResult) -> None: """ - Log validation results + Log test case results """ self.status.tested(name) - if not validation.valid: - self.status.failure( - f"{name}: Expected value {validation.value} vs. Real value {validation.computed_metric}" + if not result.testCaseStatus == TestCaseStatus.Success: + self.status.failure(f"{name}: {result.result}") + + @staticmethod + def get_test_name(table: Table, test_type: str, column_name: str = None): + """ + Build a unique identifier to log the test + in the shape of FQDN.[column].test_type + + :param table: Table Entity + :param test_type: We expected one test type per table & column + :param column_name: Column name, if logging a column test + :return: Unique name for this execution + """ + col = f".{column_name}." if column_name else "." + return table.fullyQualifiedName + col + test_type + + def run_table_test( + self, table: Table, test_case: TableTestCase, profiler_results: TableProfile + ) -> Optional[TestCaseResult]: + """ + Run & log the table test against the TableProfile. + + :param table: Table Entity being processed + :param test_case: Table Test Case to run + :param profiler_results: Table profiler with informed metrics + :return: TestCaseResult + """ + test_name = self.get_test_name( + table=table, test_type=test_case.tableTestType.value + ) + if test_name in self.status.tests: + logger.info( + f"Test {test_name} has already been computed in this execution." + ) + return None + + test_case_result: TestCaseResult = validate( + test_case.config, + table_profile=profiler_results, + execution_date=self.execution_date, + ) + self.log_test_result(name=test_name, result=test_case_result) + return test_case_result + + def run_column_test( + self, + table: Table, + column: str, + test_case: ColumnTestCase, + profiler_results: TableProfile, + ) -> Optional[TestCaseResult]: + """ + Run & log the column test against the ColumnProfile + + :param table: Table Entity being processed + :param column: Column being tested + :param test_case: Column Test Case to run + :param profiler_results: Table profiler with informed metrics + :return: TestCaseResult + """ + test_name = self.get_test_name( + table=table, test_type=test_case.columnTestType.value, column_name=column + ) + if test_name in self.status.tests: + logger.info( + f"Test {test_name} has already been computed in this execution." + ) + return None + + # Check if we computed a profile for the required column + col_profiler_res = next( + iter( + col_profiler + for col_profiler in profiler_results.columnProfile + if col_profiler.name == column + ), + None, + ) + if col_profiler_res is None: + msg = ( + f"Cannot find a profiler that computed the column {column}" + + f" Skipping validation {test_name}" + ) + self.status.failure(msg) + return TestCaseResult( + executionTime=self.execution_date.timestamp(), + status=TestCaseStatus.Aborted, + result=msg, ) - def validate_entity(self, orm, profiler_results: TableProfile) -> TestDef: + test_case_result: TestCaseResult = validate( + test_case.config, + col_profiler_res, + execution_date=self.execution_date, + ) + self.log_test_result(name=test_name, result=test_case_result) + return test_case_result + + def validate_config_tests( + self, table: Table, profiler_results: TableProfile + ) -> Optional[TestDef]: """ - Given a table, check if it has any tests pending. - - If so, run the Validations against the profiler_results - and return the computed Validations. - - The result will have the shape {test_name: Validation} - - Type of entity is DeclarativeMeta + Here we take care of new incoming tests in the workflow + definition. Run them and prepare the new TestDef + of the record, that will be sent to the sink to + update the Table Entity. """ - if not isinstance(orm, DeclarativeMeta): - raise ValueError(f"Entity {orm} should be a DeclarativeMeta.") - logger.info(f"Checking validations for {orm}...") + logger.info(f"Checking validations for {table.fullyQualifiedName}...") - # We have all validations parsed at read-time - test_def: TestDef = self.config.tests + test_suite: TestSuite = self.config.test_suite + + # Check if I have tests for the table I am processing + my_record_tests = next( + iter( + test + for test in test_suite.tests + if test.table == table.fullyQualifiedName + ), + None, + ) + + if not my_record_tests: + return None # Compute all validations against the profiler results - for test in test_def.table_tests: - for validation in test.expression: - # Pass the whole dict, although we just want columnCount & rowCount - validation.validate(profiler_results.dict()) - self.log_validation(name=test.name, validation=validation) + for table_test in my_record_tests.table_tests: + test_case_result = self.run_table_test( + table=table, + test_case=table_test.testCase, + profiler_results=profiler_results, + ) + if test_case_result: + table_test.result = test_case_result - for column_res in test_def.column_tests: - for test in column_res.columns: - col_profiler_res = next( - iter( - col_profiler - for col_profiler in profiler_results.columnProfile - if col_profiler.name == test.column - ), - None, + for column_test in my_record_tests.column_tests: + test_case_result = self.run_column_test( + table=table, + column=column_test.columnName, + test_case=column_test.testCase, + profiler_results=profiler_results, + ) + if test_case_result: + column_test.result = test_case_result + + return my_record_tests + + def validate_entity_tests( + self, + table: Table, + profiler_results: TableProfile, + config_tests: Optional[TestDef], + ) -> Optional[TestDef]: + """ + This method checks the tests that are + configured at entity level, i.e., have been + stored via the API at some other point in time. + + If we find a test that has already been run + from the workflow config, we will skip it + and trust the workflow input. + + :param table: OpenMetadata Table Entity being processed + :param profiler_results: TableProfile with computed metrics + :param config_tests: Results of running the configuration tests + """ + + # We need to keep track of all ran tests, so let's initialize + # a TestDef class with either what we have from the incoming + # config, or leaving it empty. + # During the Entity processing, we will add here + # any tests we discover from the Entity side. + record_tests = ( + TestDef( + table=config_tests.table, + table_tests=config_tests.table_tests + if config_tests.table_tests + else [], + column_tests=config_tests.column_tests + if config_tests.column_tests + else [], + ) + if config_tests + else TestDef( + table=table.fullyQualifiedName, table_tests=[], column_tests=[] + ) + ) + + # Note that the tests configured in the Entity as `TableTest` and + # `ColumnTest`. However, to PUT the results we need the API form: + # `CreateTableTestRequest` and `CreateColumnTestRequest`. + # We will convert the found tests before running them. + + # Fetch all table tests, if any + table_tests = ( + table_test for table_test in (table.tableTests or []) + ) # tableTests are optional, so it might be a list or None + for table_test in table_tests: + test_case_result = self.run_table_test( + table=table, + test_case=table_test.testCase, + profiler_results=profiler_results, + ) + if test_case_result: + create_table_test = CreateTableTestRequest( + description=table_test.description, + testCase=table_test.testCase, + executionFrequency=table_test.executionFrequency, + owner=table_test.owner, + result=test_case_result, ) - if col_profiler_res is None: - self.status.warning( - f"Cannot find a profiler that computed the column {test.column} for {column_res.table}." - + f" Skipping validation {test}" + record_tests.table_tests.append(create_table_test) + + # For all columns, check if any of them has tests and fetch them + col_tests = ( + col_test + for col in table.columns + for col_test in ( + col.columnTests or [] + ) # columnTests is optional, so it might be a list or None + if col.columnTests + ) + for column_test in col_tests: + if column_test: + test_case_result = self.run_column_test( + table=table, + column=column_test.columnName, + test_case=column_test.testCase, + profiler_results=profiler_results, + ) + if test_case_result: + create_column_test = CreateColumnTestRequest( + columnName=column_test.columnName, + description=column_test.description, + testCase=column_test.testCase, + executionFrequency=column_test.executionFrequency, + owner=column_test.owner, + result=test_case_result, ) + record_tests.column_tests.append(create_column_test) - for validation in test.expression: - validation.validate(col_profiler_res.dict()) - self.log_validation(name=test.name, validation=validation) + return record_tests - return test_def - - def process(self, record: Table) -> ProfileAndTests: + def process(self, record: Table) -> ProfilerResponse: """ Run the profiling and tests """ - # Convert entity to ORM. Fetch the db by ID to make sure we use the proper db name database = self.metadata.get_by_id( entity=Database, entity_id=record.database.id @@ -212,14 +411,18 @@ class OrmProfilerProcessor(Processor[Table]): entity_profile = self.profile_entity(orm_table, record) - entity_validations = None - if self.config.tests: - entity_validations = self.validate_entity(orm_table, entity_profile) + # First, check if we have any tests directly configured in the workflow + config_tests = None + if self.config.test_suite: + config_tests = self.validate_config_tests(record, entity_profile) - res = ProfileAndTests( + # Then, Check if the entity has any tests + record_tests = self.validate_entity_tests(record, entity_profile, config_tests) + + res = ProfilerResponse( table=record, profile=entity_profile, - tests=entity_validations, + record_tests=record_tests, ) return res diff --git a/ingestion/src/metadata/orm_profiler/profiles/core.py b/ingestion/src/metadata/orm_profiler/profiles/core.py index 78f06fd7adf..f892c987be7 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/core.py +++ b/ingestion/src/metadata/orm_profiler/profiles/core.py @@ -187,7 +187,7 @@ class Profiler(Generic[MetricType]): This should only execute column metrics. """ - logger.info("Running SQL Profiler...") + logger.debug("Running SQL Profiler...") col_metrics = self.get_col_metrics(self.static_metrics) @@ -242,7 +242,7 @@ class Profiler(Generic[MetricType]): Data should be saved under self.results """ - logger.info("Running post Profiler...") + logger.debug("Running post Profiler...") current_col_results: Dict[str, Any] = self._column_results.get(col.name) if not current_col_results: diff --git a/ingestion/src/metadata/orm_profiler/profiles/default.py b/ingestion/src/metadata/orm_profiler/profiles/default.py index 16cfc6d586d..8a51003ab6d 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/default.py +++ b/ingestion/src/metadata/orm_profiler/profiles/default.py @@ -39,7 +39,6 @@ class DefaultProfiler(Profiler): # Column Metrics Metrics.MEAN.value, Metrics.COUNT.value, - Metrics.DISTINCT_COUNT.value, Metrics.MIN.value, Metrics.MIN_LENGTH.value, Metrics.MAX.value, diff --git a/ingestion/src/metadata/orm_profiler/profiles/models.py b/ingestion/src/metadata/orm_profiler/profiles/models.py index 730ef834901..f48307a4e5d 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/models.py +++ b/ingestion/src/metadata/orm_profiler/profiles/models.py @@ -20,24 +20,6 @@ from pydantic import BaseModel, validator from metadata.orm_profiler.metrics.registry import Metrics -class TimeMetricDef(BaseModel): - """ - Model representing the input of a time metric - """ - - name: str # metric name - window: int # time delta in days to apply - - -class CustomMetricDef(BaseModel): - """ - Model representing the input of a time metric - """ - - name: str # metric name - sql: str # custom SQL query to run - - class ProfilerDef(BaseModel): """ Incoming profiler definition from the @@ -46,8 +28,9 @@ class ProfilerDef(BaseModel): name: str # Profiler name metrics: List[str] # names of currently supported Static and Composed metrics - time_metrics: List[TimeMetricDef] = None - custom_metrics: List[CustomMetricDef] = None + # TBD: + # time_metrics: List[TimeMetricDef] = None + # custom_metrics: List[CustomMetricDef] = None # rule_metrics: ... @validator("metrics", each_item=True) diff --git a/ingestion/src/metadata/orm_profiler/sink/file.py b/ingestion/src/metadata/orm_profiler/sink/file.py index 4346d96a98f..34c89ffaad0 100644 --- a/ingestion/src/metadata/orm_profiler/sink/file.py +++ b/ingestion/src/metadata/orm_profiler/sink/file.py @@ -18,7 +18,7 @@ from pathlib import Path from metadata.config.common import ConfigModel from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus -from metadata.orm_profiler.api.models import ProfileAndTests +from metadata.orm_profiler.api.models import ProfilerResponse from metadata.orm_profiler.utils import logger logger = logger() @@ -54,7 +54,7 @@ class FileSink(Sink[Entity]): config = FileSinkConfig.parse_obj(config_dict) return cls(ctx, config) - def write_record(self, record: ProfileAndTests) -> None: + def write_record(self, record: ProfilerResponse) -> None: if self.wrote_something: self.file.write("\n") @@ -62,25 +62,9 @@ class FileSink(Sink[Entity]): self.file.write(f"Profile for: {record.table.fullyQualifiedName}\n") self.file.write(f"{record.profile.json()}\n") - if record.tests: + if record.record_tests: self.file.write(f"\nTest results:\n") - - for test in record.tests.table_tests: - self.file.write(f"\tTable Tests results:\n") - for validation in test.expression: - self.file.write( - f"\t\t{test.name}: {validation.valid}, (Real) {validation.computed_metric}" - + f" <{validation.operator.__name__}> {validation.value} (expected)\n" - ) - - for col_test in record.tests.column_tests: - self.file.write(f"\tColumn Tests results:\n") - for column in col_test.columns: - for validation in column.expression: - self.file.write( - f"\t\t[{column.column}] - {column.name}: {validation.valid}, (Real) {validation.computed_metric}" - + f" <{validation.operator.__name__}> {validation.value} (expected)\n" - ) + self.file.write(f"{record.record_tests.json()}\n") self.wrote_something = True self.report.records_written(record.table.fullyQualifiedName) diff --git a/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py b/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py index 9e3d422210d..69e050cb76f 100644 --- a/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py +++ b/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py @@ -12,11 +12,15 @@ """ OpenMetadata REST Sink implementation for the ORM Profiler results """ +import traceback + from metadata.config.common import ConfigModel from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus +from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.orm_profiler.api.models import ProfilerResponse from metadata.orm_profiler.utils import logger logger = logger() @@ -56,7 +60,31 @@ class MetadataRestSink(Sink[Entity]): return self.status def close(self) -> None: - pass + self.metadata.close() - def write_record(self, record: Entity) -> None: - pass + def write_record(self, record: ProfilerResponse) -> None: + try: + self.metadata.ingest_table_profile_data( + table=record.table, table_profile=[record.profile] + ) + + if record.record_tests: + for table_test in record.record_tests.table_tests: + self.metadata.add_table_test( + table=record.table, table_test=table_test + ) + + for col_test in record.record_tests.column_tests: + self.metadata.add_column_test(table=record.table, col_test=col_test) + + logger.info( + f"Successfully ingested profiler & test data for {record.table.fullyQualifiedName}" + ) + self.status.records_written(f"Table: {record.table.fullyQualifiedName}") + + except APIError as err: + logger.error( + f"Failed to sink profiler & test data for {record.table.fullyQualifiedName} - {err}" + ) + logger.debug(traceback.print_exc()) + self.status.failure(f"Table: {record.table.fullyQualifiedName}") diff --git a/ingestion/src/metadata/orm_profiler/validations/column/__init__.py b/ingestion/src/metadata/orm_profiler/validations/column/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py new file mode 100644 index 00000000000..30a0bd9b487 --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py @@ -0,0 +1,66 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ColumnValuesToBeBetween validation implementation +""" + +from datetime import datetime + +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( + ColumnValuesToBeBetween, +) +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def column_values_to_be_between( + test_case: ColumnValuesToBeBetween, + col_profile: ColumnProfile, + execution_date: datetime, +) -> TestCaseResult: + """ + Validate Column Values metric + :param test_case: ColumnValuesToBeBetween + :param col_profile: should contain MIN & MAX metrics + :param execution_date: Datetime when the tests ran + :return: TestCaseResult with status and results + """ + + if col_profile.min is None or col_profile.max is None: + msg = ( + "We expect `min` & `max` to be informed on the profiler for ColumnValuesToBeBetween" + + f" but got min={col_profile.min}, max={col_profile.max}." + ) + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = ( + TestCaseStatus.Success + if col_profile.min >= test_case.minValue + and col_profile.max <= test_case.maxValue + else TestCaseStatus.Failed + ) + result = ( + f"Found min={col_profile.min}, max={col_profile.max} vs." + + f" the expected min={test_case.minValue}, max={test_case.maxValue}." + ) + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py new file mode 100644 index 00000000000..7bef060382f --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py @@ -0,0 +1,65 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ColumnValuesToBeUnique validation implementation +""" + +from datetime import datetime + +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.column.columnValuesToBeUnique import ( + ColumnValuesToBeUnique, +) +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def column_values_to_be_unique( + _: ColumnValuesToBeUnique, + col_profile: ColumnProfile, + execution_date: datetime, +) -> TestCaseResult: + """ + Validate Column Values metric + :param _: ColumnValuesToBeUnique. Just used to trigger singledispatch + :param col_profile: should contain count and distinct count metrics + :param execution_date: Datetime when the tests ran + :return: TestCaseResult with status and results + """ + + if col_profile.valuesCount is None or col_profile.uniqueCount is None: + msg = ( + "We expect `valuesCount` & `uniqueCount` to be informed on the profiler for ColumnValuesToBeUnique" + + f" but got valuesCount={col_profile.valuesCount}, uniqueCount={col_profile.uniqueCount}." + ) + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = ( + TestCaseStatus.Success + if col_profile.valuesCount == col_profile.uniqueCount + else TestCaseStatus.Failed + ) + result = ( + f"Found valuesCount={col_profile.valuesCount} vs. uniqueCount={col_profile.uniqueCount}." + + f" Both counts should be equal for column values to be unique." + ) + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/src/metadata/orm_profiler/validations/core.py b/ingestion/src/metadata/orm_profiler/validations/core.py index badc87104a5..75285b38436 100644 --- a/ingestion/src/metadata/orm_profiler/validations/core.py +++ b/ingestion/src/metadata/orm_profiler/validations/core.py @@ -10,137 +10,48 @@ # limitations under the License. """ -Core Validation definitions +Core Validation definitions. + +In this module we define how to check specific test case +behavior based on the computed metrics of the profiler. + +These functions should not raise an error, but rather +mark the test as Failure/Aborted and pass a proper +result string. The ORM Processor will be the one in charge +of logging these issues. """ -import operator as op -from typing import Any, Callable, Dict, Union +from functools import singledispatch -from pydantic import ValidationError - -from metadata.orm_profiler.metrics.registry import Metrics +from metadata.generated.schema.tests.basic import TestCaseResult from metadata.orm_profiler.utils import logger +from metadata.orm_profiler.validations.column.column_values_to_be_between import ( + column_values_to_be_between, +) +from metadata.orm_profiler.validations.column.column_values_to_be_unique import ( + column_values_to_be_unique, +) +from metadata.orm_profiler.validations.table.table_row_count_to_equal import ( + table_row_count_to_equal, +) logger = logger() -class ValidationConversionException(Exception): +@singledispatch +def validate(test_case, *args, **kwargs) -> TestCaseResult: """ - Issue converting parser results to our Metrics and Validations + Default function to validate test cases. + + Note that the first argument should be a positional argument. """ + raise NotImplementedError( + f"Missing test case validation implementation for {type(test_case)}." + ) -class MissingMetricException(Exception): - """ - The required Metric is not available in the profiler results - """ +# Table Tests +validate.register(table_row_count_to_equal) - -_OPERATOR_MAP = { - "<": op.lt, - ">": op.gt, - "==": op.eq, - "!=": op.ne, - "<=": op.le, - ">=": op.ge, -} - - -class Validation: - """ - Base class for Validation definition. - - This data will be extracted from the TestDef expression. - - We will map here the results from parsing with - the grammar. - """ - - def __init__(self, metric: str, operator: Callable, value: Union[float, int, str]): - self.metric = metric - self.operator = operator - self.value = value - self.valid = None - self.computed_metric = None - - @classmethod - def create(cls, raw_validation: Dict[str, str]) -> "Validation": - """ - Given the results of the grammar parser, convert - them to a Validation class with the assigned Metric, - the right operator and the casted value. - """ - - raw_metric = raw_validation.get("metric") - if not raw_metric: - raise ValidationConversionException( - f"Missing metric information in {raw_validation}." - ) - - metric = Metrics.get(raw_metric.upper()) - if not metric: - logger.error("Error trying to get Metric from Registry.") - raise ValidationConversionException( - f"Cannot find metric from {raw_validation} in the Registry." - ) - - metric_name = metric.name() - - operator = _OPERATOR_MAP.get(raw_validation.get("operation")) - if not operator: - logger.error("Invalid Operator when converting to validation.") - raise ValidationConversionException( - f"Cannot find operator from {raw_validation}." - ) - - raw_value = raw_validation.get("value") - if not raw_value: - logger.error("Missing or Empty value") - raise ValidationConversionException( - f"Missing or empty value in {raw_validation}." - ) - - if raw_value.isdigit(): - value = int(raw_value) # Check if int - else: - try: - value = float(raw_value) # Otherwise, might be float - except ValueError: - value = raw_value # If not, leave as string - - try: - validation = cls(metric=metric_name, operator=operator, value=value) - except ValidationError as err: - logger.error( - "Error trying to convert a RAW validation to a Validation model" - ) - raise err - - return validation - - def validate(self, results: Dict[str, Any]) -> "Validation": - """ - Given a Validation and the profiler results, - compare their data. - - We will call this function for each validation - that we received. - - Each time we will return a Validation object - with a properly informed `valid` field, - containing the result of the validation. - """ - self.computed_metric = results.get(self.metric) - - if not self.computed_metric: - raise MissingMetricException( - f"The required metric {self.metric} is not available" - + f" in the profiler results: {results}." - ) - - is_valid = self.operator( - self.computed_metric, self.value # Order matters. Real value vs. expected - ) - - self.valid = is_valid # Either True / False - - return self # return self for convenience +# Column Tests +validate.register(column_values_to_be_between) +validate.register(column_values_to_be_unique) diff --git a/ingestion/src/metadata/orm_profiler/validations/grammar.py b/ingestion/src/metadata/orm_profiler/validations/grammar.py deleted file mode 100644 index d445ef0be3d..00000000000 --- a/ingestion/src/metadata/orm_profiler/validations/grammar.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Validation grammar definition. - -We will use this to convert the test cases specified -in the JSON workflows to our internal metrics, profilers -and validations. - -We need to parse expressions such as: - - "row_count > 1000" - - "qty_sold_per_day > 100" - -Test definitions are going to be pointing to tables or columns. -This means that we will have that information beforehand. - -The job of the grammar is to make it as easy as possible -for users to define their tests and link that logic -to the profiler objects. -""" -from typing import Dict, List - -from parsimonious import NodeVisitor -from parsimonious.grammar import Grammar - -# To understand the grammar definition better, read it from bottom to top. -# Line by line, we are doing: -# 1. ws: define the whitespace regex -# 2. operand: set of supported operands. Only one can match -# 3. operation: [space]operand[space] or just operand. Spaces are optional -# 4. value: what do we match in the expression. It can be any word, e.g., 100, Food, Seat500, 0.3 -# Note that we need to start checking the number. Otherwise, `would` match but leave -# unparsed string in decimals. -# 5. metric: letters and underscores. We will match them to Metrics afterwards -grammar = Grammar( - """ - test_def = (rule / sep)* - - rule = metric operation value - sep = ws? "&" ws? - metric = ~r"[a-zA-Z_]+" - value = number / word - - word = ~r"[-\w]+" - number = ~r"\d+.?(\d+)?" - operation = ws? operand ws? - operand = "==" / "!=" / "<=" / ">=" / "<" / ">" - - ws = ~"\s*" - """ -) - - -class ExpVisitor(NodeVisitor): - """ - Visit the extracted nodes from our - test expression definition - """ - - @staticmethod - def visit_test_def(_, visited_children): - """Returns the overall output.""" - - validations = [child[0] for child in visited_children if child[0]] - - return validations - - @staticmethod - def visit_rule(_, visited_children): - rule = {key: value for key, value in visited_children} - return rule - - @staticmethod - def visit_sep(*args, **kwargs): - pass - - @staticmethod - def visit_metric(node, _): - return node.expr_name, node.text - - @staticmethod - def visit_operation(node, _): - return node.expr_name, node.text.strip() - - @staticmethod - def visit_value(node, _): - return node.expr_name, node.text - - def generic_visit(self, node, visited_children): - """ - Generic visit method. Return the children - or the node if there are no children - """ - return visited_children or node - - -def parse(expression: str, visitor: ExpVisitor) -> List[Dict[str, str]]: - """ - Given an expression, parse it with - our grammar and return the visiting - result - """ - tree = grammar.parse(expression) - return visitor.visit(tree) diff --git a/ingestion/src/metadata/orm_profiler/validations/models.py b/ingestion/src/metadata/orm_profiler/validations/models.py index 0f36597e5c5..2bb9aab51df 100644 --- a/ingestion/src/metadata/orm_profiler/validations/models.py +++ b/ingestion/src/metadata/orm_profiler/validations/models.py @@ -13,82 +13,32 @@ Models to map tests and validations from JSON workflows to the profiler """ -from typing import List +from typing import List, Optional -from parsimonious import ParseError -from pydantic import BaseModel, validator - -from metadata.orm_profiler.utils import logger -from metadata.orm_profiler.validations.core import Validation -from metadata.orm_profiler.validations.grammar import ExpVisitor, parse - -logger = logger() -visitor = ExpVisitor() +from metadata.config.common import ConfigModel +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest -class ExpressionModel(BaseModel): +class TestDef(ConfigModel): """ - We use this model to convert expression to Validations - on the fly while we are parsing the JSON configuration. + Table test definition - It is a comfortable and fast way to know if we'll be - able to parse the tests with our grammar. + We expect: + - table name + - List of table tests + - List of column tests """ - expression: List[Validation] - enabled: bool = True - - @validator("expression", pre=True) - def convert_expression(cls, value): # cls as per pydantic docs - """ - Make sure that we can parse the expression to a - validation - """ - try: - raw_validation = parse(value, visitor) - except ParseError as err: - logger.error(f"Cannot parse expression properly: {value}") - raise err - - return [Validation.create(val) for val in raw_validation] - - class Config: - arbitrary_types_allowed = True + table: str # Table FQDN + table_tests: Optional[List[CreateTableTestRequest]] = None + column_tests: Optional[List[CreateColumnTestRequest]] = None -class TableTest(ExpressionModel): +class TestSuite(ConfigModel): """ - Incoming table test definition from the workflow + Config test definition """ - name: str - table: str # fullyQualifiedName - - -class ColumnTestExpression(ExpressionModel): - """ - Column test expression definition - """ - - name: str - column: str - - -class ColumnTest(BaseModel): - """ - Incoming column test definition from the workflow - """ - - table: str # fullyQualifiedName - name: str # Set of col tests - columns: List[ColumnTestExpression] - - -class TestDef(BaseModel): - """ - Base Test definition - """ - - name: str - table_tests: List[TableTest] = None - column_tests: List[ColumnTest] = None + name: str # Test Suite name + tests: List[TestDef] diff --git a/ingestion/src/metadata/orm_profiler/validations/table/__init__.py b/ingestion/src/metadata/orm_profiler/validations/table/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py b/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py new file mode 100644 index 00000000000..45b4effe5cb --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py @@ -0,0 +1,58 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TableRowCountToEqual validation implementation +""" +from datetime import datetime + +from metadata.generated.schema.entity.data.table import TableProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.table.tableRowCountToEqual import ( + TableRowCountToEqual, +) +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def table_row_count_to_equal( + test_case: TableRowCountToEqual, + table_profile: TableProfile, + execution_date: datetime, +) -> TestCaseResult: + """ + Validate row count metric + :param test_case: TableRowCountToEqual + :param table_profile: should contain row count metric + :param execution_date: Datetime when the tests ran + :return: TestCaseResult with status and results + """ + + if table_profile.rowCount is None: + msg = "rowCount should not be None for TableRowCountToEqual" + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = ( + TestCaseStatus.Success + if table_profile.rowCount == test_case.value + else TestCaseStatus.Failed + ) + result = f"Found {table_profile.rowCount} rows vs. the expected {test_case.value}" + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/tests/integration/ometa/test_ometa_table_api.py b/ingestion/tests/integration/ometa/test_ometa_table_api.py index bd671fc20c1..0b12d071171 100644 --- a/ingestion/tests/integration/ometa/test_ometa_table_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_table_api.py @@ -16,12 +16,16 @@ import uuid from datetime import datetime from unittest import TestCase +import pytest + from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.services.createDatabaseService import ( CreateDatabaseServiceRequest, ) from metadata.generated.schema.api.teams.createUser import CreateUserRequest +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( Column, @@ -39,8 +43,26 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( + ColumnValuesToBeBetween, +) +from metadata.generated.schema.tests.columnTest import ( + ColumnTest, + ColumnTestCase, + ColumnTestType, +) +from metadata.generated.schema.tests.table.tableRowCountToEqual import ( + TableRowCountToEqual, +) +from metadata.generated.schema.tests.tableTest import ( + TableTest, + TableTestCase, + TableTestType, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.models.table_queries import TableUsageRequest +from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -382,3 +404,123 @@ class OMetaTableTest(TestCase): ) assert res.id == entity_ref.id + + def test_add_table_tests(self): + """ + Add tableTests to table instance + """ + + table = self.metadata.create_or_update(data=self.create) + + table_test = CreateTableTestRequest( + description="Testing something", + testCase=TableTestCase( + config=TableRowCountToEqual(value=100), + tableTestType=TableTestType.tableRowCountToEqual, + ), + ) + + table_with_test = self.metadata.add_table_test( + table=table, table_test=table_test + ) + + assert len(table_with_test.tableTests) == 1 + assert table_with_test.tableTests[0].testCase == table_test.testCase + + test_case_result = TestCaseResult( + result="some result", + executionTime=datetime.now().timestamp(), + testCaseStatus=TestCaseStatus.Success, + ) + + table_test_with_res = CreateTableTestRequest( + description="Testing something", + testCase=TableTestCase( + config=TableRowCountToEqual(value=100), + tableTestType=TableTestType.tableRowCountToEqual, + ), + result=test_case_result, + ) + + table_with_test_and_res = self.metadata.add_table_test( + table=table, table_test=table_test_with_res + ) + + assert len(table_with_test_and_res.tableTests[0].results) == 1 + assert ( + table_with_test_and_res.tableTests[0].results[0].testCaseStatus + == TestCaseStatus.Success + ) + + def test_add_column_tests(self): + """ + Add columnTests to table instance + """ + + table = self.metadata.create_or_update(data=self.create) + + col_test = CreateColumnTestRequest( + columnName="id", + testCase=ColumnTestCase( + config=ColumnValuesToBeBetween(minValue=1, maxValue=3), + columnTestType=ColumnTestType.columnValuesToBeBetween, + ), + ) + + updated_table = self.metadata.add_column_test(table=table, col_test=col_test) + + id_test = next( + iter([col for col in updated_table.columns if col.name.__root__ == "id"]), + None, + ) + + assert len(id_test.columnTests) == 1 + assert id_test.columnTests[0].testCase == col_test.testCase + + # Column needs to exist in the table! + with pytest.raises(APIError): + ko_test = CreateColumnTestRequest( + columnName="random_column", + testCase=ColumnTestCase( + config=ColumnValuesToBeBetween(minValue=1, maxValue=3), + columnTestType=ColumnTestType.columnValuesToBeBetween, + ), + ) + + self.metadata.add_column_test(table=table, col_test=ko_test) + + col_test_res = TestCaseResult( + result="some result", + executionTime=datetime.now().timestamp(), + testCaseStatus=TestCaseStatus.Success, + ) + + col_test_with_res = CreateColumnTestRequest( + columnName="id", + testCase=ColumnTestCase( + config=ColumnValuesToBeBetween(minValue=1, maxValue=3), + columnTestType=ColumnTestType.columnValuesToBeBetween, + ), + result=col_test_res, + ) + + table_with_test_and_res = self.metadata.add_column_test( + table=table, col_test=col_test_with_res + ) + + id_test_res = next( + iter( + [ + col + for col in table_with_test_and_res.columns + if col.name.__root__ == "id" + ] + ), + None, + ) + + assert len(id_test_res.columnTests[0].results) == 1 + assert ( + id_test_res.columnTests[0].results[0].testCaseStatus + == TestCaseStatus.Success + ) diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py index 5bca6f666b3..449db6b9ebb 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py @@ -118,34 +118,36 @@ class ProfilerWorkflowTest(TestCase): "config": { "profiler": { "name": "my_profiler", - "metrics": ["row_count", "min", "COUNT", "null_count"], + "metrics": ["row_count", "min", "max", "COUNT", "null_count"], }, - "tests": { - "name": "my_tests", - "table_tests": [ + "test_suite": { + "name": "My Test Suite", + "tests": [ { - "name": "check row number", - "table": "test_sqlite.main.users", - "expression": "row_count == 2", - } - ], - "column_tests": [ - { - "name": "some column tests", - "table": "test_sqlite.main.users", - "columns": [ + "table": "test_sqlite.main.users", # FQDN + "table_tests": [ { - "name": "check name count", - "column": "name", - "expression": "count < 10", - }, - { - "name": "check null count", - "column": "nickname", - "expression": "null_count == 0", + "testCase": { + "config": { + "value": 100, + }, + "tableTestType": "tableRowCountToEqual", + }, }, ], - } + "column_tests": [ + { + "columnName": "age", + "testCase": { + "config": { + "minValue": 0, + "maxValue": 99, + }, + "columnTestType": "columnValuesToBeBetween", + }, + } + ], + }, ], }, }, diff --git a/ingestion/tests/unit/profiler/test_grammar.py b/ingestion/tests/unit/profiler/test_grammar.py deleted file mode 100644 index ffd93f9740b..00000000000 --- a/ingestion/tests/unit/profiler/test_grammar.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Test the Test Definition grammar -""" -import pytest -from parsimonious import ParseError - -from metadata.orm_profiler.validations.grammar import ExpVisitor, parse - -visitor = ExpVisitor() - - -def test_simple_parsing(): - """ - Play with simple expressions - """ - res = parse("row_count == 100", visitor) - assert res == [{"metric": "row_count", "operation": "==", "value": "100"}] - - res = parse("something_else > random", visitor) - assert res == [{"metric": "something_else", "operation": ">", "value": "random"}] - - res = parse("something_else <= random", visitor) - assert res == [{"metric": "something_else", "operation": "<=", "value": "random"}] - - # No spaces are needed - res = parse("hello!=99", visitor) - assert res == [{"metric": "hello", "operation": "!=", "value": "99"}] - - res = parse("random==Seat500", visitor) - assert res == [{"metric": "random", "operation": "==", "value": "Seat500"}] - - -def test_multiple_parsing(): - """ - We can also evaluate multiple test definitions - """ - res = parse("metric_a < value1 & metric_b == value2 & metric_c != value3", visitor) - - expected = [ - {"metric": "metric_a", "operation": "<", "value": "value1"}, - {"metric": "metric_b", "operation": "==", "value": "value2"}, - {"metric": "metric_c", "operation": "!=", "value": "value3"}, - ] - - assert res == expected - - # No spaces are needed - res = parse("metric_a something", visitor)[0] - model = Validation.create(raw_validation) + res_ko = validate( + TableRowCountToEqual(value=50), + table_profile=table_profile, + execution_date=EXECUTION_DATE, + ) - assert model.metric == "min" - assert model.operator == op.gt - assert model.value == "something" + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result="Found 100.0 rows vs. the expected 50", + ) - raw_validation = parse("null_ratio < 0.2", visitor)[0] - model = Validation.create(raw_validation) + table_profile_aborted = TableProfile( + profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"), + ) - assert model.metric == "nullProportion" - assert model.operator == op.lt - assert model.value == 0.2 + res_aborted = validate( + TableRowCountToEqual(value=100), + table_profile=table_profile_aborted, + execution_date=EXECUTION_DATE, + ) - # This validation does not make sense, but we are just checking cases - raw_validation = parse("null_ratio >= 5.4", visitor)[0] - model = Validation.create(raw_validation) - - assert model.metric == "nullProportion" - assert model.operator == op.ge - assert model.value == 5.4 + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result="rowCount should not be None for TableRowCountToEqual", + ) -def test_model_conversion_exceptions(): +def test_column_values_to_be_between(): """ - Check that we cannot pass malformed data + Check ColumnValuesToBeBetween """ - # No info at all - with pytest.raises(ValidationConversionException): - Validation.create({}) + column_profile = ColumnProfile( + min=1, + max=3, + ) - # Invalid metric, cannot be found in Registry - with pytest.raises(ValidationConversionException): - Validation.create({"metric": "not a valid metric"}) + res_ok = validate( + ColumnValuesToBeBetween( + minValue=0, + maxValue=3, + ), + col_profile=column_profile, + execution_date=EXECUTION_DATE, + ) + assert res_ok == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Success, + result="Found min=1.0, max=3.0 vs. the expected min=0, max=3.", + ) - # Missing Operation key - with pytest.raises(ValidationConversionException): - Validation.create({"metric": "min"}) + res_ko = validate( + ColumnValuesToBeBetween( + minValue=0, + maxValue=2, + ), + col_profile=column_profile, + execution_date=EXECUTION_DATE, + ) - # Invalid Operation value - with pytest.raises(ValidationConversionException): - Validation.create({"metric": "min", "operation": "invalid operation"}) + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result="Found min=1.0, max=3.0 vs. the expected min=0, max=2.", + ) - # Missing value key - with pytest.raises(ValidationConversionException): - Validation.create({"metric": "min", "operation": "=="}) + column_profile_aborted = ColumnProfile( + min=1, + ) - # Empty value - with pytest.raises(ValidationConversionException): - Validation.create({"metric": "min", "operation": "==", "value": ""}) + res_aborted = validate( + ColumnValuesToBeBetween( + minValue=0, + maxValue=3, + ), + col_profile=column_profile_aborted, + execution_date=EXECUTION_DATE, + ) + + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=( + "We expect `min` & `max` to be informed on the profiler for ColumnValuesToBeBetween" + + " but got min=1.0, max=None." + ), + ) -def test_validate(): +def test_column_values_to_be_unique(): """ - Make sure that we are properly flagging validation results + Check ColumnValuesToBeUnique """ - results = {"valuesCount": 100} - raw_validation = parse("count == 100", visitor)[0] - validation = Validation.create(raw_validation) + column_profile = ColumnProfile( + valuesCount=10, + uniqueCount=10, + ) - assert validation.validate(results).valid + res_ok = validate( + ColumnValuesToBeUnique(), + col_profile=column_profile, + execution_date=EXECUTION_DATE, + ) + assert res_ok == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Success, + result=( + "Found valuesCount=10.0 vs. uniqueCount=10.0." + + " Both counts should be equal for column values to be unique." + ), + ) - raw_validation = parse("count != 100", visitor)[0] - validation = Validation.create(raw_validation) + column_profile_ko = ColumnProfile( + valuesCount=10, + uniqueCount=5, + ) - assert not validation.validate(results).valid + res_ko = validate( + ColumnValuesToBeUnique(), + col_profile=column_profile_ko, + execution_date=EXECUTION_DATE, + ) - results = {"nullProportion": 0.2} + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result=( + "Found valuesCount=10.0 vs. uniqueCount=5.0." + + " Both counts should be equal for column values to be unique." + ), + ) - raw_validation = parse("Null_Ratio < 0.3", visitor)[0] - validation = Validation.create(raw_validation) + column_profile_aborted = ColumnProfile() - assert validation.validate(results).valid + res_aborted = validate( + ColumnValuesToBeUnique(), + col_profile=column_profile_aborted, + execution_date=EXECUTION_DATE, + ) - raw_validation = parse("Null_Ratio >= 0.3", visitor)[0] - validation = Validation.create(raw_validation) - - assert not validation.validate(results).valid + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=( + "We expect `valuesCount` & `uniqueCount` to be informed on the profiler for ColumnValuesToBeUnique" + + " but got valuesCount=None, uniqueCount=None." + ), + ) diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index cde4981bf6d..e82167df0a7 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -18,7 +18,25 @@ from copy import deepcopy import sqlalchemy as sqa from sqlalchemy.orm import declarative_base +from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest +from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( + ColumnValuesToBeBetween, +) +from metadata.generated.schema.tests.columnTest import ( + ColumnTest, + ColumnTestCase, + ColumnTestType, +) +from metadata.generated.schema.tests.table.tableRowCountToEqual import ( + TableRowCountToEqual, +) +from metadata.generated.schema.tests.tableTest import ( + TableTest, + TableTestCase, + TableTestType, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.sqlite import SQLiteConfig @@ -26,6 +44,7 @@ from metadata.orm_profiler.api.workflow import ProfilerWorkflow from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor from metadata.orm_profiler.profiles.default import DefaultProfiler from metadata.orm_profiler.profiles.models import ProfilerDef +from metadata.orm_profiler.validations.models import TestDef, TestSuite config = { "source": {"type": "sqlite", "config": {"service_name": "my_service"}}, @@ -52,7 +71,7 @@ def test_init_workflow(): assert isinstance(workflow.processor, OrmProfilerProcessor) assert workflow.processor.config.profiler is None - assert workflow.processor.config.tests is None + assert workflow.processor.config.test_suite is None def test_filter_entities(): @@ -188,110 +207,66 @@ def test_tests_def(): Validate the test case definition """ test_config = deepcopy(config) - test_config["processor"]["config"]["tests"] = { - "name": "my_tests", - "table_tests": [ + test_config["processor"]["config"]["test_suite"] = { + "name": "My Test Suite", + "tests": [ { - "name": "first_test", - "table": "service.db.name", - "expression": "row_count > 100", - "enabled": False, - }, - { - "name": "another_test", - "table": "service.db.name", - "expression": "row_count > 1000 & row_count < 2000", - }, - ], - "column_tests": [ - { - "table": "service.db.name", - "name": "set_of_col_tests", - "columns": [ + "table": "service.db.name", # FQDN + "table_tests": [ { - "name": "first_col_test", - "column": "column_name_1", - "expression": "min > 5", - }, - { - "name": "another_col_test", - "column": "column_name_1", - "expression": "min > 5 & min < 10", - }, - { - "name": "second_col_test", - "column": "column_name_2", - "expression": "null_ratio < 0.1", + "testCase": { + "config": { + "value": 100, + }, + "tableTestType": "tableRowCountToEqual", + }, }, ], - } + "column_tests": [ + { + "columnName": "age", + "testCase": { + "config": { + "minValue": 0, + "maxValue": 99, + }, + "columnTestType": "columnValuesToBeBetween", + }, + } + ], + }, ], } test_workflow = ProfilerWorkflow.create(test_config) assert isinstance(test_workflow.processor, OrmProfilerProcessor) - tests = test_workflow.processor.config.tests + suite = test_workflow.processor.config.test_suite - assert tests.name == "my_tests" + expected = TestSuite( + name="My Test Suite", + tests=[ + TestDef( + table="service.db.name", + table_tests=[ + CreateTableTestRequest( + testCase=TableTestCase( + config=TableRowCountToEqual(value=100), + tableTestType=TableTestType.tableRowCountToEqual, + ), + ) + ], + column_tests=[ + CreateColumnTestRequest( + columnName="age", + testCase=ColumnTestCase( + config=ColumnValuesToBeBetween(minValue=0, maxValue=99), + columnTestType=ColumnTestType.columnValuesToBeBetween, + ), + ) + ], + ) + ], + ) - # Check cardinality - assert len(tests.table_tests) == 2 - assert len(tests.column_tests) == 1 - assert len(tests.column_tests[0].columns) == 3 - - assert tests.table_tests[0].name == "first_test" - assert tests.table_tests[0].table == "service.db.name" - assert tests.table_tests[0].expression[0].metric == "rowCount" - assert not tests.table_tests[0].enabled - - assert tests.column_tests[0].columns[0].name == "first_col_test" - assert tests.column_tests[0].columns[0].column == "column_name_1" - assert tests.column_tests[0].columns[0].expression[0].metric == "min" - assert tests.column_tests[0].columns[0].enabled - - # We cannot do a 1:1 general assertion because we are dynamically - # creating the Validation classes. Then, the internal IDs don't match - # and the assertion fails. However, and for visual representation, - # the resulting class looks like follows: - - # TestDef( - # name="my_tests", - # table_tests=[ - # # I can have multiple tests on the same table - # TableTest( - # name="first_test", - # table="service.db.name", - # expression="row_number > 100", # This will be one Validation - # enabled=False, - # ), - # TableTest( - # name="another_test", - # table="service.db.name", - # expression="row_number > 1000 & row_number < 2000", # This will be two Validations - # ), - # ], - # column_tests=[ - # ColumnTest( - # table="service.db.name", - # name="set_of_col_tests", - # columns=[ - # ColumnTestExpression( - # name="first_col_test", - # column="column_name_1", - # expression="min > 5", # One Validation - # ), - # ColumnTestExpression( - # name="another_col_test", - # column="column_name_1", - # expression="min > 5 & min < 10", # Two Validations - # ), - # ColumnTestExpression( - # name="second_col_test", - # column="column_name_2", - # expression="null_ratio < 0.1", # One Validation - # ), - # ], - # ) - # ], - # ) + assert suite == expected