diff --git a/ingestion/src/metadata/ingestion/source/database/hive/metadata.py b/ingestion/src/metadata/ingestion/source/database/hive/metadata.py index d0f564e1679..8119ce0ec91 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/metadata.py @@ -30,11 +30,9 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.database.column_helpers import ( - remove_table_from_column_name, -) from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.hive.queries import HIVE_GET_COMMENTS +from metadata.profiler.orm.registry import Dialects complex_data_types = ["struct", "map", "array", "union"] @@ -184,33 +182,121 @@ def get_table_comment( # pylint: disable=unused-argument return {"text": None} -def get_impala_columns(self, connection, table_name, schema=None, **kwargs): +def get_impala_table_or_view_names(connection, schema=None, target_type="table"): + """ + Depending on the targetType returns either the Views or Tables + since they share the same method for getting their names. + """ + query = "show tables" + if schema: + query += " IN " + schema + + cursor = connection.execute(query) + results = cursor.fetchall() + tables_and_views = [result[0] for result in results] + + retvalue = [] + + for table_view in tables_and_views: + query = f"describe formatted `{schema}`.`{table_view}`" + cursor = connection.execute(query) + results = cursor.fetchall() + + for result in list(results): + data = result + if data[0].strip() == "Table Type:": + if target_type.lower() in data[1].lower(): + retvalue.append(table_view) + return retvalue + + +def get_impala_view_names( + self, connection, schema=None, **kw +): # pylint: disable=unused-argument + results = get_impala_table_or_view_names(connection, schema, "view") + return results + + +def get_impala_table_names( + self, connection, schema=None, **kw +): # pylint: disable=unused-argument + results = get_impala_table_or_view_names(connection, schema, "table") + return results + + +def get_impala_table_comment( + self, connection, table_name, schema_name, **kw +): # pylint: disable=unused-argument + """ + Gets the table comment from the describe formatted query result under the Table Parameters section. + """ + full_table_name = ( + f"{schema_name}.{table_name}" if schema_name is not None else table_name + ) + split_name = full_table_name.split(".") + query = f"describe formatted `{split_name[0]}`.`{split_name[1]}`" + cursor = connection.execute(query) + results = cursor.fetchall() + + found_table_parameters = False + try: + for result in list(results): + data = result + if not found_table_parameters and data[0].strip() == "Table Parameters:": + found_table_parameters = True + if found_table_parameters: + coltext = data[1].strip() if data[1] is not None else "" + if coltext == "comment": + return {"text": data[2]} + except Exception: + return {"text": None} + return {"text": None} + + +def get_impala_columns( + self, connection, table_name, schema=None, **kwargs +): # pylint: disable=unused-argument + # pylint: disable=too-many-locals """ Extracted from the Impala Dialect. We'll tune the implementation. By default, this gives us the column name as `table.column`. We just want to get `column`. """ - # pylint: disable=unused-argument full_table_name = f"{schema}.{table_name}" if schema is not None else table_name - query = f"SELECT * FROM {full_table_name} LIMIT 0" - cursor = connection.execute(query) - schema = cursor.cursor.description - # We need to fetch the empty results otherwise these queries remain in - # flight - cursor.fetchall() + split_name = full_table_name.split(".") + query = f"DESCRIBE `{split_name[0]}`.`{split_name[1]}`" + describe_table_rows = connection.execute(query) column_info = [] - for col in schema: - column_info.append( - { - "name": remove_table_from_column_name(table_name, col[0]), - # Using Hive's map instead of Impala's, as we are pointing to a Hive Server - # Passing the lower as Hive's map is based on lower strings. - "type": _type_map[col[1].lower()], - "nullable": True, - "autoincrement": False, - } - ) + ordinal_pos = 0 + for col in describe_table_rows: + ordinal_pos = ordinal_pos + 1 + col_raw = col[1] + attype = re.sub(r"\(.*\)", "", col[1]) + col_type = re.search(r"^\w+", col[1]).group(0) + try: + coltype = _type_map[col_type] + except KeyError: + util.warn(f"Did not recognize type '{col_raw}' of column '{col[0]}'") + coltype = types.NullType + charlen = re.search(r"\(([\d,]+)\)", col_raw.lower()) + if charlen: + charlen = charlen.group(1) + if attype == "decimal": + prec, scale = charlen.split(",") + args = (int(prec), int(scale)) + else: + args = (int(charlen),) + coltype = coltype(*args) + add_column = { + "name": col[0], + "type": coltype, + "comment": col[2], + "nullable": True, + "autoincrement": False, + "ordinalPosition": ordinal_pos, + } + column_info.append(add_column) return column_info @@ -218,7 +304,7 @@ HiveDialect.get_columns = get_columns HiveDialect.get_table_comment = get_table_comment ImpalaDialect.get_columns = get_impala_columns -ImpalaDialect.get_table_comment = get_table_comment +ImpalaDialect.get_table_comment = get_impala_table_comment HIVE_VERSION_WITH_VIEW_SUPPORT = "2.2.0" @@ -251,13 +337,20 @@ class HiveSource(CommonDbSourceService): Fetching views in hive server with query "SHOW VIEWS" was possible only after hive 2.2.0 version """ - result = dict(self.engine.execute("SELECT VERSION()").fetchone()) - version = result.get("_c0", "").split() - if version and self._parse_version(version[0]) >= self._parse_version( - HIVE_VERSION_WITH_VIEW_SUPPORT - ): - HiveDialect.get_table_names = get_table_names - HiveDialect.get_view_names = get_view_names + if self.engine.driver == Dialects.Impala: + ImpalaDialect.get_table_names = get_impala_table_names + ImpalaDialect.get_view_names = get_impala_view_names + ImpalaDialect.get_table_comment = get_impala_table_comment + ImpalaDialect.get_columns = get_impala_columns else: - HiveDialect.get_table_names = get_table_names_older_versions - HiveDialect.get_view_names = get_view_names_older_versions + result = dict(self.engine.execute("SELECT VERSION()").fetchone()) + + version = result.get("_c0", "").split() + if version and self._parse_version(version[0]) >= self._parse_version( + HIVE_VERSION_WITH_VIEW_SUPPORT + ): + HiveDialect.get_table_names = get_table_names + HiveDialect.get_view_names = get_view_names + else: + HiveDialect.get_table_names = get_table_names_older_versions + HiveDialect.get_view_names = get_view_names_older_versions diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index f7b8faf0c2a..5f4cb0745b9 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -282,6 +282,7 @@ class SqlColumnHandlerMixin: constraint=col_constraint, children=children, arrayDataType=arr_data_type, + ordinalPosition=column.get("ordinalPosition"), ) if precision: om_column.precision = precision[0] diff --git a/ingestion/src/metadata/profiler/orm/converter.py b/ingestion/src/metadata/profiler/orm/converter.py index 641845583e2..d84db486238 100644 --- a/ingestion/src/metadata/profiler/orm/converter.py +++ b/ingestion/src/metadata/profiler/orm/converter.py @@ -105,6 +105,21 @@ def check_snowflake_case_sensitive(table_service_type, table_or_col) -> Optional return None +def check_if_should_quote_column_name(table_service_type) -> Optional[bool]: + """Check whether column name should be quoted when passed into the sql command build up. + This is important when a column name is the same as a reserve word and causes a sql error. + + Args: + table_service_type: the main sql engine to determine if we should always quote. + Return: True or False + """ + + if table_service_type == databaseService.DatabaseServiceType.Hive: + return True + + return None + + def build_orm_col( idx: int, col: Column, table_service_type, parent: Optional[str] = None ) -> sqlalchemy.Column: @@ -119,7 +134,6 @@ def build_orm_col( As this is only used for INSERT/UPDATE/DELETE, there is no impact for our read-only purposes. """ - if parent: name = f"{parent}.{col.name.__root__}" else: @@ -129,7 +143,8 @@ def build_orm_col( name=str(name), type_=map_types(col, table_service_type), primary_key=not bool(idx), # The first col seen is used as PK - quote=check_snowflake_case_sensitive(table_service_type, col.name.__root__), + quote=check_if_should_quote_column_name(table_service_type) + or check_snowflake_case_sensitive(table_service_type, col.name.__root__), key=str( col.name.__root__ ).lower(), # Add lowercase column name as key for snowflake case sensitive columns diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index 2eeadfd37fa..829e17abbfd 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -73,7 +73,6 @@ def _(elements, compiler, **kwargs): @compiles(MedianFn, Dialects.Hive) -@compiles(MedianFn, Dialects.Impala) def _(elements, compiler, **kwargs): """Median computation for Hive""" col, _, percentile = [ @@ -82,6 +81,39 @@ def _(elements, compiler, **kwargs): return "percentile(cast(%s as BIGINT), %s)" % (col, percentile) +@compiles(MedianFn, Dialects.Impala) +def _(elements, compiler, **kwargs): + """Median computation for Impala + Median compution for Impala uses the appx_median function. + OM uses this median function to also compute first and third quartiles. + These calculations are not supported with a simple function inside Impala. + The if statement returns null when we are not looking for the .5 precentile + In Impala to get the first quartile a full SQL statement like this is necessary: + with ntiles as + ( + select filesize, ntile(4) over (order by filesize) as quarter + from hdfs_files + ) + , quarters as + ( + select 1 as grp, max(filesize) as quartile_value, quarter + from ntiles + group by quarter + ) + select max(case when quarter = 1 then quartile_value end) as first_q + , max(case when quarter = 2 then quartile_value end) as second_q + , max(case when quarter = 3 then quartile_value end) as third_q + , max(case when quarter = 4 then quartile_value end) as fourth_q + from quarters + group by grp + ; + """ + col, _, percentile = [ + compiler.process(element, **kwargs) for element in elements.clauses + ] + return "if(%s = .5, appx_median(%s), null)" % (percentile, col) + + @compiles(MedianFn, Dialects.MySQL) def _(elements, compiler, **kwargs): # pylint: disable=unused-argument """Median computation for MySQL currently not supported