From 922c3d2713daca2eaabb5704ce5e6b4ef763d188 Mon Sep 17 00:00:00 2001 From: harshsoni2024 <64592571+harshsoni2024@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:39:06 +0530 Subject: [PATCH] powerbi refactor, avoid prepare bulk data (#19123) --- .../source/dashboard/powerbi/client.py | 26 ++- .../source/dashboard/powerbi/metadata.py | 211 +++++++----------- 2 files changed, 97 insertions(+), 140 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py index d1acc7901ac..68a7aad0047 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py @@ -127,7 +127,13 @@ class PowerBiApiClient: List[PowerBIDashboard] """ try: - response_data = self.client.get(f"/myorg/groups/{group_id}/dashboards") + admin = "admin/" if self.config.useAdminApis else "" + response_data = self.client.get( + f"/myorg/{admin}groups/{group_id}/dashboards" + ) + if not response_data: + logger.debug(f"No dashboards found for workspace_id: {group_id}") + return None response = DashboardsResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -142,7 +148,11 @@ class PowerBiApiClient: List[PowerBIReport] """ try: - response_data = self.client.get(f"/myorg/groups/{group_id}/reports") + admin = "admin/" if self.config.useAdminApis else "" + response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/reports") + if not response_data: + logger.debug(f"No reports found for workspace_id: {group_id}") + return None response = ReportsResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -157,7 +167,11 @@ class PowerBiApiClient: List[Dataset] """ try: - response_data = self.client.get(f"/myorg/groups/{group_id}/datasets") + admin = "admin/" if self.config.useAdminApis else "" + response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/datasets") + if not response_data: + logger.debug(f"No datasets found for workspace_id: {group_id}") + return None response = DatasetResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -174,9 +188,13 @@ class PowerBiApiClient: List[Tile] """ try: + admin = "admin/" if self.config.useAdminApis else "" response_data = self.client.get( - f"/myorg/groups/{group_id}/dashboards/{dashboard_id}/tiles" + f"/myorg/{admin}dashboards/{dashboard_id}/tiles" ) + if not response_data: + logger.debug(f"No dashboard tiles found for workspace_id: {group_id}") + return None response = TilesResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 685e699c281..ea66b1b279f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -92,12 +92,6 @@ class PowerbiSource(DashboardServiceSource): self.datamodel_file_mappings = [] def prepare(self): - if self.service_connection.useAdminApis: - groups = self.get_admin_workspace_data() - else: - groups = self.get_org_workspace_data() - if groups: - self.workspace_data = self.get_filtered_workspaces(groups) return super().prepare() def close(self): @@ -123,91 +117,6 @@ class PowerbiSource(DashboardServiceSource): filtered_groups.append(group) return filtered_groups - def get_org_workspace_data(self) -> Optional[List[Group]]: - """ - fetch all the group workspace ids - """ - groups = self.client.api_client.fetch_all_workspaces() - for group in groups: - # add the dashboards to the groups - group.dashboards.extend( - self.client.api_client.fetch_all_org_dashboards(group_id=group.id) or [] - ) - for dashboard in group.dashboards: - # add the tiles to the dashboards - dashboard.tiles.extend( - self.client.api_client.fetch_all_org_tiles( - group_id=group.id, dashboard_id=dashboard.id - ) - or [] - ) - - # add the reports to the groups - group.reports.extend( - self.client.api_client.fetch_all_org_reports(group_id=group.id) or [] - ) - - # add the datasets to the groups - group.datasets.extend( - self.client.api_client.fetch_all_org_datasets(group_id=group.id) or [] - ) - for dataset in group.datasets: - # add the tables to the datasets - dataset.tables.extend( - self.client.api_client.fetch_dataset_tables( - group_id=group.id, dataset_id=dataset.id - ) - or [] - ) - return groups - - def get_admin_workspace_data(self) -> Optional[List[Group]]: - """ - fetch all the workspace ids - """ - groups = [] - workspaces = self.client.api_client.fetch_all_workspaces() - if workspaces: - workspace_id_list = [workspace.id for workspace in workspaces] - - # Start the scan of the available workspaces for dashboard metadata - workspace_paginated_list = [ - workspace_id_list[i : i + self.pagination_entity_per_page] - for i in range( - 0, len(workspace_id_list), self.pagination_entity_per_page - ) - ] - count = 1 - for workspace_ids_chunk in workspace_paginated_list: - logger.info( - f"Scanning {count}/{len(workspace_paginated_list)} set of workspaces" - ) - workspace_scan = self.client.api_client.initiate_workspace_scan( - workspace_ids_chunk - ) - - # Keep polling the scan status endpoint to check if scan is succeeded - workspace_scan_status = self.client.api_client.wait_for_scan_complete( - scan_id=workspace_scan.id - ) - if workspace_scan_status: - response = self.client.api_client.fetch_workspace_scan_result( - scan_id=workspace_scan.id - ) - groups.extend( - [ - active_workspace - for active_workspace in response.workspaces - if active_workspace.state == "Active" - ] - ) - else: - logger.error("Error in fetching dashboards and charts") - count += 1 - else: - logger.error("Unable to fetch any PowerBI workspaces") - return groups or None - @classmethod def create( cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None @@ -220,12 +129,51 @@ class PowerbiSource(DashboardServiceSource): ) return cls(config, metadata) + def prepare_workspace_data(self, workspace: Group): + """prepare one workspace data at a time""" + # add the dashboards to the groups + workspace.dashboards.extend( + self.client.api_client.fetch_all_org_dashboards(group_id=workspace.id) or [] + ) + for dashboard in workspace.dashboards: + # add the tiles to the dashboards + dashboard.tiles.extend( + self.client.api_client.fetch_all_org_tiles( + group_id=workspace.id, dashboard_id=dashboard.id + ) + or [] + ) + + # add the reports to the groups + workspace.reports.extend( + self.client.api_client.fetch_all_org_reports(group_id=workspace.id) or [] + ) + + # add the datasets to the groups + workspace.datasets.extend( + self.client.api_client.fetch_all_org_datasets(group_id=workspace.id) or [] + ) + for dataset in workspace.datasets: + # add the tables to the datasets + dataset.tables.extend( + self.client.api_client.fetch_dataset_tables( + group_id=workspace.id, dataset_id=dataset.id + ) + or [] + ) + def get_dashboard(self) -> Any: """ Method to iterate through dashboard lists filter dashboards & yield dashboard details """ - for workspace in self.workspace_data: + # fetch all workspaces/groups & apply filter pattern + all_workspaces = self.client.api_client.fetch_all_workspaces() + all_workspaces = self.get_filtered_workspaces(all_workspaces) + for workspace in all_workspaces: + # prepare additional data for specific workspace (datasets, reports, dashboards) + self.prepare_workspace_data(workspace) self.context.get().workspace = workspace + self.workspace_data.append(workspace) for dashboard in self.get_dashboards_list(): try: dashboard_details = self.get_dashboard_details(dashboard) @@ -309,54 +257,45 @@ class PowerbiSource(DashboardServiceSource): f"{workspace_id}/{chart_url_postfix}" ) - def list_datamodels(self) -> Iterable[Dataset]: + def yield_datamodel( + self, dashboard_details: Union[PowerBIDashboard, PowerBIReport] + ) -> Iterable[Either[CreateDashboardRequest]]: """ - Get All the Powerbi Datasets + Method to yield datamodel for each workspace """ - if self.source_config.includeDataModels: + workspace_datasets = self.context.get().workspace.datasets + for dataset in workspace_datasets: + if filter_by_datamodel( + self.source_config.dataModelFilterPattern, dataset.name + ): + self.status.filter(dataset.name, "Data model filtered out.") + continue try: - for workspace in self.workspace_data: - for dataset in workspace.datasets or []: - if filter_by_datamodel( - self.source_config.dataModelFilterPattern, dataset.name - ): - self.status.filter(dataset.name, "Data model filtered out.") - continue - yield dataset - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(f"Unexpected error fetching PowerBI datasets - {err}") - - def yield_bulk_datamodel( - self, dataset: Dataset - ) -> Iterable[Either[CreateDashboardDataModelRequest]]: - """ - Method to fetch DataModels in bulk - """ - try: - data_model_request = CreateDashboardDataModelRequest( - name=EntityName(dataset.id), - displayName=dataset.name, - description=Markdown(dataset.description) - if dataset.description - else None, - service=FullyQualifiedEntityName(self.context.get().dashboard_service), - dataModelType=DataModelType.PowerBIDataModel.value, - serviceType=DashboardServiceType.PowerBI.value, - columns=self._get_column_info(dataset), - project=self._fetch_dataset_workspace(dataset_id=dataset.id), - ) - yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_request=data_model_request) - - except Exception as exc: - yield Either( - left=StackTraceError( - name=dataset.name, - error=f"Error yielding Data Model [{dataset.name}]: {exc}", - stackTrace=traceback.format_exc(), + data_model_request = CreateDashboardDataModelRequest( + name=EntityName(dataset.id), + displayName=dataset.name, + description=Markdown(dataset.description) + if dataset.description + else None, + service=FullyQualifiedEntityName( + self.context.get().dashboard_service + ), + dataModelType=DataModelType.PowerBIDataModel.value, + serviceType=DashboardServiceType.PowerBI.value, + columns=self._get_column_info(dataset), + project=self.get_project_name(dashboard_details), + ) + yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=dataset.name, + error=f"Error yielding Data Model [{dataset.name}]: {exc}", + stackTrace=traceback.format_exc(), + ) ) - ) def _get_child_columns(self, table: PowerBiTable) -> List[Column]: """