Refactor converter, unique count, add registry (#12939)

This commit is contained in:
Ayush Shah 2023-08-23 11:43:12 +05:30 committed by GitHub
parent 74e8f0b32c
commit 1e8ea26bbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 118 additions and 108 deletions

View File

@ -14,14 +14,12 @@ Unique Count Metric definition
"""
from typing import Optional
from sqlalchemy import NVARCHAR, TEXT, column, func, literal_column
from sqlalchemy import column, func
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.profiler.metrics.core import QueryMetric
from metadata.profiler.orm.converter.mssql.converter import cast_dict
from metadata.profiler.orm.functions.count import CountFn
from metadata.profiler.orm.functions.unique_count import _unique_count_query_mapper
from metadata.profiler.orm.registry import NOT_COMPUTE
from metadata.profiler.orm.types.custom_image import CustomImage
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -58,28 +56,10 @@ class UniqueCount(QueryMetric):
# Run all queries on top of the sampled data
col = column(self.col.name, self.col.type)
is_mssql = (
hasattr(session.bind, "dialect") and session.bind.dialect.name == "mssql"
unique_count_query = _unique_count_query_mapper[session.bind.dialect.name](
col, session, sample
)
is_mssql_deprecated_datatype = isinstance(
self.col.type, (CustomImage, TEXT, NVARCHAR)
)
count_fn = CountFn(col) if is_mssql and is_mssql_deprecated_datatype else col
group_by_col = (
func.convert(literal_column(cast_dict.get(type(self.col.type))), col)
if is_mssql and is_mssql_deprecated_datatype
else col
)
only_once = (
session.query(func.count(count_fn))
.select_from(sample)
.group_by(group_by_col)
.having(func.count(count_fn) == 1)
)
only_once_cte = only_once.cte("only_once")
only_once_cte = unique_count_query.cte("only_once")
return session.query(func.count().label(self.name())).select_from(only_once_cte)
def df_fn(self, dfs=None):

View File

@ -15,13 +15,15 @@ to an SQLAlchemy ORM class.
"""
from typing import Optional, cast
import sqlalchemy
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeMeta, declarative_base
from metadata.generated.schema.entity.data.database import Database, databaseService
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.orm.converter.converter_registry import converter_registry
Base = declarative_base()
@ -62,6 +64,32 @@ def check_if_should_quote_column_name(table_service_type) -> Optional[bool]:
return None
def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column:
"""
Cook the ORM column from our metadata instance
information.
The first parsed column will be used arbitrarily
as the PK, as SQLAlchemy forces us to specify
at least one PK.
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
return sqlalchemy.Column(
name=str(col.name.__root__),
type_=converter_registry[table_service_type]().map_types(
col, table_service_type
),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_if_should_quote_column_name(table_service_type)
or check_snowflake_case_sensitive(table_service_type, col.name.__root__),
key=str(
col.name.__root__
).lower(), # Add lowercase column name as key for snowflake case sensitive columns
)
def ometa_to_sqa_orm(
table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None
) -> DeclarativeMeta:
@ -74,9 +102,6 @@ def ometa_to_sqa_orm(
`type` and passing SQLAlchemy `Base` class
as the bases tuple for inheritance.
"""
# pylint: disable=import-outside-toplevel,cyclic-import
from metadata.profiler.orm.converter.dispatch_converter import build_orm_col
table.serviceType = cast(
databaseService.DatabaseServiceType, table.serviceType
) # satisfy mypy

View File

@ -0,0 +1,26 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Dispatch logic to map an Converter base based on dialect
"""
from collections import defaultdict
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.profiler.orm.converter.bigquery.converter import BigqueryMapTypes
from metadata.profiler.orm.converter.common import CommonMapTypes
from metadata.profiler.orm.converter.snowflake.converter import SnowflakeMapTypes
converter_registry = defaultdict(lambda: CommonMapTypes)
converter_registry[DatabaseServiceType.BigQuery] = BigqueryMapTypes
converter_registry[DatabaseServiceType.Snowflake] = SnowflakeMapTypes

View File

@ -1,60 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Dispatch logic to map an Converter base based on dialect
"""
import sqlalchemy
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.profiler.orm.converter.bigquery.converter import BigqueryMapTypes
from metadata.profiler.orm.converter.snowflake.converter import SnowflakeMapTypes
converter_registry = {
DatabaseServiceType.BigQuery: BigqueryMapTypes,
DatabaseServiceType.Snowflake: SnowflakeMapTypes,
}
def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column:
"""
Cook the ORM column from our metadata instance
information.
The first parsed column will be used arbitrarily
as the PK, as SQLAlchemy forces us to specify
at least one PK.
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
# pylint: disable=import-outside-toplevel
from metadata.profiler.orm.converter.base import (
check_if_should_quote_column_name,
check_snowflake_case_sensitive,
)
from metadata.profiler.orm.converter.common import CommonMapTypes
return sqlalchemy.Column(
name=str(col.name.__root__),
type_=converter_registry.get(table_service_type, CommonMapTypes)().map_types(
col, table_service_type
),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_if_should_quote_column_name(table_service_type)
or check_snowflake_case_sensitive(table_service_type, col.name.__root__),
key=str(
col.name.__root__
).lower(), # Add lowercase column name as key for snowflake case sensitive columns
)

View File

@ -1,14 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Registry logic to map an Converter base based on dialect
"""

View File

@ -10,11 +10,7 @@
# limitations under the License.
"""
Define Random Number function
Returns a column with random values
between 0 and 100 to help us draw sample
data.
Define Sum function
"""
from sqlalchemy.ext.compiler import compiles

View File

@ -0,0 +1,57 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unique Count Metric functions
"""
from collections import defaultdict
from sqlalchemy import NVARCHAR, TEXT, func, literal_column
from metadata.profiler.orm.converter.mssql.converter import cast_dict
from metadata.profiler.orm.functions.count import CountFn
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.orm.types.custom_image import CustomImage
def _unique_count_query(col, session, sample):
return (
session.query(func.count(col))
.select_from(sample)
.group_by(col)
.having(func.count(col) == 1)
)
def _unique_count_query_mssql(col, session, sample):
# The ntext, text, and image data types will be removed in a future version of SQL Server.
# Avoid using these data types in new development work, and plan to modify applications that currently use them.
# Use nvarchar(max), varchar(max), and varbinary(max) instead.
# ref:https://learn.microsoft.com/en-us/sql/t-sql/data-types/ntext-text-and-image-transact-sql?view=sql-server-ver16
is_mssql_deprecated_datatype = isinstance(col.type, (CustomImage, TEXT, NVARCHAR))
if is_mssql_deprecated_datatype:
count_fn = CountFn(col)
group_by_col = func.convert(literal_column(cast_dict.get(type(col.type))), col)
else:
count_fn = col
group_by_col = col
return (
session.query(func.count(count_fn))
.select_from(sample)
.group_by(group_by_col)
.having(func.count(count_fn) == 1)
)
_unique_count_query_mapper = defaultdict(lambda: _unique_count_query)
_unique_count_query_mapper[Dialects.MSSQL] = _unique_count_query_mssql