From 2a4f57791b78ab64fd771c8cd0bb87c9f814b28c Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Wed, 8 Oct 2025 23:45:56 -0700 Subject: [PATCH] bugfix(fivetran/unity): move UnityCatalogConnectionConfig import to avoid circular deps with ge_profiler (#14956) --- .../ingestion/source/fivetran/config.py | 4 +- .../datahub/ingestion/source/unity/config.py | 53 +--------------- .../ingestion/source/unity/connection.py | 61 +++++++++++++++++++ 3 files changed, 63 insertions(+), 55 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/unity/connection.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 8d6e242376..72b758b768 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -29,9 +29,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) -from datahub.ingestion.source.unity.config import ( - UnityCatalogConnectionConfig, -) +from datahub.ingestion.source.unity.connection import UnityCatalogConnectionConfig from datahub.utilities.lossy_collections import LossyList from datahub.utilities.perf_timer import PerfTimer diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 640c085601..db4958f371 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -2,7 +2,6 @@ import logging import os from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Union -from urllib.parse import urlparse import pydantic from pydantic import Field @@ -20,10 +19,8 @@ from datahub.configuration.source_common import ( ) from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field -from datahub.ingestion.source.ge_data_profiler import DATABRICKS from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.sql.sql_config import SQLCommonConfig -from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, ) @@ -31,6 +28,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, StatefulProfilingConfigMixin, ) +from datahub.ingestion.source.unity.connection import UnityCatalogConnectionConfig from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_config.operation_config import ( OperationConfig, @@ -132,55 +130,6 @@ class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig ) -class UnityCatalogConnectionConfig(ConfigModel): - """ - Configuration for connecting to Databricks Unity Catalog. - Contains only connection-related fields that can be reused across different sources. - """ - - scheme: str = DATABRICKS - token: str = pydantic.Field(description="Databricks personal access token") - workspace_url: str = pydantic.Field( - description="Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com" - ) - warehouse_id: Optional[str] = pydantic.Field( - default=None, - description=( - "SQL Warehouse id, for running queries. Must be explicitly provided to enable SQL-based features. " - "Required for the following features that need SQL access: " - "1) Tag extraction (include_tags=True) - queries system.information_schema.tags " - "2) Hive Metastore catalog (include_hive_metastore=True) - queries legacy hive_metastore catalog " - "3) System table lineage (lineage_data_source=SYSTEM_TABLES) - queries system.access.table_lineage/column_lineage " - "4) Data profiling (profiling.enabled=True) - runs SELECT/ANALYZE queries on tables. " - "When warehouse_id is missing, these features will be automatically disabled (with warnings) to allow ingestion to continue." - ), - ) - - extra_client_options: Dict[str, Any] = Field( - default={}, - description="Additional options to pass to Databricks SQLAlchemy client.", - ) - - def __init__(self, **data: Any): - super().__init__(**data) - - def get_sql_alchemy_url(self, database: Optional[str] = None) -> str: - uri_opts = {"http_path": f"/sql/1.0/warehouses/{self.warehouse_id}"} - if database: - uri_opts["catalog"] = database - return make_sqlalchemy_uri( - scheme=self.scheme, - username="token", - password=self.token, - at=urlparse(self.workspace_url).netloc, - db=database, - uri_opts=uri_opts, - ) - - def get_options(self) -> dict: - return self.extra_client_options - - class UnityCatalogSourceConfig( UnityCatalogConnectionConfig, SQLCommonConfig, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/connection.py b/metadata-ingestion/src/datahub/ingestion/source/unity/connection.py new file mode 100644 index 0000000000..3f4c43c5e2 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/connection.py @@ -0,0 +1,61 @@ +"""Databricks Unity Catalog connection configuration.""" + +from typing import Any, Dict, Optional +from urllib.parse import urlparse + +import pydantic +from pydantic import Field + +from datahub.configuration.common import ConfigModel +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri + +DATABRICKS = "databricks" + + +class UnityCatalogConnectionConfig(ConfigModel): + """ + Configuration for connecting to Databricks Unity Catalog. + Contains only connection-related fields that can be reused across different sources. + """ + + scheme: str = DATABRICKS + token: str = pydantic.Field(description="Databricks personal access token") + workspace_url: str = pydantic.Field( + description="Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com" + ) + warehouse_id: Optional[str] = pydantic.Field( + default=None, + description=( + "SQL Warehouse id, for running queries. Must be explicitly provided to enable SQL-based features. " + "Required for the following features that need SQL access: " + "1) Tag extraction (include_tags=True) - queries system.information_schema.tags " + "2) Hive Metastore catalog (include_hive_metastore=True) - queries legacy hive_metastore catalog " + "3) System table lineage (lineage_data_source=SYSTEM_TABLES) - queries system.access.table_lineage/column_lineage " + "4) Data profiling (profiling.enabled=True) - runs SELECT/ANALYZE queries on tables. " + "When warehouse_id is missing, these features will be automatically disabled (with warnings) to allow ingestion to continue." + ), + ) + + extra_client_options: Dict[str, Any] = Field( + default={}, + description="Additional options to pass to Databricks SQLAlchemy client.", + ) + + def __init__(self, **data: Any): + super().__init__(**data) + + def get_sql_alchemy_url(self, database: Optional[str] = None) -> str: + uri_opts = {"http_path": f"/sql/1.0/warehouses/{self.warehouse_id}"} + if database: + uri_opts["catalog"] = database + return make_sqlalchemy_uri( + scheme=self.scheme, + username="token", + password=self.token, + at=urlparse(self.workspace_url).netloc, + db=database, + uri_opts=uri_opts, + ) + + def get_options(self) -> dict: + return self.extra_client_options