diff --git a/.github/workflows/deploy-docs.yml b/.github/workflows/deploy-docs.yml deleted file mode 100644 index 3585c037530..00000000000 --- a/.github/workflows/deploy-docs.yml +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: Deploy Docs -on: - push: - branches: - - 0.6.0 - paths: - - 'docs/**' - - '.github/workflows/deploy-docs.yml' - -jobs: - build-and-deploy: - runs-on: ubuntu-latest - steps: - - name: Checkout 🛎️ - uses: actions/checkout@v2.3.1 - - - name: Deploy 🚀 - uses: JamesIves/github-pages-deploy-action@4.1.6 - with: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - BRANCH: main - FOLDER: docs - target-folder: docs diff --git a/catalog-rest-service/src/main/resources/json/schema/tests/column/columnValuesToBeNotInSet.json b/catalog-rest-service/src/main/resources/json/schema/tests/column/columnValuesToBeNotInSet.json index ac9e548d963..cde0839d45d 100644 --- a/catalog-rest-service/src/main/resources/json/schema/tests/column/columnValuesToBeNotInSet.json +++ b/catalog-rest-service/src/main/resources/json/schema/tests/column/columnValuesToBeNotInSet.json @@ -8,9 +8,7 @@ "properties": { "values": { "description": "An Array of values.", - "items": { - "type": "object" - } + "type": "array" } }, "required": ["values"], diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index ef768fcb340..724501b9d40 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 9, 0, dev=17) +__version__ = Version("metadata", 0, 9, 0, dev=18) __all__ = ["__version__"] diff --git a/ingestion/src/metadata/orm_profiler/metrics/registry.py b/ingestion/src/metadata/orm_profiler/metrics/registry.py index efc6c41df5d..7bc36448dd3 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/registry.py +++ b/ingestion/src/metadata/orm_profiler/metrics/registry.py @@ -23,7 +23,9 @@ from metadata.orm_profiler.metrics.composed.ilike_ratio import ILikeRatio from metadata.orm_profiler.metrics.composed.like_ratio import LikeRatio from metadata.orm_profiler.metrics.composed.null_ratio import NullRatio from metadata.orm_profiler.metrics.composed.unique_ratio import UniqueRatio +from metadata.orm_profiler.metrics.static.column_count import ColumnCount from metadata.orm_profiler.metrics.static.count import Count +from metadata.orm_profiler.metrics.static.count_in_set import CountInSet from metadata.orm_profiler.metrics.static.histogram import Histogram from metadata.orm_profiler.metrics.static.ilike_count import ILikeCount from metadata.orm_profiler.metrics.static.like_count import LikeCount @@ -50,6 +52,8 @@ class Metrics(MetricRegistry): # Static Metrics MEAN = Mean COUNT = Count + COUNT_IN_SET = CountInSet + COLUMN_COUNT = ColumnCount HISTOGRAM = Histogram ILIKE_COUNT = ILikeCount LIKE_COUNT = LikeCount diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py b/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py new file mode 100644 index 00000000000..9507170d29d --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py @@ -0,0 +1,47 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Table Column Count Metric definition +""" +from sqlalchemy import func, inspect, literal + +from metadata.orm_profiler.metrics.core import StaticMetric, _label + + +class ColumnCount(StaticMetric): + """ + COLUMN_COUNT Metric + + Count all columns on a table + """ + + @classmethod + def name(cls): + return "columnCount" + + @classmethod + def is_col_metric(cls) -> bool: + """ + Mark the class as a Table Metric + """ + return False + + def metric_type(self): + return int + + @_label + def fn(self): + if not hasattr(self, "table"): + raise AttributeError( + "Column Count requires a table to be set: add_props(table=...)(Metrics.COLUMN_COUNT)" + ) + return literal(len(inspect(self.table).c)) diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/count_in_set.py b/ingestion/src/metadata/orm_profiler/metrics/static/count_in_set.py new file mode 100644 index 00000000000..e9cf65ea66a --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/metrics/static/count_in_set.py @@ -0,0 +1,52 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +CountInSet Metric definition +""" + +from sqlalchemy import case, func + +from metadata.orm_profiler.metrics.core import StaticMetric, _label +from metadata.orm_profiler.utils import logger + +logger = logger() + + +class CountInSet(StaticMetric): + """ + COUNT_IN_SET Metric + + Given a column, return the count of values in a given set. + """ + + @classmethod + def name(cls): + return "countInSet" + + def metric_type(self): + return int + + @_label + def fn(self): + if not hasattr(self, "values"): + raise AttributeError( + "CountInSet requires a set of values to be validate: add_props(values=...)(Metrics.COUNT_IN_SET)" + ) + + try: + set_values = set(self.values) + return func.sum(case([(self.col.in_(set_values), 1)], else_=0)) + + except Exception as err: # pylint: disable=broad-except + logger.error(f"Error trying to run countInSet for {self.col} - {err}") + print(err) + return None diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/ilike_count.py b/ingestion/src/metadata/orm_profiler/metrics/static/ilike_count.py index 66430844c33..dd95c01f0cd 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/ilike_count.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/ilike_count.py @@ -36,6 +36,6 @@ class ILikeCount(StaticMetric): def fn(self): if not hasattr(self, "expression"): raise AttributeError( - "ILike Count requires an expression to be set: Metrics.ILIKE_COUNT(col, expression=...)" + "ILike Count requires an expression to be set: add_props(expression=...)(Metrics.ILIKE_COUNT)" ) return func.count(self.col.ilike(self.expression)) diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/like_count.py b/ingestion/src/metadata/orm_profiler/metrics/static/like_count.py index 4dfd7f864f9..23db7b616a0 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/like_count.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/like_count.py @@ -36,6 +36,6 @@ class LikeCount(StaticMetric): def fn(self): if not hasattr(self, "expression"): raise AttributeError( - "Like Count requires an expression to be set: Metrics.LIKE_COUNT(col, expression=...)" + "Like Count requires an expression to be set: add_props(expression=...)(Metrics.LIKE_COUNT)" ) return func.count(self.col.like(self.expression)) diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/mean.py b/ingestion/src/metadata/orm_profiler/metrics/static/mean.py index 43e4ee94ee0..961c4ddf1af 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/mean.py @@ -66,7 +66,7 @@ class Mean(StaticMetric): if is_concatenable(self.col.type): return ConcatAvgFn(self.col) - logger.warning( - f"Don't know how to process type {self.col.type} when computing AVG" + logger.debug( + f"Don't know how to process type {self.col.type} when computing MEAN" ) return None diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/concat.py b/ingestion/src/metadata/orm_profiler/orm/functions/concat.py index 63cfe9b17cd..c10c3783736 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/concat.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/concat.py @@ -33,6 +33,7 @@ def _(element, compiler, **kw): return "CONCAT(%s)" % compiler.process(element.clauses, **kw) +@compiles(ConcatFn, DatabaseServiceType.Redshift.value.lower()) @compiles(ConcatFn, DatabaseServiceType.SQLite.value.lower()) def _(element, compiler, **kw): """ diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index f3f8cded45e..cbe51d503cd 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -173,12 +173,17 @@ class OrmProfilerProcessor(Processor[Table]): return table.fullyQualifiedName + col + test_type def run_table_test( - self, table: Table, test_case: TableTestCase, profiler_results: TableProfile + self, + table: Table, + orm_table, + test_case: TableTestCase, + profiler_results: TableProfile, ) -> Optional[TestCaseResult]: """ Run & log the table test against the TableProfile. :param table: Table Entity being processed + :param orm_table: Declarative Meta :param test_case: Table Test Case to run :param profiler_results: Table profiler with informed metrics :return: TestCaseResult @@ -196,6 +201,8 @@ class OrmProfilerProcessor(Processor[Table]): test_case.config, table_profile=profiler_results, execution_date=self.execution_date, + session=self.session, + table=orm_table, ) self.log_test_result(name=test_name, result=test_case_result) return test_case_result @@ -203,6 +210,7 @@ class OrmProfilerProcessor(Processor[Table]): def run_column_test( self, table: Table, + orm_table, column: str, test_case: ColumnTestCase, profiler_results: TableProfile, @@ -211,6 +219,7 @@ class OrmProfilerProcessor(Processor[Table]): Run & log the column test against the ColumnProfile :param table: Table Entity being processed + :param orm_table: Declarative Meta :param column: Column being tested :param test_case: Column Test Case to run :param profiler_results: Table profiler with informed metrics @@ -248,20 +257,26 @@ class OrmProfilerProcessor(Processor[Table]): test_case_result: TestCaseResult = validate( test_case.config, - col_profiler_res, + col_profile=col_profiler_res, execution_date=self.execution_date, + session=self.session, + table=orm_table, ) self.log_test_result(name=test_name, result=test_case_result) return test_case_result def validate_config_tests( - self, table: Table, profiler_results: TableProfile + self, table: Table, orm_table, profiler_results: TableProfile ) -> Optional[TestDef]: """ Here we take care of new incoming tests in the workflow definition. Run them and prepare the new TestDef of the record, that will be sent to the sink to update the Table Entity. + + :param table: OpenMetadata Table Entity being processed + :param orm_table: Declarative Meta + :param profiler_results: TableProfile with computed metrics """ logger.info(f"Checking validations for {table.fullyQualifiedName}...") @@ -285,6 +300,7 @@ class OrmProfilerProcessor(Processor[Table]): for table_test in my_record_tests.table_tests: test_case_result = self.run_table_test( table=table, + orm_table=orm_table, test_case=table_test.testCase, profiler_results=profiler_results, ) @@ -294,6 +310,7 @@ class OrmProfilerProcessor(Processor[Table]): for column_test in my_record_tests.column_tests: test_case_result = self.run_column_test( table=table, + orm_table=orm_table, column=column_test.columnName, test_case=column_test.testCase, profiler_results=profiler_results, @@ -306,6 +323,7 @@ class OrmProfilerProcessor(Processor[Table]): def validate_entity_tests( self, table: Table, + orm_table, profiler_results: TableProfile, config_tests: Optional[TestDef], ) -> Optional[TestDef]: @@ -319,6 +337,7 @@ class OrmProfilerProcessor(Processor[Table]): and trust the workflow input. :param table: OpenMetadata Table Entity being processed + :param orm_table: Declarative Meta :param profiler_results: TableProfile with computed metrics :param config_tests: Results of running the configuration tests """ @@ -356,6 +375,7 @@ class OrmProfilerProcessor(Processor[Table]): for table_test in table_tests: test_case_result = self.run_table_test( table=table, + orm_table=orm_table, test_case=table_test.testCase, profiler_results=profiler_results, ) @@ -382,6 +402,7 @@ class OrmProfilerProcessor(Processor[Table]): if column_test: test_case_result = self.run_column_test( table=table, + orm_table=orm_table, column=column_test.columnName, test_case=column_test.testCase, profiler_results=profiler_results, @@ -414,10 +435,12 @@ class OrmProfilerProcessor(Processor[Table]): # First, check if we have any tests directly configured in the workflow config_tests = None if self.config.test_suite: - config_tests = self.validate_config_tests(record, entity_profile) + config_tests = self.validate_config_tests(record, orm_table, entity_profile) # Then, Check if the entity has any tests - record_tests = self.validate_entity_tests(record, entity_profile, config_tests) + record_tests = self.validate_entity_tests( + record, orm_table, entity_profile, config_tests + ) res = ProfilerResponse( table=record, diff --git a/ingestion/src/metadata/orm_profiler/profiles/core.py b/ingestion/src/metadata/orm_profiler/profiles/core.py index f892c987be7..2981b507f23 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/core.py +++ b/ingestion/src/metadata/orm_profiler/profiles/core.py @@ -218,22 +218,28 @@ class Profiler(Generic[MetricType]): """ for metric in self.get_col_metrics(self.query_metrics): + try: + metric_query = metric(col).query(session=self.session) - metric_query = metric(col).query(session=self.session) + # We might not compute some metrics based on the column type. + # In those cases, the `query` function returns None + if not metric_query: + continue - # We might not compute some metrics based on the column type. - # In those cases, the `query` function returns None - if not metric_query: - continue + query_res = metric_query.all() - query_res = metric_query.all() + # query_res has the shape of List[Row], where each row is a dict, + # e.g., [{colA: 1, colB: 2},...] + # We are going to transform this into a Dict[List] by pivoting, so that + # data = {colA: [1,2,3], colB: [4,5,6]...} + data = {k: [dic[k] for dic in query_res] for k in dict(query_res[0])} + self._column_results[col.name].update({metric.name(): data}) - # query_res has the shape of List[Row], where each row is a dict, - # e.g., [{colA: 1, colB: 2},...] - # We are going to transform this into a Dict[List] by pivoting, so that - # data = {colA: [1,2,3], colB: [4,5,6]...} - data = {k: [dic[k] for dic in query_res] for k in dict(query_res[0])} - self._column_results[col.name].update({metric.name(): data}) + except Exception as err: # pylint: disable=broad-except + logger.error( + f"Exception encountered computing {metric.name()} for {self.table.__tablename__}.{col.name} - {err}" + ) + self.session.rollback() def post_col_run(self, col: Column): """ @@ -299,8 +305,9 @@ class Profiler(Generic[MetricType]): self.execute_column(col) except Exception as exc: # pylint: disable=broad-except logger.error( - f"Error trying to compute profile for {self.table}.{col.name} - {exc}" + f"Error trying to compute profile for {self.table.__tablename__}.{col.name} - {exc}" ) + self.session.rollback() return self @@ -343,3 +350,7 @@ class Profiler(Generic[MetricType]): except ValidationError as err: logger.error(f"Cannot transform profiler results to TableProfile {err}") raise err + + @property + def column_results(self): + return self._column_results diff --git a/ingestion/src/metadata/orm_profiler/profiles/default.py b/ingestion/src/metadata/orm_profiler/profiles/default.py index 8a51003ab6d..dc04bbccd61 100644 --- a/ingestion/src/metadata/orm_profiler/profiles/default.py +++ b/ingestion/src/metadata/orm_profiler/profiles/default.py @@ -16,6 +16,7 @@ from typing import List, Optional from sqlalchemy.orm.session import Session +from metadata.orm_profiler.metrics.core import add_props from metadata.orm_profiler.metrics.registry import Metrics from metadata.orm_profiler.profiles.core import Profiler @@ -36,6 +37,7 @@ class DefaultProfiler(Profiler): _metrics = [ # Table Metrics Metrics.ROW_COUNT.value, + add_props(table=table)(Metrics.COLUMN_COUNT.value), # Column Metrics Metrics.MEAN.value, Metrics.COUNT.value, diff --git a/ingestion/src/metadata/orm_profiler/validations/column/column_values_not_in_set.py b/ingestion/src/metadata/orm_profiler/validations/column/column_values_not_in_set.py new file mode 100644 index 00000000000..9792c80c8a2 --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/column/column_values_not_in_set.py @@ -0,0 +1,87 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ColumnValuesToBeNotNull validation implementation +""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import inspect +from sqlalchemy.orm import DeclarativeMeta, Session + +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.column.columnValuesToBeNotInSet import ( + ColumnValuesToBeNotInSet, +) +from metadata.orm_profiler.metrics.core import add_props +from metadata.orm_profiler.metrics.registry import Metrics +from metadata.orm_profiler.profiles.core import Profiler +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def column_values_not_in_set( + test_case: ColumnValuesToBeNotInSet, + col_profile: ColumnProfile, + execution_date: datetime, + session: Optional[Session] = None, + table: Optional[DeclarativeMeta] = None, +) -> TestCaseResult: + """ + Validate Column Values metric + :param test_case: ColumnValuesToBeUnique. Just used to trigger singledispatch + :param col_profile: should contain count and distinct count metrics + :param execution_date: Datetime when the tests ran + :param session: SQLAlchemy Session, for tests that need to compute new metrics + :param table: SQLAlchemy Table, for tests that need to compute new metrics + :return: TestCaseResult with status and results + """ + + set_count = add_props(values=test_case.values)(Metrics.COUNT_IN_SET.value) + + try: + col = next( + iter([col for col in inspect(table).c if col.name == col_profile.name]), + None, + ) + + if col is None: + raise ValueError( + f"Cannot find the configured column {col_profile.name} for ColumnValuesToBeNotInSet" + ) + + res = ( + Profiler(set_count, session=session, table=table, use_cols=[col]) + .execute() + .column_results + ) + set_count_res = res.get(col.name)[Metrics.COUNT_IN_SET.name] + + except Exception as err: # pylint: disable=broad-except + session.rollback() + msg = f"Error computing ColumnValuesToBeNotInSet for {col_profile.name} - {err}" + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = TestCaseStatus.Success if set_count_res == 0 else TestCaseStatus.Failed + result = f"Found countInSet={set_count_res}. It should be 0." + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py index 30a0bd9b487..e8b98463272 100644 --- a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py +++ b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_between.py @@ -29,6 +29,7 @@ def column_values_to_be_between( test_case: ColumnValuesToBeBetween, col_profile: ColumnProfile, execution_date: datetime, + **__, ) -> TestCaseResult: """ Validate Column Values metric diff --git a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_not_null.py b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_not_null.py new file mode 100644 index 00000000000..60612b7ee04 --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_not_null.py @@ -0,0 +1,58 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ColumnValuesToBeNotNull validation implementation +""" + +from datetime import datetime + +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.column.columnValuesToBeNotNull import ( + ColumnValuesToBeNotNull, +) +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def column_values_to_be_not_null( + _: ColumnValuesToBeNotNull, + col_profile: ColumnProfile, + execution_date: datetime, + **__, +) -> TestCaseResult: + """ + Validate Column Values metric + :param _: ColumnValuesToBeUnique. Just used to trigger singledispatch + :param col_profile: should contain count and distinct count metrics + :param execution_date: Datetime when the tests ran + :return: TestCaseResult with status and results + """ + + if col_profile.nullCount is None: + msg = "We expect `nullCount` to be informed on the profiler for ColumnValuesToBeNotNull." + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = ( + TestCaseStatus.Success if col_profile.nullCount == 0 else TestCaseStatus.Failed + ) + result = f"Found nullCount={col_profile.nullCount}. It should be 0." + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py index 7bef060382f..2c649e60093 100644 --- a/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py +++ b/ingestion/src/metadata/orm_profiler/validations/column/column_values_to_be_unique.py @@ -29,6 +29,7 @@ def column_values_to_be_unique( _: ColumnValuesToBeUnique, col_profile: ColumnProfile, execution_date: datetime, + **__, ) -> TestCaseResult: """ Validate Column Values metric diff --git a/ingestion/src/metadata/orm_profiler/validations/core.py b/ingestion/src/metadata/orm_profiler/validations/core.py index 75285b38436..85e6399db34 100644 --- a/ingestion/src/metadata/orm_profiler/validations/core.py +++ b/ingestion/src/metadata/orm_profiler/validations/core.py @@ -24,12 +24,24 @@ from functools import singledispatch from metadata.generated.schema.tests.basic import TestCaseResult from metadata.orm_profiler.utils import logger +from metadata.orm_profiler.validations.column.column_values_not_in_set import ( + column_values_not_in_set, +) from metadata.orm_profiler.validations.column.column_values_to_be_between import ( column_values_to_be_between, ) +from metadata.orm_profiler.validations.column.column_values_to_be_not_null import ( + column_values_to_be_not_null, +) from metadata.orm_profiler.validations.column.column_values_to_be_unique import ( column_values_to_be_unique, ) +from metadata.orm_profiler.validations.table.table_column_count_to_equal import ( + table_column_count_to_equal, +) +from metadata.orm_profiler.validations.table.table_row_count_to_be_between import ( + table_row_count_to_be_between, +) from metadata.orm_profiler.validations.table.table_row_count_to_equal import ( table_row_count_to_equal, ) @@ -38,7 +50,7 @@ logger = logger() @singledispatch -def validate(test_case, *args, **kwargs) -> TestCaseResult: +def validate(test_case, **kwargs) -> TestCaseResult: """ Default function to validate test cases. @@ -51,7 +63,13 @@ def validate(test_case, *args, **kwargs) -> TestCaseResult: # Table Tests validate.register(table_row_count_to_equal) +validate.register(table_row_count_to_be_between) +validate.register(table_column_count_to_equal) # Column Tests validate.register(column_values_to_be_between) validate.register(column_values_to_be_unique) +validate.register(column_values_to_be_not_null) + +# Column Session Tests +validate.register(column_values_not_in_set) diff --git a/ingestion/src/metadata/orm_profiler/validations/table/table_column_count_to_equal.py b/ingestion/src/metadata/orm_profiler/validations/table/table_column_count_to_equal.py new file mode 100644 index 00000000000..0f604354a8d --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/table/table_column_count_to_equal.py @@ -0,0 +1,61 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TableColumnCountToEqual validation implementation +""" +from datetime import datetime + +from metadata.generated.schema.entity.data.table import TableProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.table.tableColumnCountToEqual import ( + TableColumnCountToEqual, +) +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def table_column_count_to_equal( + test_case: TableColumnCountToEqual, + table_profile: TableProfile, + execution_date: datetime, + **__, +) -> TestCaseResult: + """ + Validate row count metric + :param test_case: TableColumnCountToEqual + :param table_profile: should contain columnCount metric + :param execution_date: Datetime when the tests ran + :return: TestCaseResult with status and results + """ + + if table_profile.columnCount is None: + msg = "columnCount should not be None for TableColumnCountToEqual" + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = ( + TestCaseStatus.Success + if table_profile.columnCount == test_case.value + else TestCaseStatus.Failed + ) + result = ( + f"Found {table_profile.columnCount} columns vs. the expected {test_case.value}" + ) + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_be_between.py b/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_be_between.py new file mode 100644 index 00000000000..6fa04361661 --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_be_between.py @@ -0,0 +1,59 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TableRowCountToBeBetween validation implementation +""" +from datetime import datetime + +from metadata.generated.schema.entity.data.table import TableProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.table.tableRowCountToBeBetween import ( + TableRowCountToBeBetween, +) +from metadata.orm_profiler.utils import logger + +logger = logger() + + +def table_row_count_to_be_between( + test_case: TableRowCountToBeBetween, + table_profile: TableProfile, + execution_date: datetime, + **__, +) -> TestCaseResult: + """ + Validate row count metric + :param test_case: TableRowCountToBeBetween + :param table_profile: should contain row count metric + :param execution_date: Datetime when the tests ran + :return: TestCaseResult with status and results + """ + + if table_profile.rowCount is None: + msg = "rowCount should not be None for TableRowCountToBeBetween" + logger.error(msg) + return TestCaseResult( + executionTime=execution_date.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=msg, + ) + + status = ( + TestCaseStatus.Success + if test_case.minValue <= table_profile.rowCount <= test_case.maxValue + else TestCaseStatus.Failed + ) + result = f"Found {table_profile.rowCount} rows vs. the expected range [{test_case.minValue}, {test_case.maxValue}]." + + return TestCaseResult( + executionTime=execution_date.timestamp(), testCaseStatus=status, result=result + ) diff --git a/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py b/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py index 45b4effe5cb..8385d493de9 100644 --- a/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py +++ b/ingestion/src/metadata/orm_profiler/validations/table/table_row_count_to_equal.py @@ -28,6 +28,7 @@ def table_row_count_to_equal( test_case: TableRowCountToEqual, table_profile: TableProfile, execution_date: datetime, + **__, ) -> TestCaseResult: """ Validate row count metric diff --git a/ingestion/tests/unit/profiler/test_metrics.py b/ingestion/tests/unit/profiler/test_metrics.py index 5abfa0866e6..62482ee9e56 100644 --- a/ingestion/tests/unit/profiler/test_metrics.py +++ b/ingestion/tests/unit/profiler/test_metrics.py @@ -128,7 +128,7 @@ class MetricsTest(TestCase): res = profiler.execute()._column_results assert res.get(User.nickname.name).get(Metrics.NULL_RATIO.name) == 0.5 - def test_table_count(self): + def test_table_row_count(self): """ Check Table Metric run """ @@ -137,6 +137,15 @@ class MetricsTest(TestCase): res = profiler.execute()._table_results assert res.get(Metrics.ROW_COUNT.name) == 2 + def test_table_column_count(self): + """ + Check Column Count metric + """ + col_count = add_props(table=User)(Metrics.COLUMN_COUNT.value) + profiler = Profiler(col_count, session=self.session, table=User) + res = profiler.execute()._table_results + assert res.get(Metrics.COLUMN_COUNT.name) == 6 + def test_avg(self): """ Check avg for distinct types @@ -456,3 +465,26 @@ class MetricsTest(TestCase): ) assert res.get(User.age.name)[Metrics.UNIQUE_RATIO.name] == 1.0 + + def test_count_in_set(self): + """ + Check Count In Set metric + """ + + set_count = add_props(values=["John"])(Metrics.COUNT_IN_SET.value) + res = ( + Profiler(set_count, session=self.session, table=User, use_cols=[User.name]) + .execute() + ._column_results + ) + + assert res.get(User.name.name)[Metrics.COUNT_IN_SET.name] == 1.0 + + set_count = add_props(values=["John", "Jane"])(Metrics.COUNT_IN_SET.value) + res = ( + Profiler(set_count, session=self.session, table=User, use_cols=[User.name]) + .execute() + ._column_results + ) + + assert res.get(User.name.name)[Metrics.COUNT_IN_SET.name] == 2.0 diff --git a/ingestion/tests/unit/profiler/test_profiler.py b/ingestion/tests/unit/profiler/test_profiler.py index 17adec3e71b..d3937d6c2d6 100644 --- a/ingestion/tests/unit/profiler/test_profiler.py +++ b/ingestion/tests/unit/profiler/test_profiler.py @@ -59,7 +59,7 @@ class ProfilerTest(TestCase): cls.session.add_all(data) cls.session.commit() - def test_simple_profiler(self): + def test_default_profiler(self): """ Check our pre-cooked profiler """ @@ -69,6 +69,7 @@ class ProfilerTest(TestCase): profile = simple.get_profile() assert profile.rowCount == 2 + assert profile.columnCount == 5 age_profile = next( iter( diff --git a/ingestion/tests/unit/profiler/test_session_validations.py b/ingestion/tests/unit/profiler/test_session_validations.py new file mode 100644 index 00000000000..e2567325b01 --- /dev/null +++ b/ingestion/tests/unit/profiler/test_session_validations.py @@ -0,0 +1,126 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test validations that need a session configured to run +""" +from datetime import datetime +from unittest import TestCase + +from sqlalchemy import TEXT, Column, Integer, String, create_engine +from sqlalchemy.orm import declarative_base + +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.column.columnValuesToBeNotInSet import ( + ColumnValuesToBeNotInSet, +) +from metadata.orm_profiler.engines import create_and_bind_session +from metadata.orm_profiler.validations.core import validate + +EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d") +Base = declarative_base() + + +class User(Base): + __tablename__ = "users" + id = Column(Integer, primary_key=True) + name = Column(String(256)) + fullname = Column(String(256)) + nickname = Column(String(256)) + comments = Column(TEXT) + age = Column(Integer) + + +class MetricsTest(TestCase): + """ + Run checks on different metrics + """ + + engine = create_engine("sqlite+pysqlite:///:memory:", echo=False, future=True) + session = create_and_bind_session(engine) + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare Ingredients + """ + User.__table__.create(bind=cls.engine) + + data = [ + User( + name="John", + fullname="John Doe", + nickname="johnny b goode", + comments="no comments", + age=30, + ), + User( + name="Jane", + fullname="Jone Doe", + nickname=None, + comments="maybe some comments", + age=31, + ), + ] + cls.session.add_all(data) + cls.session.commit() + + def test_column_values_not_in_set(self): + """ + Check that the metric runs and the results are correctly validated + """ + column_profile = ColumnProfile(name="name") # column name + + res_ok = validate( + ColumnValuesToBeNotInSet(values=["random", "forbidden"]), + col_profile=column_profile, + execution_date=EXECUTION_DATE, + session=self.session, + table=User, + ) + + assert res_ok == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Success, + result="Found countInSet=0. It should be 0.", + ) + + res_ko = validate( + ColumnValuesToBeNotInSet(values=["John", "forbidden"]), + col_profile=column_profile, + execution_date=EXECUTION_DATE, + session=self.session, + table=User, + ) + + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result="Found countInSet=1. It should be 0.", + ) + + res_aborted = validate( + ColumnValuesToBeNotInSet(values=["John", "forbidden"]), + col_profile=ColumnProfile(name="random"), + execution_date=EXECUTION_DATE, + session=self.session, + table=User, + ) + + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=( + "Error computing ColumnValuesToBeNotInSet for random - Cannot find" + + " the configured column random for ColumnValuesToBeNotInSet" + ), + ) diff --git a/ingestion/tests/unit/profiler/test_validations.py b/ingestion/tests/unit/profiler/test_validations.py index 76bf61dba08..314982b13c4 100644 --- a/ingestion/tests/unit/profiler/test_validations.py +++ b/ingestion/tests/unit/profiler/test_validations.py @@ -22,9 +22,18 @@ from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( ColumnValuesToBeBetween, ) +from metadata.generated.schema.tests.column.columnValuesToBeNotNull import ( + ColumnValuesToBeNotNull, +) from metadata.generated.schema.tests.column.columnValuesToBeUnique import ( ColumnValuesToBeUnique, ) +from metadata.generated.schema.tests.table.tableColumnCountToEqual import ( + TableColumnCountToEqual, +) +from metadata.generated.schema.tests.table.tableRowCountToBeBetween import ( + TableRowCountToBeBetween, +) from metadata.generated.schema.tests.table.tableRowCountToEqual import ( TableRowCountToEqual, ) @@ -82,6 +91,104 @@ def test_table_row_count_to_equal(): ) +def test_table_row_count_to_be_between(): + """ + Check TableRowCountToEqual + """ + table_profile = TableProfile( + profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"), + rowCount=100, + ) + + res_ok = validate( + TableRowCountToBeBetween(minValue=20, maxValue=120), + table_profile=table_profile, + execution_date=EXECUTION_DATE, + ) + assert res_ok == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Success, + result="Found 100.0 rows vs. the expected range [20, 120].", + ) + + res_ko = validate( + TableRowCountToBeBetween(minValue=120, maxValue=200), + table_profile=table_profile, + execution_date=EXECUTION_DATE, + ) + + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result="Found 100.0 rows vs. the expected range [120, 200].", + ) + + table_profile_aborted = TableProfile( + profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"), + ) + + res_aborted = validate( + TableRowCountToBeBetween(minValue=120, maxValue=200), + table_profile=table_profile_aborted, + execution_date=EXECUTION_DATE, + ) + + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result="rowCount should not be None for TableRowCountToBeBetween", + ) + + +def test_table_column_count_to_equal(): + """ + Check TableRowCountToEqual + """ + table_profile = TableProfile( + profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"), + columnCount=5, + ) + + res_ok = validate( + TableColumnCountToEqual(value=5), + table_profile=table_profile, + execution_date=EXECUTION_DATE, + ) + assert res_ok == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Success, + result="Found 5.0 columns vs. the expected 5", + ) + + res_ko = validate( + TableColumnCountToEqual(value=20), + table_profile=table_profile, + execution_date=EXECUTION_DATE, + ) + + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result="Found 5.0 columns vs. the expected 20", + ) + + table_profile_aborted = TableProfile( + profileDate=EXECUTION_DATE.strftime("%Y-%m-%d"), + ) + + res_aborted = validate( + TableColumnCountToEqual(value=5), + table_profile=table_profile_aborted, + execution_date=EXECUTION_DATE, + ) + + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result="columnCount should not be None for TableColumnCountToEqual", + ) + + def test_column_values_to_be_between(): """ Check ColumnValuesToBeBetween @@ -204,3 +311,56 @@ def test_column_values_to_be_unique(): + " but got valuesCount=None, uniqueCount=None." ), ) + + +def test_column_values_to_be_not_null(): + """ + Check ColumnValuesToBeNotNull + """ + + column_profile = ColumnProfile( + nullCount=0, + ) + + res_ok = validate( + ColumnValuesToBeNotNull(), + col_profile=column_profile, + execution_date=EXECUTION_DATE, + ) + assert res_ok == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Success, + result=("Found nullCount=0.0. It should be 0."), + ) + + column_profile_ko = ColumnProfile( + nullCount=10, + ) + + res_ko = validate( + ColumnValuesToBeNotNull(), + col_profile=column_profile_ko, + execution_date=EXECUTION_DATE, + ) + + assert res_ko == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Failed, + result=("Found nullCount=10.0. It should be 0."), + ) + + column_profile_aborted = ColumnProfile() + + res_aborted = validate( + ColumnValuesToBeNotNull(), + col_profile=column_profile_aborted, + execution_date=EXECUTION_DATE, + ) + + assert res_aborted == TestCaseResult( + executionTime=EXECUTION_DATE.timestamp(), + testCaseStatus=TestCaseStatus.Aborted, + result=( + "We expect `nullCount` to be informed on the profiler for ColumnValuesToBeNotNull." + ), + )