bugfix(fivetran/unity): move UnityCatalogConnectionConfig import to avoid circular deps with ge_profiler (#14956)

This commit is contained in:
Anush Kumar 2025-10-08 23:45:56 -07:00 committed by GitHub
parent b6ff38d1c3
commit 2a4f57791b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 63 additions and 55 deletions

View File

@ -29,9 +29,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import ( from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase, StatefulIngestionConfigBase,
) )
from datahub.ingestion.source.unity.config import ( from datahub.ingestion.source.unity.connection import UnityCatalogConnectionConfig
UnityCatalogConnectionConfig,
)
from datahub.utilities.lossy_collections import LossyList from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer

View File

@ -2,7 +2,6 @@ import logging
import os import os
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse
import pydantic import pydantic
from pydantic import Field from pydantic import Field
@ -20,10 +19,8 @@ from datahub.configuration.source_common import (
) )
from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.ge_data_profiler import DATABRICKS
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig, StatefulStaleMetadataRemovalConfig,
) )
@ -31,6 +28,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase, StatefulIngestionConfigBase,
StatefulProfilingConfigMixin, StatefulProfilingConfigMixin,
) )
from datahub.ingestion.source.unity.connection import UnityCatalogConnectionConfig
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_config.operation_config import ( from datahub.ingestion.source_config.operation_config import (
OperationConfig, OperationConfig,
@ -132,55 +130,6 @@ class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig
) )
class UnityCatalogConnectionConfig(ConfigModel):
"""
Configuration for connecting to Databricks Unity Catalog.
Contains only connection-related fields that can be reused across different sources.
"""
scheme: str = DATABRICKS
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(
description="Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com"
)
warehouse_id: Optional[str] = pydantic.Field(
default=None,
description=(
"SQL Warehouse id, for running queries. Must be explicitly provided to enable SQL-based features. "
"Required for the following features that need SQL access: "
"1) Tag extraction (include_tags=True) - queries system.information_schema.tags "
"2) Hive Metastore catalog (include_hive_metastore=True) - queries legacy hive_metastore catalog "
"3) System table lineage (lineage_data_source=SYSTEM_TABLES) - queries system.access.table_lineage/column_lineage "
"4) Data profiling (profiling.enabled=True) - runs SELECT/ANALYZE queries on tables. "
"When warehouse_id is missing, these features will be automatically disabled (with warnings) to allow ingestion to continue."
),
)
extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to Databricks SQLAlchemy client.",
)
def __init__(self, **data: Any):
super().__init__(**data)
def get_sql_alchemy_url(self, database: Optional[str] = None) -> str:
uri_opts = {"http_path": f"/sql/1.0/warehouses/{self.warehouse_id}"}
if database:
uri_opts["catalog"] = database
return make_sqlalchemy_uri(
scheme=self.scheme,
username="token",
password=self.token,
at=urlparse(self.workspace_url).netloc,
db=database,
uri_opts=uri_opts,
)
def get_options(self) -> dict:
return self.extra_client_options
class UnityCatalogSourceConfig( class UnityCatalogSourceConfig(
UnityCatalogConnectionConfig, UnityCatalogConnectionConfig,
SQLCommonConfig, SQLCommonConfig,

View File

@ -0,0 +1,61 @@
"""Databricks Unity Catalog connection configuration."""
from typing import Any, Dict, Optional
from urllib.parse import urlparse
import pydantic
from pydantic import Field
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
DATABRICKS = "databricks"
class UnityCatalogConnectionConfig(ConfigModel):
"""
Configuration for connecting to Databricks Unity Catalog.
Contains only connection-related fields that can be reused across different sources.
"""
scheme: str = DATABRICKS
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(
description="Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com"
)
warehouse_id: Optional[str] = pydantic.Field(
default=None,
description=(
"SQL Warehouse id, for running queries. Must be explicitly provided to enable SQL-based features. "
"Required for the following features that need SQL access: "
"1) Tag extraction (include_tags=True) - queries system.information_schema.tags "
"2) Hive Metastore catalog (include_hive_metastore=True) - queries legacy hive_metastore catalog "
"3) System table lineage (lineage_data_source=SYSTEM_TABLES) - queries system.access.table_lineage/column_lineage "
"4) Data profiling (profiling.enabled=True) - runs SELECT/ANALYZE queries on tables. "
"When warehouse_id is missing, these features will be automatically disabled (with warnings) to allow ingestion to continue."
),
)
extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to Databricks SQLAlchemy client.",
)
def __init__(self, **data: Any):
super().__init__(**data)
def get_sql_alchemy_url(self, database: Optional[str] = None) -> str:
uri_opts = {"http_path": f"/sql/1.0/warehouses/{self.warehouse_id}"}
if database:
uri_opts["catalog"] = database
return make_sqlalchemy_uri(
scheme=self.scheme,
username="token",
password=self.token,
at=urlparse(self.workspace_url).netloc,
db=database,
uri_opts=uri_opts,
)
def get_options(self) -> dict:
return self.extra_client_options