mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-03 12:16:10 +00:00
feat(ingestion/tableau): parameter to have entity owners as email address of owner (#14724)
This commit is contained in:
parent
60dcd8ce21
commit
5be17c6444
@ -524,6 +524,10 @@ class TableauConfig(
|
||||
default=False,
|
||||
description="Ingest Owner from source. This will override Owner info entered from UI",
|
||||
)
|
||||
use_email_as_username: bool = Field(
|
||||
default=False,
|
||||
description="Use email address instead of username for entity owners. Requires ingest_owner to be True.",
|
||||
)
|
||||
ingest_tables_external: bool = Field(
|
||||
default=False,
|
||||
description="Ingest details for tables external to (not embedded in) tableau as entities.",
|
||||
@ -678,6 +682,14 @@ class TableauConfig(
|
||||
raise ValueError(
|
||||
"tags_for_hidden_assets is only allowed with ingest_tags enabled. Be aware that this will overwrite tags entered from the UI."
|
||||
)
|
||||
|
||||
use_email_as_username = values.get("use_email_as_username")
|
||||
ingest_owner = values.get("ingest_owner")
|
||||
if use_email_as_username and not ingest_owner:
|
||||
raise ValueError(
|
||||
"use_email_as_username requires ingest_owner to be enabled."
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
|
||||
@ -839,6 +851,9 @@ class TableauSourceReport(
|
||||
default_factory=(lambda: defaultdict(int))
|
||||
)
|
||||
|
||||
# Owner extraction statistics
|
||||
num_email_fallback_to_username: int = 0
|
||||
|
||||
|
||||
def report_user_role(report: TableauSourceReport, server: Server) -> None:
|
||||
title: str = "Insufficient Permissions"
|
||||
@ -2716,13 +2731,12 @@ class TableauSiteSource:
|
||||
dataset_snapshot.aspects.append(browse_paths)
|
||||
|
||||
# Ownership
|
||||
owner = (
|
||||
self._get_ownership(datasource_info[c.OWNER][c.USERNAME])
|
||||
if datasource_info
|
||||
and datasource_info.get(c.OWNER)
|
||||
and datasource_info[c.OWNER].get(c.USERNAME)
|
||||
owner_identifier = (
|
||||
self._get_owner_identifier(datasource_info[c.OWNER])
|
||||
if datasource_info and datasource_info.get(c.OWNER)
|
||||
else None
|
||||
)
|
||||
owner = self._get_ownership(owner_identifier) if owner_identifier else None
|
||||
if owner is not None:
|
||||
dataset_snapshot.aspects.append(owner)
|
||||
|
||||
@ -3127,7 +3141,7 @@ class TableauSiteSource:
|
||||
|
||||
creator: Optional[str] = None
|
||||
if workbook is not None and workbook.get(c.OWNER) is not None:
|
||||
creator = workbook[c.OWNER].get(c.USERNAME)
|
||||
creator = self._get_owner_identifier(workbook[c.OWNER])
|
||||
created_at = sheet.get(c.CREATED_AT, datetime.now())
|
||||
updated_at = sheet.get(c.UPDATED_AT, datetime.now())
|
||||
last_modified = self.get_last_modified(creator, created_at, updated_at)
|
||||
@ -3276,7 +3290,7 @@ class TableauSiteSource:
|
||||
|
||||
def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
|
||||
workbook_container_key = self.gen_workbook_key(workbook[c.ID])
|
||||
creator = workbook.get(c.OWNER, {}).get(c.USERNAME)
|
||||
creator = self._get_owner_identifier(workbook.get(c.OWNER, {}))
|
||||
|
||||
owner_urn = (
|
||||
builder.make_user_urn(creator)
|
||||
@ -3458,7 +3472,7 @@ class TableauSiteSource:
|
||||
|
||||
creator: Optional[str] = None
|
||||
if workbook is not None and workbook.get(c.OWNER) is not None:
|
||||
creator = workbook[c.OWNER].get(c.USERNAME)
|
||||
creator = self._get_owner_identifier(workbook[c.OWNER])
|
||||
created_at = dashboard.get(c.CREATED_AT, datetime.now())
|
||||
updated_at = dashboard.get(c.UPDATED_AT, datetime.now())
|
||||
last_modified = self.get_last_modified(creator, created_at, updated_at)
|
||||
@ -3605,6 +3619,20 @@ class TableauSiteSource:
|
||||
)
|
||||
return last_modified
|
||||
|
||||
def _get_owner_identifier(self, owner_dict: dict) -> Optional[str]:
|
||||
"""Extract owner identifier (email or username) based on configuration."""
|
||||
if not owner_dict:
|
||||
return None
|
||||
|
||||
if self.config.use_email_as_username:
|
||||
email = owner_dict.get(c.EMAIL)
|
||||
if email:
|
||||
return email
|
||||
# Fall back to username if email is not available
|
||||
self.report.num_email_fallback_to_username += 1
|
||||
|
||||
return owner_dict.get(c.USERNAME)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def _get_ownership(self, user: str) -> Optional[OwnershipClass]:
|
||||
if self.config.ingest_owner and user:
|
||||
@ -3828,3 +3856,15 @@ class TableauSiteSource:
|
||||
self.report.emit_upstream_tables_timer[self.site_content_url] = (
|
||||
timer.elapsed_seconds(digits=2)
|
||||
)
|
||||
|
||||
# Log owner extraction statistics if there were fallbacks
|
||||
if (
|
||||
self.config.use_email_as_username
|
||||
and self.config.ingest_owner
|
||||
and self.report.num_email_fallback_to_username > 0
|
||||
):
|
||||
logger.info(
|
||||
f"Owner extraction summary for site '{self.site_content_url}': "
|
||||
f"{self.report.num_email_fallback_to_username} entities fell back from email to username "
|
||||
f"(email was not available)"
|
||||
)
|
||||
|
||||
@ -65,6 +65,7 @@ workbook_graphql_query = """
|
||||
projectName
|
||||
owner {
|
||||
username
|
||||
email
|
||||
}
|
||||
description
|
||||
uri
|
||||
@ -107,6 +108,7 @@ sheet_graphql_query = """
|
||||
luid
|
||||
owner {
|
||||
username
|
||||
email
|
||||
}
|
||||
}
|
||||
datasourceFields {
|
||||
@ -185,6 +187,7 @@ dashboard_graphql_query = """
|
||||
luid
|
||||
owner {
|
||||
username
|
||||
email
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -268,6 +271,7 @@ embedded_datasource_graphql_query = """
|
||||
luid
|
||||
owner {
|
||||
username
|
||||
email
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -424,6 +428,7 @@ published_datasource_graphql_query = """
|
||||
}
|
||||
owner {
|
||||
username
|
||||
email
|
||||
}
|
||||
description
|
||||
uri
|
||||
|
||||
@ -59,6 +59,7 @@ LUID = "luid"
|
||||
EMBEDDED_DATA_SOURCE = "EmbeddedDatasource"
|
||||
OWNER = "owner"
|
||||
USERNAME = "username"
|
||||
EMAIL = "email"
|
||||
HAS_EXTRACTS = "hasExtracts"
|
||||
EXTRACT_LAST_REFRESH_TIME = "extractLastRefreshTime"
|
||||
EXTRACT_LAST_INCREMENTAL_UPDATE_TIME = "extractLastIncrementalUpdateTime"
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from tableauserverclient import Server, UserItem
|
||||
|
||||
@ -10,6 +11,7 @@ class UserInfo:
|
||||
user_name: str
|
||||
site_role: str
|
||||
site_id: str
|
||||
email: Optional[str] = None
|
||||
|
||||
def has_site_administrator_explorer_privileges(self):
|
||||
return self.site_role in [
|
||||
@ -34,4 +36,5 @@ class UserInfo:
|
||||
user_name=user.name,
|
||||
site_role=user.site_role,
|
||||
site_id=server.site_id,
|
||||
email=user.email,
|
||||
)
|
||||
|
||||
@ -182,3 +182,34 @@ def test_extract_project_hierarchy(extract_project_hierarchy, allowed_projects):
|
||||
assert allowed_projects == [
|
||||
project.name for project in site_source.tableau_project_registry.values()
|
||||
]
|
||||
|
||||
|
||||
def test_use_email_as_username_requires_ingest_owner():
|
||||
"""Test that use_email_as_username requires ingest_owner to be enabled."""
|
||||
config_dict = default_config.copy()
|
||||
config_dict["ingest_owner"] = False
|
||||
config_dict["use_email_as_username"] = True
|
||||
|
||||
with pytest.raises(
|
||||
ValidationError,
|
||||
match=r".*use_email_as_username requires ingest_owner to be enabled.*",
|
||||
):
|
||||
TableauConfig.parse_obj(config_dict)
|
||||
|
||||
|
||||
def test_use_email_as_username_valid_config():
|
||||
"""Test that use_email_as_username works when ingest_owner is enabled."""
|
||||
config_dict = default_config.copy()
|
||||
config_dict["ingest_owner"] = True
|
||||
config_dict["use_email_as_username"] = True
|
||||
|
||||
config = TableauConfig.parse_obj(config_dict)
|
||||
assert config.ingest_owner is True
|
||||
assert config.use_email_as_username is True
|
||||
|
||||
|
||||
def test_use_email_as_username_default_false():
|
||||
"""Test that use_email_as_username defaults to False."""
|
||||
config_dict = default_config.copy()
|
||||
config = TableauConfig.parse_obj(config_dict)
|
||||
assert config.use_email_as_username is False
|
||||
|
||||
@ -711,3 +711,86 @@ class TestTableauPageSizeConfig:
|
||||
config = TableauPageSizeConfig(database_table_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_database_table_page_size == any_page_size
|
||||
|
||||
|
||||
def test_get_owner_identifier_username():
|
||||
"""Test owner identifier extraction using username."""
|
||||
config_dict = default_config.copy()
|
||||
config_dict["use_email_as_username"] = False
|
||||
config = TableauConfig.parse_obj(config_dict)
|
||||
|
||||
context = PipelineContext(run_id="test", pipeline_name="test")
|
||||
site_source = TableauSiteSource(
|
||||
config=config,
|
||||
ctx=context,
|
||||
site=SiteIdContentUrl(site_id="site1", site_content_url="site1"),
|
||||
report=TableauSourceReport(),
|
||||
server=mock.MagicMock(spec=Server),
|
||||
platform="tableau",
|
||||
)
|
||||
|
||||
owner_dict = {"username": "testuser", "email": "test@example.com"}
|
||||
result = site_source._get_owner_identifier(owner_dict)
|
||||
assert result == "testuser"
|
||||
|
||||
|
||||
def test_get_owner_identifier_email():
|
||||
"""Test owner identifier extraction using email."""
|
||||
config_dict = default_config.copy()
|
||||
config_dict["use_email_as_username"] = True
|
||||
config = TableauConfig.parse_obj(config_dict)
|
||||
|
||||
context = PipelineContext(run_id="test", pipeline_name="test")
|
||||
site_source = TableauSiteSource(
|
||||
config=config,
|
||||
ctx=context,
|
||||
site=SiteIdContentUrl(site_id="site1", site_content_url="site1"),
|
||||
report=TableauSourceReport(),
|
||||
server=mock.MagicMock(spec=Server),
|
||||
platform="tableau",
|
||||
)
|
||||
|
||||
owner_dict = {"username": "testuser", "email": "test@example.com"}
|
||||
result = site_source._get_owner_identifier(owner_dict)
|
||||
assert result == "test@example.com"
|
||||
|
||||
|
||||
def test_get_owner_identifier_email_fallback():
|
||||
"""Test owner identifier extraction falls back to username when email is not available."""
|
||||
config_dict = default_config.copy()
|
||||
config_dict["use_email_as_username"] = True
|
||||
config = TableauConfig.parse_obj(config_dict)
|
||||
|
||||
context = PipelineContext(run_id="test", pipeline_name="test")
|
||||
site_source = TableauSiteSource(
|
||||
config=config,
|
||||
ctx=context,
|
||||
site=SiteIdContentUrl(site_id="site1", site_content_url="site1"),
|
||||
report=TableauSourceReport(),
|
||||
server=mock.MagicMock(spec=Server),
|
||||
platform="tableau",
|
||||
)
|
||||
|
||||
owner_dict = {"username": "testuser"} # No email
|
||||
result = site_source._get_owner_identifier(owner_dict)
|
||||
assert result == "testuser"
|
||||
|
||||
|
||||
def test_get_owner_identifier_empty_dict():
|
||||
"""Test owner identifier extraction with empty owner dict."""
|
||||
config_dict = default_config.copy()
|
||||
config_dict["use_email_as_username"] = True
|
||||
config = TableauConfig.parse_obj(config_dict)
|
||||
|
||||
context = PipelineContext(run_id="test", pipeline_name="test")
|
||||
site_source = TableauSiteSource(
|
||||
config=config,
|
||||
ctx=context,
|
||||
site=SiteIdContentUrl(site_id="site1", site_content_url="site1"),
|
||||
report=TableauSourceReport(),
|
||||
server=mock.MagicMock(spec=Server),
|
||||
platform="tableau",
|
||||
)
|
||||
|
||||
result = site_source._get_owner_identifier({})
|
||||
assert result is None
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user