Fixes #6891 by implementing support for VARIANT DType (#7084)

This commit is contained in:
Teddy 2022-08-31 19:01:00 +02:00 committed by GitHub
parent 01309249c8
commit 811f640a18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 54 additions and 13 deletions

View File

@ -90,7 +90,8 @@
"GEOGRAPHY",
"ENUM",
"JSON",
"UUID"
"UUID",
"VARIANT"
]
},
"constraint": {

View File

@ -24,7 +24,8 @@ echo "Prepare Docker volume for the operators"@
cd docker/local-metadata
echo "Starting Local Docker Containers"
docker compose down && docker compose up --build -d
echo "Using ingestion dependency: ${INGESTION_DEPENDENCY:-all}"
docker compose down && docker compose build --build-arg INGESTION_DEPENDENCY="${INGESTION_DEPENDENCY:-all}" && docker compose up -d
until curl -s -f "http://localhost:9200/_cat/indices/team_search_index"; do
printf 'Checking if Elastic Search instance is up...\n'

View File

@ -31,9 +31,6 @@ ENV CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints
# Add docker provider for the DockerOperator
RUN pip install "apache-airflow[docker]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
RUN pip install "openmetadata-ingestion[all]"
RUN pip uninstall openmetadata-ingestion -y
FROM airflow as apis
WORKDIR /openmetadata-airflow-apis
COPY openmetadata-airflow-apis /openmetadata-airflow-apis
@ -44,7 +41,7 @@ FROM apis as ingestion
WORKDIR /ingestion
COPY ingestion /ingestion
ARG INGESTION_DEPENDENCY=all
ARG INGESTION_DEPENDENCY
RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]"
# Uninstalling psycopg2-binary and installing psycopg2 instead

View File

@ -70,6 +70,23 @@ _TYPE_MAP = {
SQA_RESERVED_ATTRIBUTES = ["metadata"]
def map_types(col: Column, table_service_type):
"""returns an ORM type"""
if col.arrayDataType:
return _TYPE_MAP.get(col.dataType)(item_type=col.arrayDataType)
if (
table_service_type == databaseService.DatabaseServiceType.Snowflake
and col.dataType == DataType.JSON
):
from snowflake.sqlalchemy import VARIANT
return VARIANT
return _TYPE_MAP.get(col.dataType)
def check_snowflake_case_sensitive(table_service_type, table_or_col) -> Optional[bool]:
"""Check whether column or table name are not uppercase for snowflake table.
If so, then force quoting, If not return None to let engine backend handle the logic.
@ -97,11 +114,10 @@ def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Colum
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
return sqlalchemy.Column(
name=str(col.name.__root__),
type_=_TYPE_MAP.get(col.dataType)
if not col.arrayDataType
else _TYPE_MAP.get(col.dataType)(item_type=col.arrayDataType),
type_=map_types(col, table_service_type),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_snowflake_case_sensitive(table_service_type, col.name.__root__),
key=str(

View File

@ -110,6 +110,17 @@ class TestSuiteWorkflow:
)
raise err
def _filter_test_cases_for_table_entity(
self, table_fqn: str, test_cases: List[TestCase]
) -> list[TestCase]:
"""Filter test cases for specific entity"""
return [
test_case
for test_case in test_cases
if test_case.entityLink.__root__.split("::")[2].replace(">", "")
== table_fqn
]
def _get_unique_table_entities(self, test_cases: List[TestCase]) -> Set:
"""from a list of test cases extract unique table entities"""
table_fqns = [
@ -240,6 +251,7 @@ class TestSuiteWorkflow:
entity=TestSuite,
fqn=self.config.source.serviceName,
)
if test_suite:
return [test_suite]
return None
@ -377,7 +389,9 @@ class TestSuiteWorkflow:
for table_fqn in unique_table_fqns:
try:
sqa_interface = self._create_sqa_tests_runner_interface(table_fqn)
for test_case in test_cases:
for test_case in self._filter_test_cases_for_table_entity(
table_fqn, test_cases
):
try:
data_test_runner = self._create_data_tests_runner(sqa_interface)
test_result = data_test_runner.run_and_handle(test_case)

View File

@ -85,6 +85,18 @@ def column_value_length_to_be_between(
],
)
if not max_value_length_value_res or not min_value_length_value_res:
msg = f"Error computing {test_case.name} for {runner.table.__tablename__}: missing max value length or min value length"
return TestCaseResult(
timestamp=execution_date,
testCaseStatus=TestCaseStatus.Aborted,
result=msg,
testResultValue=[
TestResultValue(name="minValueLength", value=None),
TestResultValue(name="maxValueLength", value=None),
],
)
min_bound = next(
(
float(param.value)
@ -102,8 +114,8 @@ def column_value_length_to_be_between(
status = (
TestCaseStatus.Success
if min_bound >= min_value_length_value_res
and max_bound <= max_value_length_value_res
if min_bound <= min_value_length_value_res
and max_bound >= max_value_length_value_res
else TestCaseStatus.Failed
)
result = (

View File

@ -56,7 +56,7 @@ regex expression to filter tables.
Sampling percentage to apply for profiling tables.
**Thread Count**
Number of thread to use when computing metrics for the profiler
Number of thread to use when computing metrics for the profiler. For Snowflake users we recommend setting it to 1. There is a known issue with one of the dependency (`snowflake-connector-python`) affecting projects with certain environments.
**Ingest Sample Data**
Whether the profiler should ingest sample data