mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-26 15:10:05 +00:00
GEN 1683 - Add Column Value to be At Expected Location Test (#18524)
* feat: added column value to be in expected location test * fix: renamed value -> values * doc: added 1.6 documentatio entry * style: ran python linting * fix: move data packaging to pyproject.yaml * fix: add init file back for data package * fix: failing test case
This commit is contained in:
parent
f1ebe816b0
commit
d579008c99
@ -31,6 +31,7 @@ namespaces = true
|
||||
[tool.setuptools.package-data]
|
||||
"metadata.examples" = ["workflows/*.yaml"]
|
||||
"_openmetadata_testutils" = ["data/**/*"]
|
||||
"metadata.data_quality" = ["data/**/*"]
|
||||
|
||||
[project.scripts]
|
||||
metadata = "metadata.cmd:metadata"
|
||||
|
||||
@ -141,6 +141,7 @@ base_requirements = {
|
||||
"tabulate==0.9.0",
|
||||
"typing-inspect",
|
||||
"packaging", # For version parsing
|
||||
"shapely",
|
||||
}
|
||||
|
||||
plugins: Dict[str, Set[str]] = {
|
||||
|
||||
34957
ingestion/src/metadata/data_quality/data/fr-cities.json
Normal file
34957
ingestion/src/metadata/data_quality/data/fr-cities.json
Normal file
File diff suppressed because one or more lines are too long
@ -0,0 +1,297 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Validator for column value to be at expected location test case
|
||||
"""
|
||||
|
||||
import json
|
||||
import traceback
|
||||
from abc import abstractmethod
|
||||
from importlib import resources
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
|
||||
from shapely.geometry import MultiPolygon, Point, Polygon
|
||||
|
||||
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
|
||||
from metadata.data_quality.validations.utils import casefold_if_string
|
||||
from metadata.generated.schema.tests.basic import (
|
||||
TestCaseResult,
|
||||
TestCaseStatus,
|
||||
TestResultValue,
|
||||
)
|
||||
from metadata.utils import entity_link
|
||||
from metadata.utils.logger import test_suite_logger
|
||||
|
||||
logger = test_suite_logger()
|
||||
|
||||
|
||||
class BaseColumnValuesToBeAtExpectedLocationValidator(BaseTestValidator):
|
||||
"""Validator for column value to be at expected location test case"""
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
def run_validation(self) -> TestCaseResult:
|
||||
"""Run validation for the given test case
|
||||
|
||||
Returns:
|
||||
TestCaseResult:
|
||||
"""
|
||||
valid = True
|
||||
valid_count = 0
|
||||
invalid_count = 0
|
||||
unknown_count = 0
|
||||
try:
|
||||
radius: float = self.get_test_case_param_value(
|
||||
self.test_case.parameterValues, # type: ignore
|
||||
"radius",
|
||||
float,
|
||||
)
|
||||
lon: str = self.get_test_case_param_value(
|
||||
self.test_case.parameterValues, # type: ignore
|
||||
"longitudeColumnName",
|
||||
str,
|
||||
)
|
||||
lat: str = self.get_test_case_param_value(
|
||||
self.test_case.parameterValues, # type: ignore
|
||||
"latitudeColumnName",
|
||||
str,
|
||||
)
|
||||
ref_type: str = self.get_test_case_param_value(
|
||||
self.test_case.parameterValues, # type: ignore
|
||||
"locationReferenceType",
|
||||
str,
|
||||
)
|
||||
|
||||
column_reference = entity_link.split(self.test_case.entityLink.root)[-1]
|
||||
columns = [column_reference, lon, lat]
|
||||
shapes = self._get_shapes(radius, ref_type)
|
||||
for data in self._fetch_data(columns):
|
||||
is_valid = self._validate_point(
|
||||
data[column_reference],
|
||||
ref_type,
|
||||
data[lat],
|
||||
data[lon],
|
||||
shapes,
|
||||
)
|
||||
if is_valid is False:
|
||||
valid = False
|
||||
invalid_count += 1
|
||||
elif is_valid is None:
|
||||
unknown_count += 1
|
||||
else:
|
||||
valid_count += 1
|
||||
|
||||
except (ValueError, RuntimeError) as exc:
|
||||
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(msg)
|
||||
return self.get_test_case_result_object(
|
||||
self.execution_date,
|
||||
TestCaseStatus.Aborted,
|
||||
msg,
|
||||
[
|
||||
TestResultValue(
|
||||
name="validLocation", value=None, predictedValue=None
|
||||
),
|
||||
TestResultValue(
|
||||
name="invalidLocation", value=None, predictedValue=None
|
||||
),
|
||||
TestResultValue(
|
||||
name="unknownLocation", value=None, predictedValue=None
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
if self.test_case.computePassedFailedRowCount:
|
||||
row_count, failed_rows = (valid_count + invalid_count), invalid_count
|
||||
else:
|
||||
row_count, failed_rows = None, None
|
||||
|
||||
return self.get_test_case_result_object(
|
||||
self.execution_date,
|
||||
self.get_test_case_status(valid),
|
||||
(
|
||||
f"Found validLocation={valid_count}, invalidLocation={invalid_count},"
|
||||
f"unknownLocation={unknown_count} vs. expected 0 invalidLocation."
|
||||
),
|
||||
[
|
||||
TestResultValue(
|
||||
name="validLocation", value=str(valid_count), predictedValue=None
|
||||
),
|
||||
TestResultValue(
|
||||
name="invalidLocation",
|
||||
value=str(invalid_count),
|
||||
predictedValue=None,
|
||||
),
|
||||
TestResultValue(
|
||||
name="unknownLocation",
|
||||
value=str(unknown_count),
|
||||
predictedValue=None,
|
||||
),
|
||||
],
|
||||
row_count=row_count,
|
||||
failed_rows=failed_rows,
|
||||
min_bound=None,
|
||||
max_bound=None,
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def _fetch_data(self, columns: List[str]):
|
||||
raise NotImplementedError
|
||||
|
||||
def _get_shapes(self, radius: float, ref_type: str) -> List[Dict]:
|
||||
"""Transform the json file into a list of shapes
|
||||
|
||||
Args:
|
||||
radius (float): radius to buffer the shapes
|
||||
Returns:
|
||||
List[Dict]
|
||||
"""
|
||||
geojson_property = "libgeo" if ref_type == "CITY" else "codgeo"
|
||||
# pylint: disable=deprecated-method
|
||||
with resources.open_text("metadata.data_quality.data", "fr-cities.json") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# bring meters to coordinates degrees (e.g. 1000 meter = 0.01 degree)
|
||||
radius = radius / 100000
|
||||
shapes = []
|
||||
|
||||
for feature in data.get("features"):
|
||||
type_ = feature["geometry"]["type"]
|
||||
|
||||
if type_ == "Polygon":
|
||||
polygon = Polygon(feature["geometry"]["coordinates"][0])
|
||||
else:
|
||||
coordinates = [
|
||||
Polygon(c[0]) for c in feature["geometry"]["coordinates"]
|
||||
]
|
||||
polygon = MultiPolygon(coordinates)
|
||||
polygon = polygon.buffer(radius)
|
||||
properties = feature["properties"]
|
||||
shapes.append({"geometry": polygon, "properties": properties})
|
||||
|
||||
return sorted(shapes, key=lambda x: x["properties"][geojson_property])
|
||||
|
||||
def _search_location(
|
||||
self, shapes: List[Dict], ref: Any, ref_type: str
|
||||
) -> Optional[List]:
|
||||
"""Search for the location in the shapes list
|
||||
|
||||
Args:
|
||||
shapes (Dict): list of shapes
|
||||
ref (Any): reference to search for
|
||||
ref_type (str): type of reference
|
||||
Returns:
|
||||
Optional[Dict]
|
||||
"""
|
||||
geojson_property = "libgeo" if ref_type == "CITY" else "codgeo"
|
||||
geotype = str
|
||||
|
||||
if len(shapes) == 0:
|
||||
return []
|
||||
|
||||
if len(shapes) == 1:
|
||||
return (
|
||||
shapes
|
||||
if self._compare_geojson_values(
|
||||
self._get_geojson_value(shapes[0], geojson_property), ref, geotype
|
||||
)
|
||||
else []
|
||||
)
|
||||
|
||||
n = len(shapes) // 2
|
||||
mid_value = casefold_if_string(
|
||||
self._get_geojson_value(shapes[n], geojson_property)
|
||||
)
|
||||
ref = casefold_if_string(ref)
|
||||
if self._compare_geojson_values(mid_value, ref, geotype):
|
||||
matches = [shapes[n]]
|
||||
left = n - 1
|
||||
|
||||
while left >= 0 and self._compare_geojson_values(
|
||||
self._get_geojson_value(shapes[left], geojson_property), ref, geotype
|
||||
):
|
||||
matches.append(shapes[left])
|
||||
left -= 1
|
||||
|
||||
right = n + 1
|
||||
while right < len(shapes) and self._compare_geojson_values(
|
||||
self._get_geojson_value(shapes[right], geojson_property), ref, geotype
|
||||
):
|
||||
matches.append(shapes[right])
|
||||
right += 1
|
||||
|
||||
return matches
|
||||
|
||||
if geotype(mid_value) > geotype(ref):
|
||||
return self._search_location(shapes[:n], ref, ref_type)
|
||||
return self._search_location(shapes[n:], ref, ref_type)
|
||||
|
||||
def _get_geojson_value(self, shape: Dict, geojson_property: str):
|
||||
"""Given a shape, return the geojson property value
|
||||
|
||||
Args:
|
||||
shape (Dict): shape to extract the value from
|
||||
geojson_property (str): geojson property to extract
|
||||
"""
|
||||
return shape.get("properties", {}).get(geojson_property, "")
|
||||
|
||||
def _compare_geojson_values(self, value: Any, ref: Any, geotype: Callable) -> bool:
|
||||
"""Compare the geojson values
|
||||
|
||||
Args:
|
||||
value (Any): value to compare
|
||||
ref (Any): reference to compare to
|
||||
|
||||
Returns:
|
||||
bool:
|
||||
"""
|
||||
return geotype(casefold_if_string(value)) == geotype(casefold_if_string(ref))
|
||||
|
||||
def _validate_point(
|
||||
self,
|
||||
ref: Any,
|
||||
ref_type: str,
|
||||
lat: float,
|
||||
lon: Union[float, str],
|
||||
shapes: List[Dict],
|
||||
) -> Optional[bool]:
|
||||
"""Validate the point is within the shapes
|
||||
|
||||
Args:
|
||||
ref (Any): reference to search for
|
||||
ref_type (str): type of reference
|
||||
lat (float): latitude
|
||||
lon (float): lonitude
|
||||
shapes (List[Dict]): list of shapes
|
||||
|
||||
Returns:
|
||||
bool:
|
||||
"""
|
||||
if isinstance(lon, str) or isinstance(lat, str):
|
||||
# lat/lon can be represented as strings in format 1,7743058 or 1.7743058
|
||||
try:
|
||||
lon = float(lon)
|
||||
lat = float(lat)
|
||||
except ValueError:
|
||||
lon = float(lon.replace(",", ".")) # type: ignore
|
||||
lat = float(lat.replace(",", ".")) # type: ignore
|
||||
|
||||
if not lon or not lat:
|
||||
return None
|
||||
point = Point(lon, lat)
|
||||
locations = self._search_location(shapes, ref, ref_type)
|
||||
if not locations:
|
||||
return None
|
||||
for location in locations:
|
||||
if location["geometry"].contains(point):
|
||||
return True
|
||||
|
||||
return False
|
||||
@ -0,0 +1,40 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Pandas validator for column value to be at expected location test case
|
||||
"""
|
||||
|
||||
from typing import List, cast
|
||||
|
||||
from metadata.data_quality.validations.column.base.columnValuesToBeAtExpectedLocation import (
|
||||
BaseColumnValuesToBeAtExpectedLocationValidator,
|
||||
)
|
||||
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
|
||||
PandasValidatorMixin,
|
||||
)
|
||||
from metadata.utils.logger import test_suite_logger
|
||||
|
||||
logger = test_suite_logger()
|
||||
|
||||
|
||||
class ColumnValuesToBeAtExpectedLocationValidator(
|
||||
BaseColumnValuesToBeAtExpectedLocationValidator, PandasValidatorMixin
|
||||
):
|
||||
"""Validator for column value to be at expected location test case"""
|
||||
|
||||
def _fetch_data(self, columns: List[str]):
|
||||
from pandas import DataFrame # pylint: disable=import-outside-toplevel
|
||||
|
||||
self.runner = cast(List[DataFrame], self.runner)
|
||||
for df in self.runner:
|
||||
for idx in df.index:
|
||||
yield df.loc[idx, columns]
|
||||
@ -0,0 +1,46 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
SQA validator for column value to be at expected location test case
|
||||
"""
|
||||
|
||||
from typing import Iterator, List, cast
|
||||
|
||||
from sqlalchemy import Column, inspect
|
||||
|
||||
from metadata.data_quality.validations.column.base.columnValuesToBeAtExpectedLocation import (
|
||||
BaseColumnValuesToBeAtExpectedLocationValidator,
|
||||
)
|
||||
from metadata.data_quality.validations.mixins.sqa_validator_mixin import (
|
||||
SQAValidatorMixin,
|
||||
)
|
||||
from metadata.profiler.processor.runner import QueryRunner
|
||||
from metadata.utils.logger import test_suite_logger
|
||||
|
||||
logger = test_suite_logger()
|
||||
|
||||
|
||||
class ColumnValuesToBeAtExpectedLocationValidator(
|
||||
BaseColumnValuesToBeAtExpectedLocationValidator, SQAValidatorMixin
|
||||
):
|
||||
"""Validator for column value to be at expected location test case"""
|
||||
|
||||
def _fetch_data(self, columns: List[str]) -> Iterator:
|
||||
"""Fetch data from the runner object"""
|
||||
self.runner = cast(QueryRunner, self.runner)
|
||||
inspection = inspect(self.runner.table)
|
||||
table_columns: List[Column] = inspection.c if inspection is not None else []
|
||||
cols = [col for col in table_columns if col.name in columns]
|
||||
for col in cols:
|
||||
col.key = col.name
|
||||
|
||||
yield from self.runner.yield_from_sample(*cols)
|
||||
@ -2,7 +2,7 @@
|
||||
Data quality validation utility functions.
|
||||
"""
|
||||
|
||||
from typing import Callable, List, Optional, TypeVar, Union
|
||||
from typing import Any, Callable, List, Optional, TypeVar, Union
|
||||
|
||||
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
|
||||
|
||||
@ -54,3 +54,14 @@ def get_bool_test_case_param(
|
||||
if str_val is None:
|
||||
return False
|
||||
return str_val.lower() == "true"
|
||||
|
||||
|
||||
def casefold_if_string(value: Any) -> Any:
|
||||
"""Case fold the value if it is a string.
|
||||
|
||||
Args:
|
||||
value (Any): value to case fold
|
||||
Returns:
|
||||
Any: case folded value
|
||||
"""
|
||||
return value.casefold() if isinstance(value, str) else value
|
||||
|
||||
@ -22,7 +22,10 @@ from sqlalchemy import text
|
||||
from sqlalchemy.orm import DeclarativeMeta, Query, Session
|
||||
from sqlalchemy.orm.util import AliasedClass
|
||||
|
||||
from metadata.profiler.processor.handle_partition import partition_filter_handler
|
||||
from metadata.profiler.processor.handle_partition import (
|
||||
build_partition_predicate,
|
||||
partition_filter_handler,
|
||||
)
|
||||
from metadata.utils.logger import query_runner_logger
|
||||
from metadata.utils.sqa_utils import get_query_filter_for_runner
|
||||
|
||||
@ -121,6 +124,22 @@ class QueryRunner:
|
||||
def select_all_from_sample(self, *entities, **kwargs):
|
||||
return self._select_from_sample(*entities, **kwargs).all()
|
||||
|
||||
def yield_from_sample(self, *entities, **kwargs):
|
||||
query = self._select_from_sample(*entities, **kwargs)
|
||||
if self._partition_details:
|
||||
partition_filter = build_partition_predicate(
|
||||
self._partition_details,
|
||||
self.table.__table__.c,
|
||||
)
|
||||
query.filter(partition_filter)
|
||||
|
||||
result = self._session.execute(self._select_from_sample(*entities, **kwargs))
|
||||
while True:
|
||||
rows = result.fetchmany(1000)
|
||||
if not rows:
|
||||
break
|
||||
yield from rows
|
||||
|
||||
def dispatch_query_select_first(self, *entities, **kwargs):
|
||||
"""dispatch query to sample or all table"""
|
||||
if isinstance(self._sample, AliasedClass):
|
||||
|
||||
@ -260,6 +260,7 @@ def test_all_definition_exists(metadata, run_data_quality_workflow, db_service):
|
||||
"columnValuesToBeNotInSet",
|
||||
"columnValueMeanToBeBetween",
|
||||
"columnValuesToBeBetween",
|
||||
"columnValuesToBeAtExpectedLocation",
|
||||
"tableDiff",
|
||||
}
|
||||
missing = set()
|
||||
|
||||
@ -42,6 +42,7 @@ ENTITY_LINK_AGE = "<#E::table::service.db.users::columns::age>"
|
||||
ENTITY_LINK_NAME = "<#E::table::service.db.users::columns::name>"
|
||||
ENTITY_LINK_USER = "<#E::table::service.db.users>"
|
||||
ENTITY_LINK_INSERTED_DATE = "<#E::table::service.db.users::columns::inserted_date>"
|
||||
ENTITY_LINK_EXPECTED_LOCATION = "<#E::table::service.db.users::columns::postal_code>"
|
||||
|
||||
TABLE = Table(
|
||||
id=uuid4(),
|
||||
@ -55,6 +56,9 @@ TABLE = Table(
|
||||
Column(name="nickname", dataType=DataType.STRING), # type: ignore
|
||||
Column(name="age", dataType=DataType.INT), # type: ignore
|
||||
Column(name="inserted_date", dataType=DataType.DATE), # type: ignore
|
||||
Column(name="postal_code", dataType=DataType.INT), # type: ignore
|
||||
Column(name="lat", dataType=DataType.DECIMAL), # type: ignore
|
||||
Column(name="lon", dataType=DataType.DECIMAL), # type: ignore
|
||||
],
|
||||
database=EntityReference(id=uuid4(), name="db", type="database"), # type: ignore
|
||||
) # type: ignore
|
||||
@ -69,6 +73,9 @@ class User(Base):
|
||||
nickname = sqa.Column(sqa.String(256))
|
||||
age = sqa.Column(sqa.Integer)
|
||||
inserted_date = sqa.Column(sqa.DATE)
|
||||
postal_code = sqa.Column(sqa.INT)
|
||||
lat = sqa.Column(sqa.DECIMAL)
|
||||
lon = sqa.Column(sqa.DECIMAL)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -105,6 +112,9 @@ def create_sqlite_table():
|
||||
nickname="",
|
||||
age=30,
|
||||
inserted_date=datetime.today() - timedelta(days=i),
|
||||
postal_code=60001,
|
||||
lat=49.6852237,
|
||||
lon=1.7743058,
|
||||
),
|
||||
User(
|
||||
name="Jane",
|
||||
@ -113,6 +123,9 @@ def create_sqlite_table():
|
||||
nickname="Johnny d",
|
||||
age=31,
|
||||
inserted_date=datetime.today() - timedelta(days=i),
|
||||
postal_code=19005,
|
||||
lat=45.2589385,
|
||||
lon=1.4731471,
|
||||
),
|
||||
User(
|
||||
name="John",
|
||||
@ -121,6 +134,9 @@ def create_sqlite_table():
|
||||
nickname=None,
|
||||
age=None,
|
||||
inserted_date=datetime.today() - timedelta(days=i),
|
||||
postal_code=11008,
|
||||
lat=42.9974445,
|
||||
lon=2.2518325,
|
||||
),
|
||||
]
|
||||
session.add_all(data)
|
||||
@ -464,7 +480,7 @@ def test_case_table_column_count_to_be_between():
|
||||
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
|
||||
parameterValues=[
|
||||
TestCaseParameterValue(name="minColValue", value="2"),
|
||||
TestCaseParameterValue(name="maxColValue", value="10"),
|
||||
TestCaseParameterValue(name="maxColValue", value="11"),
|
||||
],
|
||||
) # type: ignore
|
||||
|
||||
@ -706,3 +722,20 @@ def test_case_column_values_to_be_between_datetime():
|
||||
TestCaseParameterValue(name="maxValue", value="1625171052000"),
|
||||
],
|
||||
) # type: ignore
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_case_column_values_to_be_at_expected_location():
|
||||
return TestCase(
|
||||
name=TEST_CASE_NAME,
|
||||
entityLink=ENTITY_LINK_EXPECTED_LOCATION,
|
||||
testSuite=EntityReference(id=uuid4(), type="TestSuite"), # type: ignore
|
||||
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), # type: ignore
|
||||
parameterValues=[
|
||||
TestCaseParameterValue(name="locationReferenceType", value="POSTAL_CODE"),
|
||||
TestCaseParameterValue(name="longitudeColumnName", value="lon"),
|
||||
TestCaseParameterValue(name="latitudeColumnName", value="lat"),
|
||||
TestCaseParameterValue(name="radius", value="1000"),
|
||||
],
|
||||
computePassedFailedRowCount=True,
|
||||
) # type: ignore
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Validate column value at location."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Dict, Iterator
|
||||
from unittest.mock import patch
|
||||
|
||||
from metadata.data_quality.validations.column.sqlalchemy.columnValuesToBeAtExpectedLocation import (
|
||||
ColumnValuesToBeAtExpectedLocationValidator,
|
||||
)
|
||||
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
|
||||
from metadata.generated.schema.type.basic import Timestamp
|
||||
|
||||
|
||||
def _fetch_data() -> Iterator[Dict]:
|
||||
rows = [
|
||||
{"postal_code": 60001, "lon": "1,7743058", "lat": "49,6852237"},
|
||||
{"postal_code": 44001, "lon": "-1,5244159", "lat": "47,5546432"},
|
||||
{"postal_code": 60001, "lon": "3,17932", "lat": "49,59686"},
|
||||
]
|
||||
|
||||
yield from rows
|
||||
|
||||
|
||||
def test_column_value_to_be_at_expected_location(
|
||||
test_case_column_values_to_be_at_expected_location,
|
||||
):
|
||||
"""Test column value to be at expected location validation."""
|
||||
validator = ColumnValuesToBeAtExpectedLocationValidator(
|
||||
None,
|
||||
test_case_column_values_to_be_at_expected_location,
|
||||
Timestamp(root=int(datetime.strptime("2021-07-03", "%Y-%m-%d").timestamp())),
|
||||
)
|
||||
with patch(
|
||||
"metadata.data_quality.validations.column.sqlalchemy.columnValuesToBeAtExpectedLocation.ColumnValuesToBeAtExpectedLocationValidator._fetch_data",
|
||||
return_value=_fetch_data(),
|
||||
):
|
||||
result: TestCaseResult = validator.run_validation()
|
||||
|
||||
assert result.testCaseStatus == TestCaseStatus.Failed
|
||||
assert result.testResultValue[0].value == "2"
|
||||
assert result.testResultValue[1].value == "1"
|
||||
assert result.testResultValue[2].value == "0"
|
||||
@ -303,13 +303,22 @@ EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d")
|
||||
"test_case_table_column_count_to_be_between",
|
||||
"tableColumnCountToBeBetween",
|
||||
"TABLE",
|
||||
(TestCaseResult, "7", None, TestCaseStatus.Success, None, None, None, None),
|
||||
(
|
||||
TestCaseResult,
|
||||
"10",
|
||||
None,
|
||||
TestCaseStatus.Success,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
),
|
||||
(
|
||||
"test_case_table_column_count_to_equal",
|
||||
"tableColumnCountToEqual",
|
||||
"TABLE",
|
||||
(TestCaseResult, "7", None, TestCaseStatus.Failed, None, None, None, None),
|
||||
(TestCaseResult, "10", None, TestCaseStatus.Failed, None, None, None, None),
|
||||
),
|
||||
(
|
||||
"test_case_table_column_name_to_exist",
|
||||
@ -407,6 +416,21 @@ EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d")
|
||||
None,
|
||||
),
|
||||
),
|
||||
(
|
||||
"test_case_column_values_to_be_at_expected_location",
|
||||
"columnValuesToBeAtExpectedLocation",
|
||||
"COLUMN",
|
||||
(
|
||||
TestCaseResult,
|
||||
"30",
|
||||
"0",
|
||||
TestCaseStatus.Success,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_suite_validation_database(
|
||||
|
||||
@ -33,6 +33,9 @@ DL_DATA = (
|
||||
"johnny b goode",
|
||||
30,
|
||||
datetime.today() - timedelta(days=1),
|
||||
60001,
|
||||
49.6852237,
|
||||
1.7743058,
|
||||
],
|
||||
[
|
||||
"2",
|
||||
@ -42,8 +45,22 @@ DL_DATA = (
|
||||
"Johnny d",
|
||||
31,
|
||||
datetime.today() - timedelta(days=2),
|
||||
19005,
|
||||
45.2589385,
|
||||
1.4731471,
|
||||
],
|
||||
[
|
||||
"3",
|
||||
"John",
|
||||
"Joh",
|
||||
"John Doe",
|
||||
None,
|
||||
None,
|
||||
datetime.today() - timedelta(days=3),
|
||||
11008,
|
||||
42.9974445,
|
||||
2.2518325,
|
||||
],
|
||||
["3", "John", "Joh", "John Doe", None, None, datetime.today() - timedelta(days=3)],
|
||||
)
|
||||
|
||||
|
||||
@ -57,6 +74,9 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
|
||||
"nickname",
|
||||
"age",
|
||||
"inserted_date",
|
||||
"postal_code",
|
||||
"lat",
|
||||
"lon",
|
||||
],
|
||||
)
|
||||
|
||||
@ -381,13 +401,22 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
|
||||
"test_case_table_column_count_to_be_between",
|
||||
"tableColumnCountToBeBetween",
|
||||
"TABLE",
|
||||
(TestCaseResult, "7", None, TestCaseStatus.Success, None, None, None, None),
|
||||
(
|
||||
TestCaseResult,
|
||||
"10",
|
||||
None,
|
||||
TestCaseStatus.Success,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
),
|
||||
(
|
||||
"test_case_table_column_count_to_equal",
|
||||
"tableColumnCountToEqual",
|
||||
"TABLE",
|
||||
(TestCaseResult, "7", None, TestCaseStatus.Failed, None, None, None, None),
|
||||
(TestCaseResult, "10", None, TestCaseStatus.Failed, None, None, None, None),
|
||||
),
|
||||
(
|
||||
"test_case_table_column_name_to_exist",
|
||||
@ -473,6 +502,21 @@ DATALAKE_DATA_FRAME = lambda times_increase_sample_data: DataFrame(
|
||||
None,
|
||||
),
|
||||
),
|
||||
(
|
||||
"test_case_column_values_to_be_at_expected_location",
|
||||
"columnValuesToBeAtExpectedLocation",
|
||||
"COLUMN",
|
||||
(
|
||||
TestCaseResult,
|
||||
"6000",
|
||||
"0",
|
||||
TestCaseStatus.Success,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_suite_validation_datalake(
|
||||
|
||||
@ -619,6 +619,7 @@ Tests applied on top of Column metrics. Here is the list of all column tests:
|
||||
- [Column Value Median to Be Between](#column-value-median-to-be-between)
|
||||
- [Column Values Sum to Be Between](#column-values-sum-to-be-between)
|
||||
- [Column Values Standard Deviation to Be Between](#column-values-standard-deviation-to-be-between)
|
||||
- [Column Values To Be At Expected Location](#column-values-to-be-at-expected-location)
|
||||
|
||||
### Column Values to Be Unique
|
||||
Makes sure that there are no duplicate values in a given column.
|
||||
@ -1501,3 +1502,70 @@ Accuracy
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Column Values To Be At Expected Location
|
||||
Validate the reference value for a column is a the expected geographic location
|
||||
> Data will be temporarely stored in memory while the test case is running to validate the location. Not data will be permanently stored.
|
||||
> France is the only supported location at this time. To add any additional location please reach out to the team in our slack support channel
|
||||
|
||||
**Dimension**:
|
||||
Accuracy
|
||||
|
||||
**Properties**
|
||||
|
||||
* `locationReferenceType`: the type of location refernce `CITY` or `POSTAL_CODE`
|
||||
* `longitudeColumnName`: longitude column name
|
||||
* `latitudeColumnName`: latitude column name
|
||||
* `radius`: radius in meter from which the location can be from the expected lat/long -- acts as a buffer
|
||||
|
||||
**Behavior**
|
||||
|
||||
| Condition | Status |
|
||||
| ----------- | ----------- |
|
||||
|column values lat/long is **within** the polygon of the column reference (+/- radius) |Success ✅|
|
||||
|column values lat/long is **outside** the polygon of the column reference (+/- radius)|Failed ❌|
|
||||
|
||||
**YAML Config**
|
||||
|
||||
```yaml
|
||||
- name: ExpectedGeoLocation
|
||||
testDefinitionName: ColumnValuesToBeAtExpectedLocation
|
||||
columnName: "Code Insee"
|
||||
parameterValues:
|
||||
- name: locationReferenceType
|
||||
value: POSTAL_CODE
|
||||
- name: longitudeColumnName
|
||||
value: "Coordonnée Y"
|
||||
- name: latitudeColumnName
|
||||
value: "Coordonnée X"
|
||||
- name: radius
|
||||
value: "1000"
|
||||
```
|
||||
|
||||
**JSON Config**
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "ExpectedGeoLocation",
|
||||
"testDefinitionName": "ColumnValuesToBeAtExpectedLocation",
|
||||
"columnName": "Code Insee",
|
||||
"parameterValues": [
|
||||
{
|
||||
"name": "locationReferenceType",
|
||||
"value": "POSTAL_CODE"
|
||||
},
|
||||
{
|
||||
"name": "longitudeColumnName",
|
||||
"value": "Coordonnée Y"
|
||||
},
|
||||
{
|
||||
"name": "latitudeColumnName",
|
||||
"value": "Coordonnée X"
|
||||
},
|
||||
{
|
||||
"name": "radius",
|
||||
"value": "1000"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
@ -626,6 +626,7 @@ Tests applied on top of Column metrics. Here is the list of all column tests:
|
||||
- [Column Value Median to Be Between](#column-value-median-to-be-between)
|
||||
- [Column Values Sum to Be Between](#column-values-sum-to-be-between)
|
||||
- [Column Values Standard Deviation to Be Between](#column-values-standard-deviation-to-be-between)
|
||||
- [Column Values To Be At Expected Location](#column-values-to-be-at-expected-location)
|
||||
|
||||
### Column Values to Be Unique
|
||||
Makes sure that there are no duplicate values in a given column.
|
||||
@ -1508,3 +1509,70 @@ Accuracy
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Column Values To Be At Expected Location
|
||||
Validate the reference value for a column is a the expected geographic location
|
||||
> Data will be temporarely stored in memory while the test case is running to validate the location. Not data will be permanently stored.
|
||||
> France is the only supported location at this time. To add any additional location please reach out to the team in our slack support channel
|
||||
|
||||
**Dimension**:
|
||||
Accuracy
|
||||
|
||||
**Properties**
|
||||
|
||||
* `locationReferenceType`: the type of location refernce `CITY` or `POSTAL_CODE`
|
||||
* `longitudeColumnName`: longitude column name
|
||||
* `latitudeColumnName`: latitude column name
|
||||
* `radius`: radius in meter from which the location can be from the expected lat/long -- acts as a buffer
|
||||
|
||||
**Behavior**
|
||||
|
||||
| Condition | Status |
|
||||
| ----------- | ----------- |
|
||||
|column values lat/long is **within** the polygon of the column reference (+/- radius) |Success ✅|
|
||||
|column values lat/long is **outside** the polygon of the column reference (+/- radius)|Failed ❌|
|
||||
|
||||
**YAML Config**
|
||||
|
||||
```yaml
|
||||
- name: ExpectedGeoLocation
|
||||
testDefinitionName: ColumnValuesToBeAtExpectedLocation
|
||||
columnName: "Code Insee"
|
||||
parameterValues:
|
||||
- name: locationReferenceType
|
||||
value: POSTAL_CODE
|
||||
- name: longitudeColumnName
|
||||
value: "Coordonnée Y"
|
||||
- name: latitudeColumnName
|
||||
value: "Coordonnée X"
|
||||
- name: radius
|
||||
value: "1000"
|
||||
```
|
||||
|
||||
**JSON Config**
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "ExpectedGeoLocation",
|
||||
"testDefinitionName": "ColumnValuesToBeAtExpectedLocation",
|
||||
"columnName": "Code Insee",
|
||||
"parameterValues": [
|
||||
{
|
||||
"name": "locationReferenceType",
|
||||
"value": "POSTAL_CODE"
|
||||
},
|
||||
{
|
||||
"name": "longitudeColumnName",
|
||||
"value": "Coordonnée Y"
|
||||
},
|
||||
{
|
||||
"name": "latitudeColumnName",
|
||||
"value": "Coordonnée X"
|
||||
},
|
||||
{
|
||||
"name": "radius",
|
||||
"value": "1000"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
{
|
||||
"name": "columnValuesToBeAtExpectedLocation",
|
||||
"fullyQualifiedName": "columnValuesToBeAtExpectedLocation",
|
||||
"displayName": "Column Values To Be At Expected Location",
|
||||
"description": "This schema defines the test ColumnValuesToBeAtExpectedLocation. Test the lat/long values in a column to be at the specified location in the reference column.",
|
||||
"entityType": "COLUMN",
|
||||
"testPlatforms": ["OpenMetadata"],
|
||||
"supportedDataTypes": ["BYTES", "STRING", "MEDIUMTEXT", "TEXT", "CHAR", "VARCHAR","NUMBER", "INT", "FLOAT", "DOUBLE", "DECIMAL", "TINYINT", "SMALLINT", "BIGINT", "BYTEINT"],
|
||||
"parameterDefinition": [
|
||||
{
|
||||
"name": "locationReferenceType",
|
||||
"displayName": "Location Reference Type",
|
||||
"description": "The type of the location reference column.",
|
||||
"dataType": "ARRAY",
|
||||
"optionValues": ["CITY", "POSTAL_CODE"],
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "longitudeColumnName",
|
||||
"displayName": "Longitude Column Name (X)",
|
||||
"description": "The longitude column name in the table.",
|
||||
"dataType": "STRING",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "latitudeColumnName",
|
||||
"displayName": "Latitude Column Name (Y)",
|
||||
"description": "The latitude column name in the table.",
|
||||
"dataType": "STRING",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "radius",
|
||||
"displayName": "Radius (in meters) from the expected location",
|
||||
"description": "The radius in meters from the expected location. The test will check if the lat/long values are within the location + the radius.",
|
||||
"dataType": "FLOAT",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"supportsRowLevelPassedFailed": false,
|
||||
"provider": "system",
|
||||
"dataQualityDimension": "Accuracy"
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user