feat(ingest/sigma): add reporting on filtered workspaces (#12998)

This commit is contained in:
Harshal Sheth 2025-03-28 16:58:34 -07:00 committed by GitHub
parent a37a1a502e
commit 3e4d14734b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 13 additions and 10 deletions

View File

@ -9,6 +9,7 @@ from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
@ -54,16 +55,14 @@ class Constant:
@dataclass
class SigmaSourceReport(StaleEntityRemovalSourceReport):
number_of_workspaces: int = 0
workspaces: EntityFilterReport = EntityFilterReport.field(type="workspace")
number_of_workspaces: Optional[int] = None
non_accessible_workspaces_count: int = 0
shared_entities_count: int = 0
number_of_datasets: int = 0
number_of_workbooks: int = 0
number_of_files_metadata: Dict[str, int] = field(default_factory=dict)
def report_number_of_workspaces(self, number_of_workspaces: int) -> None:
self.number_of_workspaces = number_of_workspaces
class PlatformDetail(PlatformInstanceConfigMixin, EnvConfigMixin):
data_source_platform: str = pydantic.Field(

View File

@ -162,14 +162,17 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
def _get_allowed_workspaces(self) -> List[Workspace]:
all_workspaces = self.sigma_api.workspaces.values()
allowed_workspaces = [
workspace
for workspace in all_workspaces
if self.config.workspace_pattern.allowed(workspace.name)
]
logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.report_number_of_workspaces(len(all_workspaces))
self.reporter.number_of_workspaces = len(all_workspaces)
allowed_workspaces = []
for workspace in all_workspaces:
if self.config.workspace_pattern.allowed(workspace.name):
allowed_workspaces.append(workspace)
else:
self.reporter.workspaces.dropped(workspace.workspaceId)
logger.info(f"Number of allowed workspaces = {len(allowed_workspaces)}")
return allowed_workspaces
def _gen_workspace_workunit(
@ -658,6 +661,7 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
yield from self._gen_workbook_workunit(workbook)
for workspace in self._get_allowed_workspaces():
self.reporter.workspaces.processed(workspace.workspaceId)
yield from self._gen_workspace_workunit(workspace)
yield from self._gen_sigma_dataset_upstream_lineage_workunit()