diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 5b0a1e3c35..3e22522461 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1497,9 +1497,17 @@ class DatahubGEProfiler: logger.error( f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts." ) + if platform == DATABRICKS: + # TODO: Review logic for BigQuery as well, probably project.dataset.table should be quoted there as well + quoted_name = ".".join( + batch.engine.dialect.identifier_preparer.quote(part) + for part in name_parts + ) + batch._table = sa.text(quoted_name) + logger.debug(f"Setting quoted table name to be {batch._table}") # If we only have two parts that means the project_id is missing from the table name and we add it # Temp tables has 3 parts while normal tables only has 2 parts - if len(str(batch._table).split(".")) == 2: + elif len(str(batch._table).split(".")) == 2: batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}") logger.debug(f"Setting table name to be {batch._table}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 0dfa61f2f9..5d31a6c1a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -521,9 +521,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin): @cached(cachetools.FIFOCache(maxsize=100)) def get_schema_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]: """Optimized version using databricks-sql""" - logger.info(f"Fetching schema tags for catalog: {catalog}") + logger.info(f"Fetching schema tags for catalog: `{catalog}`") - query = f"SELECT * FROM {catalog}.information_schema.schema_tags" + query = f"SELECT * FROM `{catalog}`.information_schema.schema_tags" rows = self._execute_sql_query(query) result_dict: Dict[str, List[UnityCatalogTag]] = {} @@ -544,9 +544,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin): @cached(cachetools.FIFOCache(maxsize=100)) def get_catalog_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]: """Optimized version using databricks-sql""" - logger.info(f"Fetching table tags for catalog: {catalog}") + logger.info(f"Fetching table tags for catalog: `{catalog}`") - query = f"SELECT * FROM {catalog}.information_schema.catalog_tags" + query = f"SELECT * FROM `{catalog}`.information_schema.catalog_tags" rows = self._execute_sql_query(query) result_dict: Dict[str, List[UnityCatalogTag]] = {} @@ -566,9 +566,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin): @cached(cachetools.FIFOCache(maxsize=100)) def get_table_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]: """Optimized version using databricks-sql""" - logger.info(f"Fetching table tags for catalog: {catalog}") + logger.info(f"Fetching table tags for catalog: `{catalog}`") - query = f"SELECT * FROM {catalog}.information_schema.table_tags" + query = f"SELECT * FROM `{catalog}`.information_schema.table_tags" rows = self._execute_sql_query(query) result_dict: Dict[str, List[UnityCatalogTag]] = {} @@ -589,9 +589,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin): @cached(cachetools.FIFOCache(maxsize=100)) def get_column_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]: """Optimized version using databricks-sql""" - logger.info(f"Fetching column tags for catalog: {catalog}") + logger.info(f"Fetching column tags for catalog: `{catalog}`") - query = f"SELECT * FROM {catalog}.information_schema.column_tags" + query = f"SELECT * FROM `{catalog}`.information_schema.column_tags" rows = self._execute_sql_query(query) result_dict: Dict[str, List[UnityCatalogTag]] = {}