fix(ingestion/databricks): Fix quoting of catalog/schema/table names for databricks (#14203)

This commit is contained in:
skrydal 2025-07-24 15:39:34 +02:00 committed by GitHub
parent 01df6e8723
commit 08c587d065
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 9 deletions

View File

@ -1497,9 +1497,17 @@ class DatahubGEProfiler:
logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
)
if platform == DATABRICKS:
# TODO: Review logic for BigQuery as well, probably project.dataset.table should be quoted there as well
quoted_name = ".".join(
batch.engine.dialect.identifier_preparer.quote(part)
for part in name_parts
)
batch._table = sa.text(quoted_name)
logger.debug(f"Setting quoted table name to be {batch._table}")
# If we only have two parts that means the project_id is missing from the table name and we add it
# Temp tables has 3 parts while normal tables only has 2 parts
if len(str(batch._table).split(".")) == 2:
elif len(str(batch._table).split(".")) == 2:
batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}")
logger.debug(f"Setting table name to be {batch._table}")

View File

@ -521,9 +521,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
@cached(cachetools.FIFOCache(maxsize=100))
def get_schema_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching schema tags for catalog: {catalog}")
logger.info(f"Fetching schema tags for catalog: `{catalog}`")
query = f"SELECT * FROM {catalog}.information_schema.schema_tags"
query = f"SELECT * FROM `{catalog}`.information_schema.schema_tags"
rows = self._execute_sql_query(query)
result_dict: Dict[str, List[UnityCatalogTag]] = {}
@ -544,9 +544,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
@cached(cachetools.FIFOCache(maxsize=100))
def get_catalog_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching table tags for catalog: {catalog}")
logger.info(f"Fetching table tags for catalog: `{catalog}`")
query = f"SELECT * FROM {catalog}.information_schema.catalog_tags"
query = f"SELECT * FROM `{catalog}`.information_schema.catalog_tags"
rows = self._execute_sql_query(query)
result_dict: Dict[str, List[UnityCatalogTag]] = {}
@ -566,9 +566,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
@cached(cachetools.FIFOCache(maxsize=100))
def get_table_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching table tags for catalog: {catalog}")
logger.info(f"Fetching table tags for catalog: `{catalog}`")
query = f"SELECT * FROM {catalog}.information_schema.table_tags"
query = f"SELECT * FROM `{catalog}`.information_schema.table_tags"
rows = self._execute_sql_query(query)
result_dict: Dict[str, List[UnityCatalogTag]] = {}
@ -589,9 +589,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
@cached(cachetools.FIFOCache(maxsize=100))
def get_column_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching column tags for catalog: {catalog}")
logger.info(f"Fetching column tags for catalog: `{catalog}`")
query = f"SELECT * FROM {catalog}.information_schema.column_tags"
query = f"SELECT * FROM `{catalog}`.information_schema.column_tags"
rows = self._execute_sql_query(query)
result_dict: Dict[str, List[UnityCatalogTag]] = {}