mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-22 07:58:06 +00:00
parent
dfd6286cb4
commit
eb906589fd
@ -52,7 +52,7 @@ class PostgresSource(SQLSource):
|
|||||||
def get_status(self) -> SourceStatus:
|
def get_status(self) -> SourceStatus:
|
||||||
return self.status
|
return self.status
|
||||||
|
|
||||||
def _is_partition(self, table_name: str, schema_name: str) -> bool:
|
def _is_partition(self, table_name: str, schema: str, inspector) -> bool:
|
||||||
cur = self.pgconn.cursor()
|
cur = self.pgconn.cursor()
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
@ -62,7 +62,7 @@ class PostgresSource(SQLSource):
|
|||||||
WHERE c.relname = %s
|
WHERE c.relname = %s
|
||||||
AND n.nspname = %s
|
AND n.nspname = %s
|
||||||
""",
|
""",
|
||||||
(table_name, schema_name),
|
(table_name, schema),
|
||||||
)
|
)
|
||||||
is_partition = cur.fetchone()[0]
|
is_partition = cur.fetchone()[0]
|
||||||
return is_partition
|
return is_partition
|
||||||
|
@ -69,9 +69,9 @@ class Histogram(QueryMetric):
|
|||||||
|
|
||||||
step = dict(bins.first())["step"]
|
step = dict(bins.first())["step"]
|
||||||
|
|
||||||
if step == 0:
|
if not step: # step == 0 or None for empty tables
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"MIN({col.name}) == MAX({col.name}). Aborting histogram computation."
|
f"MIN({col.name}) == MAX({col.name}) or EMPTY table. Aborting histogram computation."
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ from metadata.generated.schema.entity.services.databaseService import (
|
|||||||
DatabaseServiceType,
|
DatabaseServiceType,
|
||||||
)
|
)
|
||||||
from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label
|
from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label
|
||||||
from metadata.orm_profiler.orm.registry import is_quantifiable
|
from metadata.orm_profiler.orm.registry import Dialects, is_quantifiable
|
||||||
from metadata.orm_profiler.utils import logger
|
from metadata.orm_profiler.utils import logger
|
||||||
|
|
||||||
logger = logger()
|
logger = logger()
|
||||||
@ -36,12 +36,12 @@ def _(element, compiler, **kw):
|
|||||||
return "STDDEV_POP(%s)" % compiler.process(element.clauses, **kw)
|
return "STDDEV_POP(%s)" % compiler.process(element.clauses, **kw)
|
||||||
|
|
||||||
|
|
||||||
@compiles(StdDevFn, DatabaseServiceType.MSSQL.value.lower())
|
@compiles(StdDevFn, Dialects.MSSQL)
|
||||||
def _(element, compiler, **kw):
|
def _(element, compiler, **kw):
|
||||||
return "STDEVP(%s)" % compiler.process(element.clauses, **kw)
|
return "STDEVP(%s)" % compiler.process(element.clauses, **kw)
|
||||||
|
|
||||||
|
|
||||||
@compiles(StdDevFn, DatabaseServiceType.SQLite.value.lower()) # Needed for unit tests
|
@compiles(StdDevFn, Dialects.SQLite) # Needed for unit tests
|
||||||
def _(element, compiler, **kw):
|
def _(element, compiler, **kw):
|
||||||
"""
|
"""
|
||||||
This actually returns the squared STD, but as
|
This actually returns the squared STD, but as
|
||||||
|
@ -32,7 +32,7 @@ _TYPE_MAP = {
|
|||||||
DataType.INT: sqlalchemy.INT,
|
DataType.INT: sqlalchemy.INT,
|
||||||
DataType.BIGINT: sqlalchemy.BIGINT,
|
DataType.BIGINT: sqlalchemy.BIGINT,
|
||||||
DataType.BYTEINT: sqlalchemy.SMALLINT,
|
DataType.BYTEINT: sqlalchemy.SMALLINT,
|
||||||
DataType.BYTES: CustomTypes.BYTES,
|
DataType.BYTES: CustomTypes.BYTES.value,
|
||||||
DataType.FLOAT: sqlalchemy.FLOAT,
|
DataType.FLOAT: sqlalchemy.FLOAT,
|
||||||
DataType.DOUBLE: sqlalchemy.DECIMAL,
|
DataType.DOUBLE: sqlalchemy.DECIMAL,
|
||||||
DataType.DECIMAL: sqlalchemy.DECIMAL,
|
DataType.DECIMAL: sqlalchemy.DECIMAL,
|
||||||
@ -61,7 +61,7 @@ _TYPE_MAP = {
|
|||||||
# DataType.GEOGRAPHY: ...,
|
# DataType.GEOGRAPHY: ...,
|
||||||
DataType.ENUM: sqlalchemy.Enum,
|
DataType.ENUM: sqlalchemy.Enum,
|
||||||
DataType.JSON: sqlalchemy.JSON,
|
DataType.JSON: sqlalchemy.JSON,
|
||||||
DataType.UUID: CustomTypes.UUID,
|
DataType.UUID: CustomTypes.UUID.value,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ from metadata.generated.schema.entity.services.databaseService import (
|
|||||||
DatabaseServiceType,
|
DatabaseServiceType,
|
||||||
)
|
)
|
||||||
from metadata.orm_profiler.metrics.core import CACHE
|
from metadata.orm_profiler.metrics.core import CACHE
|
||||||
|
from metadata.orm_profiler.orm.registry import Dialects
|
||||||
from metadata.orm_profiler.utils import logger
|
from metadata.orm_profiler.utils import logger
|
||||||
|
|
||||||
logger = logger()
|
logger = logger()
|
||||||
@ -33,9 +34,9 @@ def _(element, compiler, **kw):
|
|||||||
return "CONCAT(%s)" % compiler.process(element.clauses, **kw)
|
return "CONCAT(%s)" % compiler.process(element.clauses, **kw)
|
||||||
|
|
||||||
|
|
||||||
@compiles(ConcatFn, DatabaseServiceType.Redshift.value.lower())
|
@compiles(ConcatFn, Dialects.Redshift)
|
||||||
@compiles(ConcatFn, DatabaseServiceType.SQLite.value.lower())
|
@compiles(ConcatFn, Dialects.SQLite)
|
||||||
@compiles(ConcatFn, DatabaseServiceType.Vertica.value.lower())
|
@compiles(ConcatFn, Dialects.Vertica)
|
||||||
def _(element, compiler, **kw):
|
def _(element, compiler, **kw):
|
||||||
"""
|
"""
|
||||||
This actually returns the squared STD, but as
|
This actually returns the squared STD, but as
|
||||||
|
@ -15,10 +15,8 @@ Define Length function
|
|||||||
from sqlalchemy.ext.compiler import compiles
|
from sqlalchemy.ext.compiler import compiles
|
||||||
from sqlalchemy.sql.functions import FunctionElement
|
from sqlalchemy.sql.functions import FunctionElement
|
||||||
|
|
||||||
from metadata.generated.schema.entity.services.databaseService import (
|
|
||||||
DatabaseServiceType,
|
|
||||||
)
|
|
||||||
from metadata.orm_profiler.metrics.core import CACHE
|
from metadata.orm_profiler.metrics.core import CACHE
|
||||||
|
from metadata.orm_profiler.orm.registry import Dialects
|
||||||
from metadata.orm_profiler.utils import logger
|
from metadata.orm_profiler.utils import logger
|
||||||
|
|
||||||
logger = logger()
|
logger = logger()
|
||||||
@ -33,10 +31,9 @@ def _(element, compiler, **kw):
|
|||||||
return "LEN(%s)" % compiler.process(element.clauses, **kw)
|
return "LEN(%s)" % compiler.process(element.clauses, **kw)
|
||||||
|
|
||||||
|
|
||||||
@compiles(LenFn, DatabaseServiceType.SQLite.value.lower())
|
@compiles(LenFn, Dialects.SQLite)
|
||||||
@compiles(LenFn, DatabaseServiceType.Vertica.value.lower())
|
@compiles(LenFn, Dialects.Vertica)
|
||||||
@compiles(
|
@compiles(LenFn, Dialects.Hive) # For some reason hive's dialect is in bytes...
|
||||||
LenFn, DatabaseServiceType.Hive.value.lower().encode()
|
@compiles(LenFn, Dialects.Postgres)
|
||||||
) # For some reason hive's dialect is in bytes...
|
|
||||||
def _(element, compiler, **kw):
|
def _(element, compiler, **kw):
|
||||||
return "LENGTH(%s)" % compiler.process(element.clauses, **kw)
|
return "LENGTH(%s)" % compiler.process(element.clauses, **kw)
|
||||||
|
@ -23,6 +23,7 @@ from metadata.generated.schema.entity.services.databaseService import (
|
|||||||
DatabaseServiceType,
|
DatabaseServiceType,
|
||||||
)
|
)
|
||||||
from metadata.orm_profiler.metrics.core import CACHE
|
from metadata.orm_profiler.metrics.core import CACHE
|
||||||
|
from metadata.orm_profiler.orm.registry import Dialects
|
||||||
from metadata.orm_profiler.utils import logger
|
from metadata.orm_profiler.utils import logger
|
||||||
|
|
||||||
logger = logger()
|
logger = logger()
|
||||||
@ -43,12 +44,12 @@ def _(*_, **__):
|
|||||||
return "ABS(RANDOM()) * 100"
|
return "ABS(RANDOM()) * 100"
|
||||||
|
|
||||||
|
|
||||||
@compiles(RandomNumFn, DatabaseServiceType.MySQL.value.lower())
|
@compiles(RandomNumFn, Dialects.MySQL)
|
||||||
def _(*_, **__):
|
def _(*_, **__):
|
||||||
return "ABS(RAND()) * 100"
|
return "ABS(RAND()) * 100"
|
||||||
|
|
||||||
|
|
||||||
@compiles(RandomNumFn, DatabaseServiceType.SQLite.value.lower())
|
@compiles(RandomNumFn, Dialects.SQLite)
|
||||||
def _(*_, **__):
|
def _(*_, **__):
|
||||||
"""
|
"""
|
||||||
SQLite random returns a number between -9223372036854775808
|
SQLite random returns a number between -9223372036854775808
|
||||||
@ -57,7 +58,7 @@ def _(*_, **__):
|
|||||||
return "ABS(RANDOM()) % 100"
|
return "ABS(RANDOM()) % 100"
|
||||||
|
|
||||||
|
|
||||||
@compiles(RandomNumFn, DatabaseServiceType.MSSQL.value.lower())
|
@compiles(RandomNumFn, Dialects.MSSQL)
|
||||||
def _(*_, **__):
|
def _(*_, **__):
|
||||||
"""
|
"""
|
||||||
MSSQL RANDOM() function returns the same single
|
MSSQL RANDOM() function returns the same single
|
||||||
|
@ -15,7 +15,7 @@ without having an import mess
|
|||||||
"""
|
"""
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from sqlalchemy import Integer, Numeric
|
from sqlalchemy import Integer, Numeric
|
||||||
from sqlalchemy.sql.sqltypes import Concatenable
|
from sqlalchemy.sql.sqltypes import Concatenable, Enum
|
||||||
|
|
||||||
from metadata.orm_profiler.orm.types.hex_byte_string import HexByteString
|
from metadata.orm_profiler.orm.types.hex_byte_string import HexByteString
|
||||||
from metadata.orm_profiler.orm.types.uuid import UUIDString
|
from metadata.orm_profiler.orm.types.uuid import UUIDString
|
||||||
@ -27,6 +27,37 @@ class CustomTypes(TypeRegistry):
|
|||||||
UUID = UUIDString
|
UUID = UUIDString
|
||||||
|
|
||||||
|
|
||||||
|
class Dialects(Enum):
|
||||||
|
"""
|
||||||
|
Map the service types from DatabaseServiceType
|
||||||
|
to the dialect scheme name used for ingesting
|
||||||
|
and profiling data.
|
||||||
|
"""
|
||||||
|
|
||||||
|
Hive = b"hive"
|
||||||
|
Postgres = "postgresql"
|
||||||
|
BigQuery = "bigquery"
|
||||||
|
MySQL = "mysql"
|
||||||
|
Redshift = "redshift"
|
||||||
|
Snowflake = "snowflake"
|
||||||
|
MSSQL = "mssql"
|
||||||
|
Oracle = "oracle"
|
||||||
|
Athena = "athena"
|
||||||
|
Presto = "presto"
|
||||||
|
Trino = "Trino"
|
||||||
|
Vertica = "vertica"
|
||||||
|
Glue = "glue"
|
||||||
|
MariaDB = "mariadb"
|
||||||
|
Druid = "druid"
|
||||||
|
Db2 = "db2"
|
||||||
|
ClickHouse = "clickhouse"
|
||||||
|
Databricks = "databricks"
|
||||||
|
DynamoDB = "dynamoDB"
|
||||||
|
AzureSQL = "azuresql"
|
||||||
|
SingleStore = "singlestore"
|
||||||
|
SQLite = "sqlite"
|
||||||
|
|
||||||
|
|
||||||
# Sometimes we want to skip certain types for computing metrics.
|
# Sometimes we want to skip certain types for computing metrics.
|
||||||
# If the type is NULL, then we won't run the metric execution
|
# If the type is NULL, then we won't run the metric execution
|
||||||
# in the profiler.
|
# in the profiler.
|
||||||
@ -34,6 +65,7 @@ class CustomTypes(TypeRegistry):
|
|||||||
NOT_COMPUTE = {
|
NOT_COMPUTE = {
|
||||||
sqlalchemy.types.NullType,
|
sqlalchemy.types.NullType,
|
||||||
sqlalchemy.ARRAY,
|
sqlalchemy.ARRAY,
|
||||||
|
sqlalchemy.JSON,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ class ColumnTypeParser:
|
|||||||
"DATETIMEOFFSET": "DATETIME",
|
"DATETIMEOFFSET": "DATETIME",
|
||||||
"DECIMAL": "DECIMAL",
|
"DECIMAL": "DECIMAL",
|
||||||
"DOUBLE PRECISION": "DOUBLE",
|
"DOUBLE PRECISION": "DOUBLE",
|
||||||
|
"DOUBLE_PRECISION": "DOUBLE",
|
||||||
"DOUBLE": "DOUBLE",
|
"DOUBLE": "DOUBLE",
|
||||||
"ENUM": "ENUM",
|
"ENUM": "ENUM",
|
||||||
"FLOAT": "FLOAT",
|
"FLOAT": "FLOAT",
|
||||||
@ -101,7 +102,6 @@ class ColumnTypeParser:
|
|||||||
"INTERVAL DAY TO SECOND": "INTERVAL",
|
"INTERVAL DAY TO SECOND": "INTERVAL",
|
||||||
"INTERVAL YEAR TO MONTH": "INTERVAL",
|
"INTERVAL YEAR TO MONTH": "INTERVAL",
|
||||||
"INTERVAL": "INTERVAL",
|
"INTERVAL": "INTERVAL",
|
||||||
"JSON": "JSON",
|
|
||||||
"LONG RAW": "BINARY",
|
"LONG RAW": "BINARY",
|
||||||
"LONG VARCHAR": "VARCHAR",
|
"LONG VARCHAR": "VARCHAR",
|
||||||
"LONGBLOB": "LONGBLOB",
|
"LONGBLOB": "LONGBLOB",
|
||||||
|
@ -574,3 +574,28 @@ class MetricsTest(TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert res.get(User.name.name)[Metrics.COUNT_IN_SET.name] == 3
|
assert res.get(User.name.name)[Metrics.COUNT_IN_SET.name] == 3
|
||||||
|
|
||||||
|
def test_histogram_empty(self):
|
||||||
|
"""
|
||||||
|
Run the histogram on an empty table
|
||||||
|
"""
|
||||||
|
|
||||||
|
class EmptyUser(Base):
|
||||||
|
__tablename__ = "empty_users"
|
||||||
|
id = Column(Integer, primary_key=True)
|
||||||
|
name = Column(String(256))
|
||||||
|
fullname = Column(String(256))
|
||||||
|
nickname = Column(String(256))
|
||||||
|
comments = Column(TEXT)
|
||||||
|
age = Column(Integer)
|
||||||
|
|
||||||
|
EmptyUser.__table__.create(bind=self.engine)
|
||||||
|
|
||||||
|
hist = add_props(bins=5)(Metrics.HISTOGRAM.value)
|
||||||
|
res = (
|
||||||
|
Profiler(hist, session=self.session, table=EmptyUser, use_cols=[User.age])
|
||||||
|
.execute()
|
||||||
|
._column_results
|
||||||
|
)
|
||||||
|
|
||||||
|
assert res.get(User.age.name).get(Metrics.HISTOGRAM.name) is None
|
||||||
|
Loading…
x
Reference in New Issue
Block a user