Pyimpala fix colnames, comments and dialect sql compilation (#10470)

* Fix col names and comments for impala hive

* Fix cols, comments and impala sql compilation

* Handle hive types

* Format
This commit is contained in:
Pere Miquel Brull 2023-03-08 14:13:06 +01:00 committed by GitHub
parent c199f13ed0
commit a05e56feba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 18 deletions

View File

@ -0,0 +1,24 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers functions to handle columns when we extract
their raw information from the source
"""
def remove_table_from_column_name(table_name: str, raw_column_name: str) -> str:
"""
Given a column `table.column`, return only `column`.
Note that we might have columns which have real dots
"." in the name, so we cannot just split.
"""
return raw_column_name.replace(table_name + ".", "")

View File

@ -15,6 +15,7 @@ Hive source methods.
import re import re
from typing import Tuple from typing import Tuple
from impala.sqlalchemy import ImpalaDialect
from pyhive.sqlalchemy_hive import HiveDialect, _type_map from pyhive.sqlalchemy_hive import HiveDialect, _type_map
from sqlalchemy import types, util from sqlalchemy import types, util
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
@ -29,6 +30,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.column_helpers import (
remove_table_from_column_name,
)
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS
@ -55,7 +59,6 @@ def get_columns(
rows = [[col.strip() if col else None for col in row] for row in rows] rows = [[col.strip() if col else None for col in row] for row in rows]
rows = [row for row in rows if row[0] and row[0] != "# col_name"] rows = [row for row in rows if row[0] and row[0] != "# col_name"]
result = [] result = []
args = ()
for (col_name, col_type, comment) in rows: for (col_name, col_type, comment) in rows:
if col_name == "# Partition Information": if col_name == "# Partition Information":
break break
@ -182,9 +185,42 @@ def get_table_comment( # pylint: disable=unused-argument
return {"text": None} return {"text": None}
def get_impala_columns(self, connection, table_name, schema=None, **kwargs):
"""
Extracted from the Impala Dialect. We'll tune the implementation.
By default, this gives us the column name as `table.column`. We just
want to get `column`.
"""
# pylint: disable=unused-argument
full_table_name = f"{schema}.{table_name}" if schema is not None else table_name
query = f"SELECT * FROM {full_table_name} LIMIT 0"
cursor = connection.execute(query)
schema = cursor.cursor.description
# We need to fetch the empty results otherwise these queries remain in
# flight
cursor.fetchall()
column_info = []
for col in schema:
column_info.append(
{
"name": remove_table_from_column_name(table_name, col[0]),
# Using Hive's map instead of Impala's, as we are pointing to a Hive Server
# Passing the lower as Hive's map is based on lower strings.
"type": _type_map[col[1].lower()],
"nullable": True,
"autoincrement": False,
}
)
return column_info
HiveDialect.get_columns = get_columns HiveDialect.get_columns = get_columns
HiveDialect.get_table_comment = get_table_comment HiveDialect.get_table_comment = get_table_comment
ImpalaDialect.get_columns = get_impala_columns
ImpalaDialect.get_table_comment = get_table_comment
HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0" HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0"

View File

@ -36,6 +36,7 @@ def _(element, compiler, **kw):
@compiles(LenFn, Dialects.SQLite) @compiles(LenFn, Dialects.SQLite)
@compiles(LenFn, Dialects.Vertica) @compiles(LenFn, Dialects.Vertica)
@compiles(LenFn, Dialects.Hive) @compiles(LenFn, Dialects.Hive)
@compiles(LenFn, Dialects.Impala)
@compiles(LenFn, Dialects.Databricks) @compiles(LenFn, Dialects.Databricks)
@compiles(LenFn, Dialects.MySQL) @compiles(LenFn, Dialects.MySQL)
@compiles(LenFn, Dialects.MariaDB) @compiles(LenFn, Dialects.MariaDB)

View File

@ -73,6 +73,7 @@ def _(elements, compiler, **kwargs):
@compiles(MedianFn, Dialects.Hive) @compiles(MedianFn, Dialects.Hive)
@compiles(MedianFn, Dialects.Impala)
def _(elements, compiler, **kwargs): def _(elements, compiler, **kwargs):
"""Median computation for Hive""" """Median computation for Hive"""
col, _, percentile = [ col, _, percentile = [

View File

@ -42,6 +42,7 @@ def _(*_, **__):
@compiles(RandomNumFn, Dialects.Hive) @compiles(RandomNumFn, Dialects.Hive)
@compiles(RandomNumFn, Dialects.Impala)
@compiles(RandomNumFn, Dialects.MySQL) @compiles(RandomNumFn, Dialects.MySQL)
@compiles(RandomNumFn, Dialects.IbmDbSa) @compiles(RandomNumFn, Dialects.IbmDbSa)
@compiles(RandomNumFn, Dialects.Db2) @compiles(RandomNumFn, Dialects.Db2)

View File

@ -37,31 +37,34 @@ class Dialects(Enum):
Map the service types from DatabaseServiceType Map the service types from DatabaseServiceType
to the dialect scheme name used for ingesting to the dialect scheme name used for ingesting
and profiling data. and profiling data.
Keep this alphabetically ordered
""" """
Hive = b"hive" # Hive requires bytes
Postgres = "postgresql"
BigQuery = "bigquery"
MySQL = "mysql"
Redshift = "redshift"
Snowflake = "snowflake"
MSSQL = "mssql"
Oracle = "oracle"
Athena = "awsathena" Athena = "awsathena"
Presto = "presto" AzureSQL = "azuresql"
Trino = "trino" BigQuery = "bigquery"
Vertica = "vertica"
Glue = "glue"
MariaDB = "mariadb"
Druid = "druid"
Db2 = "db2"
ClickHouse = "clickhouse" ClickHouse = "clickhouse"
Databricks = "databricks" Databricks = "databricks"
Db2 = "db2"
Druid = "druid"
DynamoDB = "dynamoDB" DynamoDB = "dynamoDB"
AzureSQL = "azuresql" Glue = "glue"
Hive = b"hive" # Hive requires bytes
IbmDbSa = "ibm_db_sa"
Impala = "impala"
MariaDB = "mariadb"
MSSQL = "mssql"
MySQL = "mysql"
Oracle = "oracle"
Postgres = "postgresql"
Presto = "presto"
Redshift = "redshift"
SingleStore = "singlestore" SingleStore = "singlestore"
SQLite = "sqlite" SQLite = "sqlite"
IbmDbSa = "ibm_db_sa" Snowflake = "snowflake"
Trino = "trino"
Vertica = "vertica"
# Sometimes we want to skip certain types for computing metrics. # Sometimes we want to skip certain types for computing metrics.

View File

@ -0,0 +1,44 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test col helpers functions
"""
from unittest import TestCase
from metadata.ingestion.source.database.column_helpers import (
remove_table_from_column_name,
)
class TestColumnHelpers(TestCase):
"""
Validate column helpers
"""
def test_remove_table_from_column_name(self):
"""
From table.column -> column
"""
self.assertEqual(
remove_table_from_column_name(
table_name="table",
raw_column_name="table.column",
),
"column",
)
self.assertEqual(
remove_table_from_column_name(
table_name="table",
raw_column_name="table.column.with.dots",
),
"column.with.dots",
)