Fixes 12947: Add Support For DQ and Profiler in Databricks Unity Catalog (#14424)

This commit is contained in:
Ayush Shah 2023-12-20 21:18:05 +05:30 committed by GitHub
parent 9388c39b46
commit ebc0a551e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 386 additions and 32 deletions

View File

@ -0,0 +1,29 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import (
SQATestSuiteInterface,
)
class DatabricksTestSuiteInterface(SQATestSuiteInterface):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def create_session(self):
super().create_session()
self.set_catalog(self.session)

View File

@ -0,0 +1,29 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import (
SQATestSuiteInterface,
)
class SnowflakeTestSuiteInterface(SQATestSuiteInterface):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def create_session(self):
super().create_session()
self.set_session_tag(self.session)

View File

@ -58,12 +58,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface):
self.ometa_client = ometa_client
self.table_entity = table_entity
self.service_connection_config = service_connection_config
self.session = create_and_bind_session(
get_connection(self.service_connection_config)
)
self.set_session_tag(self.session)
self.set_catalog(self.session)
self.create_session()
self._table = self._convert_table_to_orm_object(sqa_metadata)
(
@ -75,6 +70,11 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface):
self._sampler = self._create_sampler()
self._runner = self._create_runner()
def create_session(self):
self.session = create_and_bind_session(
get_connection(self.service_connection_config)
)
@property
def sample(self) -> Union[DeclarativeMeta, AliasedClass]:
"""_summary_

View File

@ -0,0 +1,35 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import (
SQATestSuiteInterface,
)
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.source.database.databricks.connection import (
get_connection as databricks_get_connection,
)
class UnityCatalogTestSuiteInterface(SQATestSuiteInterface):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def create_session(self):
self.session = create_and_bind_session(
databricks_get_connection(self.service_connection_config)
)
self.set_catalog(self.session)

View File

@ -17,14 +17,32 @@ from logging import Logger
from metadata.data_quality.interface.pandas.pandas_test_suite_interface import (
PandasTestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.databricks.test_suite_interface import (
DatabricksTestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.snowflake.test_suite_interface import (
SnowflakeTestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import (
SQATestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.unity_catalog.test_suite_interface import (
UnityCatalogTestSuiteInterface,
)
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import test_suite_logger
@ -51,6 +69,17 @@ class TestSuiteInterfaceFactory:
"""
self._interface_type[interface_type] = interface
def register_many(self, interface_dict):
"""
Registers multiple profiler interfaces at once.
Args:
interface_dict: A dictionary mapping connection class names (strings) to their
corresponding profiler interface classes.
"""
for interface_type, interface_class in interface_dict.items():
self.register(interface_type, interface_class)
def create(
self,
service_connection_config: DatabaseConnection,
@ -86,3 +115,14 @@ class TestSuiteInterfaceFactory:
test_suite_interface_factory = TestSuiteInterfaceFactory()
test_suite_interface = {
DatabaseConnection.__name__: SQATestSuiteInterface,
DatalakeConnection.__name__: PandasTestSuiteInterface,
SnowflakeConnection.__name__: SnowflakeTestSuiteInterface,
UnityCatalogConnection.__name__: UnityCatalogTestSuiteInterface,
DatabricksConnection.__name__: DatabricksTestSuiteInterface,
}
test_suite_interface_factory.register_many(test_suite_interface)

View File

@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.database.databricksCo
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeType,
)
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_SESSION_TAG_QUERY,
@ -84,12 +87,16 @@ class SQAInterfaceMixin:
)
def set_catalog(self, session) -> None:
"""Set catalog for the session. Right now only databricks requires it
"""Set catalog for the session. Right now only databricks and unity catalog requires it
Args:
session (Session): sqa session object
"""
if isinstance(self.service_connection_config, DatabricksConnection):
if not isinstance(
self.service_connection_config,
(UnityCatalogConnection, DatabricksConnection),
):
return
bind = session.get_bind()
bind.execute(
"USE CATALOG %(catalog)s;",

View File

@ -18,6 +18,9 @@ from typing import cast
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
@ -30,6 +33,9 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
@ -38,6 +44,9 @@ from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.databricks.profiler_interface import (
DatabricksProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
@ -50,6 +59,9 @@ from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import
from metadata.profiler.interface.sqlalchemy.trino.profiler_interface import (
TrinoProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.unity_catalog.profiler_interface import (
UnityCatalogProfilerInterface,
)
class ProfilerInterfaceFactory:
@ -62,6 +74,17 @@ class ProfilerInterfaceFactory:
"""Register a new interface"""
self._interface_type[interface_type] = interface_class
def register_many(self, interface_dict):
"""
Registers multiple profiler interfaces at once.
Args:
interface_dict: A dictionary mapping connection class names (strings) to their
corresponding profiler interface classes.
"""
for interface_type, interface_class in interface_dict.items():
self.register(interface_type, interface_class)
def create(self, interface_type: str, *args, **kwargs):
"""Create interface object based on interface type"""
interface_class = self._interface_type.get(interface_type)
@ -72,17 +95,15 @@ class ProfilerInterfaceFactory:
profiler_interface_factory = ProfilerInterfaceFactory()
profiler_interface_factory.register(DatabaseConnection.__name__, SQAProfilerInterface)
profiler_interface_factory.register(
BigQueryConnection.__name__, BigQueryProfilerInterface
)
profiler_interface_factory.register(
SingleStoreConnection.__name__, SingleStoreProfilerInterface
)
profiler_interface_factory.register(
DatalakeConnection.__name__, PandasProfilerInterface
)
profiler_interface_factory.register(
SnowflakeConnection.__name__, SnowflakeProfilerInterface
)
profiler_interface_factory.register(TrinoConnection.__name__, TrinoProfilerInterface)
profilers = {
DatabaseConnection.__name__: SQAProfilerInterface,
BigQueryConnection.__name__: BigQueryProfilerInterface,
SingleStoreConnection.__name__: SingleStoreProfilerInterface,
DatalakeConnection.__name__: PandasProfilerInterface,
SnowflakeConnection.__name__: SnowflakeProfilerInterface,
TrinoConnection.__name__: TrinoProfilerInterface,
UnityCatalogConnection.__name__: UnityCatalogProfilerInterface,
DatabricksConnection.__name__: DatabricksProfilerInterface,
}
profiler_interface_factory.register_many(profilers)

View File

@ -0,0 +1,26 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
class DatabricksProfilerInterface(SQAProfilerInterface):
def __init__(self, service_connection_config, **kwargs):
super().__init__(service_connection_config=service_connection_config, **kwargs)
self.set_catalog(self.session)

View File

@ -104,10 +104,11 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
)
self._table = self._convert_table_to_orm_object(sqa_metadata)
self.create_session()
def create_session(self):
self.session_factory = self._session_factory()
self.session = self.session_factory()
self.set_session_tag(self.session)
self.set_catalog(self.session)
@property
def table(self):

View File

@ -29,6 +29,13 @@ class SnowflakeProfilerInterface(SQAProfilerInterface):
sqlalchemy.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def create_session(self):
super().create_session()
self.set_session_tag(self.session)
def _programming_error_static_metric(self, runner, column, exc, session, metrics):
if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get(
session.bind.dialect.name

View File

@ -0,0 +1,33 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.ingestion.source.database.databricks.connection import (
get_connection as databricks_get_connection,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
class UnityCatalogProfilerInterface(SQAProfilerInterface):
def __init__(self, service_connection_config, **kwargs):
super().__init__(service_connection_config=service_connection_config, **kwargs)
def create_session(self):
self.connection = databricks_get_connection(self.service_connection_config)
super().create_session()
self.set_catalog(self.session)

View File

@ -30,6 +30,28 @@ from metadata.generated.schema.entity.services.connections.connectionBasicType i
DataStorageConfig,
SampleDataStorageConfig,
)
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
@ -40,7 +62,34 @@ from metadata.profiler.api.models import (
ProfileSampleConfig,
TableConfig,
)
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.interface.profiler_interface_factory import (
ProfilerInterfaceFactory,
)
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.databricks.profiler_interface import (
DatabricksProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.single_store.profiler_interface import (
SingleStoreProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import (
SnowflakeProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.trino.profiler_interface import (
TrinoProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.unity_catalog.profiler_interface import (
UnityCatalogProfilerInterface,
)
class ProfilerInterfaceTest(TestCase):
@ -305,3 +354,31 @@ class ProfilerInterfaceTest(TestCase):
schema_config, table_fqn="demo"
),
)
def test_register_many(self):
# Initialize factory
factory = ProfilerInterfaceFactory()
# Define profiles dictionary
profiles = {
DatabaseConnection.__name__: SQAProfilerInterface,
BigQueryConnection.__name__: BigQueryProfilerInterface,
SingleStoreConnection.__name__: SingleStoreProfilerInterface,
DatalakeConnection.__name__: PandasProfilerInterface,
SnowflakeConnection.__name__: SnowflakeProfilerInterface,
TrinoConnection.__name__: TrinoProfilerInterface,
UnityCatalogConnection.__name__: UnityCatalogProfilerInterface,
DatabricksConnection.__name__: DatabricksProfilerInterface,
}
# Register profiles
factory.register_many(profiles)
# Assert all expected interfaces are registered
expected_interfaces = set(profiles.keys())
actual_interfaces = set(factory._interface_type.keys())
assert expected_interfaces == actual_interfaces
# Assert profiler classes match registered interfaces
for interface_type, interface_class in profiles.items():
assert factory._interface_type[interface_type] == interface_class

View File

@ -18,12 +18,25 @@ from pytest import mark
from metadata.data_quality.interface.pandas.pandas_test_suite_interface import (
PandasTestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.databricks.test_suite_interface import (
DatabricksTestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.snowflake.test_suite_interface import (
SnowflakeTestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import (
SQATestSuiteInterface,
)
from metadata.data_quality.interface.sqlalchemy.unity_catalog.test_suite_interface import (
UnityCatalogTestSuiteInterface,
)
from metadata.data_quality.interface.test_suite_interface_factory import (
TestSuiteInterfaceFactory,
test_suite_interface_factory,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
S3Config,
)
@ -33,6 +46,12 @@ from metadata.generated.schema.entity.services.connections.database.datalakeConn
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
MYSQL_CONNECTION_CONFIG = MysqlConnection(
@ -73,3 +92,29 @@ def test_interface_factory(
table_entity=None, # type: ignore
)
assert interface.__class__ == expected_interface
def test_register_many():
# Initialize factory
factory = TestSuiteInterfaceFactory()
test_suite_interfaces = {
"base": SQATestSuiteInterface,
DatalakeConnection.__name__: PandasTestSuiteInterface,
SnowflakeConnection.__name__: SnowflakeTestSuiteInterface,
UnityCatalogConnection.__name__: UnityCatalogTestSuiteInterface,
DatabricksConnection.__name__: DatabricksTestSuiteInterface,
}
# Register profiles
factory.register_many(test_suite_interfaces)
# Assert all expected interfaces are registered
expected_interfaces = set(test_suite_interfaces.keys())
actual_interfaces = set(factory._interface_type.keys())
assert expected_interfaces == actual_interfaces
# Assert profiler classes match registered interfaces
for interface_type, interface_class in test_suite_interfaces.items():
assert factory._interface_type[interface_type] == interface_class

View File

@ -24,7 +24,7 @@
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/databricksType",
"default": "Databricks"
"default": "UnityCatalog"
},
"scheme": {
"title": "Connection Scheme",
@ -85,6 +85,10 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Database",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"