mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-26 08:13:11 +00:00
Minor: Fix Databricks & Unity Catalog, Table or View Not found (#23014)
(cherry picked from commit 29bf750bde252c5a30eec4c7966217de5ab02274)
This commit is contained in:
parent
0808d37efa
commit
dd7682a3b5
@ -85,10 +85,28 @@ class DatabricksProfilerInterface(SQAProfilerInterface):
|
|||||||
result += "`.`".join(splitted_result)
|
result += "`.`".join(splitted_result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def visit_table(self, *args, **kwargs):
|
||||||
|
result = super( # pylint: disable=bad-super-call
|
||||||
|
HiveCompiler, self
|
||||||
|
).visit_table(*args, **kwargs)
|
||||||
|
# Handle table references with hyphens in database/schema names
|
||||||
|
# Format: `database`.`schema`.`table` for Unity Catalog/Databricks
|
||||||
|
if "." in result and not result.startswith("`"):
|
||||||
|
parts = result.split(".")
|
||||||
|
quoted_parts = []
|
||||||
|
for part in parts:
|
||||||
|
if "-" in part and not (part.startswith("`") and part.endswith("`")):
|
||||||
|
quoted_parts.append(f"`{part}`")
|
||||||
|
else:
|
||||||
|
quoted_parts.append(part)
|
||||||
|
result = ".".join(quoted_parts)
|
||||||
|
return result
|
||||||
|
|
||||||
def __init__(self, service_connection_config, **kwargs):
|
def __init__(self, service_connection_config, **kwargs):
|
||||||
super().__init__(service_connection_config=service_connection_config, **kwargs)
|
super().__init__(service_connection_config=service_connection_config, **kwargs)
|
||||||
self.set_catalog(self.session)
|
self.set_catalog(self.session)
|
||||||
HiveCompiler.visit_column = DatabricksProfilerInterface.visit_column
|
HiveCompiler.visit_column = DatabricksProfilerInterface.visit_column
|
||||||
|
HiveCompiler.visit_table = DatabricksProfilerInterface.visit_table
|
||||||
|
|
||||||
def _get_struct_columns(self, columns: List[OMColumn], parent: str):
|
def _get_struct_columns(self, columns: List[OMColumn], parent: str):
|
||||||
"""Get struct columns"""
|
"""Get struct columns"""
|
||||||
|
|||||||
@ -14,6 +14,8 @@ Interfaces with database for all database engine
|
|||||||
supporting sqlalchemy abstraction layer
|
supporting sqlalchemy abstraction layer
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from sqlalchemy import event
|
||||||
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||||
|
|
||||||
from metadata.ingestion.source.database.databricks.connection import (
|
from metadata.ingestion.source.database.databricks.connection import (
|
||||||
get_connection as databricks_get_connection,
|
get_connection as databricks_get_connection,
|
||||||
@ -26,5 +28,17 @@ from metadata.profiler.interface.sqlalchemy.databricks.profiler_interface import
|
|||||||
class UnityCatalogProfilerInterface(DatabricksProfilerInterface):
|
class UnityCatalogProfilerInterface(DatabricksProfilerInterface):
|
||||||
def create_session(self):
|
def create_session(self):
|
||||||
self.connection = databricks_get_connection(self.service_connection_config)
|
self.connection = databricks_get_connection(self.service_connection_config)
|
||||||
super().create_session()
|
|
||||||
self.set_catalog(self.session)
|
# Create custom session factory with after_begin event to set catalog
|
||||||
|
session_maker = sessionmaker(bind=self.connection)
|
||||||
|
|
||||||
|
@event.listens_for(session_maker, "after_begin")
|
||||||
|
def set_catalog(session, transaction, connection):
|
||||||
|
# Safely quote the catalog name to prevent SQL injection
|
||||||
|
quoted_catalog = connection.dialect.identifier_preparer.quote(
|
||||||
|
self.service_connection_config.catalog
|
||||||
|
)
|
||||||
|
connection.execute(f"USE CATALOG {quoted_catalog};")
|
||||||
|
|
||||||
|
self.session_factory = scoped_session(session_maker)
|
||||||
|
self.session = self.session_factory()
|
||||||
|
|||||||
@ -24,7 +24,7 @@ SYSTEM_QUERY = textwrap.dedent(
|
|||||||
'{database}' AS database,
|
'{database}' AS database,
|
||||||
'{schema}' AS schema,
|
'{schema}' AS schema,
|
||||||
'{table}' AS table
|
'{table}' AS table
|
||||||
FROM (DESCRIBE HISTORY {database}.{schema}.{table})
|
FROM (DESCRIBE HISTORY `{database}`.`{schema}`.`{table}`)
|
||||||
WHERE operation IN ({operations}) AND timestamp > DATEADD(day, -1, CURRENT_TIMESTAMP())
|
WHERE operation IN ({operations}) AND timestamp > DATEADD(day, -1, CURRENT_TIMESTAMP())
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|||||||
@ -11,7 +11,8 @@
|
|||||||
"""
|
"""
|
||||||
Helper module to handle data sampling for the profiler
|
Helper module to handle data sampling for the profiler
|
||||||
"""
|
"""
|
||||||
from sqlalchemy import Column, text
|
from sqlalchemy import Column, event, text
|
||||||
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||||
|
|
||||||
from metadata.ingestion.source.database.databricks.connection import (
|
from metadata.ingestion.source.database.databricks.connection import (
|
||||||
get_connection as databricks_get_connection,
|
get_connection as databricks_get_connection,
|
||||||
@ -25,6 +26,17 @@ class DatabricksSamplerInterface(SQASampler):
|
|||||||
"""Initialize with a single Databricks connection"""
|
"""Initialize with a single Databricks connection"""
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.connection = databricks_get_connection(self.service_connection_config)
|
self.connection = databricks_get_connection(self.service_connection_config)
|
||||||
|
session_maker = sessionmaker(bind=self.connection)
|
||||||
|
|
||||||
|
@event.listens_for(session_maker, "after_begin")
|
||||||
|
def set_catalog(session, transaction, connection):
|
||||||
|
# Safely quote the catalog name to prevent SQL injection
|
||||||
|
quoted_catalog = connection.dialect.identifier_preparer.quote(
|
||||||
|
self.service_connection_config.catalog
|
||||||
|
)
|
||||||
|
connection.execute(f"USE CATALOG {quoted_catalog};")
|
||||||
|
|
||||||
|
self.session_factory = scoped_session(session_maker)
|
||||||
|
|
||||||
def get_client(self):
|
def get_client(self):
|
||||||
"""client is the session for SQA"""
|
"""client is the session for SQA"""
|
||||||
|
|||||||
@ -14,47 +14,14 @@ Interfaces with database for all database engine
|
|||||||
supporting sqlalchemy abstraction layer
|
supporting sqlalchemy abstraction layer
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from sqlalchemy import Column, text
|
|
||||||
|
|
||||||
from metadata.ingestion.source.database.databricks.connection import (
|
from metadata.sampler.sqlalchemy.databricks.sampler import DatabricksSamplerInterface
|
||||||
get_connection as databricks_get_connection,
|
|
||||||
)
|
|
||||||
from metadata.profiler.orm.types.custom_array import CustomArray
|
|
||||||
from metadata.sampler.sqlalchemy.sampler import SQASampler
|
|
||||||
|
|
||||||
|
|
||||||
class UnityCatalogSamplerInterface(SQASampler):
|
class UnityCatalogSamplerInterface(DatabricksSamplerInterface):
|
||||||
|
"""
|
||||||
|
Unity Catalog Sampler Interface
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
"""Initialize with a single Databricks connection"""
|
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.connection = databricks_get_connection(self.service_connection_config)
|
|
||||||
|
|
||||||
def get_client(self):
|
|
||||||
"""client is the session for SQA"""
|
|
||||||
client = super().get_client()
|
|
||||||
self.set_catalog(client)
|
|
||||||
return client
|
|
||||||
|
|
||||||
def _handle_array_column(self, column: Column) -> bool:
|
|
||||||
"""Check if a column is an array type"""
|
|
||||||
return isinstance(column.type, CustomArray)
|
|
||||||
|
|
||||||
def _get_slice_expression(self, column: Column):
|
|
||||||
"""Generate SQL expression to slice array elements at query level
|
|
||||||
|
|
||||||
Args:
|
|
||||||
column_name: Name of the column
|
|
||||||
max_elements: Maximum number of elements to extract
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
SQL expression string for array slicing
|
|
||||||
"""
|
|
||||||
max_elements = self._get_max_array_elements()
|
|
||||||
return text(
|
|
||||||
f"""
|
|
||||||
CASE
|
|
||||||
WHEN `{column.name}` IS NULL THEN NULL
|
|
||||||
ELSE slice(`{column.name}`, 1, {max_elements})
|
|
||||||
END AS `{column._label}`
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|||||||
@ -212,6 +212,8 @@ def get_log_name(record: Entity) -> Optional[str]:
|
|||||||
try:
|
try:
|
||||||
if hasattr(record, "name"):
|
if hasattr(record, "name"):
|
||||||
return f"{type(record).__name__} [{getattr(record, 'name').root}]"
|
return f"{type(record).__name__} [{getattr(record, 'name').root}]"
|
||||||
|
if hasattr(record, "table") and hasattr(record.table, "name"):
|
||||||
|
return f"{type(record).__name__} [{record.table.name.root}]"
|
||||||
return f"{type(record).__name__} [{record.entity.name.root}]"
|
return f"{type(record).__name__} [{record.entity.name.root}]"
|
||||||
except Exception:
|
except Exception:
|
||||||
return str(record)
|
return str(record)
|
||||||
|
|||||||
@ -73,7 +73,7 @@ class UnityCatalogSamplerTest(TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@patch(
|
@patch(
|
||||||
"metadata.sampler.sqlalchemy.unitycatalog.sampler.SQASampler.build_table_orm"
|
"metadata.sampler.sqlalchemy.unitycatalog.sampler.UnityCatalogSamplerInterface.build_table_orm"
|
||||||
)
|
)
|
||||||
def test_handle_array_column(self, mock_build_table_orm):
|
def test_handle_array_column(self, mock_build_table_orm):
|
||||||
"""Test array column detection"""
|
"""Test array column detection"""
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user