Fix #12321 - Deprecate impala scheme from Hive (#12327)

* Deprecate impala scheme from Hive

* Add migrations
This commit is contained in:
Pere Miquel Brull 2023-07-11 17:38:32 +02:00 committed by GitHub
parent c930427390
commit 1b7c5e3233
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 155 deletions

View File

@ -21,3 +21,7 @@ CREATE TABLE IF NOT EXISTS data_product_entity (
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE (fqnHash) UNIQUE (fqnHash)
); );
UPDATE dbservice_entity
SET json = JSON_REPLACE(json, '$.connection.config.scheme', 'hive')
WHERE JSON_EXTRACT(json, '$.connection.config.scheme') IN ('impala', 'impala4');

View File

@ -21,3 +21,7 @@ CREATE TABLE IF NOT EXISTS data_product_entity (
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE (fqnHash) UNIQUE (fqnHash)
); );
update dbservice_entity
set json = jsonb_set(json::jsonb, '{connection,config,scheme}', '"hive"')
where json#>>'{connection,config,scheme}' in ('impala', 'impala4');

View File

@ -15,7 +15,6 @@ Hive source methods.
import re import re
from typing import Tuple from typing import Tuple
from impala.sqlalchemy import ImpalaDialect
from pyhive.sqlalchemy_hive import HiveDialect, _type_map from pyhive.sqlalchemy_hive import HiveDialect, _type_map
from sqlalchemy import types, util from sqlalchemy import types, util
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
@ -32,7 +31,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS
from metadata.profiler.orm.registry import Dialects
complex_data_types = ["struct", "map", "array", "union"] complex_data_types = ["struct", "map", "array", "union"]
@ -182,124 +180,6 @@ def get_table_comment( # pylint: disable=unused-argument
return {"text": None} return {"text": None}
def get_impala_table_or_view_names(connection, schema=None, target_type="table"):
"""
Depending on the targetType returns either the Views or Tables
since they share the same method for getting their names.
"""
query = "show tables"
if schema:
query += " IN " + schema
cursor = connection.execute(query)
results = cursor.fetchall()
tables_and_views = [result[0] for result in results]
retvalue = []
for table_view in tables_and_views:
query = f"describe formatted `{schema}`.`{table_view}`"
cursor = connection.execute(query)
results = cursor.fetchall()
for result in list(results):
data = result
if data[0].strip() == "Table Type:":
if target_type.lower() in data[1].lower():
retvalue.append(table_view)
return retvalue
def get_impala_view_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
results = get_impala_table_or_view_names(connection, schema, "view")
return results
def get_impala_table_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
results = get_impala_table_or_view_names(connection, schema, "table")
return results
def get_impala_table_comment(
self, connection, table_name, schema_name, **kw
): # pylint: disable=unused-argument
"""
Gets the table comment from the describe formatted query result under the Table Parameters section.
"""
full_table_name = (
f"{schema_name}.{table_name}" if schema_name is not None else table_name
)
split_name = full_table_name.split(".")
query = f"describe formatted `{split_name[0]}`.`{split_name[1]}`"
cursor = connection.execute(query)
results = cursor.fetchall()
found_table_parameters = False
try:
for result in list(results):
data = result
if not found_table_parameters and data[0].strip() == "Table Parameters:":
found_table_parameters = True
if found_table_parameters:
coltext = data[1].strip() if data[1] is not None else ""
if coltext == "comment":
return {"text": data[2]}
except Exception:
return {"text": None}
return {"text": None}
def get_impala_columns(
self, connection, table_name, schema=None, **kwargs
): # pylint: disable=unused-argument
# pylint: disable=too-many-locals
"""
Extracted from the Impala Dialect. We'll tune the implementation.
By default, this gives us the column name as `table.column`. We just
want to get `column`.
"""
full_table_name = f"{schema}.{table_name}" if schema is not None else table_name
split_name = full_table_name.split(".")
query = f"DESCRIBE `{split_name[0]}`.`{split_name[1]}`"
describe_table_rows = connection.execute(query)
column_info = []
ordinal_pos = 0
for col in describe_table_rows:
ordinal_pos = ordinal_pos + 1
col_raw = col[1]
attype = re.sub(r"\(.*\)", "", col[1])
col_type = re.search(r"^\w+", col[1]).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_raw}' of column '{col[0]}'")
coltype = types.NullType
charlen = re.search(r"\(([\d,]+)\)", col_raw.lower())
if charlen:
charlen = charlen.group(1)
if attype == "decimal":
prec, scale = charlen.split(",")
args = (int(prec), int(scale))
else:
args = (int(charlen),)
coltype = coltype(*args)
add_column = {
"name": col[0],
"type": coltype,
"comment": col[2],
"nullable": True,
"autoincrement": False,
"ordinalPosition": ordinal_pos,
}
column_info.append(add_column)
return column_info
# pylint: disable=unused-argument # pylint: disable=unused-argument
@reflection.cache @reflection.cache
def get_view_definition(self, connection, view_name, schema=None, **kw): def get_view_definition(self, connection, view_name, schema=None, **kw):
@ -313,25 +193,9 @@ def get_view_definition(self, connection, view_name, schema=None, **kw):
return None return None
# pylint: disable=unused-argument
@reflection.cache
def get_impala_view_definition(self, connection, view_name, schema=None, **kw):
"""
Gets the view definition
"""
full_view_name = f"`{view_name}`" if not schema else f"`{schema}`.`{view_name}`"
res = connection.execute(f"SHOW CREATE VIEW {full_view_name}").fetchall()
if res:
return "\n".join(i[0] for i in res)
return None
HiveDialect.get_columns = get_columns HiveDialect.get_columns = get_columns
HiveDialect.get_table_comment = get_table_comment HiveDialect.get_table_comment = get_table_comment
ImpalaDialect.get_columns = get_impala_columns
ImpalaDialect.get_table_comment = get_impala_table_comment
ImpalaDialect.get_view_definition = get_impala_view_definition
HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0" HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0"
@ -363,13 +227,6 @@ class HiveSource(CommonDbSourceService):
Fetching views in hive server with query "SHOW VIEWS" was possible Fetching views in hive server with query "SHOW VIEWS" was possible
only after hive 2.2.0 version only after hive 2.2.0 version
""" """
if self.engine.driver == Dialects.Impala:
ImpalaDialect.get_table_names = get_impala_table_names
ImpalaDialect.get_view_names = get_impala_view_names
ImpalaDialect.get_table_comment = get_impala_table_comment
ImpalaDialect.get_columns = get_impala_columns
ImpalaDialect.get_view_definition = get_impala_view_definition
else:
result = dict(self.engine.execute("SELECT VERSION()").fetchone()) result = dict(self.engine.execute("SELECT VERSION()").fetchone())
version = result.get("_c0", "").split() version = result.get("_c0", "").split()

View File

@ -15,7 +15,7 @@
"hiveScheme": { "hiveScheme": {
"description": "SQLAlchemy driver scheme options.", "description": "SQLAlchemy driver scheme options.",
"type": "string", "type": "string",
"enum": ["hive", "hive+http", "hive+https", "impala", "impala4"], "enum": ["hive", "hive+http", "hive+https"],
"default": "hive" "default": "hive"
} }
}, },