MINOR: powerbi fetch workspaces failure handle (#19785)

This commit is contained in:
harshsoni2024 2025-02-17 15:27:03 +05:30 committed by GitHub
parent 8d16657a15
commit e0237d68b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -47,7 +47,10 @@ from metadata.utils.logger import utils_logger
logger = utils_logger()
GETGROUPS_DEFAULT_PARAMS = {"$top": "1", "$skip": "0"}
API_RESPONSE_MESSAGE_KEY = "message"
AUTH_TOKEN_MAX_RETRIES = 5
AUTH_TOKEN_RETRY_WAIT = 120
# Similar inner methods with mode client. That's fine.
# pylint: disable=duplicate-code
class PowerBiApiClient:
@ -59,6 +62,9 @@ class PowerBiApiClient:
def __init__(self, config: PowerBIConnection):
self.config = config
self.pagination_entity_per_page = min(
100, self.config.pagination_entity_per_page
)
self.msal_client = msal.ConfidentialClientApplication(
client_id=self.config.clientId,
client_credential=self.config.clientSecret.get_secret_value(),
@ -82,15 +88,11 @@ class PowerBiApiClient:
"""
logger.info("Generating PowerBi access token")
response_data = self.msal_client.acquire_token_silent(
scopes=self.config.scope, account=None
)
response_data = self.get_auth_token_from_cache()
if not response_data:
logger.info("Token does not exist in the cache. Getting a new token.")
response_data = self.msal_client.acquire_token_for_client(
scopes=self.config.scope
)
response_data = self.generate_new_auth_token()
response_data = response_data or {}
auth_response = PowerBiToken(**response_data)
if not auth_response.access_token:
raise InvalidSourceException(
@ -100,6 +102,59 @@ class PowerBiApiClient:
logger.info("PowerBi Access Token generated successfully")
return auth_response.access_token, auth_response.expires_in
def generate_new_auth_token(self) -> Optional[dict]:
"""generate new auth token"""
retry = AUTH_TOKEN_MAX_RETRIES
while retry:
try:
response_data = self.msal_client.acquire_token_for_client(
scopes=self.config.scope
)
return response_data
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error generating new auth token: {exc}")
# wait for time and retry
retry -= 1
if retry:
logger.warning(
f"Error generating new token: {exc}, "
f"sleep {AUTH_TOKEN_RETRY_WAIT} seconds retrying {retry} more times.."
)
sleep(AUTH_TOKEN_RETRY_WAIT)
else:
logger.warning(
"Could not generate new token after maximum retries, "
"Please check provided configs"
)
return None
def get_auth_token_from_cache(self) -> Optional[dict]:
"""fetch auth token from cache"""
retry = AUTH_TOKEN_MAX_RETRIES
while retry:
try:
response_data = self.msal_client.acquire_token_silent(
scopes=self.config.scope, account=None
)
return response_data
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error getting token from cache: {exc}")
retry -= 1
if retry:
logger.warning(
f"Error getting token from cache: {exc}, "
f"sleep {AUTH_TOKEN_RETRY_WAIT} seconds retrying {retry} more times.."
)
sleep(AUTH_TOKEN_RETRY_WAIT)
else:
logger.warning(
"Could not get token from cache after maximum retries, "
"Please check provided configs"
)
return None
def fetch_dashboards(self) -> Optional[List[PowerBIDashboard]]:
"""Get dashboards method
Returns:
@ -198,6 +253,7 @@ class PowerBiApiClient:
return None
# pylint: disable=too-many-branches,too-many-statements
def fetch_all_workspaces(self) -> Optional[List[Group]]:
"""Method to fetch all powerbi workspace details
Returns:
@ -206,28 +262,94 @@ class PowerBiApiClient:
try:
admin = "admin/" if self.config.useAdminApis else ""
api_url = f"/myorg/{admin}groups"
entities_per_page = self.config.pagination_entity_per_page
params_data = {"$top": "1"}
response_data = self.client.get(api_url, data=params_data)
response = GroupsResponse(**response_data)
count = response.odata_count
entities_per_page = self.pagination_entity_per_page
failed_indexes = []
params_data = GETGROUPS_DEFAULT_PARAMS
response = self.client.get(api_url, data=params_data)
if (
not response
or API_RESPONSE_MESSAGE_KEY in response
or len(response) != len(GroupsResponse.__annotations__)
):
logger.warning("Error fetching workspaces between results: (0, 1)")
if response and response.get(API_RESPONSE_MESSAGE_KEY):
logger.warning(
"Error message from API response: "
f"{str(response.get(API_RESPONSE_MESSAGE_KEY))}"
)
failed_indexes.append(params_data)
count = 0
else:
try:
response = GroupsResponse(**response)
count = response.odata_count
except Exception as exc:
logger.warning(f"Error processing GetGroups response: {exc}")
count = 0
indexes = math.ceil(count / entities_per_page)
workspaces = []
for index in range(indexes):
params_data = {
"$top": str(entities_per_page),
"$skip": str(index * entities_per_page),
}
response_data = self.client.get(api_url, data=params_data)
if not response_data:
logger.error(
"Error fetching workspaces between results: "
f"{str(index * entities_per_page)} - {str(entities_per_page)}"
response = self.client.get(api_url, data=params_data)
if (
not response
or API_RESPONSE_MESSAGE_KEY in response
or len(response) != len(GroupsResponse.__annotations__)
):
index_range = (
int(params_data.get("$skip")),
int(params_data.get("$skip")) + int(params_data.get("$top")),
)
logger.warning(
f"Error fetching workspaces between results: {str(index_range)}"
)
if response and response.get(API_RESPONSE_MESSAGE_KEY):
logger.warning(
"Error message from API response: "
f"{str(response.get(API_RESPONSE_MESSAGE_KEY))}"
)
failed_indexes.append(params_data)
continue
response = GroupsResponse(**response_data)
workspaces.extend(response.value)
try:
response = GroupsResponse(**response)
workspaces.extend(response.value)
except Exception as exc:
logger.warning(f"Error processing GetGroups response: {exc}")
if failed_indexes:
logger.info(
"Retrying one more time on failed indexes to get workspaces"
)
for params_data in failed_indexes:
response = self.client.get(api_url, data=params_data)
if (
not response
or API_RESPONSE_MESSAGE_KEY in response
or len(response) != len(GroupsResponse.__annotations__)
):
index_range = (
int(params_data.get("$skip")),
int(params_data.get("$skip"))
+ int(params_data.get("$top")),
)
logger.warning(
f"Workspaces between results {str(index_range)} "
"could not be fetched on multiple attempts"
)
if response and response.get(API_RESPONSE_MESSAGE_KEY):
logger.warning(
"Error message from API response: "
f"{str(response.get(API_RESPONSE_MESSAGE_KEY))}"
)
continue
try:
response = GroupsResponse(**response)
workspaces.extend(response.value)
except Exception as exc:
logger.warning(f"Error processing GetGroups response: {exc}")
return workspaces
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())