diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py index bd4d24d28b6..381094e1801 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py @@ -127,13 +127,7 @@ class PowerBiApiClient: List[PowerBIDashboard] """ try: - admin = "admin/" if self.config.useAdminApis else "" - response_data = self.client.get( - f"/myorg/{admin}groups/{group_id}/dashboards" - ) - if not response_data: - logger.debug(f"No dashboards found for workspace_id: {group_id}") - return None + response_data = self.client.get(f"/myorg/groups/{group_id}/dashboards") response = DashboardsResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -148,11 +142,7 @@ class PowerBiApiClient: List[PowerBIReport] """ try: - admin = "admin/" if self.config.useAdminApis else "" - response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/reports") - if not response_data: - logger.debug(f"No reports found for workspace_id: {group_id}") - return None + response_data = self.client.get(f"/myorg/groups/{group_id}/reports") response = ReportsResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -167,11 +157,7 @@ class PowerBiApiClient: List[Dataset] """ try: - admin = "admin/" if self.config.useAdminApis else "" - response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/datasets") - if not response_data: - logger.debug(f"No datasets found for workspace_id: {group_id}") - return None + response_data = self.client.get(f"/myorg/groups/{group_id}/datasets") response = DatasetResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -188,13 +174,9 @@ class PowerBiApiClient: List[Tile] """ try: - admin = "admin/" if self.config.useAdminApis else "" response_data = self.client.get( - f"/myorg/{admin}dashboards/{dashboard_id}/tiles" + f"/myorg/groups/{group_id}/dashboards/{dashboard_id}/tiles" ) - if not response_data: - logger.debug(f"No dashboard tiles found for workspace_id: {group_id}") - return None response = TilesResponse(**response_data) return response.value except Exception as exc: # pylint: disable=broad-except @@ -234,9 +216,6 @@ class PowerBiApiClient: entities_per_page = self.config.pagination_entity_per_page params_data = {"$top": "1"} response_data = self.client.get(api_url, data=params_data) - if not response_data: - logger.debug("No groups/workspaces found") - return None response = GroupsResponse(**response_data) count = response.odata_count indexes = math.ceil(count / entities_per_page) @@ -249,7 +228,10 @@ class PowerBiApiClient: } response_data = self.client.get(api_url, data=params_data) if not response_data: - logger.debug("No more groups/workspaces found") + logger.error( + "Error fetching workspaces between results: " + f"{str(index * entities_per_page)} - {str(entities_per_page)}" + ) continue response = GroupsResponse(**response_data) workspaces.extend(response.value) @@ -286,7 +268,6 @@ class PowerBiApiClient: def fetch_workspace_scan_status( self, scan_id: str ) -> Optional[WorkSpaceScanResponse]: - # deprecated in favour to avoide bulk data prepare """Get Workspace scan status by id method Args: scan_id: @@ -305,7 +286,6 @@ class PowerBiApiClient: return None def fetch_workspace_scan_result(self, scan_id: str) -> Optional[Workspaces]: - # deprecated in favour to avoide bulk data prepare """Get Workspace scan result by id method Args: scan_id: @@ -327,7 +307,6 @@ class PowerBiApiClient: """ Method to poll the scan status endpoint until the timeout """ - # deprecated in favour to avoide bulk data prepare min_sleep_time = 3 if min_sleep_time > timeout: logger.info(f"Timeout is set to minimum sleep time: {timeout}") diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index f8ec367b462..a918b46fd8a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -49,7 +49,6 @@ from metadata.generated.schema.type.basic import ( ) from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.utils import model_str from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource @@ -93,6 +92,17 @@ class PowerbiSource(DashboardServiceSource): self.datamodel_file_mappings = [] def prepare(self): + """ + - Since we get all the required info i.e. reports, dashboards, charts, datasets + with workflow scan approach, we are populating bulk data for workspace. + - Some individual APIs are not able to yield data with details. + """ + if self.service_connection.useAdminApis: + groups = self.get_admin_workspace_data() + else: + groups = self.get_org_workspace_data() + if groups: + self.workspace_data = self.get_filtered_workspaces(groups) return super().prepare() def close(self): @@ -118,6 +128,108 @@ class PowerbiSource(DashboardServiceSource): filtered_groups.append(group) return filtered_groups + def get_org_workspace_data(self) -> Optional[List[Group]]: + """ + fetch all the group workspace ids + """ + groups = self.client.api_client.fetch_all_workspaces() + for group in groups: + # add the dashboards to the groups + group.dashboards.extend( + self.client.api_client.fetch_all_org_dashboards(group_id=group.id) or [] + ) + for dashboard in group.dashboards: + # add the tiles to the dashboards + dashboard.tiles.extend( + self.client.api_client.fetch_all_org_tiles( + group_id=group.id, dashboard_id=dashboard.id + ) + or [] + ) + + # add the reports to the groups + group.reports.extend( + self.client.api_client.fetch_all_org_reports(group_id=group.id) or [] + ) + + # add the datasets to the groups + group.datasets.extend( + self.client.api_client.fetch_all_org_datasets(group_id=group.id) or [] + ) + for dataset in group.datasets: + # add the tables to the datasets + dataset.tables.extend( + self.client.api_client.fetch_dataset_tables( + group_id=group.id, dataset_id=dataset.id + ) + or [] + ) + return groups + + def get_admin_workspace_data(self) -> Optional[List[Group]]: + """ + fetch all the workspace ids + """ + groups = [] + workspaces = self.client.api_client.fetch_all_workspaces() + if workspaces: + workspace_id_list = [workspace.id for workspace in workspaces] + + # Start the scan of the available workspaces for dashboard metadata + workspace_paginated_list = [ + workspace_id_list[i : i + self.pagination_entity_per_page] + for i in range( + 0, len(workspace_id_list), self.pagination_entity_per_page + ) + ] + count = 1 + for workspace_ids_chunk in workspace_paginated_list: + logger.info( + f"Scanning {count}/{len(workspace_paginated_list)} set of workspaces" + ) + workspace_scan = self.client.api_client.initiate_workspace_scan( + workspace_ids_chunk + ) + if not workspace_scan: + logger.error( + f"Error initiating workspace scan for ids:{str(workspace_ids_chunk)}\n moving to next set of workspaces" + ) + count += 1 + continue + + # Keep polling the scan status endpoint to check if scan is succeeded + workspace_scan_status = self.client.api_client.wait_for_scan_complete( + scan_id=workspace_scan.id + ) + if not workspace_scan_status: + logger.error( + f"Max poll hit to scan status for scan_id: {workspace_scan.id}, moving to next set of workspaces" + ) + count += 1 + continue + + # Get scan result for successfull scan + response = self.client.api_client.fetch_workspace_scan_result( + scan_id=workspace_scan.id + ) + if not response: + logger.error( + f"Error getting workspace scan result for scan_id: {workspace_scan.id}" + ) + count += 1 + continue + groups.extend( + [ + active_workspace + for active_workspace in response.workspaces + if active_workspace.state == "Active" + ] + ) + count += 1 + else: + logger.error("Unable to fetch any PowerBI workspaces") + return groups or None + @classmethod def create( cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None @@ -130,51 +242,12 @@ class PowerbiSource(DashboardServiceSource): ) return cls(config, metadata) - def prepare_workspace_data(self, workspace: Group): - """prepare one workspace data at a time""" - # add the dashboards to the groups - workspace.dashboards.extend( - self.client.api_client.fetch_all_org_dashboards(group_id=workspace.id) or [] - ) - for dashboard in workspace.dashboards: - # add the tiles to the dashboards - dashboard.tiles.extend( - self.client.api_client.fetch_all_org_tiles( - group_id=workspace.id, dashboard_id=dashboard.id - ) - or [] - ) - - # add the reports to the groups - workspace.reports.extend( - self.client.api_client.fetch_all_org_reports(group_id=workspace.id) or [] - ) - - # add the datasets to the groups - workspace.datasets.extend( - self.client.api_client.fetch_all_org_datasets(group_id=workspace.id) or [] - ) - for dataset in workspace.datasets: - # add the tables to the datasets - dataset.tables.extend( - self.client.api_client.fetch_dataset_tables( - group_id=workspace.id, dataset_id=dataset.id - ) - or [] - ) - def get_dashboard(self) -> Any: """ Method to iterate through dashboard lists filter dashboards & yield dashboard details """ - # fetch all workspaces/groups & apply filter pattern - all_workspaces = self.client.api_client.fetch_all_workspaces() or [] - all_workspaces = self.get_filtered_workspaces(all_workspaces) - for workspace in all_workspaces: - # prepare additional data for specific workspace (datasets, reports, dashboards) - self.prepare_workspace_data(workspace) + for workspace in self.workspace_data: self.context.get().workspace = workspace - self.workspace_data.append(workspace) for dashboard in self.get_dashboards_list(): try: dashboard_details = self.get_dashboard_details(dashboard) @@ -258,45 +331,54 @@ class PowerbiSource(DashboardServiceSource): f"{workspace_id}/{chart_url_postfix}" ) - def yield_datamodel( - self, dashboard_details: Union[PowerBIDashboard, PowerBIReport] - ) -> Iterable[Either[CreateDashboardRequest]]: + def list_datamodels(self) -> Iterable[Dataset]: """ - Method to yield datamodel for each workspace + Get All the Powerbi Datasets """ - workspace_datasets = self.context.get().workspace.datasets - for dataset in workspace_datasets: - if filter_by_datamodel( - self.source_config.dataModelFilterPattern, dataset.name - ): - self.status.filter(dataset.name, "Data model filtered out.") - continue + if self.source_config.includeDataModels: try: - data_model_request = CreateDashboardDataModelRequest( - name=EntityName(dataset.id), - displayName=dataset.name, - description=Markdown(dataset.description) - if dataset.description - else None, - service=FullyQualifiedEntityName( - self.context.get().dashboard_service - ), - dataModelType=DataModelType.PowerBIDataModel.value, - serviceType=DashboardServiceType.PowerBI.value, - columns=self._get_column_info(dataset), - project=self.get_project_name(dashboard_details), - ) - yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_request=data_model_request) + for workspace in self.workspace_data: + for dataset in workspace.datasets or []: + if filter_by_datamodel( + self.source_config.dataModelFilterPattern, dataset.name + ): + self.status.filter(dataset.name, "Data model filtered out.") + continue + yield dataset + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Unexpected error fetching PowerBI datasets - {err}") - except Exception as exc: - yield Either( - left=StackTraceError( - name=dataset.name, - error=f"Error yielding Data Model [{dataset.name}]: {exc}", - stackTrace=traceback.format_exc(), - ) + def yield_bulk_datamodel( + self, dataset: Dataset + ) -> Iterable[Either[CreateDashboardDataModelRequest]]: + """ + Method to fetch DataModels in bulk + """ + try: + data_model_request = CreateDashboardDataModelRequest( + name=EntityName(dataset.id), + displayName=dataset.name, + description=Markdown(dataset.description) + if dataset.description + else None, + service=FullyQualifiedEntityName(self.context.get().dashboard_service), + dataModelType=DataModelType.PowerBIDataModel.value, + serviceType=DashboardServiceType.PowerBI.value, + columns=self._get_column_info(dataset), + project=self._fetch_dataset_workspace(dataset_id=dataset.id), + ) + yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=dataset.name, + error=f"Error yielding Data Model [{dataset.name}]: {exc}", + stackTrace=traceback.format_exc(), ) + ) def _get_child_columns(self, table: PowerBiTable) -> List[Column]: """ @@ -762,85 +844,3 @@ class PowerbiSource(DashboardServiceSource): f"Error fetching project name for {dashboard_details.id}: {exc}" ) return None - - def yield_dashboard_lineage( - self, dashboard_details: Any - ) -> Iterable[Either[OMetaLineageRequest]]: - """ - Yields lineage if config is enabled. - - We will look for the data in all the services - we have informed. - """ - for lineage in self.yield_datamodel_dashboard_lineage(dashboard_details) or []: - if lineage.right is not None: - yield Either( - right=OMetaLineageRequest( - lineage_request=lineage.right, - override_lineage=self.source_config.overrideLineage, - ) - ) - else: - yield lineage - - db_service_names = self.get_db_service_names() - for db_service_name in db_service_names or []: - yield from self.yield_dashboard_lineage_details( - dashboard_details, db_service_name - ) or [] - - def yield_datamodel_dashboard_lineage( - self, dashboard_details: Any - ) -> Iterable[Either[AddLineageRequest]]: - """ - Returns: - Lineage request between Data Models and Dashboards - """ - dashboard_fqn = fqn.build( - self.metadata, - entity_type=Dashboard, - service_name=self.context.get().dashboard_service, - dashboard_name=dashboard_details.id, - ) - dashboard_entity = self.metadata.get_by_name( - entity=Dashboard, fqn=dashboard_fqn - ) - if isinstance(dashboard_details, PowerBIReport): - datamodel_fqn = fqn.build( - metadata=self.metadata, - entity_type=DashboardDataModel, - service_name=self.context.get().dashboard_service, - data_model_name=dashboard_details.datasetId, - ) - datamodel_entity = self.metadata.get_by_name( - entity=DashboardDataModel, fqn=datamodel_fqn - ) - if dashboard_entity and datamodel_entity: - yield self._get_add_lineage_request( - to_entity=dashboard_entity, from_entity=datamodel_entity - ) - else: - if ( - hasattr(self.context.get(), "dataModels") - and self.context.get().dataModels - ): - for datamodel in self.context.get().dataModels: - try: - datamodel_fqn = fqn.build( - metadata=self.metadata, - entity_type=DashboardDataModel, - service_name=self.context.get().dashboard_service, - data_model_name=datamodel, - ) - datamodel_entity = self.metadata.get_by_name( - entity=DashboardDataModel, fqn=datamodel_fqn - ) - if dashboard_entity and datamodel_entity: - yield self._get_add_lineage_request( - to_entity=dashboard_entity, from_entity=datamodel_entity - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error( - f"Error to yield dashboard lineage details for data model name [{str(datamodel)}]: {err}" - )