bugfix: powerbi lineage source parameter usecase fix, last active user ingestion fix (#21272)

(cherry picked from commit a414e93163a9f6001fd90d298128cecc3a6f60b8)
This commit is contained in:
harshsoni2024 2025-05-19 19:21:02 +05:30 committed by OpenMetadata Release Bot
parent a676636332
commit 1e2e2e078b
3 changed files with 186 additions and 39 deletions

View File

@ -297,7 +297,7 @@ class PowerbiSource(DashboardServiceSource):
""" """
return ( return (
f"{clean_uri(self.service_connection.hostPort)}/groups/" f"{clean_uri(self.service_connection.hostPort)}/groups/"
f"{workspace_id}/reports/{dashboard_id}" f"{workspace_id}/reports/{dashboard_id}/ReportSection?experience=power-bi"
) )
def _get_chart_url( def _get_chart_url(
@ -692,6 +692,30 @@ class PowerbiSource(DashboardServiceSource):
logger.debug(f"Error to get data_model_column_fqn {exc}") logger.debug(f"Error to get data_model_column_fqn {exc}")
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
def _parse_snowflake_regex_exp(
self, match: re.Match, datamodel_entity: DashboardDataModel
) -> Optional[str]:
"""parse snowflake regex expression"""
try:
if not match:
return None
elif match.group(1):
return match.group(1)
elif match.group(2):
dataset = self._fetch_dataset_from_workspace(datamodel_entity.name.root)
if dataset and dataset.expressions:
# find keyword from dataset expressions
for dexpression in dataset.expressions:
if dexpression.name == match.group(2):
pattern = r'DefaultValue="([^"]+)"'
kw_match = re.search(pattern, dexpression.expression)
if kw_match:
return kw_match.group(1)
except Exception as exc:
logger.debug(f"Error to parse snowflake regex expression: {exc}")
logger.debug(traceback.format_exc())
return None
def _parse_redshift_source(self, source_expression: str) -> Optional[dict]: def _parse_redshift_source(self, source_expression: str) -> Optional[dict]:
try: try:
db_match = re.search( db_match = re.search(
@ -708,9 +732,7 @@ class PowerbiSource(DashboardServiceSource):
schema = schema_table_match[0] if len(schema_table_match) > 0 else None schema = schema_table_match[0] if len(schema_table_match) > 0 else None
table = schema_table_match[1] if len(schema_table_match) > 1 else None table = schema_table_match[1] if len(schema_table_match) > 1 else None
if ( if table: # atlease table should be fetched
schema and table
): # atlease 2 entities schema and table should be fetched
return {"database": database, "schema": schema, "table": table} return {"database": database, "schema": schema, "table": table}
return None return None
except Exception as exc: except Exception as exc:
@ -718,30 +740,32 @@ class PowerbiSource(DashboardServiceSource):
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
return None return None
def _parse_snowflake_source(self, source_expression: str) -> Optional[dict]: def _parse_snowflake_source(
self, source_expression: str, datamodel_entity: DashboardDataModel
) -> Optional[dict]:
try: try:
if "Snowflake.Databases" not in source_expression: if "Snowflake.Databases" not in source_expression:
# Not a snowflake valid expression # Not a snowflake valid expression
return None return None
db_match = re.search( db_match = re.search(
r'\[Name="([^"]+)",Kind="Database"\]', source_expression r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Database"\]', source_expression
) )
schema_match = re.search( schema_match = re.search(
r'\[Name="([^"]+)",Kind="Schema"\]', source_expression r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Schema"\]', source_expression
) )
view_match = re.search(r'\[Name="([^"]+)",Kind="View"\]', source_expression)
table_match = re.search( table_match = re.search(
r'\[Name="([^"]+)",Kind="Table"\]', source_expression r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Table"\]', source_expression
)
view_match = re.search(
r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="View"\]', source_expression
) )
database = db_match.group(1) if db_match else None database = self._parse_snowflake_regex_exp(db_match, datamodel_entity)
schema = schema_match.group(1) if schema_match else None schema = self._parse_snowflake_regex_exp(schema_match, datamodel_entity)
view = view_match.group(1) if view_match else None table = self._parse_snowflake_regex_exp(table_match, datamodel_entity)
table = table_match.group(1) if table_match else None view = self._parse_snowflake_regex_exp(view_match, datamodel_entity)
if schema and ( if table or view: # atlease table or view should be fetched
table or view
): # atlease 2 entities schema and table should be fetched
return { return {
"database": database, "database": database,
"schema": schema, "schema": schema,
@ -753,14 +777,18 @@ class PowerbiSource(DashboardServiceSource):
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
return None return None
def _parse_table_info_from_source_exp(self, table: PowerBiTable) -> dict: def _parse_table_info_from_source_exp(
self, table: PowerBiTable, datamodel_entity: DashboardDataModel
) -> dict:
try: try:
if not isinstance(table.source, list): if not isinstance(table.source, list):
return {} return {}
source_expression = table.source[0].expression source_expression = table.source[0].expression
# parse snowflake source # parse snowflake source
table_info = self._parse_snowflake_source(source_expression) table_info = self._parse_snowflake_source(
source_expression, datamodel_entity
)
if isinstance(table_info, dict): if isinstance(table_info, dict):
return table_info return table_info
# parse redshift source # parse redshift source
@ -783,12 +811,12 @@ class PowerbiSource(DashboardServiceSource):
Method to create lineage between table and datamodels Method to create lineage between table and datamodels
""" """
try: try:
table_info = self._parse_table_info_from_source_exp(table) table_info = self._parse_table_info_from_source_exp(table, datamodel_entity)
fqn_search_string = build_es_fqn_search_string( fqn_search_string = build_es_fqn_search_string(
database_name=table_info.get("database"),
schema_name=table_info.get("schema"),
service_name=db_service_name or "*", service_name=db_service_name or "*",
table_name=table_info.get("table") or table.name, table_name=table_info.get("table") or table.name,
schema_name=table_info.get("schema") or "*",
database_name=table_info.get("database") or "*",
) )
table_entity = self.metadata.search_in_any_service( table_entity = self.metadata.search_in_any_service(
entity_type=Table, entity_type=Table,
@ -970,27 +998,68 @@ class PowerbiSource(DashboardServiceSource):
try: try:
owner_ref_list = [] # to assign multiple owners to entity if they exist owner_ref_list = [] # to assign multiple owners to entity if they exist
for owner in dashboard_details.users or []: for owner in dashboard_details.users or []:
if owner.email:
owner_ref = None owner_ref = None
# put filtering conditions
if isinstance(dashboard_details, Dataset):
access_right = owner.datasetUserAccessRight
elif isinstance(dashboard_details, Dataflow):
access_right = owner.dataflowUserAccessRight
elif isinstance(dashboard_details, PowerBIReport):
access_right = owner.reportUserAccessRight
elif isinstance(dashboard_details, PowerBIDashboard):
access_right = owner.dashboardUserAccessRight
if owner.userType != "Member" or (
isinstance(
dashboard_details, (Dataflow, PowerBIReport, PowerBIDashboard)
)
and access_right != "Owner"
):
logger.warning(
f"User is not a member and has no access to the {dashboard_details.id}: ({owner.displayName}, {owner.email})"
)
continue
if owner.email:
try: try:
owner_email = EmailStr._validate(owner.email) owner_email = EmailStr._validate(owner.email)
except PydanticCustomError:
logger.warning(f"Invalid email for owner: {owner.email}")
owner_email = None
if owner_email:
try:
owner_ref = self.metadata.get_reference_by_email( owner_ref = self.metadata.get_reference_by_email(
owner_email.lower() owner_email.lower()
) )
except PydanticCustomError: except Exception as err:
logger.warning( logger.warning(
f"Could not fetch owner data for email: {owner.email}" f"Could not fetch owner data with email {owner.email} in {dashboard_details.id}: {err}"
) )
if owner.displayName: elif owner.displayName:
try:
owner_ref = self.metadata.get_reference_by_name( owner_ref = self.metadata.get_reference_by_name(
name=owner.displayName name=owner.displayName
) )
except Exception as err: except Exception as err:
logger.warning( logger.warning(
f"Error processing current owner data in {dashboard_details.id}: {err}" f"Could not process owner data with name {owner.displayName} in {dashboard_details.id}: {err}"
) )
if owner_ref: if owner_ref:
owner_ref_list.append(owner_ref.root[0]) owner_ref_list.append(owner_ref.root[0])
# check for last modified, configuredBy user
current_active_user = None
if isinstance(dashboard_details, Dataset):
current_active_user = dashboard_details.configuredBy
elif isinstance(dashboard_details, (Dataflow, PowerBIReport)):
current_active_user = dashboard_details.modifiedBy
if current_active_user:
try:
owner_ref = self.metadata.get_reference_by_email(
current_active_user.lower()
)
if owner_ref and owner_ref.root[0] not in owner_ref_list:
owner_ref_list.append(owner_ref.root[0])
except Exception as err:
logger.warning(f"Could not fetch owner data due to {err}")
if len(owner_ref_list) > 0: if len(owner_ref_list) > 0:
logger.debug( logger.debug(
f"Successfully fetched owners data for {dashboard_details.id}" f"Successfully fetched owners data for {dashboard_details.id}"

View File

@ -39,6 +39,11 @@ class PowerBIUser(BaseModel):
displayName: Optional[str] = None displayName: Optional[str] = None
email: Optional[str] = Field(alias="emailAddress", default=None) email: Optional[str] = Field(alias="emailAddress", default=None)
userType: Optional[str] = None
reportUserAccessRight: Optional[str] = None
datasetUserAccessRight: Optional[str] = None
dataflowUserAccessRight: Optional[str] = None
dashboardUserAccessRight: Optional[str] = None
class PowerBIDashboard(BaseModel): class PowerBIDashboard(BaseModel):
@ -65,6 +70,7 @@ class PowerBIReport(BaseModel):
name: str name: str
datasetId: Optional[str] = None datasetId: Optional[str] = None
users: Optional[List[PowerBIUser]] = [] users: Optional[List[PowerBIUser]] = []
modifiedBy: Optional[str] = None
class DashboardsResponse(BaseModel): class DashboardsResponse(BaseModel):
@ -163,6 +169,11 @@ class TablesResponse(BaseModel):
value: List[PowerBiTable] value: List[PowerBiTable]
class DatasetExpression(BaseModel):
name: str
expression: str
class Dataset(BaseModel): class Dataset(BaseModel):
""" """
PowerBI Dataset Model PowerBI Dataset Model
@ -174,6 +185,8 @@ class Dataset(BaseModel):
tables: Optional[List[PowerBiTable]] = [] tables: Optional[List[PowerBiTable]] = []
description: Optional[str] = None description: Optional[str] = None
users: Optional[List[PowerBIUser]] = [] users: Optional[List[PowerBIUser]] = []
expressions: Optional[List[DatasetExpression]] = []
configuredBy: Optional[str] = None
class DatasetResponse(BaseModel): class DatasetResponse(BaseModel):
@ -190,6 +203,8 @@ class Dataflow(BaseModel):
id: str = Field(alias="objectId") id: str = Field(alias="objectId")
name: str name: str
description: Optional[str] = None description: Optional[str] = None
users: Optional[List[PowerBIUser]] = []
modifiedBy: Optional[str] = None
class Group(BaseModel): class Group(BaseModel):

View File

@ -4,6 +4,10 @@ from unittest.mock import patch
import pytest import pytest
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
@ -105,8 +109,18 @@ MOCK_DASHBOARD_WITH_OWNERS = {
"embedUrl": "https://test.com/embed", "embedUrl": "https://test.com/embed",
"tiles": [], "tiles": [],
"users": [ "users": [
{"displayName": "John Doe", "emailAddress": "john.doe@example.com"}, {
{"displayName": "Jane Smith", "emailAddress": "jane.smith@example.com"}, "displayName": "John Doe",
"emailAddress": "john.doe@example.com",
"dashboardUserAccessRight": "Owner",
"userType": "Member",
},
{
"displayName": "Jane Smith",
"emailAddress": "jane.smith@example.com",
"dashboardUserAccessRight": "Owner",
"userType": "Member",
},
], ],
} }
@ -115,7 +129,14 @@ MOCK_DATASET_WITH_OWNERS = {
"name": "Test Dataset", "name": "Test Dataset",
"tables": [], "tables": [],
"description": "Test dataset description", "description": "Test dataset description",
"users": [{"displayName": "John Doe", "emailAddress": "john.doe@example.com"}], "users": [
{
"displayName": "John Doe",
"emailAddress": "john.doe@example.com",
"datasetUserAccessRight": "Owner",
"userType": "Member",
}
],
} }
MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList( MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList(
@ -125,6 +146,34 @@ MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList(
root=[EntityReference(id=uuid.uuid4(), name="Jane Smith", type="user")] root=[EntityReference(id=uuid.uuid4(), name="Jane Smith", type="user")]
) )
MOCK_SNOWFLAKE_EXP_V2 = 'let\n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]),\n Database = Source{[Name=DB,Kind="Database"]}[Data],\n DB_Schema = Database{[Name=Schema,Kind="Schema"]}[Data],\n Table = DB_Schema{[Name="CUSTOMER_TABLE",Kind="Table"]}[Data],\n #"Andere entfernte Spalten" = Table.SelectColumns(Table,{"ID_BERICHTSMONAT", "ID_AKQUISE_VERMITTLER", "ID_AKQUISE_OE", "ID_SPARTE", "ID_RISIKOTRAEGER", "ID_KUNDE", "STUECK", "BBE"})\nin\n #"Andere entfernte Spalten"'
EXPECTED_SNOWFLAKE_RESULT_V2 = {
"database": "MY_DB",
"schema": "MY_SCHEMA",
"table": "CUSTOMER_TABLE",
}
MOCK_DATASET_FROM_WORKSPACE = Dataset(
id="testdataset",
name="Test Dataset",
tables=[],
expressions=[
{
"name": "DB",
"expression": '"MY_DB" meta [IsParameterQuery=true, List={"MY_DB_DEV", "MY_DB", "MY_DB_PROD"}, DefaultValue="MY_DB", Type="Text", IsParameterQueryRequired=true]',
},
{
"name": "Schema",
"expression": '"MY_SCHEMA" meta [IsParameterQuery=true, List={"MY_SCHEMA", "MY_SCHEMA_PROD"}, DefaultValue="MY_SCHEMA", Type="Text", IsParameterQueryRequired=true]',
},
],
)
MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel(
name="dummy_datamodel",
id=uuid.uuid4(),
columns=[],
dataModelType=DataModelType.PowerBIDataModel.value,
)
class PowerBIUnitTest(TestCase): class PowerBIUnitTest(TestCase):
""" """
@ -147,7 +196,12 @@ class PowerBIUnitTest(TestCase):
) )
@pytest.mark.order(1) @pytest.mark.order(1)
def test_parse_database_source(self): @patch.object(
PowerbiSource,
"_fetch_dataset_from_workspace",
return_value=MOCK_DATASET_FROM_WORKSPACE,
)
def test_parse_database_source(self, *_):
# Test with valid redshift source # Test with valid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP) result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP)
self.assertEqual(result, EXPECTED_REDSHIFT_RESULT) self.assertEqual(result, EXPECTED_REDSHIFT_RESULT)
@ -161,13 +215,22 @@ class PowerBIUnitTest(TestCase):
self.assertEqual(result, None) self.assertEqual(result, None)
# Test with valid snowflake source # Test with valid snowflake source
result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP) result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT) self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT)
# Test with invalid snowflake source # Test with invalid snowflake source
result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP_INVALID) result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP_INVALID, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, None) self.assertEqual(result, None)
result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT_V2)
@pytest.mark.order(2) @pytest.mark.order(2)
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email") @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email")
def test_owner_ingestion(self, get_reference_by_email): def test_owner_ingestion(self, get_reference_by_email):