diff --git a/ingestion/src/metadata/ingestion/source/database/column_helpers.py b/ingestion/src/metadata/ingestion/source/database/column_helpers.py new file mode 100644 index 00000000000..62255bf83aa --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/column_helpers.py @@ -0,0 +1,24 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Helpers functions to handle columns when we extract +their raw information from the source +""" + + +def remove_table_from_column_name(table_name: str, raw_column_name: str) -> str: + """ + Given a column `table.column`, return only `column`. + + Note that we might have columns which have real dots + "." in the name, so we cannot just split. + """ + return raw_column_name.replace(table_name + ".", "") diff --git a/ingestion/src/metadata/ingestion/source/database/hive/metadata.py b/ingestion/src/metadata/ingestion/source/database/hive/metadata.py index ef45e74c02f..3c112b97674 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/metadata.py @@ -15,6 +15,7 @@ Hive source methods. import re from typing import Tuple +from impala.sqlalchemy import ImpalaDialect from pyhive.sqlalchemy_hive import HiveDialect, _type_map from sqlalchemy import types, util from sqlalchemy.engine import reflection @@ -29,6 +30,9 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.column_helpers import ( + remove_table_from_column_name, +) from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS @@ -55,7 +59,6 @@ def get_columns( rows = [[col.strip() if col else None for col in row] for row in rows] rows = [row for row in rows if row[0] and row[0] != "# col_name"] result = [] - args = () for (col_name, col_type, comment) in rows: if col_name == "# Partition Information": break @@ -182,9 +185,42 @@ def get_table_comment( # pylint: disable=unused-argument return {"text": None} +def get_impala_columns(self, connection, table_name, schema=None, **kwargs): + """ + Extracted from the Impala Dialect. We'll tune the implementation. + + By default, this gives us the column name as `table.column`. We just + want to get `column`. + """ + # pylint: disable=unused-argument + full_table_name = f"{schema}.{table_name}" if schema is not None else table_name + query = f"SELECT * FROM {full_table_name} LIMIT 0" + cursor = connection.execute(query) + schema = cursor.cursor.description + # We need to fetch the empty results otherwise these queries remain in + # flight + cursor.fetchall() + column_info = [] + for col in schema: + column_info.append( + { + "name": remove_table_from_column_name(table_name, col[0]), + # Using Hive's map instead of Impala's, as we are pointing to a Hive Server + # Passing the lower as Hive's map is based on lower strings. + "type": _type_map[col[1].lower()], + "nullable": True, + "autoincrement": False, + } + ) + return column_info + + HiveDialect.get_columns = get_columns HiveDialect.get_table_comment = get_table_comment +ImpalaDialect.get_columns = get_impala_columns +ImpalaDialect.get_table_comment = get_table_comment + HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0" diff --git a/ingestion/src/metadata/profiler/orm/functions/length.py b/ingestion/src/metadata/profiler/orm/functions/length.py index b15f262088a..fec0da3922d 100644 --- a/ingestion/src/metadata/profiler/orm/functions/length.py +++ b/ingestion/src/metadata/profiler/orm/functions/length.py @@ -36,6 +36,7 @@ def _(element, compiler, **kw): @compiles(LenFn, Dialects.SQLite) @compiles(LenFn, Dialects.Vertica) @compiles(LenFn, Dialects.Hive) +@compiles(LenFn, Dialects.Impala) @compiles(LenFn, Dialects.Databricks) @compiles(LenFn, Dialects.MySQL) @compiles(LenFn, Dialects.MariaDB) diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index ecf02fee098..d46f8d1ca9b 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -73,6 +73,7 @@ def _(elements, compiler, **kwargs): @compiles(MedianFn, Dialects.Hive) +@compiles(MedianFn, Dialects.Impala) def _(elements, compiler, **kwargs): """Median computation for Hive""" col, _, percentile = [ diff --git a/ingestion/src/metadata/profiler/orm/functions/random_num.py b/ingestion/src/metadata/profiler/orm/functions/random_num.py index 6179a9ca6c7..084fde64f40 100644 --- a/ingestion/src/metadata/profiler/orm/functions/random_num.py +++ b/ingestion/src/metadata/profiler/orm/functions/random_num.py @@ -42,6 +42,7 @@ def _(*_, **__): @compiles(RandomNumFn, Dialects.Hive) +@compiles(RandomNumFn, Dialects.Impala) @compiles(RandomNumFn, Dialects.MySQL) @compiles(RandomNumFn, Dialects.IbmDbSa) @compiles(RandomNumFn, Dialects.Db2) diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index 66019850677..0218be32c17 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -37,31 +37,34 @@ class Dialects(Enum): Map the service types from DatabaseServiceType to the dialect scheme name used for ingesting and profiling data. + + Keep this alphabetically ordered """ - Hive = b"hive" # Hive requires bytes - Postgres = "postgresql" - BigQuery = "bigquery" - MySQL = "mysql" - Redshift = "redshift" - Snowflake = "snowflake" - MSSQL = "mssql" - Oracle = "oracle" Athena = "awsathena" - Presto = "presto" - Trino = "trino" - Vertica = "vertica" - Glue = "glue" - MariaDB = "mariadb" - Druid = "druid" - Db2 = "db2" + AzureSQL = "azuresql" + BigQuery = "bigquery" ClickHouse = "clickhouse" Databricks = "databricks" + Db2 = "db2" + Druid = "druid" DynamoDB = "dynamoDB" - AzureSQL = "azuresql" + Glue = "glue" + Hive = b"hive" # Hive requires bytes + IbmDbSa = "ibm_db_sa" + Impala = "impala" + MariaDB = "mariadb" + MSSQL = "mssql" + MySQL = "mysql" + Oracle = "oracle" + Postgres = "postgresql" + Presto = "presto" + Redshift = "redshift" SingleStore = "singlestore" SQLite = "sqlite" - IbmDbSa = "ibm_db_sa" + Snowflake = "snowflake" + Trino = "trino" + Vertica = "vertica" # Sometimes we want to skip certain types for computing metrics. diff --git a/ingestion/tests/unit/source/test_column_helpers.py b/ingestion/tests/unit/source/test_column_helpers.py new file mode 100644 index 00000000000..ec6385838ae --- /dev/null +++ b/ingestion/tests/unit/source/test_column_helpers.py @@ -0,0 +1,44 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test col helpers functions +""" +from unittest import TestCase + +from metadata.ingestion.source.database.column_helpers import ( + remove_table_from_column_name, +) + + +class TestColumnHelpers(TestCase): + """ + Validate column helpers + """ + + def test_remove_table_from_column_name(self): + """ + From table.column -> column + """ + self.assertEqual( + remove_table_from_column_name( + table_name="table", + raw_column_name="table.column", + ), + "column", + ) + + self.assertEqual( + remove_table_from_column_name( + table_name="table", + raw_column_name="table.column.with.dots", + ), + "column.with.dots", + )