diff --git a/ingestion/src/metadata/examples/workflows/db2_profiler.yaml b/ingestion/src/metadata/examples/workflows/db2_profiler.yaml new file mode 100644 index 00000000000..1c8121d4ad5 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/db2_profiler.yaml @@ -0,0 +1,44 @@ +source: + type: db2 + serviceName: local_db2 + serviceConnection: + config: + type: Db2 + username: openmetadata_user + password: openmetadata_password + hostPort: localhost:50000 + database: default + sourceConfig: + config: + type: Profiler + generateSampleData: true + databaseFilterPattern: + includes: + - database + schemaFilterPattern: + includes: + - schema_one + excludes: + - schema_two + tableFilterPattern: + includes: + - orders + - customers +processor: + type: "orm-profiler" + config: + tableConfig: + - fullyQualifiedName: local_db2.database.schema_one.orders + profileSample: 85 + columnConfig: + includeColumns: + - columnName: order_id + - columnName: order_date + - columnName: status +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth \ No newline at end of file diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py b/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py index 4234f25a029..573ef4e82d2 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/column_count.py @@ -15,9 +15,32 @@ Table Column Count Metric definition # pylint: disable=duplicate-code from sqlalchemy import inspect, literal +from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import DeclarativeMeta +from sqlalchemy.sql.functions import FunctionElement -from metadata.orm_profiler.metrics.core import StaticMetric, _label +from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label +from metadata.orm_profiler.orm.registry import Dialects + + +class ColunCountFn(FunctionElement): + name = __qualname__ + inherit_cache = CACHE + + +@compiles(ColunCountFn) +def _(element, compiler, **kw): + return compiler.process(element.clauses, **kw) + + +@compiles(ColunCountFn, Dialects.IbmDbSa) +@compiles(ColunCountFn, Dialects.Db2) +def _(element, compiler, **kw): + """Returns column count for db2 database and handles casting variables. + If casting is not provided for variables, db2 throws error. + """ + proc = compiler.process(element.clauses, **kw) + return f"CAST({proc} AS BIGINT)" class ColumnCount(StaticMetric): @@ -54,7 +77,7 @@ class ColumnCount(StaticMetric): raise AttributeError( "Column Count requires a table to be set: add_props(table=...)(Metrics.COLUMN_COUNT)" ) - return literal(len(inspect(self.table).c)) + return ColunCountFn(literal(len(inspect(self.table).c))) @_label def dl_fn(self, data_frame=None): diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/column_names.py b/ingestion/src/metadata/orm_profiler/metrics/static/column_names.py index e2d5dd723ef..0e65cdf59e4 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/column_names.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/column_names.py @@ -16,9 +16,32 @@ Table Column Count Metric definition import sqlalchemy from sqlalchemy import inspect, literal +from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import DeclarativeMeta +from sqlalchemy.sql.functions import FunctionElement -from metadata.orm_profiler.metrics.core import StaticMetric, _label +from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label +from metadata.orm_profiler.orm.registry import Dialects + + +class ColunNameFn(FunctionElement): + name = __qualname__ + inherit_cache = CACHE + + +@compiles(ColunNameFn) +def _(element, compiler, **kw): + return compiler.process(element.clauses, **kw) + + +@compiles(ColunNameFn, Dialects.IbmDbSa) +@compiles(ColunNameFn, Dialects.Db2) +def _(element, compiler, **kw): + """Returns column names for db2 database and handles casting variables. + If casting is not provided for variables, db2 throws error. + """ + proc = compiler.process(element.clauses, **kw) + return f"CAST({proc} AS VARCHAR)" class ColumnNames(StaticMetric): @@ -57,7 +80,7 @@ class ColumnNames(StaticMetric): ) col_names = ",".join(inspect(self.table).c.keys()) - return literal(col_names, type_=sqlalchemy.types.String) + return ColunNameFn(literal(col_names, type_=sqlalchemy.types.String)) @_label def dl_fn(self, data_frame=None): diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/length.py b/ingestion/src/metadata/orm_profiler/orm/functions/length.py index 74d4f9c99ee..717d4e2495e 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/length.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/length.py @@ -44,6 +44,8 @@ def _(element, compiler, **kw): @compiles(LenFn, Dialects.Presto) @compiles(LenFn, Dialects.BigQuery) @compiles(LenFn, Dialects.Oracle) +@compiles(LenFn, Dialects.IbmDbSa) +@compiles(LenFn, Dialects.Db2) def _(element, compiler, **kw): return "LENGTH(%s)" % compiler.process(element.clauses, **kw) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py index 5d40186c575..d7a5c6eec28 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py @@ -56,6 +56,8 @@ def _(element, compiler, **kw): @compiles(ModuloFn, Dialects.Oracle) @compiles(ModuloFn, Dialects.Presto) @compiles(ModuloFn, Dialects.Trino) +@compiles(ModuloFn, Dialects.IbmDbSa) +@compiles(ModuloFn, Dialects.Db2) def _(element, compiler, **kw): """Modulo function for specific dialect""" value, base = validate_and_compile(element, compiler, **kw) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py b/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py index d26e5ccf364..011909b0e6a 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py @@ -43,6 +43,8 @@ def _(*_, **__): @compiles(RandomNumFn, Dialects.Hive) @compiles(RandomNumFn, Dialects.MySQL) +@compiles(RandomNumFn, Dialects.IbmDbSa) +@compiles(RandomNumFn, Dialects.Db2) def _(*_, **__): return "ABS(RAND()) * 100" diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/sum.py b/ingestion/src/metadata/orm_profiler/orm/functions/sum.py index d967c9e9ba3..30abe11e85f 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/sum.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/sum.py @@ -58,3 +58,11 @@ def _(element, compiler, **kw): """These database types have all int types as alias for int64 so don't need a cast""" proc = compiler.process(element.clauses, **kw) return f"SUM({proc})" + + +@compiles(SumFn, Dialects.IbmDbSa) +@compiles(SumFn, Dialects.Db2) +def _(element, compiler, **kw): + """Handle the case for DB2 where it requires to type cast the variables""" + proc = compiler.process(element.clauses, **kw).replace("?", "CAST(? AS INT)") + return f"SUM(CAST({proc} AS BIGINT))"