From ef438eb870716fed8c194905ed43b6a4cf7c3f4c Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 28 Oct 2021 20:46:12 -0700 Subject: [PATCH] Add bigquery, athena support for profiler (#974) --- .../openmetadata/common/database_common.py | 4 + profiler/src/openmetadata/databases/athena.py | 78 +++++++++++++++++++ .../src/openmetadata/databases/bigquery.py | 57 ++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 profiler/src/openmetadata/databases/athena.py create mode 100644 profiler/src/openmetadata/databases/bigquery.py diff --git a/profiler/src/openmetadata/common/database_common.py b/profiler/src/openmetadata/common/database_common.py index 65f7c89ae6d..a65d90647a7 100644 --- a/profiler/src/openmetadata/common/database_common.py +++ b/profiler/src/openmetadata/common/database_common.py @@ -122,6 +122,10 @@ class SQLExpressions(BaseModel): def escape_metacharacters(value: str): return re.sub(r"(\\.)", r"\\\1", value) + def literal_date(self, tdate: date): + date_string = tdate.strftime("%Y-%m-%d") + return f"DATE '{date_string}'" + def literal_number(self, value: Number): if value is None: return None diff --git a/profiler/src/openmetadata/databases/athena.py b/profiler/src/openmetadata/databases/athena.py new file mode 100644 index 00000000000..c66e87fc47a --- /dev/null +++ b/profiler/src/openmetadata/databases/athena.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from datetime import date +from typing import Optional, Tuple +from urllib.parse import quote_plus + +from openmetadata.common.database_common import ( + DatabaseCommon, + SQLConnectionConfig, + SQLExpressions, +) + + +class AthenaConfig(SQLConnectionConfig): + scheme: str = "awsathena+rest" + username: Optional[str] = None + password: Optional[str] = None + database: Optional[str] = None + aws_region: str + s3_staging_dir: str + work_group: str + service_type = "BigQuery" + + def get_connection_url(self): + url = f"{self.scheme}://" + if self.username: + url += f"{quote_plus(self.username)}" + if self.password: + url += f":{quote_plus(self.password)}" + else: + url += ":" + url += f"@athena.{self.aws_region}.amazonaws.com:443/" + if self.database: + url += f"{self.database}" + url += f"?s3_staging_dir={quote_plus(self.s3_staging_dir)}" + url += f"&work_group={self.work_group}" + + return url + + +class AthenaSQLExpressions(SQLExpressions): + avg_expr = "AVG(CAST({} as DECIMAL(38, 0)))" + sum_expr = "SUM(CAST ({} as DECIMAL(38, 0)))" + + def literal_date(self, tdate: date): + date_string = tdate.strftime("%Y-%m-%d") + return f"DATE('{date_string}')" + + +class Athena(DatabaseCommon): + config: AthenaConfig = None + sql_exprs: AthenaSQLExpressions = AthenaSQLExpressions() + + def __init__(self, config): + super().__init__(config) + self.config = config + + @classmethod + def create(cls, config_dict): + config = AthenaConfig.parse_obj(config_dict) + return cls(config) + + def qualify_table_name(self, table_name: str) -> str: + return f"`{self.config.database}.{table_name}`" diff --git a/profiler/src/openmetadata/databases/bigquery.py b/profiler/src/openmetadata/databases/bigquery.py new file mode 100644 index 00000000000..d3e6e895bef --- /dev/null +++ b/profiler/src/openmetadata/databases/bigquery.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Optional, Tuple + +from openmetadata.common.database_common import ( + DatabaseCommon, + SQLConnectionConfig, + SQLExpressions, +) + + +class BigqueryConfig(SQLConnectionConfig): + scheme = "bigquery" + project_id: Optional[str] = None + duration: int = 1 + service_type = "BigQuery" + + def get_connection_url(self): + if self.project_id: + return f"{self.scheme}://{self.project_id}" + return f"{self.scheme}://" + + +class BigquerySQLExpressions(SQLExpressions): + stddev_expr = "STDDEV_POP({})" + regex_like_pattern_expr = "REGEXP_CONTAINS({expr}, r'{}')" + + +class Bigquery(DatabaseCommon): + config: BigqueryConfig = None + sql_exprs: BigquerySQLExpressions = BigquerySQLExpressions() + + def __init__(self, config): + super().__init__(config) + self.config = config + + @classmethod + def create(cls, config_dict): + config = BigqueryConfig.parse_obj(config_dict) + return cls(config) + + def qualify_table_name(self, table_name: str) -> str: + return f"`{self.config.database}.{table_name}`"