issue-20519: Support PowerBI Owners ingestion (#20525)

This commit is contained in:
harshsoni2024 2025-04-02 16:11:27 +05:30 committed by GitHub
parent 15ad58692d
commit f267d4ef01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 166 additions and 3 deletions

View File

@ -14,6 +14,7 @@ REST Auth & Client for PowerBi
import json
import math
import traceback
from copy import deepcopy
from time import sleep
from typing import List, Optional, Tuple
@ -349,7 +350,13 @@ class PowerBiApiClient:
api_url = f"/myorg/{admin}groups"
entities_per_page = self.pagination_entity_per_page
failed_indexes = []
params_data = GETGROUPS_DEFAULT_PARAMS
parsed_filter_query = None
if filter_pattern:
parsed_filter_query = self.create_filter_query(filter_pattern)
logger.debug(f"Filter query applied = {parsed_filter_query}")
params_data = deepcopy(GETGROUPS_DEFAULT_PARAMS)
if parsed_filter_query:
params_data["$filter"] = parsed_filter_query
response = self.client.get(api_url, data=params_data)
if (
not response
@ -372,14 +379,18 @@ class PowerBiApiClient:
logger.warning(f"Error processing GetGroups response: {exc}")
count = 0
indexes = math.ceil(count / entities_per_page)
logger.debug(
f"Total {count} workspaces found, Will run {indexes} iterations fetching"
f" maximum {entities_per_page} workspaces in a single iteration"
)
workspaces = []
for index in range(indexes):
params_data = {
"$top": str(entities_per_page),
"$skip": str(index * entities_per_page),
}
if filter_pattern:
params_data["$filter"] = self.create_filter_query(filter_pattern)
if parsed_filter_query:
params_data["$filter"] = parsed_filter_query
response = self.client.get(api_url, data=params_data)
if (

View File

@ -14,6 +14,9 @@ import re
import traceback
from typing import Any, Iterable, List, Optional, Union
from pydantic import EmailStr
from pydantic_core import PydanticCustomError
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDashboardDataModel import (
@ -48,6 +51,7 @@ from metadata.generated.schema.type.basic import (
Markdown,
SourceUrl,
)
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
@ -488,6 +492,7 @@ class PowerbiSource(DashboardServiceSource):
"name": table.name,
"displayName": table.name,
"description": table.description,
"children": [],
}
child_columns = self._get_child_columns(table=table)
child_measures = self._get_child_measures(table=table)
@ -534,6 +539,7 @@ class PowerbiSource(DashboardServiceSource):
serviceType=DashboardServiceType.PowerBI.value,
columns=self._get_column_info(dataset),
project=self.get_project_name(dashboard_details=dataset),
owners=self.get_owner_ref(dashboard_details=dataset),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
@ -939,3 +945,44 @@ class PowerbiSource(DashboardServiceSource):
f"Error fetching project name for {dashboard_details.id}: {exc}"
)
return None
def get_owner_ref( # pylint: disable=unused-argument, useless-return
self, dashboard_details: Any
) -> Optional[EntityReferenceList]:
"""
Method to process the dashboard owners
"""
try:
owner_ref_list = [] # to assign multiple owners to entity if they exist
for owner in dashboard_details.users or []:
if owner.email:
owner_ref = None
try:
owner_email = EmailStr._validate(owner.email)
owner_ref = self.metadata.get_reference_by_email(
owner_email.lower()
)
except PydanticCustomError:
logger.warning(
f"Could not fetch owner data for email: {owner.email}"
)
if owner.displayName:
owner_ref = self.metadata.get_reference_by_name(
name=owner.displayName
)
except Exception as err:
logger.warning(
f"Error processing current owner data in {dashboard_details.id}: {err}"
)
if owner_ref:
owner_ref_list.append(owner_ref.root[0])
if len(owner_ref_list) > 0:
logger.debug(
f"Successfully fetched owners data for {dashboard_details.id}"
)
return EntityReferenceList(root=owner_ref_list)
return None
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not fetch owner data due to {err}")
return None

View File

@ -32,6 +32,15 @@ class Tile(BaseModel):
reportId: Optional[str] = None
class PowerBIUser(BaseModel):
"""
PowerBI User Model
"""
displayName: Optional[str] = None
email: Optional[str] = Field(alias="emailAddress", default=None)
class PowerBIDashboard(BaseModel):
"""
PowerBI PowerBIDashboard Model
@ -43,6 +52,7 @@ class PowerBIDashboard(BaseModel):
webUrl: Optional[str] = None
embedUrl: Optional[str] = None
tiles: Optional[List[Tile]] = []
users: Optional[List[PowerBIUser]] = []
class PowerBIReport(BaseModel):
@ -54,6 +64,7 @@ class PowerBIReport(BaseModel):
id: str
name: str
datasetId: Optional[str] = None
users: Optional[List[PowerBIUser]] = []
class DashboardsResponse(BaseModel):
@ -162,6 +173,7 @@ class Dataset(BaseModel):
name: str
tables: Optional[List[PowerBiTable]] = []
description: Optional[str] = None
users: Optional[List[PowerBIUser]] = []
class DatasetResponse(BaseModel):

View File

@ -1,3 +1,4 @@
import uuid
from unittest import TestCase
from unittest.mock import patch
@ -6,8 +7,11 @@ import pytest
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
from metadata.ingestion.source.dashboard.powerbi.models import Dataset, PowerBIDashboard
MOCK_REDSHIFT_EXP = """
let
@ -94,6 +98,33 @@ mock_config = {
},
}
MOCK_DASHBOARD_WITH_OWNERS = {
"id": "dashboard1",
"displayName": "Test Dashboard",
"webUrl": "https://test.com",
"embedUrl": "https://test.com/embed",
"tiles": [],
"users": [
{"displayName": "John Doe", "emailAddress": "john.doe@example.com"},
{"displayName": "Jane Smith", "emailAddress": "jane.smith@example.com"},
],
}
MOCK_DATASET_WITH_OWNERS = {
"id": "dataset1",
"name": "Test Dataset",
"tables": [],
"description": "Test dataset description",
"users": [{"displayName": "John Doe", "emailAddress": "john.doe@example.com"}],
}
MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList(
root=[EntityReference(id=uuid.uuid4(), name="John Doe", type="user")]
)
MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList(
root=[EntityReference(id=uuid.uuid4(), name="Jane Smith", type="user")]
)
class PowerBIUnitTest(TestCase):
"""
@ -136,3 +167,65 @@ class PowerBIUnitTest(TestCase):
# Test with invalid snowflake source
result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP_INVALID)
self.assertEqual(result, None)
@pytest.mark.order(2)
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email")
def test_owner_ingestion(self, get_reference_by_email):
# Mock responses for dashboard owners
self.powerbi.metadata.get_reference_by_email.side_effect = [
MOCK_USER_1_ENITYTY_REF_LIST,
MOCK_USER_2_ENITYTY_REF_LIST,
]
# Test dashboard owner ingestion
dashboard = PowerBIDashboard.model_validate(MOCK_DASHBOARD_WITH_OWNERS)
owner_ref = self.powerbi.get_owner_ref(dashboard)
self.assertIsNotNone(owner_ref)
self.assertEqual(len(owner_ref.root), 2)
self.assertEqual(owner_ref.root[0].name, "John Doe")
self.assertEqual(owner_ref.root[1].name, "Jane Smith")
# Verify get_reference_by_email was called with correct emails
self.powerbi.metadata.get_reference_by_email.assert_any_call(
"john.doe@example.com"
)
self.powerbi.metadata.get_reference_by_email.assert_any_call(
"jane.smith@example.com"
)
# Reset mock for dataset test
self.powerbi.metadata.get_reference_by_email.reset_mock()
self.powerbi.metadata.get_reference_by_email.side_effect = [
MOCK_USER_1_ENITYTY_REF_LIST
]
# Test dataset owner ingestion
dataset = Dataset.model_validate(MOCK_DATASET_WITH_OWNERS)
owner_ref = self.powerbi.get_owner_ref(dataset)
self.assertIsNotNone(owner_ref.root)
self.assertEqual(len(owner_ref.root), 1)
self.assertEqual(owner_ref.root[0].name, "John Doe")
# Verify get_reference_by_email was called with correct email
self.powerbi.metadata.get_reference_by_email.assert_called_once_with(
"john.doe@example.com"
)
# Reset mock for no owners test
self.powerbi.metadata.get_reference_by_email.reset_mock()
# Test with no owners
dashboard_no_owners = PowerBIDashboard.model_validate(
{
"id": "dashboard2",
"displayName": "Test Dashboard 2",
"webUrl": "https://test.com",
"embedUrl": "https://test.com/embed",
"tiles": [],
"users": [],
}
)
owner_ref = self.powerbi.get_owner_ref(dashboard_no_owners)
self.assertIsNone(owner_ref)
# Verify get_reference_by_email was not called when there are no owners
self.powerbi.metadata.get_reference_by_email.assert_not_called()