refactor(ingest/sigma): make some error cases more clear (#13110)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
This commit is contained in:
Harshal Sheth 2025-04-09 12:03:17 -07:00 committed by GitHub
parent 7b6ab3ba15
commit 9f0c0aa3dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 235 additions and 102 deletions

View File

@ -11,64 +11,72 @@ This source extracts the following:
1. Refer [doc](https://help.sigmacomputing.com/docs/generate-api-client-credentials) to generate an API client credentials.
2. Provide the generated Client ID and Secret in Recipe.
## Concept mapping
We have observed issues with the Sigma API, where certain API endpoints do not return the expected results, even when the user is an admin. In those cases, a workaround is to manually add the user associated with the Client ID/Secret to each workspace with missing metadata.
Empty workspaces are listed in the ingestion report in the logs with the key `empty_workspaces`.
| Sigma | Datahub | Notes |
|------------------------|---------------------------------------------------------------|----------------------------------|
| `Workspace` | [Container](../../metamodel/entities/container.md) | SubType `"Sigma Workspace"` |
| `Workbook` | [Dashboard](../../metamodel/entities/dashboard.md) | SubType `"Sigma Workbook"` |
| `Page` | [Dashboard](../../metamodel/entities/dashboard.md) | |
| `Element` | [Chart](../../metamodel/entities/chart.md) | |
| `Dataset` | [Dataset](../../metamodel/entities/dataset.md) | SubType `"Sigma Dataset"` |
| `User` | [User (a.k.a CorpUser)](../../metamodel/entities/corpuser.md) | Optionally Extracted |
## Concept mapping
| Sigma | Datahub | Notes |
| ----------- | ------------------------------------------------------------- | --------------------------- |
| `Workspace` | [Container](../../metamodel/entities/container.md) | SubType `"Sigma Workspace"` |
| `Workbook` | [Dashboard](../../metamodel/entities/dashboard.md) | SubType `"Sigma Workbook"` |
| `Page` | [Dashboard](../../metamodel/entities/dashboard.md) | |
| `Element` | [Chart](../../metamodel/entities/chart.md) | |
| `Dataset` | [Dataset](../../metamodel/entities/dataset.md) | SubType `"Sigma Dataset"` |
| `User` | [User (a.k.a CorpUser)](../../metamodel/entities/corpuser.md) | Optionally Extracted |
## Advanced Configurations
### Chart source platform mapping
If you want to provide platform details(platform name, platform instance and env) for chart's all external upstream data sources, then you can use `chart_sources_platform_mapping` as below:
#### Example - For just one specific chart's external upstream data sources
```yml
chart_sources_platform_mapping:
'workspace_name/workbook_name/chart_name_1':
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
'workspace_name/folder_name/workbook_name/chart_name_2':
data_source_platform: postgres
platform_instance: cloud_instance
env: DEV
```yml
chart_sources_platform_mapping:
"workspace_name/workbook_name/chart_name_1":
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
"workspace_name/folder_name/workbook_name/chart_name_2":
data_source_platform: postgres
platform_instance: cloud_instance
env: DEV
```
#### Example - For all charts within one specific workbook
```yml
chart_sources_platform_mapping:
'workspace_name/workbook_name_1':
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
'workspace_name/folder_name/workbook_name_2':
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
chart_sources_platform_mapping:
"workspace_name/workbook_name_1":
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
"workspace_name/folder_name/workbook_name_2":
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
```
#### Example - For all workbooks charts within one specific workspace
```yml
chart_sources_platform_mapping:
'workspace_name':
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
chart_sources_platform_mapping:
"workspace_name":
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
```
#### Example - All workbooks use the same connection
```yml
chart_sources_platform_mapping:
'*':
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
chart_sources_platform_mapping:
"*":
data_source_platform: snowflake
platform_instance: new_instance
env: PROD
```

View File

@ -1,8 +1,9 @@
import logging
from dataclasses import dataclass, field
from typing import Dict, Optional
from typing import Dict, List, Optional
import pydantic
from pydantic import BaseModel, Field
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import (
@ -17,6 +18,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyDict
logger = logging.getLogger(__name__)
@ -53,15 +55,82 @@ class Constant:
DEFAULT_API_URL = "https://aws-api.sigmacomputing.com/v2"
class WorkspaceCounts(BaseModel):
workbooks_count: int = 0
datasets_count: int = 0
elements_count: int = 0
pages_count: int = 0
def is_empty(self) -> bool:
return (
self.workbooks_count == 0
and self.datasets_count == 0
and self.elements_count == 0
and self.pages_count == 0
)
def as_obj(self) -> dict:
return {
"workbooks_count": self.workbooks_count,
"datasets_count": self.datasets_count,
"elements_count": self.elements_count,
"pages_count": self.pages_count,
}
class SigmaWorkspaceEntityFilterReport(EntityFilterReport):
type: str = "workspace"
workspace_counts: LossyDict[str, WorkspaceCounts] = Field(
default_factory=LossyDict,
description="Counts of workbooks, datasets, elements and pages in each workspace.",
)
def increment_workbooks_count(self, workspace_id: str) -> None:
if workspace_id not in self.workspace_counts:
self.workspace_counts[workspace_id] = WorkspaceCounts()
self.workspace_counts[workspace_id].workbooks_count += 1
def increment_datasets_count(self, workspace_id: str) -> None:
if workspace_id not in self.workspace_counts:
self.workspace_counts[workspace_id] = WorkspaceCounts()
self.workspace_counts[workspace_id].datasets_count += 1
def increment_elements_count(self, workspace_id: str) -> None:
if workspace_id not in self.workspace_counts:
self.workspace_counts[workspace_id] = WorkspaceCounts()
self.workspace_counts[workspace_id].elements_count += 1
def increment_pages_count(self, workspace_id: str) -> None:
if workspace_id not in self.workspace_counts:
self.workspace_counts[workspace_id] = WorkspaceCounts()
self.workspace_counts[workspace_id].pages_count += 1
def as_obj(self) -> dict:
return {
"filtered": self.dropped_entities.as_obj(),
"processed": self.processed_entities.as_obj(),
"workspace_counts": {
key: item.as_obj() for key, item in self.workspace_counts.items()
},
}
@dataclass
class SigmaSourceReport(StaleEntityRemovalSourceReport):
workspaces: EntityFilterReport = EntityFilterReport.field(type="workspace")
number_of_workspaces: Optional[int] = None
workspaces: SigmaWorkspaceEntityFilterReport = field(
default_factory=SigmaWorkspaceEntityFilterReport
)
non_accessible_workspaces_count: int = 0
shared_entities_count: int = 0
number_of_datasets: int = 0
number_of_workbooks: int = 0
datasets: EntityFilterReport = EntityFilterReport.field(type="dataset")
datasets_without_workspace: int = 0
workbooks: EntityFilterReport = EntityFilterReport.field(type="workbook")
workbooks_without_workspace: int = 0
number_of_files_metadata: Dict[str, int] = field(default_factory=dict)
empty_workspaces: List[str] = field(default_factory=list)
class PlatformDetail(PlatformInstanceConfigMixin, EnvConfigMixin):

View File

@ -35,6 +35,7 @@ from datahub.ingestion.source.sigma.config import (
PlatformDetail,
SigmaSourceConfig,
SigmaSourceReport,
WorkspaceCounts,
)
from datahub.ingestion.source.sigma.data_classes import (
Element,
@ -163,7 +164,6 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
def _get_allowed_workspaces(self) -> List[Workspace]:
all_workspaces = self.sigma_api.workspaces.values()
logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.number_of_workspaces = len(all_workspaces)
allowed_workspaces = []
for workspace in all_workspaces:
@ -285,6 +285,7 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
yield self._gen_dataset_properties(dataset_urn, dataset)
if dataset.workspaceId:
self.reporter.workspaces.increment_datasets_count(dataset.workspaceId)
yield from add_entity_to_container(
container_key=self._gen_workspace_key(dataset.workspaceId),
entity_type="dataset",
@ -468,6 +469,8 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
).as_workunit()
if workbook.workspaceId:
self.reporter.workspaces.increment_elements_count(workbook.workspaceId)
yield self._gen_entity_browsepath_aspect(
entity_urn=chart_urn,
parent_entity_urn=builder.make_container_urn(
@ -525,6 +528,7 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
all_input_fields: List[InputFieldClass] = []
if workbook.workspaceId:
self.reporter.workspaces.increment_pages_count(workbook.workspaceId)
yield self._gen_entity_browsepath_aspect(
entity_urn=dashboard_urn,
parent_entity_urn=builder.make_container_urn(
@ -614,6 +618,8 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
paths = workbook.path.split("/")[1:]
if workbook.workspaceId:
self.reporter.workspaces.increment_workbooks_count(workbook.workspaceId)
yield self._gen_entity_browsepath_aspect(
entity_urn=dashboard_urn,
parent_entity_urn=builder.make_container_urn(
@ -667,6 +673,15 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
f"{workspace.name} ({workspace.workspaceId})"
)
yield from self._gen_workspace_workunit(workspace)
if self.reporter.workspaces.workspace_counts.get(
workspace.workspaceId, WorkspaceCounts()
).is_empty():
logger.warning(
f"Workspace {workspace.name} ({workspace.workspaceId}) is empty. If this is not expected, add the user associated with the Client ID/Secret to each workspace with missing metadata"
)
self.reporter.empty_workspaces.append(
f"{workspace.name} ({workspace.workspaceId})"
)
yield from self._gen_sigma_dataset_upstream_lineage_workunit()
def get_report(self) -> SourceReport:

View File

@ -95,22 +95,22 @@ class SigmaAPI:
return get_response
def get_workspace(self, workspace_id: str) -> Optional[Workspace]:
if workspace_id in self.workspaces:
return self.workspaces[workspace_id]
logger.debug(f"Fetching workspace metadata with id '{workspace_id}'")
try:
if workspace_id in self.workspaces:
return self.workspaces[workspace_id]
else:
response = self._get_api_call(
f"{self.config.api_url}/workspaces/{workspace_id}"
)
if response.status_code == 403:
logger.debug(f"Workspace {workspace_id} not accessible.")
self.report.non_accessible_workspaces_count += 1
return None
response.raise_for_status()
workspace = Workspace.parse_obj(response.json())
self.workspaces[workspace.workspaceId] = workspace
return workspace
response = self._get_api_call(
f"{self.config.api_url}/workspaces/{workspace_id}"
)
if response.status_code == 403:
logger.debug(f"Workspace {workspace_id} not accessible.")
self.report.non_accessible_workspaces_count += 1
return None
response.raise_for_status()
workspace = Workspace.parse_obj(response.json())
self.workspaces[workspace.workspaceId] = workspace
return workspace
except Exception as e:
self._log_http_error(
message=f"Unable to fetch workspace '{workspace_id}'. Exception: {e}"
@ -187,7 +187,9 @@ class SigmaAPI:
@functools.lru_cache
def _get_files_metadata(self, file_type: str) -> Dict[str, File]:
logger.debug(f"Fetching file metadata with type {file_type}.")
file_url = url = f"{self.config.api_url}/files?typeFilters={file_type}"
file_url = url = (
f"{self.config.api_url}/files?permissionFilter=view&typeFilters={file_type}"
)
try:
files_metadata: Dict[str, File] = {}
while True:
@ -225,31 +227,50 @@ class SigmaAPI:
for dataset_dict in response_dict[Constant.ENTRIES]:
dataset = SigmaDataset.parse_obj(dataset_dict)
if dataset.datasetId in dataset_files_metadata:
dataset.path = dataset_files_metadata[dataset.datasetId].path
dataset.badge = dataset_files_metadata[dataset.datasetId].badge
if dataset.datasetId not in dataset_files_metadata:
self.report.datasets.dropped(
f"{dataset.name} ({dataset.datasetId}) (missing file metadata)"
)
continue
workspace_id = dataset_files_metadata[
dataset.datasetId
].workspaceId
if workspace_id:
dataset.workspaceId = workspace_id
workspace = self.get_workspace(dataset.workspaceId)
if workspace:
if self.config.workspace_pattern.allowed(
workspace.name
):
datasets.append(dataset)
elif self.config.ingest_shared_entities:
# If no workspace for dataset we can consider it as shared entity
self.report.shared_entities_count += 1
datasets.append(dataset)
dataset.workspaceId = dataset_files_metadata[
dataset.datasetId
].workspaceId
dataset.path = dataset_files_metadata[dataset.datasetId].path
dataset.badge = dataset_files_metadata[dataset.datasetId].badge
workspace = None
if dataset.workspaceId:
workspace = self.get_workspace(dataset.workspaceId)
if workspace:
if self.config.workspace_pattern.allowed(workspace.name):
self.report.datasets.processed(
f"{dataset.name} ({dataset.datasetId}) in {workspace.name}"
)
datasets.append(dataset)
else:
self.report.datasets.dropped(
f"{dataset.name} ({dataset.datasetId}) in {workspace.name}"
)
elif self.config.ingest_shared_entities:
# If no workspace for dataset we can consider it as shared entity
self.report.datasets_without_workspace += 1
self.report.datasets.processed(
f"{dataset.name} ({dataset.datasetId}) in workspace id {dataset.workspaceId or 'unknown'}"
)
datasets.append(dataset)
else:
self.report.datasets.dropped(
f"{dataset.name} ({dataset.datasetId}) in workspace id {dataset.workspaceId or 'unknown'}"
)
if response_dict[Constant.NEXTPAGE]:
url = f"{dataset_url}?page={response_dict[Constant.NEXTPAGE]}"
else:
break
self.report.number_of_datasets = len(datasets)
return datasets
except Exception as e:
self._log_http_error(
@ -381,34 +402,54 @@ class SigmaAPI:
for workbook_dict in response_dict[Constant.ENTRIES]:
workbook = Workbook.parse_obj(workbook_dict)
if workbook.workbookId in workbook_files_metadata:
workbook.badge = workbook_files_metadata[
workbook.workbookId
].badge
if workbook.workbookId not in workbook_files_metadata:
# Due to a bug in the Sigma API, it seems like the /files endpoint does not
# return file metadata when the user has access via admin permissions. In
# those cases, the user associated with the token needs to be manually added
# to the workspace.
self.report.workbooks.dropped(
f"{workbook.name} ({workbook.workbookId}) (missing file metadata; path: {workbook.path}; likely need to manually add user to workspace)"
)
continue
workspace_id = workbook_files_metadata[
workbook.workbookId
].workspaceId
if workspace_id:
workbook.workspaceId = workspace_id
workspace = self.get_workspace(workbook.workspaceId)
if workspace:
if self.config.workspace_pattern.allowed(
workspace.name
):
workbook.pages = self.get_workbook_pages(workbook)
workbooks.append(workbook)
elif self.config.ingest_shared_entities:
# If no workspace for workbook we can consider it as shared entity
self.report.shared_entities_count += 1
workbook.pages = self.get_workbook_pages(workbook)
workbooks.append(workbook)
workbook.workspaceId = workbook_files_metadata[
workbook.workbookId
].workspaceId
workbook.badge = workbook_files_metadata[workbook.workbookId].badge
workspace = None
if workbook.workspaceId:
workspace = self.get_workspace(workbook.workspaceId)
if workspace:
if self.config.workspace_pattern.allowed(workspace.name):
self.report.workbooks.processed(
f"{workbook.name} ({workbook.workbookId}) in {workspace.name}"
)
workbook.pages = self.get_workbook_pages(workbook)
workbooks.append(workbook)
else:
self.report.workbooks.dropped(
f"{workbook.name} ({workbook.workbookId}) in {workspace.name}"
)
elif self.config.ingest_shared_entities:
# If no workspace for workbook we can consider it as shared entity
self.report.workbooks_without_workspace += 1
self.report.workbooks.processed(
f"{workbook.name} ({workbook.workbookId}) in workspace id {workbook.workspaceId or 'unknown'}"
)
workbook.pages = self.get_workbook_pages(workbook)
workbooks.append(workbook)
else:
self.report.workbooks.dropped(
f"{workbook.name} ({workbook.workbookId}) in workspace id {workbook.workspaceId or 'unknown'}"
)
if response_dict[Constant.NEXTPAGE]:
url = f"{workbook_url}?page={response_dict[Constant.NEXTPAGE]}"
else:
break
self.report.number_of_workbooks = len(workbooks)
return workbooks
except Exception as e:
self._log_http_error(