Fixes #10775 -- BQ partitionning for datetime dtype (#10780)

* fix: BQ partitionning for datetime dtype

* Update ingestion/src/metadata/profiler/orm/functions/datetime.py

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>

* Update ingestion/src/metadata/profiler/orm/functions/datetime.py

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>

* Update ingestion/src/metadata/profiler/orm/functions/datetime.py

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Teddy 2023-03-27 15:28:34 +02:00 committed by GitHub
parent 4db2a74c26
commit 31d4662abe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 173 additions and 56 deletions

View File

@ -25,6 +25,9 @@ from metadata.utils.logger import profiler_logger
logger = profiler_logger()
# --------------
# Date Functions
# --------------
class DateAddFn(FunctionElement):
inherit_cache = CACHE
@ -97,12 +100,145 @@ def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
return f"DATE({func.current_date()}, '-{interval} {interval_unit}')"
# ------------------
# Datetime Functions
# ------------------
class DatetimeAddFn(FunctionElement):
inherit_cache = CACHE
@compiles(DatetimeAddFn)
def _(elements, compiler, **kwargs):
"""generic date and datetime function"""
return generic_function(elements, compiler, **kwargs)
@compiles(DatetimeAddFn, Dialects.MySQL)
def _(elements, compiler, **kwargs):
"""MySQL date and datetime function"""
return mysql_function(elements, compiler, **kwargs)
@compiles(DatetimeAddFn, Dialects.BigQuery)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
"""BigQuery date and datetime function"""
interval = elements.clauses.clauses[0].value
interval_unit = elements.clauses.clauses[1].text
return (
f"DATETIME_SUB({func.current_datetime()}, INTERVAL {interval} {interval_unit})"
)
@compiles(DatetimeAddFn, Dialects.Db2)
@compiles(DatetimeAddFn, Dialects.IbmDbSa)
def _(elements, compiler, **kwargs):
"""DB2 datetime function"""
return db2_function(elements, compiler, **kwargs)
@compiles(DatetimeAddFn, Dialects.ClickHouse)
def _(elements, compiler, **kwargs):
"""Clickhouse datetime function"""
return clickhouse_function(elements, compiler, **kwargs)
@compiles(DatetimeAddFn, Dialects.AzureSQL)
@compiles(DatetimeAddFn, Dialects.MSSQL)
@compiles(DatetimeAddFn, Dialects.Snowflake)
def _(elements, compiler, **kwargs):
"""AzreSQL, MSSQL, Snowflake datetime function"""
return azure_mssql_snflk_function(elements, compiler, **kwargs)
@compiles(DatetimeAddFn, Dialects.Redshift)
def _(elements, compiler, **kwargs):
"""Redshift datetime function"""
return redshift_function(elements, compiler, **kwargs)
@compiles(DatetimeAddFn, Dialects.SQLite)
def _(elements, compiler, **kwargs):
"""SQLite datetime function"""
return sqlite_function(elements, compiler, **kwargs)
# -------------------
# Timestamp Functions
# -------------------
class TimestampAddFn(FunctionElement):
inherit_cache = CACHE
@compiles(TimestampAddFn)
def _(elements, compiler, **kwargs):
"""Generic timestamp function"""
return generic_function(elements, compiler, **kwargs)
@compiles(TimestampAddFn, Dialects.BigQuery)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
"""Bigquery timestamp function"""
interval = elements.clauses.clauses[0].value
interval_unit = elements.clauses.clauses[1].text
# bigquery does not support month or year interval for timestamp.
if interval_unit.lower() in {"year", "month"}:
raise ValueError(
"Bigquery does not support `month` or `year` interval for table partitioned on timestamp",
"field types. You can set the `interval_unit to day or hour directly from OpenMetadata UI`."
# pylint: disable=line-too-long
"Visit https://docs.open-metadata.org/connectors/ingestion/workflows/profiler#4-updating-profiler-setting-at-the-table-level for more details.",
)
return (
f"DATETIME_SUB({func.current_timestamp()}, INTERVAL {interval} {interval_unit})"
)
@compiles(TimestampAddFn, Dialects.MySQL)
def _(elements, compiler, **kwargs):
"""MySQL timestamp function"""
return mysql_function(elements, compiler, **kwargs)
@compiles(TimestampAddFn, Dialects.Db2)
@compiles(TimestampAddFn, Dialects.IbmDbSa)
def _(elements, compiler, **kwargs):
"""DB2 timestamp function"""
return db2_function(elements, compiler, **kwargs)
@compiles(TimestampAddFn, Dialects.ClickHouse)
def _(elements, compiler, **kwargs):
"""Clickhouse datetime function"""
return clickhouse_function(elements, compiler, **kwargs)
@compiles(TimestampAddFn, Dialects.AzureSQL)
@compiles(TimestampAddFn, Dialects.MSSQL)
@compiles(TimestampAddFn, Dialects.Snowflake)
def _(elements, compiler, **kwargs):
"""Azure SQL, MSSQL and Snowflake timestamp function"""
return azure_mssql_snflk_function(elements, compiler, **kwargs)
@compiles(TimestampAddFn, Dialects.Redshift)
def _(elements, compiler, **kwargs):
"""Redshift timestamp function"""
return redshift_function(elements, compiler, **kwargs)
@compiles(TimestampAddFn, Dialects.SQLite)
def _(elements, compiler, **kwargs):
"""SQLite timestamp function"""
return sqlite_function(elements, compiler, **kwargs)
# -----------------------------------
# Shared timestamp/datetime Functions
# -----------------------------------
def generic_function(elements, compiler, **kwargs):
"""generic date and datetime function"""
interval = elements.clauses.clauses[0].value
interval_unit = compiler.process(elements.clauses.clauses[1], **kwargs)
@ -111,9 +247,8 @@ def _(elements, compiler, **kwargs):
)
@compiles(DatetimeAddFn, Dialects.MySQL)
def _(elements, compiler, **kwargs):
"""MySQL date and datetime function"""
def mysql_function(elements, compiler, **kwargs):
"""MySQL timestamp and datetime function"""
interval = elements.clauses.clauses[0].value
interval_unit = compiler.process(elements.clauses.clauses[1], **kwargs)
return (
@ -121,56 +256,15 @@ def _(elements, compiler, **kwargs):
)
@compiles(DatetimeAddFn, Dialects.BigQuery)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
"""generic date and datetime function"""
def sqlite_function(elements, compiler, **kwargs): # pylint: disable=unused-argument
"""SQLite timestamp and datetime function"""
interval = elements.clauses.clauses[0].value
interval_unit = elements.clauses.clauses[1].text
# bigquery does not support month or year interval for timestamp
# we'll do an approximation to get the interval in days.
if interval_unit.lower() in {"month", "year"}:
raise ValueError(
"Bigquery does not support `month` or `year` interval for table partitioned on timestamp",
"field types. You can set the `interval_unit to day directly from OpenMetadata UI`."
# pylint: disable=line-too-long
"Visit https://docs.open-metadata.org/connectors/ingestion/workflows/profiler#4-updating-profiler-setting-at-the-table-level for more details.",
)
return f"CAST(CURRENT_TIMESTAMP - interval {interval} {interval_unit} AS TIMESTAMP)"
return f"DATE({func.current_timestamp()}, '-{interval} {interval_unit}')"
@compiles(DatetimeAddFn, Dialects.Db2)
@compiles(DatetimeAddFn, Dialects.IbmDbSa)
def _(elements, compiler, **kwargs):
"""generic date and datetime function"""
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"CAST({func.current_timestamp()} - {interval} {interval_unit} AS TIMESTAMP)"
@compiles(DatetimeAddFn, Dialects.ClickHouse)
def _(elements, compiler, **kwargs):
"""generic date and datetime function"""
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"(NOW() - interval {interval} {interval_unit})"
@compiles(DatetimeAddFn, Dialects.AzureSQL)
@compiles(DatetimeAddFn, Dialects.MSSQL)
@compiles(DatetimeAddFn, Dialects.Snowflake)
def _(elements, compiler, **kwargs):
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"DATEADD({interval_unit}, -{interval}, {func.current_timestamp()})"
@compiles(DatetimeAddFn, Dialects.Redshift)
def _(elements, compiler, **kwargs):
def redshift_function(elements, compiler, **kwargs):
"""Redshift timestamp and datetime function"""
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
@ -179,8 +273,25 @@ def _(elements, compiler, **kwargs):
)
@compiles(DatetimeAddFn, Dialects.SQLite)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
interval = elements.clauses.clauses[0].value
interval_unit = elements.clauses.clauses[1].text
return f"DATE({func.current_timestamp()}, '-{interval} {interval_unit}')"
def azure_mssql_snflk_function(elements, compiler, **kwargs):
"""Azure, MSSQL and Snowflake timestamp and datetime function"""
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"DATEADD({interval_unit}, -{interval}, {func.current_timestamp()})"
def clickhouse_function(elements, compiler, **kwargs):
"""ClickHouse timestamp and datetime function"""
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"(NOW() - interval {interval} {interval_unit})"
def db2_function(elements, compiler, **kwargs):
"""DB2 timestamp and datetime function"""
interval, interval_unit = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"CAST({func.current_timestamp()} - {interval} {interval_unit} AS TIMESTAMP)"

View File

@ -21,7 +21,11 @@ from sqlalchemy import Column, and_, or_
from sqlalchemy.sql.elements import BinaryExpression
from sqlalchemy.sql.expression import TextClause
from metadata.profiler.orm.functions.datetime import DateAddFn, DatetimeAddFn
from metadata.profiler.orm.functions.datetime import (
DateAddFn,
DatetimeAddFn,
TimestampAddFn,
)
from metadata.utils.logger import query_runner_logger
logger = query_runner_logger()
@ -135,7 +139,9 @@ def dispatch_to_date_or_datetime(
"""
if isinstance(type_, (sqlalchemy.DATE)):
return DateAddFn(partition_interval, partition_interval_unit)
return DatetimeAddFn(partition_interval, partition_interval_unit)
if isinstance(type_, sqlalchemy.DATETIME):
return DatetimeAddFn(partition_interval, partition_interval_unit)
return TimestampAddFn(partition_interval, partition_interval_unit)
def get_partition_col_type(partition_column_name: str, columns: List[Column]):