mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-23 23:04:23 +00:00
fix: explicitly state USE CATALOG
for databricks connection (#10940)
This commit is contained in:
parent
4683bee91a
commit
06b8d8e7ce
@ -68,6 +68,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol):
|
|||||||
get_connection(self.service_connection_config)
|
get_connection(self.service_connection_config)
|
||||||
)
|
)
|
||||||
self.set_session_tag(self.session)
|
self.set_session_tag(self.session)
|
||||||
|
self.set_catalog(self.session)
|
||||||
|
|
||||||
self._table = self._convert_table_to_orm_object(sqa_metadata_obj)
|
self._table = self._convert_table_to_orm_object(sqa_metadata_obj)
|
||||||
|
|
||||||
|
@ -20,6 +20,9 @@ from typing import Optional
|
|||||||
from sqlalchemy import Column, MetaData, inspect
|
from sqlalchemy import Column, MetaData, inspect
|
||||||
from sqlalchemy.orm import DeclarativeMeta
|
from sqlalchemy.orm import DeclarativeMeta
|
||||||
|
|
||||||
|
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
|
||||||
|
DatabricksConnection,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
|
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
|
||||||
SnowflakeType,
|
SnowflakeType,
|
||||||
)
|
)
|
||||||
@ -80,6 +83,19 @@ class SQAInterfaceMixin:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def set_catalog(self, session) -> None:
|
||||||
|
"""Set catalog for the session. Right now only databricks requires it
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session (Session): sqa session object
|
||||||
|
"""
|
||||||
|
if isinstance(self.service_connection_config, DatabricksConnection):
|
||||||
|
bind = session.get_bind()
|
||||||
|
bind.execute(
|
||||||
|
"USE CATALOG %(catalog)s;",
|
||||||
|
{"catalog": self.service_connection_config.catalog},
|
||||||
|
).first()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""close session"""
|
"""close session"""
|
||||||
self.session.close()
|
self.session.close()
|
||||||
|
@ -100,6 +100,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
|
|||||||
self.session_factory = self._session_factory(service_connection_config)
|
self.session_factory = self._session_factory(service_connection_config)
|
||||||
self.session = self.session_factory()
|
self.session = self.session_factory()
|
||||||
self.set_session_tag(self.session)
|
self.set_session_tag(self.session)
|
||||||
|
self.set_catalog(self.session)
|
||||||
|
|
||||||
self.profile_sample_config = profile_sample_config
|
self.profile_sample_config = profile_sample_config
|
||||||
self.profile_query = sample_query
|
self.profile_query = sample_query
|
||||||
@ -392,6 +393,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
|
|||||||
Session = self.session_factory # pylint: disable=invalid-name
|
Session = self.session_factory # pylint: disable=invalid-name
|
||||||
with Session() as session:
|
with Session() as session:
|
||||||
self.set_session_tag(session)
|
self.set_session_tag(session)
|
||||||
|
self.set_catalog(session)
|
||||||
sampler = self._create_thread_safe_sampler(
|
sampler = self._create_thread_safe_sampler(
|
||||||
session,
|
session,
|
||||||
table,
|
table,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user