feat(databricks): adds Azure oauth to Databricks (#15117)

Co-authored-by: pjain155_uhg <anshul_p@optum.com>
This commit is contained in:
P Anshul Jain 2025-11-06 19:46:34 +05:30 committed by GitHub
parent 71daaa9ac0
commit bdb46d9909
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 345 additions and 34 deletions

View File

@ -4,27 +4,44 @@
- Create a [Databricks Service Principal](https://docs.databricks.com/administration-guide/users-groups/service-principals.html#what-is-a-service-principal)
- You can skip this step and use your own account to get things running quickly,
but we strongly recommend creating a dedicated service principal for production use.
#### Authentication Options
You can authenticate with Databricks using either a Personal Access Token or Azure authentication:
**Option 1: Personal Access Token (PAT)**
- Generate a Databricks Personal Access token following the following guides:
- [Service Principals](https://docs.databricks.com/administration-guide/users-groups/service-principals.html#personal-access-tokens)
- [Personal Access Tokens](https://docs.databricks.com/dev-tools/auth.html#databricks-personal-access-tokens)
- Provision your service account:
- To ingest your workspace's metadata and lineage, your service principal must have all of the following:
- One of: metastore admin role, ownership of, or `USE CATALOG` privilege on any catalogs you want to ingest
- One of: metastore admin role, ownership of, or `USE SCHEMA` privilege on any schemas you want to ingest
- Ownership of or `SELECT` privilege on any tables and views you want to ingest
- [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html)
- [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
- To ingest legacy hive_metastore catalog (`include_hive_metastore` - enabled by default), your service principal must have all of the following:
- `READ_METADATA` and `USAGE` privilege on `hive_metastore` catalog
- `READ_METADATA` and `USAGE` privilege on schemas you want to ingest
- `READ_METADATA` and `USAGE` privilege on tables and views you want to ingest
- [Hive Metastore Privileges documentation](https://docs.databricks.com/en/sql/language-manual/sql-ref-privileges-hms.html)
- To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions).
- To `include_usage_statistics` (enabled by default), your service principal must have one of the following:
- `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html).
- When `usage_data_source` is set to `SYSTEM_TABLES` or `AUTO` (default) with `warehouse_id` configured: `SELECT` privilege on `system.query.history` table for improved performance with large query volumes and multi-workspace setups.
- To ingest `profiling` information with `method: ge`, you need `SELECT` privileges on all profiled tables.
- To ingest `profiling` information with `method: analyze` and `call_analyze: true` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
- Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`.
You will still need `SELECT` privilege on those tables to fetch the results.
- Check the starter recipe below and replace `workspace_url` and `token` with your information from the previous steps.
**Option 2: Azure Authentication (for Azure Databricks)**
- Create an Azure Active Directory application:
- Follow the [Azure AD app registration guide](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app)
- Note down the `client_id` (Application ID), `tenant_id` (Directory ID), and create a `client_secret`
- Grant the Azure AD application access to your Databricks workspace:
- Add the service principal to your Databricks workspace following [this guide](https://docs.databricks.com/administration-guide/users-groups/service-principals.html#add-a-service-principal-to-your-azure-databricks-account-using-the-account-console)
#### Provision your service account:
- To ingest your workspace's metadata and lineage, your service principal must have all of the following:
- One of: metastore admin role, ownership of, or `USE CATALOG` privilege on any catalogs you want to ingest
- One of: metastore admin role, ownership of, or `USE SCHEMA` privilege on any schemas you want to ingest
- Ownership of or `SELECT` privilege on any tables and views you want to ingest
- [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html)
- [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
- To ingest legacy hive_metastore catalog (`include_hive_metastore` - enabled by default), your service principal must have all of the following:
- `READ_METADATA` and `USAGE` privilege on `hive_metastore` catalog
- `READ_METADATA` and `USAGE` privilege on schemas you want to ingest
- `READ_METADATA` and `USAGE` privilege on tables and views you want to ingest
- [Hive Metastore Privileges documentation](https://docs.databricks.com/en/sql/language-manual/sql-ref-privileges-hms.html)
- To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions).
- To `include_usage_statistics` (enabled by default), your service principal must have one of the following:
- `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html).
- When `usage_data_source` is set to `SYSTEM_TABLES` or `AUTO` (default) with `warehouse_id` configured: `SELECT` privilege on `system.query.history` table for improved performance with large query volumes and multi-workspace setups.
- To ingest `profiling` information with `method: ge`, you need `SELECT` privileges on all profiled tables.
- To ingest `profiling` information with `method: analyze` and `call_analyze: true` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
- Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`.
You will still need `SELECT` privilege on those tables to fetch the results.
- Check the starter recipe below and replace `workspace_url` and either `token` (for PAT authentication) or `azure_auth` credentials (for Azure authentication) with your information from the previous steps.

View File

@ -2,7 +2,16 @@ source:
type: databricks
config:
workspace_url: https://my-workspace.cloud.databricks.com
# Authentication Option 1: Personal Access Token
token: "<token>"
# Authentication Option 2: Azure Authentication (for Azure Databricks)
# Uncomment the following section and comment out the token above to use Azure auth
# azure_auth:
# client_id: "<azure_client_id>"
# tenant_id: "<azure_tenant_id>"
# client_secret: "<azure_client_secret>"
include_metastore: false
include_ownership: true
include_ml_model_aliases: false

View File

@ -0,0 +1,15 @@
from pydantic import Field, SecretStr
from datahub.configuration import ConfigModel
class AzureAuthConfig(ConfigModel):
client_secret: SecretStr = Field(
description="Azure application client secret used for authentication. This is a confidential credential that should be kept secure."
)
client_id: str = Field(
description="Azure application (client) ID. This is the unique identifier for the registered Azure AD application.",
)
tenant_id: str = Field(
description="Azure tenant (directory) ID. This identifies the Azure AD tenant where the application is registered.",
)

View File

@ -413,6 +413,24 @@ class UnityCatalogSourceConfig(
)
return workspace_url
@model_validator(mode="before")
def either_token_or_azure_auth_provided(cls, values: dict) -> dict:
token = values.get("token")
azure_auth = values.get("azure_auth")
# Check if exactly one of the authentication methods is provided
if not token and not azure_auth:
raise ValueError(
"Either 'azure_auth' or 'token' (personal access token) must be provided in the configuration."
)
if token and azure_auth:
raise ValueError(
"Cannot specify both 'token' and 'azure_auth'. Please provide only one authentication method."
)
return values
@field_validator("include_metastore", mode="after")
@classmethod
def include_metastore_warning(cls, v: bool) -> bool:

View File

@ -8,6 +8,7 @@ from pydantic import Field
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.unity.azure_auth_config import AzureAuthConfig
DATABRICKS = "databricks"
@ -19,7 +20,12 @@ class UnityCatalogConnectionConfig(ConfigModel):
"""
scheme: str = DATABRICKS
token: str = pydantic.Field(description="Databricks personal access token")
token: Optional[str] = pydantic.Field(
default=None, description="Databricks personal access token"
)
azure_auth: Optional[AzureAuthConfig] = Field(
default=None, description="Azure configuration"
)
workspace_url: str = pydantic.Field(
description="Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com"
)

View File

@ -16,10 +16,10 @@ class UnityCatalogConnectionTest:
self.report = UnityCatalogReport()
self.proxy = UnityCatalogApiProxy(
self.config.workspace_url,
self.config.token,
self.config.profiling.warehouse_id,
report=self.report,
databricks_api_page_size=self.config.databricks_api_page_size,
personal_access_token=self.config.token,
)
def get_connection_test(self) -> TestConnectionReport:

View File

@ -44,6 +44,7 @@ from typing_extensions import assert_never
from datahub._version import nice_version_name
from datahub.api.entities.external.unity_catalog_external_entites import UnityCatalogTag
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.source.unity.azure_auth_config import AzureAuthConfig
from datahub.ingestion.source.unity.config import (
LineageDataSource,
UsageDataSource,
@ -169,20 +170,31 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
def __init__(
self,
workspace_url: str,
personal_access_token: str,
warehouse_id: Optional[str],
report: UnityCatalogReport,
hive_metastore_proxy: Optional[HiveMetastoreProxy] = None,
lineage_data_source: LineageDataSource = LineageDataSource.AUTO,
usage_data_source: UsageDataSource = UsageDataSource.AUTO,
databricks_api_page_size: int = 0,
personal_access_token: Optional[str] = None,
azure_auth: Optional[AzureAuthConfig] = None,
):
self._workspace_client = WorkspaceClient(
host=workspace_url,
token=personal_access_token,
product="datahub",
product_version=nice_version_name(),
)
if azure_auth:
self._workspace_client = WorkspaceClient(
host=workspace_url,
azure_tenant_id=azure_auth.tenant_id,
azure_client_id=azure_auth.client_id,
azure_client_secret=azure_auth.client_secret.get_secret_value(),
product="datahub",
product_version=nice_version_name(),
)
else:
self._workspace_client = WorkspaceClient(
host=workspace_url,
token=personal_access_token,
product="datahub",
product_version=nice_version_name(),
)
self.warehouse_id = warehouse_id or ""
self.report = report
self.hive_metastore_proxy = hive_metastore_proxy

View File

@ -211,13 +211,14 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.unity_catalog_api_proxy = UnityCatalogApiProxy(
config.workspace_url,
config.token,
config.warehouse_id,
report=self.report,
hive_metastore_proxy=self.hive_metastore_proxy,
lineage_data_source=config.lineage_data_source,
usage_data_source=config.usage_data_source,
databricks_api_page_size=config.databricks_api_page_size,
personal_access_token=config.token if config.token else None,
azure_auth=config.azure_auth if config.azure_auth else None,
)
self.external_url_base = urljoin(self.config.workspace_url, "/explore/data")

View File

@ -9,6 +9,30 @@ from datahub.ingestion.source.unity.source import UnityCatalogSource
class TestUnityCatalogSource:
@pytest.mark.parametrize(
"azure_auth_partial",
[
{"tenant_id": "tid", "client_secret": "sec"}, # missing client_id
{"client_id": "cid", "client_secret": "sec"}, # missing tenant_id
{"client_id": "cid", "tenant_id": "tid"}, # missing client_secret
],
)
def test_azure_auth_config_missing_fields(self, azure_auth_partial):
"""Test that missing any of client_id, tenant_id, or client_secret in azure_auth raises a validation error."""
config_dict = {
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"azure_auth": azure_auth_partial,
}
with pytest.raises(Exception) as exc_info:
UnityCatalogSourceConfig.parse_obj(config_dict)
# Should mention the missing field in the error message
assert (
"client_id" in str(exc_info.value)
or "tenant_id" in str(exc_info.value)
or "client_secret" in str(exc_info.value)
)
@pytest.fixture
def minimal_config(self):
"""Create a minimal config for testing."""
@ -49,6 +73,42 @@ class TestUnityCatalogSource:
}
)
@pytest.fixture
def config_with_azure_auth(self):
"""Create a config with Azure authentication."""
return UnityCatalogSourceConfig.parse_obj(
{
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"include_hive_metastore": False,
"databricks_api_page_size": 150,
"azure_auth": {
"client_id": "test-client-id-12345",
"tenant_id": "test-tenant-id-67890",
"client_secret": "test-client-secret",
},
}
)
@pytest.fixture
def config_with_azure_auth_and_ml_models(self):
"""Create a config with Azure authentication and ML model settings."""
return UnityCatalogSourceConfig.parse_obj(
{
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"include_hive_metastore": False,
"include_ml_model_aliases": True,
"ml_model_max_results": 1000,
"databricks_api_page_size": 200,
"azure_auth": {
"client_id": "azure-client-id-789",
"tenant_id": "azure-tenant-id-123",
"client_secret": "azure-secret-456",
},
}
)
@patch("datahub.ingestion.source.unity.source.UnityCatalogApiProxy")
@patch("datahub.ingestion.source.unity.source.HiveMetastoreProxy")
def test_source_constructor_passes_default_page_size_to_proxy(
@ -62,13 +122,14 @@ class TestUnityCatalogSource:
# Verify proxy was created with correct parameters including page size
mock_unity_proxy.assert_called_once_with(
minimal_config.workspace_url,
minimal_config.token,
minimal_config.warehouse_id,
report=source.report,
hive_metastore_proxy=source.hive_metastore_proxy,
lineage_data_source=minimal_config.lineage_data_source,
usage_data_source=minimal_config.usage_data_source,
databricks_api_page_size=0, # Default value
personal_access_token=minimal_config.token,
azure_auth=None,
)
@patch("datahub.ingestion.source.unity.source.UnityCatalogApiProxy")
@ -83,13 +144,14 @@ class TestUnityCatalogSource:
# Verify proxy was created with correct parameters including custom page size
mock_unity_proxy.assert_called_once_with(
config_with_page_size.workspace_url,
config_with_page_size.token,
config_with_page_size.warehouse_id,
report=source.report,
hive_metastore_proxy=source.hive_metastore_proxy,
lineage_data_source=config_with_page_size.lineage_data_source,
usage_data_source=config_with_page_size.usage_data_source,
databricks_api_page_size=75, # Custom value
personal_access_token=config_with_page_size.token,
azure_auth=None,
)
@patch("datahub.ingestion.source.unity.source.UnityCatalogApiProxy")
@ -126,13 +188,14 @@ class TestUnityCatalogSource:
# Verify proxy was created with correct page size even when hive metastore is disabled
mock_unity_proxy.assert_called_once_with(
config.workspace_url,
config.token,
config.warehouse_id,
report=source.report,
hive_metastore_proxy=None, # Should be None when disabled
lineage_data_source=config.lineage_data_source,
usage_data_source=config.usage_data_source,
databricks_api_page_size=200,
personal_access_token=config.token,
azure_auth=None,
)
def test_test_connection_with_page_size_config(self):
@ -225,6 +288,176 @@ class TestUnityCatalogSource:
assert connection_test_config.ml_model_max_results == 750
assert connection_test_config.databricks_api_page_size == 200
@patch("datahub.ingestion.source.unity.source.UnityCatalogApiProxy")
@patch("datahub.ingestion.source.unity.source.HiveMetastoreProxy")
def test_source_constructor_with_azure_auth(
self, mock_hive_proxy, mock_unity_proxy, config_with_azure_auth
):
"""Test that UnityCatalogSource passes Azure auth config to proxy."""
ctx = PipelineContext(run_id="test_run")
source = UnityCatalogSource.create(config_with_azure_auth, ctx)
# Verify proxy was created with Azure auth config
mock_unity_proxy.assert_called_once_with(
config_with_azure_auth.workspace_url,
config_with_azure_auth.warehouse_id,
report=source.report,
hive_metastore_proxy=source.hive_metastore_proxy,
lineage_data_source=config_with_azure_auth.lineage_data_source,
usage_data_source=config_with_azure_auth.usage_data_source,
databricks_api_page_size=150,
personal_access_token=None, # Should be None when using Azure auth
azure_auth=config_with_azure_auth.azure_auth,
)
@patch("datahub.ingestion.source.unity.source.UnityCatalogApiProxy")
@patch("datahub.ingestion.source.unity.source.HiveMetastoreProxy")
def test_source_constructor_azure_auth_with_ml_models(
self, mock_hive_proxy, mock_unity_proxy, config_with_azure_auth_and_ml_models
):
"""Test that UnityCatalogSource with Azure auth and ML model settings works correctly."""
ctx = PipelineContext(run_id="test_run")
source = UnityCatalogSource.create(config_with_azure_auth_and_ml_models, ctx)
# Verify proxy was created with correct Azure auth and ML model configs
mock_unity_proxy.assert_called_once_with(
config_with_azure_auth_and_ml_models.workspace_url,
config_with_azure_auth_and_ml_models.warehouse_id,
report=source.report,
hive_metastore_proxy=source.hive_metastore_proxy,
lineage_data_source=config_with_azure_auth_and_ml_models.lineage_data_source,
usage_data_source=config_with_azure_auth_and_ml_models.usage_data_source,
databricks_api_page_size=200,
personal_access_token=None,
azure_auth=config_with_azure_auth_and_ml_models.azure_auth,
)
# Verify ML model settings are properly configured
assert source.config.include_ml_model_aliases is True
assert source.config.ml_model_max_results == 1000
def test_azure_auth_config_validation(self):
"""Test that Azure auth config validates required fields."""
# Test valid Azure auth config
valid_config_dict = {
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"azure_auth": {
"client_id": "test-client-id",
"tenant_id": "test-tenant-id",
"client_secret": "test-secret",
},
}
config = UnityCatalogSourceConfig.parse_obj(valid_config_dict)
assert config.azure_auth is not None
assert config.azure_auth.client_id == "test-client-id"
assert config.azure_auth.tenant_id == "test-tenant-id"
assert config.azure_auth.client_secret.get_secret_value() == "test-secret"
# Test that personal access token is not required when Azure auth is provided
assert config.token is None
def test_test_connection_with_azure_auth(self):
"""Test that test_connection properly handles Azure authentication."""
config_dict = {
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"databricks_api_page_size": 100,
"azure_auth": {
"client_id": "test-client-id",
"tenant_id": "test-tenant-id",
"client_secret": "test-secret",
},
}
with patch(
"datahub.ingestion.source.unity.source.UnityCatalogConnectionTest"
) as mock_connection_test:
mock_connection_test.return_value.get_connection_test.return_value = (
"azure_test_report"
)
result = UnityCatalogSource.test_connection(config_dict)
# Verify connection test was created with Azure auth config
assert result == "azure_test_report"
mock_connection_test.assert_called_once()
# Get the config that was passed to UnityCatalogConnectionTest
connection_test_config = mock_connection_test.call_args[0][0]
assert connection_test_config.azure_auth is not None
assert connection_test_config.azure_auth.client_id == "test-client-id"
assert connection_test_config.azure_auth.tenant_id == "test-tenant-id"
assert (
connection_test_config.azure_auth.client_secret.get_secret_value()
== "test-secret"
)
assert connection_test_config.databricks_api_page_size == 100
assert (
connection_test_config.token is None
) # Should be None with Azure auth
def test_source_creation_fails_without_authentication(self):
"""Test that UnityCatalogSource creation fails when neither token nor azure_auth are provided."""
# Test with neither token nor azure_auth provided - this should fail at config parsing
with pytest.raises(ValueError) as exc_info:
UnityCatalogSourceConfig.parse_obj(
{
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"include_hive_metastore": False,
"databricks_api_page_size": 100,
# Neither token nor azure_auth provided
}
)
# Should mention that either authentication method is required
assert (
"Either 'azure_auth' or 'token' (personal access token) must be provided"
in str(exc_info.value)
)
def test_test_connection_fails_without_authentication(self):
"""Test that test_connection fails when neither token nor azure_auth are provided."""
with pytest.raises(ValueError) as exc_info:
UnityCatalogSourceConfig.parse_obj(
{
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"databricks_api_page_size": 100,
# Neither token nor azure_auth provided
}
)
# The error should be from our validator
assert (
"Either 'azure_auth' or 'token' (personal access token) must be provided"
in str(exc_info.value)
)
def test_source_creation_fails_with_both_authentication_methods(self):
"""Test that UnityCatalogSource creation fails when both token and azure_auth are provided."""
# Test with both token and azure_auth provided - this should fail at config parsing
with pytest.raises(ValueError) as exc_info:
UnityCatalogSourceConfig.parse_obj(
{
"workspace_url": "https://test.databricks.com",
"warehouse_id": "test_warehouse",
"include_hive_metastore": False,
"databricks_api_page_size": 100,
"token": "test_token", # Both provided
"azure_auth": {
"client_id": "test-client-id",
"tenant_id": "test-tenant-id",
"client_secret": "test-secret",
},
}
)
# Should mention that only one authentication method is allowed
assert "Cannot specify both 'token' and 'azure_auth'" in str(exc_info.value)
@patch("datahub.ingestion.source.unity.source.UnityCatalogApiProxy")
@patch("datahub.ingestion.source.unity.source.HiveMetastoreProxy")
def test_process_ml_model_generates_workunits(