Fixes Mssql Ntext, text and Image (#12490)

This commit is contained in:
Ayush Shah 2023-07-20 13:34:35 +05:30 committed by GitHub
parent fee567d714
commit ab1ec50c2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 399 additions and 183 deletions

View File

@ -116,7 +116,7 @@ class ColumnTypeParser:
"GEOGRAPHY": "GEOGRAPHY",
"GEOMETRY": "GEOMETRY",
"HYPERLOGLOG": "BINARY",
"IMAGE": "BINARY",
"IMAGE": "IMAGE",
"INT": "INT",
"INT2": "SMALLINT",
"INT4": "INT",
@ -150,7 +150,7 @@ class ColumnTypeParser:
"MEDIUMTEXT": "MEDIUMTEXT",
"MONEY": "NUMBER",
"NCHAR": "CHAR",
"NTEXT": "TEXT",
"NTEXT": "NTEXT",
"NULL": "NULL",
"NUMBER": "NUMBER",
"NUMERIC": "NUMERIC",
@ -290,18 +290,14 @@ class ColumnTypeParser:
@staticmethod
def get_column_type(column_type: Any) -> str:
column_type_result = ColumnTypeParser.get_column_type_mapping(column_type)
if column_type_result:
return column_type_result
column_type_result = ColumnTypeParser.get_source_type_mapping(column_type)
if column_type_result:
return column_type_result
column_type_result = ColumnTypeParser.get_source_type_containes_brackets(
column_type
)
if column_type_result:
return column_type_result
for func in [
ColumnTypeParser.get_column_type_mapping,
ColumnTypeParser.get_source_type_mapping,
ColumnTypeParser.get_source_type_containes_brackets,
]:
column_type_result = func(column_type)
if column_type_result:
return column_type_result
return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get("UNKNOWN")
@staticmethod

View File

@ -42,7 +42,10 @@ from metadata.utils.sqlalchemy_utils import (
logger = ingestion_logger()
# The ntext, text, and image data types will be removed in a future version of SQL Server.
# Avoid using these data types in new development work, and plan to modify applications that currently use them.
# Use nvarchar(max), varchar(max), and varbinary(max) instead.
# ref: https://learn.microsoft.com/en-us/sql/t-sql/data-types/ntext-text-and-image-transact-sql?view=sql-server-ver16
ischema_names.update(
{
"nvarchar": create_sqlalchemy_type("NVARCHAR"),

View File

@ -30,7 +30,7 @@ from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_SESSION_TAG_QUERY,
)
from metadata.profiler.orm.converter import ometa_to_sqa_orm
from metadata.profiler.orm.converter.base import ometa_to_sqa_orm
class SQAInterfaceMixin:

View File

@ -27,7 +27,6 @@ from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
from metadata.utils.helpers import format_large_string_numbers
from metadata.utils.logger import profiler_logger
from metadata.utils.sqa_utils import handle_array
logger = profiler_logger()
@ -166,7 +165,7 @@ class Histogram(HybridMetric):
ending_bin_bound = res_min + bin_width
if is_concatenable(self.col.type):
col = LenFn(column(self.col.name))
col = LenFn(column(self.col.name, self.col.type))
else:
col = column(self.col.name) # type: ignore

View File

@ -18,6 +18,7 @@ Count Metric definition
from sqlalchemy import column, func
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.count import CountFn
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -41,7 +42,7 @@ class Count(StaticMetric):
@_label
def fn(self):
"""sqlalchemy function"""
return func.count(column(self.col.name))
return func.count(CountFn(column(self.col.name, self.col.type)))
def df_fn(self, dfs=None):
"""pandas function"""

View File

@ -18,6 +18,7 @@ Distinct Count Metric definition
from sqlalchemy import column, distinct, func
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.count import CountFn
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -40,7 +41,7 @@ class DistinctCount(StaticMetric):
@_label
def fn(self):
return func.count(distinct(column(self.col.name)))
return func.count(distinct(CountFn(column(self.col.name, self.col.type))))
def df_fn(self, dfs=None):
from collections import Counter # pylint: disable=import-outside-toplevel

View File

@ -54,7 +54,7 @@ class Max(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if is_concatenable(self.col.type):
return MaxFn(LenFn(column(self.col.name)))
return MaxFn(LenFn(column(self.col.name, self.col.type)))
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
return None
return MaxFn(column(self.col.name))

View File

@ -49,7 +49,7 @@ class MaxLength(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if self._is_concatenable():
return func.max(LenFn(column(self.col.name)))
return func.max(LenFn(column(self.col.name, self.col.type)))
logger.debug(
f"Don't know how to process type {self.col.type} when computing MAX_LENGTH"

View File

@ -77,7 +77,7 @@ class Mean(StaticMetric):
return func.avg(column(self.col.name))
if is_concatenable(self.col.type):
return func.avg(LenFn(column(self.col.name)))
return func.avg(LenFn(column(self.col.name, self.col.type)))
logger.debug(
f"Don't know how to process type {self.col.type} when computing MEAN"

View File

@ -53,7 +53,7 @@ class Min(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if is_concatenable(self.col.type):
return MinFn(LenFn(column(self.col.name)))
return MinFn(LenFn(column(self.col.name, self.col.type)))
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
return None

View File

@ -49,7 +49,7 @@ class MinLength(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if self._is_concatenable():
return func.min(LenFn(column(self.col.name)))
return func.min(LenFn(column(self.col.name, self.col.type)))
logger.debug(
f"Don't know how to process type {self.col.type} when computing MIN_LENGTH"

View File

@ -86,7 +86,7 @@ class StdDev(StaticMetric):
return StdDevFn(column(self.col.name))
if is_concatenable(self.col.type):
return StdDevFn(LenFn(column(self.col.name)))
return StdDevFn(LenFn(column(self.col.name, self.col.type)))
logger.debug(
f"{self.col} has type {self.col.type}, which is not listed as quantifiable."

View File

@ -42,7 +42,7 @@ class Sum(StaticMetric):
return SumFn(column(self.col.name))
if is_concatenable(self.col.type):
return SumFn(LenFn(column(self.col.name)))
return SumFn(LenFn(column(self.col.name, self.col.type)))
return None

View File

@ -14,11 +14,14 @@ Unique Count Metric definition
"""
from typing import Optional
from sqlalchemy import column, func
from sqlalchemy import NVARCHAR, TEXT, column, func, literal_column
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.profiler.metrics.core import QueryMetric
from metadata.profiler.orm.converter.mssql.converter import cast_dict
from metadata.profiler.orm.functions.count import CountFn
from metadata.profiler.orm.registry import NOT_COMPUTE
from metadata.profiler.orm.types.custom_image import CustomImage
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -54,14 +57,28 @@ class UniqueCount(QueryMetric):
return None
# Run all queries on top of the sampled data
col = column(self.col.name)
only_once = (
session.query(func.count(col))
.select_from(sample)
.group_by(col)
.having(func.count(col) == 1) # Values that appear only once
col = column(self.col.name, self.col.type)
is_mssql = (
hasattr(session.bind, "dialect") and session.bind.dialect.name == "mssql"
)
is_mssql_deprecated_datatype = isinstance(
self.col.type, (CustomImage, TEXT, NVARCHAR)
)
count_fn = CountFn(col) if is_mssql and is_mssql_deprecated_datatype else col
group_by_col = (
func.convert(literal_column(cast_dict.get(type(self.col.type))), col)
if is_mssql and is_mssql_deprecated_datatype
else col
)
only_once = (
session.query(func.count(count_fn))
.select_from(sample)
.group_by(group_by_col)
.having(func.count(count_fn) == 1)
)
only_once_cte = only_once.cte("only_once")
return session.query(func.count().label(self.name())).select_from(only_once_cte)

View File

@ -61,7 +61,7 @@ class FirstQuartile(StaticMetric):
if is_concatenable(self.col.type):
return MedianFn(
LenFn(column(self.col.name)),
LenFn(column(self.col.name, self.col.type)),
self.col.table.fullname if self.col.table is not None else None,
0.25,
)

View File

@ -61,7 +61,7 @@ class Median(StaticMetric):
if is_concatenable(self.col.type):
return MedianFn(
LenFn(column(self.col.name)),
LenFn(column(self.col.name, self.col.type)),
self.col.table.fullname if self.col.table is not None else None,
0.5,
)

View File

@ -61,7 +61,7 @@ class ThirdQuartile(StaticMetric):
if is_concatenable(self.col.type):
return MedianFn(
LenFn(column(self.col.name)),
LenFn(column(self.col.name, self.col.type)),
self.col.table.fullname if self.col.table is not None else None,
0.75,
)

View File

@ -13,90 +13,22 @@
Converter logic to transform an OpenMetadata Table Entity
to an SQLAlchemy ORM class.
"""
from typing import Optional, cast
import sqlalchemy
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeMeta, declarative_base
from metadata.generated.schema.entity.data.database import Database, databaseService
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source import sqa_types
from metadata.profiler.orm.registry import CustomTypes
from metadata.profiler.source.bigquery.type_mapper import bigquery_type_mapper
Base = declarative_base()
_TYPE_MAP = {
DataType.NUMBER: sqlalchemy.NUMERIC,
DataType.TINYINT: sqlalchemy.SMALLINT,
DataType.SMALLINT: sqlalchemy.SMALLINT,
DataType.INT: sqlalchemy.INT,
DataType.BIGINT: sqlalchemy.BIGINT,
DataType.BYTEINT: sqlalchemy.SMALLINT,
DataType.BYTES: CustomTypes.BYTES.value,
DataType.FLOAT: sqlalchemy.FLOAT,
DataType.DOUBLE: sqlalchemy.DECIMAL,
DataType.DECIMAL: sqlalchemy.DECIMAL,
DataType.NUMERIC: sqlalchemy.NUMERIC,
DataType.TIMESTAMP: CustomTypes.TIMESTAMP.value,
DataType.TIME: sqlalchemy.TIME,
DataType.DATE: sqlalchemy.DATE,
DataType.DATETIME: sqlalchemy.DATETIME,
DataType.INTERVAL: sqlalchemy.Interval,
DataType.STRING: sqlalchemy.String,
DataType.MEDIUMTEXT: sqlalchemy.TEXT,
DataType.TEXT: sqlalchemy.TEXT,
DataType.CHAR: sqlalchemy.CHAR,
DataType.VARCHAR: sqlalchemy.VARCHAR,
DataType.BOOLEAN: sqlalchemy.BOOLEAN,
DataType.BINARY: sqlalchemy.LargeBinary,
DataType.VARBINARY: sqlalchemy.VARBINARY,
DataType.ARRAY: CustomTypes.ARRAY.value,
DataType.BLOB: CustomTypes.BYTES.value,
DataType.LONGBLOB: sqlalchemy.LargeBinary,
DataType.MEDIUMBLOB: sqlalchemy.LargeBinary,
DataType.MAP: sqa_types.SQAMap,
DataType.STRUCT: sqa_types.SQAStruct,
DataType.UNION: sqa_types.SQAUnion,
DataType.SET: sqa_types.SQASet,
DataType.GEOGRAPHY: sqa_types.SQASGeography,
DataType.ENUM: sqlalchemy.Enum,
DataType.JSON: sqlalchemy.JSON,
DataType.UUID: CustomTypes.UUID.value,
DataType.BYTEA: CustomTypes.BYTEA.value,
}
SQA_RESERVED_ATTRIBUTES = ["metadata"]
def map_types(col: Column, table_service_type):
"""returns an ORM type"""
if col.arrayDataType:
return _TYPE_MAP.get(col.dataType)(item_type=col.arrayDataType)
if (
table_service_type == databaseService.DatabaseServiceType.Snowflake
and col.dataType == DataType.JSON
):
# pylint: disable=import-outside-toplevel
from snowflake.sqlalchemy import VARIANT
return VARIANT
if (
table_service_type == databaseService.DatabaseServiceType.BigQuery
and col.dataType == DataType.STRUCT
):
return bigquery_type_mapper(_TYPE_MAP, col)
return _TYPE_MAP.get(col.dataType)
def check_snowflake_case_sensitive(table_service_type, table_or_col) -> Optional[bool]:
"""Check whether column or table name are not uppercase for snowflake table.
If so, then force quoting, If not return None to let engine backend handle the logic.
@ -130,31 +62,6 @@ def check_if_should_quote_column_name(table_service_type) -> Optional[bool]:
return None
def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column:
"""
Cook the ORM column from our metadata instance
information.
The first parsed column will be used arbitrarily
as the PK, as SQLAlchemy forces us to specify
at least one PK.
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
return sqlalchemy.Column(
name=str(col.name.__root__),
type_=map_types(col, table_service_type),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_if_should_quote_column_name(table_service_type)
or check_snowflake_case_sensitive(table_service_type, col.name.__root__),
key=str(
col.name.__root__
).lower(), # Add lowercase column name as key for snowflake case sensitive columns
)
def ometa_to_sqa_orm(
table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None
) -> DeclarativeMeta:
@ -167,6 +74,8 @@ def ometa_to_sqa_orm(
`type` and passing SQLAlchemy `Base` class
as the bases tuple for inheritance.
"""
# pylint: disable=import-outside-toplevel,cyclic-import
from metadata.profiler.orm.converter.dispatch_converter import build_orm_col
table.serviceType = cast(
databaseService.DatabaseServiceType, table.serviceType

View File

@ -0,0 +1,31 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Converter logic to transform an OpenMetadata Table Entity for Bigquery
to an SQLAlchemy ORM class.
"""
from metadata.generated.schema.entity.data.database import databaseService
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.profiler.orm.converter.common import CommonMapTypes
from metadata.profiler.source.bigquery.type_mapper import bigquery_type_mapper
class BigqueryMapTypes(CommonMapTypes):
def return_custom_type(self, col: Column, table_service_type):
if (
table_service_type == databaseService.DatabaseServiceType.BigQuery
and col.dataType == DataType.STRUCT
):
return bigquery_type_mapper(self._TYPE_MAP, col)
return super().return_custom_type(col, table_service_type)

View File

@ -0,0 +1,78 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common Class For Profiler Converter.
"""
import sqlalchemy
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.ingestion.source import sqa_types
from metadata.profiler.orm.registry import CustomTypes
class CommonMapTypes:
"""
Base Class for mapping types
"""
_TYPE_MAP = {
DataType.NUMBER: sqlalchemy.NUMERIC,
DataType.TINYINT: sqlalchemy.SMALLINT,
DataType.SMALLINT: sqlalchemy.SMALLINT,
DataType.INT: sqlalchemy.INT,
DataType.BIGINT: sqlalchemy.BIGINT,
DataType.BYTEINT: sqlalchemy.SMALLINT,
DataType.BYTES: CustomTypes.BYTES.value,
DataType.FLOAT: sqlalchemy.FLOAT,
DataType.DOUBLE: sqlalchemy.DECIMAL,
DataType.DECIMAL: sqlalchemy.DECIMAL,
DataType.NUMERIC: sqlalchemy.NUMERIC,
DataType.TIMESTAMP: CustomTypes.TIMESTAMP.value,
DataType.TIME: sqlalchemy.TIME,
DataType.DATE: sqlalchemy.DATE,
DataType.DATETIME: sqlalchemy.DATETIME,
DataType.INTERVAL: sqlalchemy.Interval,
DataType.STRING: sqlalchemy.String,
DataType.MEDIUMTEXT: sqlalchemy.TEXT,
DataType.TEXT: sqlalchemy.TEXT,
DataType.CHAR: sqlalchemy.CHAR,
DataType.VARCHAR: sqlalchemy.VARCHAR,
DataType.BOOLEAN: sqlalchemy.BOOLEAN,
DataType.BINARY: sqlalchemy.LargeBinary,
DataType.VARBINARY: sqlalchemy.VARBINARY,
DataType.ARRAY: CustomTypes.ARRAY.value,
DataType.BLOB: CustomTypes.BYTES.value,
DataType.LONGBLOB: sqlalchemy.LargeBinary,
DataType.MEDIUMBLOB: sqlalchemy.LargeBinary,
DataType.MAP: sqa_types.SQAMap,
DataType.STRUCT: sqa_types.SQAStruct,
DataType.UNION: sqa_types.SQAUnion,
DataType.SET: sqa_types.SQASet,
DataType.GEOGRAPHY: sqa_types.SQASGeography,
DataType.ENUM: sqlalchemy.Enum,
DataType.JSON: sqlalchemy.JSON,
DataType.UUID: CustomTypes.UUID.value,
DataType.BYTEA: CustomTypes.BYTEA.value,
DataType.NTEXT: sqlalchemy.NVARCHAR,
DataType.IMAGE: CustomTypes.IMAGE.value,
}
def map_types(self, col: Column, table_service_type):
"""returns an ORM type"""
if col.arrayDataType:
return self._TYPE_MAP.get(col.dataType)(item_type=col.arrayDataType)
return self.return_custom_type(col, table_service_type)
def return_custom_type(self, col: Column, _):
return self._TYPE_MAP.get(col.dataType)

View File

@ -0,0 +1,60 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Dispatch logic to map an Converter base based on dialect
"""
import sqlalchemy
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.profiler.orm.converter.bigquery.converter import BigqueryMapTypes
from metadata.profiler.orm.converter.snowflake.converter import SnowflakeMapTypes
converter_registry = {
DatabaseServiceType.BigQuery: BigqueryMapTypes,
DatabaseServiceType.Snowflake: SnowflakeMapTypes,
}
def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column:
"""
Cook the ORM column from our metadata instance
information.
The first parsed column will be used arbitrarily
as the PK, as SQLAlchemy forces us to specify
at least one PK.
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
# pylint: disable=import-outside-toplevel
from metadata.profiler.orm.converter.base import (
check_if_should_quote_column_name,
check_snowflake_case_sensitive,
)
from metadata.profiler.orm.converter.common import CommonMapTypes
return sqlalchemy.Column(
name=str(col.name.__root__),
type_=converter_registry.get(table_service_type, CommonMapTypes)().map_types(
col, table_service_type
),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_if_should_quote_column_name(table_service_type)
or check_snowflake_case_sensitive(table_service_type, col.name.__root__),
key=str(
col.name.__root__
).lower(), # Add lowercase column name as key for snowflake case sensitive columns
)

View File

@ -0,0 +1,25 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Map Types to convert/cast mssql related data types to relevant data types
"""
from sqlalchemy import NVARCHAR, TEXT
from metadata.profiler.orm.registry import CustomImage
cast_dict = {
CustomImage: "VARBINARY(max)",
TEXT: "VARCHAR(max)",
NVARCHAR: "NVARCHAR(max)",
}

View File

@ -0,0 +1,14 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Registry logic to map an Converter base based on dialect
"""

View File

@ -0,0 +1,33 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Converter logic to transform an OpenMetadata Table Entity for Snowflake
to an SQLAlchemy ORM class.
"""
from metadata.generated.schema.entity.data.database import databaseService
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.profiler.orm.converter.common import CommonMapTypes
class SnowflakeMapTypes(CommonMapTypes):
def return_custom_type(self, col: Column, table_service_type):
if (
table_service_type == databaseService.DatabaseServiceType.Snowflake
and col.dataType == DataType.JSON
):
# pylint: disable=import-outside-toplevel
from snowflake.sqlalchemy import VARIANT
return VARIANT
return super().return_custom_type(col, table_service_type)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Define Count function
"""
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.sql.sqltypes import NVARCHAR, TEXT
from metadata.profiler.metrics.core import CACHE
from metadata.profiler.orm.registry import Dialects
# Keep SQA docs style defining custom constructs
# pylint: disable=consider-using-f-string,duplicate-code
from metadata.profiler.orm.types.custom_image import CustomImage
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class CountFn(FunctionElement):
inherit_cache = CACHE
@compiles(CountFn)
def _(element, compiler, **kw):
return compiler.process(element.clauses, **kw)
@compiles(CountFn, Dialects.MSSQL)
def _(element, compiler, **kw):
col_type = element.clauses.clauses[0].type
if isinstance(col_type, (NVARCHAR, TEXT)):
return "cast(%s as [nvarchar])" % compiler.process(element.clauses, **kw)
if isinstance(col_type, CustomImage):
return "cast(%s as [varbinary])" % compiler.process(element.clauses, **kw)
return compiler.process(element.clauses, **kw)

View File

@ -15,6 +15,7 @@ Define Length function
# Keep SQA docs style defining custom constructs
# pylint: disable=consider-using-f-string,duplicate-code
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import sqltypes
from sqlalchemy.sql.functions import FunctionElement
from metadata.profiler.metrics.core import CACHE
@ -61,3 +62,10 @@ def _(element, compiler, **kw):
def _(element, compiler, **kw):
"""Handles lenght function for ClickHouse"""
return "length(%s)" % compiler.process(element.clauses, **kw)
@compiles(LenFn, Dialects.MSSQL)
def _(element, compiler, **kw):
if isinstance(element.clauses.clauses[0].type, (sqltypes.TEXT, sqltypes.NVARCHAR)):
return "LEN(CAST(%s as [nvarchar]))" % compiler.process(element.clauses, **kw)
return "LEN(%s)" % compiler.process(element.clauses, **kw)

View File

@ -21,6 +21,7 @@ from metadata.generated.schema.entity.data.table import DataType
from metadata.ingestion.source import sqa_types
from metadata.profiler.orm.types.bytea_to_string import ByteaToHex
from metadata.profiler.orm.types.custom_array import CustomArray
from metadata.profiler.orm.types.custom_image import CustomImage
from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp
from metadata.profiler.orm.types.hex_byte_string import HexByteString
from metadata.profiler.orm.types.uuid import UUIDString
@ -34,6 +35,7 @@ class CustomTypes(TypeRegistry):
BYTEA = ByteaToHex
ARRAY = CustomArray
TIMESTAMP = CustomTimestamp
IMAGE = CustomImage
class Dialects(Enum):

View File

@ -0,0 +1,30 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=abstract-method
"""
Expand sqlalchemy types to map them to OpenMetadata DataType
"""
from sqlalchemy.sql.sqltypes import VARBINARY, TypeDecorator
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class CustomImage(TypeDecorator):
"""
Convert RowVersion
"""
impl = VARBINARY
cache_ok = True

View File

@ -19,7 +19,6 @@ from typing import Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.api.workflow import Workflow

View File

@ -18,14 +18,11 @@ from typing import List
import pytest
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.api.workflow import Workflow
from .base.e2e_types import E2EType
from .base.test_cli import PATH_TO_RESOURCES
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
class DatalakeCliTest(CliCommonDB.TestSuite):

View File

@ -15,8 +15,6 @@ Hive E2E tests
from typing import List
from metadata.generated.schema.entity.data.table import Histogram
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods

View File

@ -12,7 +12,6 @@
"""
OpenMetadata API initialization
"""
import pytest
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,

View File

@ -46,7 +46,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.orm.converter import ometa_to_sqa_orm
from metadata.profiler.orm.converter.base import ometa_to_sqa_orm
from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp

View File

@ -16,9 +16,6 @@ import os
from unittest import TestCase, mock
from uuid import uuid4
import boto3
import botocore
import moto
from sqlalchemy import TEXT, Column, Date, DateTime, Integer, String, Time
from sqlalchemy.orm import declarative_base
@ -27,7 +24,6 @@ from metadata.generated.schema.entity.data.table import ColumnName, DataType, Ta
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.core import Profiler

View File

@ -36,17 +36,10 @@ from metadata.generated.schema.entity.data.table import (
TableProfile,
TableProfilerConfig,
)
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteConnection,
SQLiteScheme,
)
from metadata.ingestion.source import sqa_types
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.core import MetricTypes, add_props
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.core import MissingMetricException, Profiler

View File

@ -17,7 +17,7 @@ from unittest import TestCase, mock
from uuid import uuid4
import pytest
from sqlalchemy import TEXT, Column, Integer, String, func
from sqlalchemy import TEXT, Column, Integer, String
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import Column as EntityColumn

View File

@ -14,7 +14,7 @@ Test we map correctly struct columns for BQ
"""
from metadata.generated.schema.entity.data.table import Column
from metadata.profiler.orm.converter import _TYPE_MAP
from metadata.profiler.orm.converter.common import CommonMapTypes
from metadata.profiler.source.bigquery.type_mapper import bigquery_type_mapper
@ -56,7 +56,7 @@ def test_map_struct():
}
)
type_ = bigquery_type_mapper(_TYPE_MAP, column)
type_ = bigquery_type_mapper(CommonMapTypes()._TYPE_MAP, column)
assert (
type_.__repr__()
== "STRUCT(col1=String(), col2=STRUCT(col3=String(), col4=STRUCT(col5=CustomArray(<DataType.STRING: 'STRING'>))))"

View File

@ -17,18 +17,16 @@ from unittest.mock import patch
from uuid import UUID
from pytest import mark
from sqlalchemy import Column as SQAColumn
from sqlalchemy.sql.sqltypes import INTEGER, String
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.profiler.orm.converter import ometa_to_sqa_orm
from metadata.profiler.orm.converter.base import ometa_to_sqa_orm
@patch("metadata.profiler.orm.converter.get_orm_schema", return_value="schema")
@patch("metadata.profiler.orm.converter.get_orm_database", return_value="database")
@patch("metadata.profiler.orm.converter.base.get_orm_schema", return_value="schema")
@patch("metadata.profiler.orm.converter.base.get_orm_database", return_value="database")
@mark.parametrize(
"column_definition, table_name",
[
@ -87,8 +85,8 @@ def test_snowflake_case_sensitive_orm(
assert hasattr(orm_table, name)
@patch("metadata.profiler.orm.converter.get_orm_schema", return_value="schema")
@patch("metadata.profiler.orm.converter.get_orm_database", return_value="database")
@patch("metadata.profiler.orm.converter.base.get_orm_schema", return_value="schema")
@patch("metadata.profiler.orm.converter.base.get_orm_database", return_value="database")
def test_metadata_column(mock_schema, mock_database):
"""Test that snowflake case sensitive orm table
are enforced correctly

View File

@ -17,8 +17,8 @@ from datetime import datetime
from unittest import TestCase
import pytest
from sqlalchemy import Column, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql.sqltypes import Integer, String
from metadata.profiler.metrics.hybrid.histogram import Histogram
@ -31,7 +31,7 @@ from metadata.utils.profiler_utils import (
get_value_from_cache,
set_cache,
)
from metadata.utils.sqa_utils import handle_array, is_array
from metadata.utils.sqa_utils import is_array
from .conftest import Row

View File

@ -18,7 +18,6 @@ from unittest.mock import patch
import sqlalchemy as sqa
from pytest import raises
from sqlalchemy import MetaData
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import (
@ -31,7 +30,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
@ -41,7 +39,6 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)

View File

@ -20,7 +20,6 @@ from metadata.generated.schema.security.credentials.bitbucketCredentials import
from metadata.generated.schema.security.credentials.githubCredentials import (
GitHubCredentials,
)
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.readers.credentials import (
get_credentials_from_url,
update_repository_name,

View File

@ -16,8 +16,6 @@ import logging
import os
from unittest import TestCase
import pytest
from metadata.generated.schema.entity.data.table import DataType
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource

View File

@ -14,13 +14,8 @@ import unittest
from unittest.mock import patch
import pyarrow.parquet as pq
import pytest
from pandas import DataFrame
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)

View File

@ -277,7 +277,6 @@ def test_suite_validation_datalake(
request,
):
"""Generic test runner for test validations"""
import pandas as pd
test_case = request.getfixturevalue(test_case_name)
type_, val_1, val_2, status = expected

View File

@ -16,7 +16,6 @@ Usage via query logs tests
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,

View File

@ -16,7 +16,6 @@ from datetime import datetime, timedelta
from unittest import TestCase
from unittest.mock import patch
from looker_sdk.error import SDKError
from looker_sdk.sdk.api40.methods import Looker40SDK
from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard
from looker_sdk.sdk.api40.models import (

View File

@ -37,9 +37,6 @@ from metadata.ingestion.source.database.postgres.metadata import (
POLYGON,
PostgresSource,
)
from metadata.ingestion.source.database.postgres.query_parser import (
PostgresQueryParserSource,
)
from metadata.ingestion.source.database.postgres.usage import PostgresUsageSource
mock_postgres_config = {

View File

@ -17,8 +17,6 @@ Here we don't need to patch, as we can just create our own metastore
from unittest import TestCase
from unittest.mock import patch
from sqlalchemy.types import VARCHAR
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (

View File

@ -146,7 +146,9 @@
"POLYGON",
"TUPLE",
"SPATIAL",
"TABLE"
"TABLE",
"NTEXT",
"IMAGE"
]
},
"constraint": {