fix: DBX profiler struct support + nested colum retrieval for profiler (#17267)

This commit is contained in:
Teddy 2024-08-02 11:34:10 +02:00 committed by GitHub
parent 4bb6d7ec1c
commit 2a854f90e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 14 deletions

View File

@ -17,7 +17,6 @@ from typing import List
from pyhive.sqlalchemy_hive import HiveCompiler
from sqlalchemy import Column, inspect
from sqlalchemy.sql import column
from metadata.generated.schema.entity.data.table import Column as OMColumn
from metadata.generated.schema.entity.data.table import ColumnName, DataType, TableData
@ -61,20 +60,26 @@ class DatabricksProfilerInterface(SQAProfilerInterface):
columns_list = []
for col in columns:
if col.dataType != DataType.STRUCT:
col.name = ColumnName(f"{parent}.{col.name.root}")
col = build_orm_col(
idx=1, col=col, table_service_type=DatabaseServiceType.Databricks
# For DBX struct we need to quote the column name as `a`.`b`.`c`
# otherwise the driver will quote it as `a.b.c`
col_name = ".".join([f"`{part}`" for part in parent.split(".")])
col.name = ColumnName(f"{col_name}.`{col.name.root}`")
# Set `_quote` to False to avoid quoting the column name again when compiled
sqa_col = build_orm_col(
idx=1,
col=col,
table_service_type=DatabaseServiceType.Databricks,
_quote=False,
)
col._set_parent( # pylint: disable=protected-access
sqa_col._set_parent( # pylint: disable=protected-access
self.table.__table__
)
columns_list.append(column(col.label(col.name.replace(".", "_"))))
columns_list.append(sqa_col)
else:
col = self._get_struct_columns(
cols = self._get_struct_columns(
col.children, f"{parent}.{col.name.root}"
)
columns_list.extend(col)
columns_list.extend(cols)
return columns_list
def get_columns(self) -> Column:
@ -86,7 +91,7 @@ class DatabricksProfilerInterface(SQAProfilerInterface):
self._get_struct_columns(column_obj.children, column_obj.name.root)
)
else:
col = build_orm_col(idx, column, DatabaseServiceType.Databricks)
col = build_orm_col(idx, column_obj, DatabaseServiceType.Databricks)
col._set_parent( # pylint: disable=protected-access
self.table.__table__
)

View File

@ -64,7 +64,9 @@ def check_if_should_quote_column_name(table_service_type) -> Optional[bool]:
return None
def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column:
def build_orm_col(
idx: int, col: Column, table_service_type, *, _quote=None
) -> sqlalchemy.Column:
"""
Cook the ORM column from our metadata instance
information.
@ -76,14 +78,20 @@ def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Colum
As this is only used for INSERT/UPDATE/DELETE,
there is no impact for our read-only purposes.
"""
if _quote is not None:
quote = _quote
else:
quote = check_if_should_quote_column_name(
table_service_type
) or check_snowflake_case_sensitive(table_service_type, col.name.root)
return sqlalchemy.Column(
name=str(col.name.root),
type_=converter_registry[table_service_type]().map_types(
col, table_service_type
),
primary_key=not bool(idx), # The first col seen is used as PK
quote=check_if_should_quote_column_name(table_service_type)
or check_snowflake_case_sensitive(table_service_type, col.name.root),
quote=quote,
key=str(
col.name.root
).lower(), # Add lowercase column name as key for snowflake case sensitive columns

View File

@ -181,7 +181,11 @@ public class FullyQualifiedName {
public static String getTableFQN(String columnFQN) {
// Split columnFQN of format databaseServiceName.databaseName.tableName.columnName
String[] split = split(columnFQN);
if (split.length != 5) {
// column FQN for struct columns are of format
// service.database.schema.table.column.child1.child2
// and not service.database.schema.table."column.child1.child2" so split length should be 5 or
// more
if (split.length < 5) {
throw new IllegalArgumentException("Invalid fully qualified column name " + columnFQN);
}
// Return table FQN of format databaseService.tableName