diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py index c9a38236f5f..cfaa47379ef 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/client.py @@ -47,7 +47,10 @@ from metadata.utils.logger import utils_logger logger = utils_logger() - +GETGROUPS_DEFAULT_PARAMS = {"$top": "1", "$skip": "0"} +API_RESPONSE_MESSAGE_KEY = "message" +AUTH_TOKEN_MAX_RETRIES = 5 +AUTH_TOKEN_RETRY_WAIT = 120 # Similar inner methods with mode client. That's fine. # pylint: disable=duplicate-code class PowerBiApiClient: @@ -59,6 +62,9 @@ class PowerBiApiClient: def __init__(self, config: PowerBIConnection): self.config = config + self.pagination_entity_per_page = min( + 100, self.config.pagination_entity_per_page + ) self.msal_client = msal.ConfidentialClientApplication( client_id=self.config.clientId, client_credential=self.config.clientSecret.get_secret_value(), @@ -82,15 +88,11 @@ class PowerBiApiClient: """ logger.info("Generating PowerBi access token") - response_data = self.msal_client.acquire_token_silent( - scopes=self.config.scope, account=None - ) - + response_data = self.get_auth_token_from_cache() if not response_data: logger.info("Token does not exist in the cache. Getting a new token.") - response_data = self.msal_client.acquire_token_for_client( - scopes=self.config.scope - ) + response_data = self.generate_new_auth_token() + response_data = response_data or {} auth_response = PowerBiToken(**response_data) if not auth_response.access_token: raise InvalidSourceException( @@ -100,6 +102,59 @@ class PowerBiApiClient: logger.info("PowerBi Access Token generated successfully") return auth_response.access_token, auth_response.expires_in + def generate_new_auth_token(self) -> Optional[dict]: + """generate new auth token""" + retry = AUTH_TOKEN_MAX_RETRIES + while retry: + try: + response_data = self.msal_client.acquire_token_for_client( + scopes=self.config.scope + ) + return response_data + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error generating new auth token: {exc}") + # wait for time and retry + retry -= 1 + if retry: + logger.warning( + f"Error generating new token: {exc}, " + f"sleep {AUTH_TOKEN_RETRY_WAIT} seconds retrying {retry} more times.." + ) + sleep(AUTH_TOKEN_RETRY_WAIT) + else: + logger.warning( + "Could not generate new token after maximum retries, " + "Please check provided configs" + ) + return None + + def get_auth_token_from_cache(self) -> Optional[dict]: + """fetch auth token from cache""" + retry = AUTH_TOKEN_MAX_RETRIES + while retry: + try: + response_data = self.msal_client.acquire_token_silent( + scopes=self.config.scope, account=None + ) + return response_data + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error getting token from cache: {exc}") + retry -= 1 + if retry: + logger.warning( + f"Error getting token from cache: {exc}, " + f"sleep {AUTH_TOKEN_RETRY_WAIT} seconds retrying {retry} more times.." + ) + sleep(AUTH_TOKEN_RETRY_WAIT) + else: + logger.warning( + "Could not get token from cache after maximum retries, " + "Please check provided configs" + ) + return None + def fetch_dashboards(self) -> Optional[List[PowerBIDashboard]]: """Get dashboards method Returns: @@ -198,6 +253,7 @@ class PowerBiApiClient: return None + # pylint: disable=too-many-branches,too-many-statements def fetch_all_workspaces(self) -> Optional[List[Group]]: """Method to fetch all powerbi workspace details Returns: @@ -206,28 +262,94 @@ class PowerBiApiClient: try: admin = "admin/" if self.config.useAdminApis else "" api_url = f"/myorg/{admin}groups" - entities_per_page = self.config.pagination_entity_per_page - params_data = {"$top": "1"} - response_data = self.client.get(api_url, data=params_data) - response = GroupsResponse(**response_data) - count = response.odata_count + entities_per_page = self.pagination_entity_per_page + failed_indexes = [] + params_data = GETGROUPS_DEFAULT_PARAMS + response = self.client.get(api_url, data=params_data) + if ( + not response + or API_RESPONSE_MESSAGE_KEY in response + or len(response) != len(GroupsResponse.__annotations__) + ): + logger.warning("Error fetching workspaces between results: (0, 1)") + if response and response.get(API_RESPONSE_MESSAGE_KEY): + logger.warning( + "Error message from API response: " + f"{str(response.get(API_RESPONSE_MESSAGE_KEY))}" + ) + failed_indexes.append(params_data) + count = 0 + else: + try: + response = GroupsResponse(**response) + count = response.odata_count + except Exception as exc: + logger.warning(f"Error processing GetGroups response: {exc}") + count = 0 indexes = math.ceil(count / entities_per_page) - workspaces = [] for index in range(indexes): params_data = { "$top": str(entities_per_page), "$skip": str(index * entities_per_page), } - response_data = self.client.get(api_url, data=params_data) - if not response_data: - logger.error( - "Error fetching workspaces between results: " - f"{str(index * entities_per_page)} - {str(entities_per_page)}" + response = self.client.get(api_url, data=params_data) + if ( + not response + or API_RESPONSE_MESSAGE_KEY in response + or len(response) != len(GroupsResponse.__annotations__) + ): + index_range = ( + int(params_data.get("$skip")), + int(params_data.get("$skip")) + int(params_data.get("$top")), ) + logger.warning( + f"Error fetching workspaces between results: {str(index_range)}" + ) + if response and response.get(API_RESPONSE_MESSAGE_KEY): + logger.warning( + "Error message from API response: " + f"{str(response.get(API_RESPONSE_MESSAGE_KEY))}" + ) + failed_indexes.append(params_data) continue - response = GroupsResponse(**response_data) - workspaces.extend(response.value) + try: + response = GroupsResponse(**response) + workspaces.extend(response.value) + except Exception as exc: + logger.warning(f"Error processing GetGroups response: {exc}") + + if failed_indexes: + logger.info( + "Retrying one more time on failed indexes to get workspaces" + ) + for params_data in failed_indexes: + response = self.client.get(api_url, data=params_data) + if ( + not response + or API_RESPONSE_MESSAGE_KEY in response + or len(response) != len(GroupsResponse.__annotations__) + ): + index_range = ( + int(params_data.get("$skip")), + int(params_data.get("$skip")) + + int(params_data.get("$top")), + ) + logger.warning( + f"Workspaces between results {str(index_range)} " + "could not be fetched on multiple attempts" + ) + if response and response.get(API_RESPONSE_MESSAGE_KEY): + logger.warning( + "Error message from API response: " + f"{str(response.get(API_RESPONSE_MESSAGE_KEY))}" + ) + continue + try: + response = GroupsResponse(**response) + workspaces.extend(response.value) + except Exception as exc: + logger.warning(f"Error processing GetGroups response: {exc}") return workspaces except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc())