Fixed db2 profiler (#9679)

This commit is contained in:
Onkar Ravgan 2023-01-11 20:40:25 +05:30 committed by GitHub
parent 1ec324e43e
commit 4de2dacbe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 108 additions and 4 deletions

View File

@ -0,0 +1,44 @@
source:
type: db2
serviceName: local_db2
serviceConnection:
config:
type: Db2
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:50000
database: default
sourceConfig:
config:
type: Profiler
generateSampleData: true
databaseFilterPattern:
includes:
- database
schemaFilterPattern:
includes:
- schema_one
excludes:
- schema_two
tableFilterPattern:
includes:
- orders
- customers
processor:
type: "orm-profiler"
config:
tableConfig:
- fullyQualifiedName: local_db2.database.schema_one.orders
profileSample: 85
columnConfig:
includeColumns:
- columnName: order_id
- columnName: order_date
- columnName: status
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -15,9 +15,32 @@ Table Column Count Metric definition
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
from sqlalchemy import inspect, literal from sqlalchemy import inspect, literal
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.sql.functions import FunctionElement
from metadata.orm_profiler.metrics.core import StaticMetric, _label from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.orm_profiler.orm.registry import Dialects
class ColunCountFn(FunctionElement):
name = __qualname__
inherit_cache = CACHE
@compiles(ColunCountFn)
def _(element, compiler, **kw):
return compiler.process(element.clauses, **kw)
@compiles(ColunCountFn, Dialects.IbmDbSa)
@compiles(ColunCountFn, Dialects.Db2)
def _(element, compiler, **kw):
"""Returns column count for db2 database and handles casting variables.
If casting is not provided for variables, db2 throws error.
"""
proc = compiler.process(element.clauses, **kw)
return f"CAST({proc} AS BIGINT)"
class ColumnCount(StaticMetric): class ColumnCount(StaticMetric):
@ -54,7 +77,7 @@ class ColumnCount(StaticMetric):
raise AttributeError( raise AttributeError(
"Column Count requires a table to be set: add_props(table=...)(Metrics.COLUMN_COUNT)" "Column Count requires a table to be set: add_props(table=...)(Metrics.COLUMN_COUNT)"
) )
return literal(len(inspect(self.table).c)) return ColunCountFn(literal(len(inspect(self.table).c)))
@_label @_label
def dl_fn(self, data_frame=None): def dl_fn(self, data_frame=None):

View File

@ -16,9 +16,32 @@ Table Column Count Metric definition
import sqlalchemy import sqlalchemy
from sqlalchemy import inspect, literal from sqlalchemy import inspect, literal
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.sql.functions import FunctionElement
from metadata.orm_profiler.metrics.core import StaticMetric, _label from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.orm_profiler.orm.registry import Dialects
class ColunNameFn(FunctionElement):
name = __qualname__
inherit_cache = CACHE
@compiles(ColunNameFn)
def _(element, compiler, **kw):
return compiler.process(element.clauses, **kw)
@compiles(ColunNameFn, Dialects.IbmDbSa)
@compiles(ColunNameFn, Dialects.Db2)
def _(element, compiler, **kw):
"""Returns column names for db2 database and handles casting variables.
If casting is not provided for variables, db2 throws error.
"""
proc = compiler.process(element.clauses, **kw)
return f"CAST({proc} AS VARCHAR)"
class ColumnNames(StaticMetric): class ColumnNames(StaticMetric):
@ -57,7 +80,7 @@ class ColumnNames(StaticMetric):
) )
col_names = ",".join(inspect(self.table).c.keys()) col_names = ",".join(inspect(self.table).c.keys())
return literal(col_names, type_=sqlalchemy.types.String) return ColunNameFn(literal(col_names, type_=sqlalchemy.types.String))
@_label @_label
def dl_fn(self, data_frame=None): def dl_fn(self, data_frame=None):

View File

@ -44,6 +44,8 @@ def _(element, compiler, **kw):
@compiles(LenFn, Dialects.Presto) @compiles(LenFn, Dialects.Presto)
@compiles(LenFn, Dialects.BigQuery) @compiles(LenFn, Dialects.BigQuery)
@compiles(LenFn, Dialects.Oracle) @compiles(LenFn, Dialects.Oracle)
@compiles(LenFn, Dialects.IbmDbSa)
@compiles(LenFn, Dialects.Db2)
def _(element, compiler, **kw): def _(element, compiler, **kw):
return "LENGTH(%s)" % compiler.process(element.clauses, **kw) return "LENGTH(%s)" % compiler.process(element.clauses, **kw)

View File

@ -56,6 +56,8 @@ def _(element, compiler, **kw):
@compiles(ModuloFn, Dialects.Oracle) @compiles(ModuloFn, Dialects.Oracle)
@compiles(ModuloFn, Dialects.Presto) @compiles(ModuloFn, Dialects.Presto)
@compiles(ModuloFn, Dialects.Trino) @compiles(ModuloFn, Dialects.Trino)
@compiles(ModuloFn, Dialects.IbmDbSa)
@compiles(ModuloFn, Dialects.Db2)
def _(element, compiler, **kw): def _(element, compiler, **kw):
"""Modulo function for specific dialect""" """Modulo function for specific dialect"""
value, base = validate_and_compile(element, compiler, **kw) value, base = validate_and_compile(element, compiler, **kw)

View File

@ -43,6 +43,8 @@ def _(*_, **__):
@compiles(RandomNumFn, Dialects.Hive) @compiles(RandomNumFn, Dialects.Hive)
@compiles(RandomNumFn, Dialects.MySQL) @compiles(RandomNumFn, Dialects.MySQL)
@compiles(RandomNumFn, Dialects.IbmDbSa)
@compiles(RandomNumFn, Dialects.Db2)
def _(*_, **__): def _(*_, **__):
return "ABS(RAND()) * 100" return "ABS(RAND()) * 100"

View File

@ -58,3 +58,11 @@ def _(element, compiler, **kw):
"""These database types have all int types as alias for int64 so don't need a cast""" """These database types have all int types as alias for int64 so don't need a cast"""
proc = compiler.process(element.clauses, **kw) proc = compiler.process(element.clauses, **kw)
return f"SUM({proc})" return f"SUM({proc})"
@compiles(SumFn, Dialects.IbmDbSa)
@compiles(SumFn, Dialects.Db2)
def _(element, compiler, **kw):
"""Handle the case for DB2 where it requires to type cast the variables"""
proc = compiler.process(element.clauses, **kw).replace("?", "CAST(? AS INT)")
return f"SUM(CAST({proc} AS BIGINT))"