Fixed Issue #10943: Impala query engine metadata ingestion and median function profiler (#10944)

* updated metadata to work with the impala query engine.
Uses the describe function to grab column names, data types, and comments.

* added the ordinalPosition data point into the Column constructor.

* renamed variable to better describe its usage.

* updated profile errors.
Hive connections now comment columns by default.

* removed print statements

* Cleaned up code by pulling check into its own function

* Updated median function to return null when it is being used for first and third quartiles.

* removed print statements and ran make py_format

* updated to fix some pylint errors.
imported Dialects to remove string compare to "impala" engine

* moved huge comment into function docstring.
This comment shows us the sql to get quartiles in Impala
This commit is contained in:
Keith Sirmons 2023-04-06 11:07:42 -05:00 committed by GitHub
parent 6f055fdb34
commit 42000053aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 176 additions and 35 deletions

View File

@ -30,11 +30,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.column_helpers import (
remove_table_from_column_name,
)
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS
from metadata.profiler.orm.registry import Dialects
complex_data_types = ["struct", "map", "array", "union"]
@ -184,33 +182,121 @@ def get_table_comment( # pylint: disable=unused-argument
return {"text": None}
def get_impala_columns(self, connection, table_name, schema=None, **kwargs):
def get_impala_table_or_view_names(connection, schema=None, target_type="table"):
"""
Depending on the targetType returns either the Views or Tables
since they share the same method for getting their names.
"""
query = "show tables"
if schema:
query += " IN " + schema
cursor = connection.execute(query)
results = cursor.fetchall()
tables_and_views = [result[0] for result in results]
retvalue = []
for table_view in tables_and_views:
query = f"describe formatted `{schema}`.`{table_view}`"
cursor = connection.execute(query)
results = cursor.fetchall()
for result in list(results):
data = result
if data[0].strip() == "Table Type:":
if target_type.lower() in data[1].lower():
retvalue.append(table_view)
return retvalue
def get_impala_view_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
results = get_impala_table_or_view_names(connection, schema, "view")
return results
def get_impala_table_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
results = get_impala_table_or_view_names(connection, schema, "table")
return results
def get_impala_table_comment(
self, connection, table_name, schema_name, **kw
): # pylint: disable=unused-argument
"""
Gets the table comment from the describe formatted query result under the Table Parameters section.
"""
full_table_name = (
f"{schema_name}.{table_name}" if schema_name is not None else table_name
)
split_name = full_table_name.split(".")
query = f"describe formatted `{split_name[0]}`.`{split_name[1]}`"
cursor = connection.execute(query)
results = cursor.fetchall()
found_table_parameters = False
try:
for result in list(results):
data = result
if not found_table_parameters and data[0].strip() == "Table Parameters:":
found_table_parameters = True
if found_table_parameters:
coltext = data[1].strip() if data[1] is not None else ""
if coltext == "comment":
return {"text": data[2]}
except Exception:
return {"text": None}
return {"text": None}
def get_impala_columns(
self, connection, table_name, schema=None, **kwargs
): # pylint: disable=unused-argument
# pylint: disable=too-many-locals
"""
Extracted from the Impala Dialect. We'll tune the implementation.
By default, this gives us the column name as `table.column`. We just
want to get `column`.
"""
# pylint: disable=unused-argument
full_table_name = f"{schema}.{table_name}" if schema is not None else table_name
query = f"SELECT * FROM {full_table_name} LIMIT 0"
cursor = connection.execute(query)
schema = cursor.cursor.description
# We need to fetch the empty results otherwise these queries remain in
# flight
cursor.fetchall()
split_name = full_table_name.split(".")
query = f"DESCRIBE `{split_name[0]}`.`{split_name[1]}`"
describe_table_rows = connection.execute(query)
column_info = []
for col in schema:
column_info.append(
{
"name": remove_table_from_column_name(table_name, col[0]),
# Using Hive's map instead of Impala's, as we are pointing to a Hive Server
# Passing the lower as Hive's map is based on lower strings.
"type": _type_map[col[1].lower()],
"nullable": True,
"autoincrement": False,
}
)
ordinal_pos = 0
for col in describe_table_rows:
ordinal_pos = ordinal_pos + 1
col_raw = col[1]
attype = re.sub(r"\(.*\)", "", col[1])
col_type = re.search(r"^\w+", col[1]).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_raw}' of column '{col[0]}'")
coltype = types.NullType
charlen = re.search(r"\(([\d,]+)\)", col_raw.lower())
if charlen:
charlen = charlen.group(1)
if attype == "decimal":
prec, scale = charlen.split(",")
args = (int(prec), int(scale))
else:
args = (int(charlen),)
coltype = coltype(*args)
add_column = {
"name": col[0],
"type": coltype,
"comment": col[2],
"nullable": True,
"autoincrement": False,
"ordinalPosition": ordinal_pos,
}
column_info.append(add_column)
return column_info
@ -218,7 +304,7 @@ HiveDialect.get_columns = get_columns
HiveDialect.get_table_comment = get_table_comment
ImpalaDialect.get_columns = get_impala_columns
ImpalaDialect.get_table_comment = get_table_comment
ImpalaDialect.get_table_comment = get_impala_table_comment
HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0"
@ -251,13 +337,20 @@ class HiveSource(CommonDbSourceService):
Fetching views in hive server with query "SHOW VIEWS" was possible
only after hive 2.2.0 version
"""
result = dict(self.engine.execute("SELECT VERSION()").fetchone())
version = result.get("_c0", "").split()
if version and self._parse_version(version[0]) >= self._parse_version(
HIVE_VERSION_WITH_VIEW_SUPPORT
):
HiveDialect.get_table_names = get_table_names
HiveDialect.get_view_names = get_view_names
if self.engine.driver == Dialects.Impala:
ImpalaDialect.get_table_names = get_impala_table_names
ImpalaDialect.get_view_names = get_impala_view_names
ImpalaDialect.get_table_comment = get_impala_table_comment
ImpalaDialect.get_columns = get_impala_columns
else:
HiveDialect.get_table_names = get_table_names_older_versions
HiveDialect.get_view_names = get_view_names_older_versions
result = dict(self.engine.execute("SELECT VERSION()").fetchone())
version = result.get("_c0", "").split()
if version and self._parse_version(version[0]) >= self._parse_version(
HIVE_VERSION_WITH_VIEW_SUPPORT
):
HiveDialect.get_table_names = get_table_names
HiveDialect.get_view_names = get_view_names
else:
HiveDialect.get_table_names = get_table_names_older_versions
HiveDialect.get_view_names = get_view_names_older_versions

View File

@ -282,6 +282,7 @@ class SqlColumnHandlerMixin:
constraint=col_constraint,
children=children,
arrayDataType=arr_data_type,
ordinalPosition=column.get("ordinalPosition"),
)
if precision:
om_column.precision = precision[0]

View File

@ -105,6 +105,21 @@ def check_snowflake_case_sensitive(table_service_type, table_or_col) -> Optional
return None
def check_if_should_quote_column_name(table_service_type) -> Optional[bool]:
"""Check whether column name should be quoted when passed into the sql command build up.
This is important when a column name is the same as a reserve word and causes a sql error.
Args:
table_service_type: the main sql engine to determine if we should always quote.
Return: True or False
"""
if table_service_type == databaseService.DatabaseServiceType.Hive:
return True
return None
def build_orm_col(
idx: int, col: Column, table_service_type, parent: Optional[str] = None
) -> sqlalchemy.Column:
@ -119,7 +134,6 @@ def build_orm_col(
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
if parent:
name = f"{parent}.{col.name.__root__}"
else:
@ -129,7 +143,8 @@ def build_orm_col(
name=str(name),
type_=map_types(col, table_service_type),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_snowflake_case_sensitive(table_service_type, col.name.__root__),
quote=check_if_should_quote_column_name(table_service_type)
or check_snowflake_case_sensitive(table_service_type, col.name.__root__),
key=str(
col.name.__root__
).lower(), # Add lowercase column name as key for snowflake case sensitive columns

View File

@ -73,7 +73,6 @@ def _(elements, compiler, **kwargs):
@compiles(MedianFn, Dialects.Hive)
@compiles(MedianFn, Dialects.Impala)
def _(elements, compiler, **kwargs):
"""Median computation for Hive"""
col, _, percentile = [
@ -82,6 +81,39 @@ def _(elements, compiler, **kwargs):
return "percentile(cast(%s as BIGINT), %s)" % (col, percentile)
@compiles(MedianFn, Dialects.Impala)
def _(elements, compiler, **kwargs):
"""Median computation for Impala
Median compution for Impala uses the appx_median function.
OM uses this median function to also compute first and third quartiles.
These calculations are not supported with a simple function inside Impala.
The if statement returns null when we are not looking for the .5 precentile
In Impala to get the first quartile a full SQL statement like this is necessary:
with ntiles as
(
select filesize, ntile(4) over (order by filesize) as quarter
from hdfs_files
)
, quarters as
(
select 1 as grp, max(filesize) as quartile_value, quarter
from ntiles
group by quarter
)
select max(case when quarter = 1 then quartile_value end) as first_q
, max(case when quarter = 2 then quartile_value end) as second_q
, max(case when quarter = 3 then quartile_value end) as third_q
, max(case when quarter = 4 then quartile_value end) as fourth_q
from quarters
group by grp
;
"""
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "if(%s = .5, appx_median(%s), null)" % (percentile, col)
@compiles(MedianFn, Dialects.MySQL)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
"""Median computation for MySQL currently not supported