mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-02 19:48:17 +00:00
powerbi refactor, avoid prepare bulk data (#19123)
This commit is contained in:
parent
dfe34e191c
commit
922c3d2713
@ -127,7 +127,13 @@ class PowerBiApiClient:
|
||||
List[PowerBIDashboard]
|
||||
"""
|
||||
try:
|
||||
response_data = self.client.get(f"/myorg/groups/{group_id}/dashboards")
|
||||
admin = "admin/" if self.config.useAdminApis else ""
|
||||
response_data = self.client.get(
|
||||
f"/myorg/{admin}groups/{group_id}/dashboards"
|
||||
)
|
||||
if not response_data:
|
||||
logger.debug(f"No dashboards found for workspace_id: {group_id}")
|
||||
return None
|
||||
response = DashboardsResponse(**response_data)
|
||||
return response.value
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
@ -142,7 +148,11 @@ class PowerBiApiClient:
|
||||
List[PowerBIReport]
|
||||
"""
|
||||
try:
|
||||
response_data = self.client.get(f"/myorg/groups/{group_id}/reports")
|
||||
admin = "admin/" if self.config.useAdminApis else ""
|
||||
response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/reports")
|
||||
if not response_data:
|
||||
logger.debug(f"No reports found for workspace_id: {group_id}")
|
||||
return None
|
||||
response = ReportsResponse(**response_data)
|
||||
return response.value
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
@ -157,7 +167,11 @@ class PowerBiApiClient:
|
||||
List[Dataset]
|
||||
"""
|
||||
try:
|
||||
response_data = self.client.get(f"/myorg/groups/{group_id}/datasets")
|
||||
admin = "admin/" if self.config.useAdminApis else ""
|
||||
response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/datasets")
|
||||
if not response_data:
|
||||
logger.debug(f"No datasets found for workspace_id: {group_id}")
|
||||
return None
|
||||
response = DatasetResponse(**response_data)
|
||||
return response.value
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
@ -174,9 +188,13 @@ class PowerBiApiClient:
|
||||
List[Tile]
|
||||
"""
|
||||
try:
|
||||
admin = "admin/" if self.config.useAdminApis else ""
|
||||
response_data = self.client.get(
|
||||
f"/myorg/groups/{group_id}/dashboards/{dashboard_id}/tiles"
|
||||
f"/myorg/{admin}dashboards/{dashboard_id}/tiles"
|
||||
)
|
||||
if not response_data:
|
||||
logger.debug(f"No dashboard tiles found for workspace_id: {group_id}")
|
||||
return None
|
||||
response = TilesResponse(**response_data)
|
||||
return response.value
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
|
||||
@ -92,12 +92,6 @@ class PowerbiSource(DashboardServiceSource):
|
||||
self.datamodel_file_mappings = []
|
||||
|
||||
def prepare(self):
|
||||
if self.service_connection.useAdminApis:
|
||||
groups = self.get_admin_workspace_data()
|
||||
else:
|
||||
groups = self.get_org_workspace_data()
|
||||
if groups:
|
||||
self.workspace_data = self.get_filtered_workspaces(groups)
|
||||
return super().prepare()
|
||||
|
||||
def close(self):
|
||||
@ -123,91 +117,6 @@ class PowerbiSource(DashboardServiceSource):
|
||||
filtered_groups.append(group)
|
||||
return filtered_groups
|
||||
|
||||
def get_org_workspace_data(self) -> Optional[List[Group]]:
|
||||
"""
|
||||
fetch all the group workspace ids
|
||||
"""
|
||||
groups = self.client.api_client.fetch_all_workspaces()
|
||||
for group in groups:
|
||||
# add the dashboards to the groups
|
||||
group.dashboards.extend(
|
||||
self.client.api_client.fetch_all_org_dashboards(group_id=group.id) or []
|
||||
)
|
||||
for dashboard in group.dashboards:
|
||||
# add the tiles to the dashboards
|
||||
dashboard.tiles.extend(
|
||||
self.client.api_client.fetch_all_org_tiles(
|
||||
group_id=group.id, dashboard_id=dashboard.id
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
# add the reports to the groups
|
||||
group.reports.extend(
|
||||
self.client.api_client.fetch_all_org_reports(group_id=group.id) or []
|
||||
)
|
||||
|
||||
# add the datasets to the groups
|
||||
group.datasets.extend(
|
||||
self.client.api_client.fetch_all_org_datasets(group_id=group.id) or []
|
||||
)
|
||||
for dataset in group.datasets:
|
||||
# add the tables to the datasets
|
||||
dataset.tables.extend(
|
||||
self.client.api_client.fetch_dataset_tables(
|
||||
group_id=group.id, dataset_id=dataset.id
|
||||
)
|
||||
or []
|
||||
)
|
||||
return groups
|
||||
|
||||
def get_admin_workspace_data(self) -> Optional[List[Group]]:
|
||||
"""
|
||||
fetch all the workspace ids
|
||||
"""
|
||||
groups = []
|
||||
workspaces = self.client.api_client.fetch_all_workspaces()
|
||||
if workspaces:
|
||||
workspace_id_list = [workspace.id for workspace in workspaces]
|
||||
|
||||
# Start the scan of the available workspaces for dashboard metadata
|
||||
workspace_paginated_list = [
|
||||
workspace_id_list[i : i + self.pagination_entity_per_page]
|
||||
for i in range(
|
||||
0, len(workspace_id_list), self.pagination_entity_per_page
|
||||
)
|
||||
]
|
||||
count = 1
|
||||
for workspace_ids_chunk in workspace_paginated_list:
|
||||
logger.info(
|
||||
f"Scanning {count}/{len(workspace_paginated_list)} set of workspaces"
|
||||
)
|
||||
workspace_scan = self.client.api_client.initiate_workspace_scan(
|
||||
workspace_ids_chunk
|
||||
)
|
||||
|
||||
# Keep polling the scan status endpoint to check if scan is succeeded
|
||||
workspace_scan_status = self.client.api_client.wait_for_scan_complete(
|
||||
scan_id=workspace_scan.id
|
||||
)
|
||||
if workspace_scan_status:
|
||||
response = self.client.api_client.fetch_workspace_scan_result(
|
||||
scan_id=workspace_scan.id
|
||||
)
|
||||
groups.extend(
|
||||
[
|
||||
active_workspace
|
||||
for active_workspace in response.workspaces
|
||||
if active_workspace.state == "Active"
|
||||
]
|
||||
)
|
||||
else:
|
||||
logger.error("Error in fetching dashboards and charts")
|
||||
count += 1
|
||||
else:
|
||||
logger.error("Unable to fetch any PowerBI workspaces")
|
||||
return groups or None
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
|
||||
@ -220,12 +129,51 @@ class PowerbiSource(DashboardServiceSource):
|
||||
)
|
||||
return cls(config, metadata)
|
||||
|
||||
def prepare_workspace_data(self, workspace: Group):
|
||||
"""prepare one workspace data at a time"""
|
||||
# add the dashboards to the groups
|
||||
workspace.dashboards.extend(
|
||||
self.client.api_client.fetch_all_org_dashboards(group_id=workspace.id) or []
|
||||
)
|
||||
for dashboard in workspace.dashboards:
|
||||
# add the tiles to the dashboards
|
||||
dashboard.tiles.extend(
|
||||
self.client.api_client.fetch_all_org_tiles(
|
||||
group_id=workspace.id, dashboard_id=dashboard.id
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
# add the reports to the groups
|
||||
workspace.reports.extend(
|
||||
self.client.api_client.fetch_all_org_reports(group_id=workspace.id) or []
|
||||
)
|
||||
|
||||
# add the datasets to the groups
|
||||
workspace.datasets.extend(
|
||||
self.client.api_client.fetch_all_org_datasets(group_id=workspace.id) or []
|
||||
)
|
||||
for dataset in workspace.datasets:
|
||||
# add the tables to the datasets
|
||||
dataset.tables.extend(
|
||||
self.client.api_client.fetch_dataset_tables(
|
||||
group_id=workspace.id, dataset_id=dataset.id
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
def get_dashboard(self) -> Any:
|
||||
"""
|
||||
Method to iterate through dashboard lists filter dashboards & yield dashboard details
|
||||
"""
|
||||
for workspace in self.workspace_data:
|
||||
# fetch all workspaces/groups & apply filter pattern
|
||||
all_workspaces = self.client.api_client.fetch_all_workspaces()
|
||||
all_workspaces = self.get_filtered_workspaces(all_workspaces)
|
||||
for workspace in all_workspaces:
|
||||
# prepare additional data for specific workspace (datasets, reports, dashboards)
|
||||
self.prepare_workspace_data(workspace)
|
||||
self.context.get().workspace = workspace
|
||||
self.workspace_data.append(workspace)
|
||||
for dashboard in self.get_dashboards_list():
|
||||
try:
|
||||
dashboard_details = self.get_dashboard_details(dashboard)
|
||||
@ -309,54 +257,45 @@ class PowerbiSource(DashboardServiceSource):
|
||||
f"{workspace_id}/{chart_url_postfix}"
|
||||
)
|
||||
|
||||
def list_datamodels(self) -> Iterable[Dataset]:
|
||||
def yield_datamodel(
|
||||
self, dashboard_details: Union[PowerBIDashboard, PowerBIReport]
|
||||
) -> Iterable[Either[CreateDashboardRequest]]:
|
||||
"""
|
||||
Get All the Powerbi Datasets
|
||||
Method to yield datamodel for each workspace
|
||||
"""
|
||||
if self.source_config.includeDataModels:
|
||||
workspace_datasets = self.context.get().workspace.datasets
|
||||
for dataset in workspace_datasets:
|
||||
if filter_by_datamodel(
|
||||
self.source_config.dataModelFilterPattern, dataset.name
|
||||
):
|
||||
self.status.filter(dataset.name, "Data model filtered out.")
|
||||
continue
|
||||
try:
|
||||
for workspace in self.workspace_data:
|
||||
for dataset in workspace.datasets or []:
|
||||
if filter_by_datamodel(
|
||||
self.source_config.dataModelFilterPattern, dataset.name
|
||||
):
|
||||
self.status.filter(dataset.name, "Data model filtered out.")
|
||||
continue
|
||||
yield dataset
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unexpected error fetching PowerBI datasets - {err}")
|
||||
|
||||
def yield_bulk_datamodel(
|
||||
self, dataset: Dataset
|
||||
) -> Iterable[Either[CreateDashboardDataModelRequest]]:
|
||||
"""
|
||||
Method to fetch DataModels in bulk
|
||||
"""
|
||||
try:
|
||||
data_model_request = CreateDashboardDataModelRequest(
|
||||
name=EntityName(dataset.id),
|
||||
displayName=dataset.name,
|
||||
description=Markdown(dataset.description)
|
||||
if dataset.description
|
||||
else None,
|
||||
service=FullyQualifiedEntityName(self.context.get().dashboard_service),
|
||||
dataModelType=DataModelType.PowerBIDataModel.value,
|
||||
serviceType=DashboardServiceType.PowerBI.value,
|
||||
columns=self._get_column_info(dataset),
|
||||
project=self._fetch_dataset_workspace(dataset_id=dataset.id),
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_request=data_model_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
name=dataset.name,
|
||||
error=f"Error yielding Data Model [{dataset.name}]: {exc}",
|
||||
stackTrace=traceback.format_exc(),
|
||||
data_model_request = CreateDashboardDataModelRequest(
|
||||
name=EntityName(dataset.id),
|
||||
displayName=dataset.name,
|
||||
description=Markdown(dataset.description)
|
||||
if dataset.description
|
||||
else None,
|
||||
service=FullyQualifiedEntityName(
|
||||
self.context.get().dashboard_service
|
||||
),
|
||||
dataModelType=DataModelType.PowerBIDataModel.value,
|
||||
serviceType=DashboardServiceType.PowerBI.value,
|
||||
columns=self._get_column_info(dataset),
|
||||
project=self.get_project_name(dashboard_details),
|
||||
)
|
||||
yield Either(right=data_model_request)
|
||||
self.register_record_datamodel(datamodel_request=data_model_request)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
name=dataset.name,
|
||||
error=f"Error yielding Data Model [{dataset.name}]: {exc}",
|
||||
stackTrace=traceback.format_exc(),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def _get_child_columns(self, table: PowerBiTable) -> List[Column]:
|
||||
"""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user