diff --git a/ingestion/src/metadata/profiler/orm/functions/datetime.py b/ingestion/src/metadata/profiler/orm/functions/datetime.py index 974ce383036..cf0d94b649c 100644 --- a/ingestion/src/metadata/profiler/orm/functions/datetime.py +++ b/ingestion/src/metadata/profiler/orm/functions/datetime.py @@ -25,6 +25,9 @@ from metadata.utils.logger import profiler_logger logger = profiler_logger() +# -------------- +# Date Functions +# -------------- class DateAddFn(FunctionElement): inherit_cache = CACHE @@ -97,12 +100,145 @@ def _(elements, compiler, **kwargs): # pylint: disable=unused-argument return f"DATE({func.current_date()}, '-{interval} {interval_unit}')" +# ------------------ +# Datetime Functions +# ------------------ class DatetimeAddFn(FunctionElement): inherit_cache = CACHE @compiles(DatetimeAddFn) def _(elements, compiler, **kwargs): + """generic date and datetime function""" + return generic_function(elements, compiler, **kwargs) + + +@compiles(DatetimeAddFn, Dialects.MySQL) +def _(elements, compiler, **kwargs): + """MySQL date and datetime function""" + return mysql_function(elements, compiler, **kwargs) + + +@compiles(DatetimeAddFn, Dialects.BigQuery) +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument + """BigQuery date and datetime function""" + interval = elements.clauses.clauses[0].value + interval_unit = elements.clauses.clauses[1].text + + return ( + f"DATETIME_SUB({func.current_datetime()}, INTERVAL {interval} {interval_unit})" + ) + + +@compiles(DatetimeAddFn, Dialects.Db2) +@compiles(DatetimeAddFn, Dialects.IbmDbSa) +def _(elements, compiler, **kwargs): + """DB2 datetime function""" + return db2_function(elements, compiler, **kwargs) + + +@compiles(DatetimeAddFn, Dialects.ClickHouse) +def _(elements, compiler, **kwargs): + """Clickhouse datetime function""" + return clickhouse_function(elements, compiler, **kwargs) + + +@compiles(DatetimeAddFn, Dialects.AzureSQL) +@compiles(DatetimeAddFn, Dialects.MSSQL) +@compiles(DatetimeAddFn, Dialects.Snowflake) +def _(elements, compiler, **kwargs): + """AzreSQL, MSSQL, Snowflake datetime function""" + return azure_mssql_snflk_function(elements, compiler, **kwargs) + + +@compiles(DatetimeAddFn, Dialects.Redshift) +def _(elements, compiler, **kwargs): + """Redshift datetime function""" + return redshift_function(elements, compiler, **kwargs) + + +@compiles(DatetimeAddFn, Dialects.SQLite) +def _(elements, compiler, **kwargs): + """SQLite datetime function""" + return sqlite_function(elements, compiler, **kwargs) + + +# ------------------- +# Timestamp Functions +# ------------------- +class TimestampAddFn(FunctionElement): + inherit_cache = CACHE + + +@compiles(TimestampAddFn) +def _(elements, compiler, **kwargs): + """Generic timestamp function""" + return generic_function(elements, compiler, **kwargs) + + +@compiles(TimestampAddFn, Dialects.BigQuery) +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument + """Bigquery timestamp function""" + interval = elements.clauses.clauses[0].value + interval_unit = elements.clauses.clauses[1].text + + # bigquery does not support month or year interval for timestamp. + if interval_unit.lower() in {"year", "month"}: + raise ValueError( + "Bigquery does not support `month` or `year` interval for table partitioned on timestamp", + "field types. You can set the `interval_unit to day or hour directly from OpenMetadata UI`." + # pylint: disable=line-too-long + "Visit https://docs.open-metadata.org/connectors/ingestion/workflows/profiler#4-updating-profiler-setting-at-the-table-level for more details.", + ) + + return ( + f"DATETIME_SUB({func.current_timestamp()}, INTERVAL {interval} {interval_unit})" + ) + + +@compiles(TimestampAddFn, Dialects.MySQL) +def _(elements, compiler, **kwargs): + """MySQL timestamp function""" + return mysql_function(elements, compiler, **kwargs) + + +@compiles(TimestampAddFn, Dialects.Db2) +@compiles(TimestampAddFn, Dialects.IbmDbSa) +def _(elements, compiler, **kwargs): + """DB2 timestamp function""" + return db2_function(elements, compiler, **kwargs) + + +@compiles(TimestampAddFn, Dialects.ClickHouse) +def _(elements, compiler, **kwargs): + """Clickhouse datetime function""" + return clickhouse_function(elements, compiler, **kwargs) + + +@compiles(TimestampAddFn, Dialects.AzureSQL) +@compiles(TimestampAddFn, Dialects.MSSQL) +@compiles(TimestampAddFn, Dialects.Snowflake) +def _(elements, compiler, **kwargs): + """Azure SQL, MSSQL and Snowflake timestamp function""" + return azure_mssql_snflk_function(elements, compiler, **kwargs) + + +@compiles(TimestampAddFn, Dialects.Redshift) +def _(elements, compiler, **kwargs): + """Redshift timestamp function""" + return redshift_function(elements, compiler, **kwargs) + + +@compiles(TimestampAddFn, Dialects.SQLite) +def _(elements, compiler, **kwargs): + """SQLite timestamp function""" + return sqlite_function(elements, compiler, **kwargs) + + +# ----------------------------------- +# Shared timestamp/datetime Functions +# ----------------------------------- +def generic_function(elements, compiler, **kwargs): """generic date and datetime function""" interval = elements.clauses.clauses[0].value interval_unit = compiler.process(elements.clauses.clauses[1], **kwargs) @@ -111,9 +247,8 @@ def _(elements, compiler, **kwargs): ) -@compiles(DatetimeAddFn, Dialects.MySQL) -def _(elements, compiler, **kwargs): - """MySQL date and datetime function""" +def mysql_function(elements, compiler, **kwargs): + """MySQL timestamp and datetime function""" interval = elements.clauses.clauses[0].value interval_unit = compiler.process(elements.clauses.clauses[1], **kwargs) return ( @@ -121,56 +256,15 @@ def _(elements, compiler, **kwargs): ) -@compiles(DatetimeAddFn, Dialects.BigQuery) -def _(elements, compiler, **kwargs): # pylint: disable=unused-argument - """generic date and datetime function""" +def sqlite_function(elements, compiler, **kwargs): # pylint: disable=unused-argument + """SQLite timestamp and datetime function""" interval = elements.clauses.clauses[0].value interval_unit = elements.clauses.clauses[1].text - - # bigquery does not support month or year interval for timestamp - # we'll do an approximation to get the interval in days. - if interval_unit.lower() in {"month", "year"}: - raise ValueError( - "Bigquery does not support `month` or `year` interval for table partitioned on timestamp", - "field types. You can set the `interval_unit to day directly from OpenMetadata UI`." - # pylint: disable=line-too-long - "Visit https://docs.open-metadata.org/connectors/ingestion/workflows/profiler#4-updating-profiler-setting-at-the-table-level for more details.", - ) - - return f"CAST(CURRENT_TIMESTAMP - interval {interval} {interval_unit} AS TIMESTAMP)" + return f"DATE({func.current_timestamp()}, '-{interval} {interval_unit}')" -@compiles(DatetimeAddFn, Dialects.Db2) -@compiles(DatetimeAddFn, Dialects.IbmDbSa) -def _(elements, compiler, **kwargs): - """generic date and datetime function""" - interval, interval_unit = [ - compiler.process(element, **kwargs) for element in elements.clauses - ] - return f"CAST({func.current_timestamp()} - {interval} {interval_unit} AS TIMESTAMP)" - - -@compiles(DatetimeAddFn, Dialects.ClickHouse) -def _(elements, compiler, **kwargs): - """generic date and datetime function""" - interval, interval_unit = [ - compiler.process(element, **kwargs) for element in elements.clauses - ] - return f"(NOW() - interval {interval} {interval_unit})" - - -@compiles(DatetimeAddFn, Dialects.AzureSQL) -@compiles(DatetimeAddFn, Dialects.MSSQL) -@compiles(DatetimeAddFn, Dialects.Snowflake) -def _(elements, compiler, **kwargs): - interval, interval_unit = [ - compiler.process(element, **kwargs) for element in elements.clauses - ] - return f"DATEADD({interval_unit}, -{interval}, {func.current_timestamp()})" - - -@compiles(DatetimeAddFn, Dialects.Redshift) -def _(elements, compiler, **kwargs): +def redshift_function(elements, compiler, **kwargs): + """Redshift timestamp and datetime function""" interval, interval_unit = [ compiler.process(element, **kwargs) for element in elements.clauses ] @@ -179,8 +273,25 @@ def _(elements, compiler, **kwargs): ) -@compiles(DatetimeAddFn, Dialects.SQLite) -def _(elements, compiler, **kwargs): # pylint: disable=unused-argument - interval = elements.clauses.clauses[0].value - interval_unit = elements.clauses.clauses[1].text - return f"DATE({func.current_timestamp()}, '-{interval} {interval_unit}')" +def azure_mssql_snflk_function(elements, compiler, **kwargs): + """Azure, MSSQL and Snowflake timestamp and datetime function""" + interval, interval_unit = [ + compiler.process(element, **kwargs) for element in elements.clauses + ] + return f"DATEADD({interval_unit}, -{interval}, {func.current_timestamp()})" + + +def clickhouse_function(elements, compiler, **kwargs): + """ClickHouse timestamp and datetime function""" + interval, interval_unit = [ + compiler.process(element, **kwargs) for element in elements.clauses + ] + return f"(NOW() - interval {interval} {interval_unit})" + + +def db2_function(elements, compiler, **kwargs): + """DB2 timestamp and datetime function""" + interval, interval_unit = [ + compiler.process(element, **kwargs) for element in elements.clauses + ] + return f"CAST({func.current_timestamp()} - {interval} {interval_unit} AS TIMESTAMP)" diff --git a/ingestion/src/metadata/utils/sqa_utils.py b/ingestion/src/metadata/utils/sqa_utils.py index 7259872869b..2e0fa0c55cf 100644 --- a/ingestion/src/metadata/utils/sqa_utils.py +++ b/ingestion/src/metadata/utils/sqa_utils.py @@ -21,7 +21,11 @@ from sqlalchemy import Column, and_, or_ from sqlalchemy.sql.elements import BinaryExpression from sqlalchemy.sql.expression import TextClause -from metadata.profiler.orm.functions.datetime import DateAddFn, DatetimeAddFn +from metadata.profiler.orm.functions.datetime import ( + DateAddFn, + DatetimeAddFn, + TimestampAddFn, +) from metadata.utils.logger import query_runner_logger logger = query_runner_logger() @@ -135,7 +139,9 @@ def dispatch_to_date_or_datetime( """ if isinstance(type_, (sqlalchemy.DATE)): return DateAddFn(partition_interval, partition_interval_unit) - return DatetimeAddFn(partition_interval, partition_interval_unit) + if isinstance(type_, sqlalchemy.DATETIME): + return DatetimeAddFn(partition_interval, partition_interval_unit) + return TimestampAddFn(partition_interval, partition_interval_unit) def get_partition_col_type(partition_column_name: str, columns: List[Column]):