From 1e2e2e078b8f96e519dbb3e57da9ab91b7721cc2 Mon Sep 17 00:00:00 2001 From: harshsoni2024 <64592571+harshsoni2024@users.noreply.github.com> Date: Mon, 19 May 2025 19:21:02 +0530 Subject: [PATCH] bugfix: powerbi lineage source parameter usecase fix, last active user ingestion fix (#21272) (cherry picked from commit a414e93163a9f6001fd90d298128cecc3a6f60b8) --- .../source/dashboard/powerbi/metadata.py | 135 +++++++++++++----- .../source/dashboard/powerbi/models.py | 15 ++ .../unit/topology/dashboard/test_powerbi.py | 75 +++++++++- 3 files changed, 186 insertions(+), 39 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index b44aa5edf00..d14dbb644db 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -297,7 +297,7 @@ class PowerbiSource(DashboardServiceSource): """ return ( f"{clean_uri(self.service_connection.hostPort)}/groups/" - f"{workspace_id}/reports/{dashboard_id}" + f"{workspace_id}/reports/{dashboard_id}/ReportSection?experience=power-bi" ) def _get_chart_url( @@ -692,6 +692,30 @@ class PowerbiSource(DashboardServiceSource): logger.debug(f"Error to get data_model_column_fqn {exc}") logger.debug(traceback.format_exc()) + def _parse_snowflake_regex_exp( + self, match: re.Match, datamodel_entity: DashboardDataModel + ) -> Optional[str]: + """parse snowflake regex expression""" + try: + if not match: + return None + elif match.group(1): + return match.group(1) + elif match.group(2): + dataset = self._fetch_dataset_from_workspace(datamodel_entity.name.root) + if dataset and dataset.expressions: + # find keyword from dataset expressions + for dexpression in dataset.expressions: + if dexpression.name == match.group(2): + pattern = r'DefaultValue="([^"]+)"' + kw_match = re.search(pattern, dexpression.expression) + if kw_match: + return kw_match.group(1) + except Exception as exc: + logger.debug(f"Error to parse snowflake regex expression: {exc}") + logger.debug(traceback.format_exc()) + return None + def _parse_redshift_source(self, source_expression: str) -> Optional[dict]: try: db_match = re.search( @@ -708,9 +732,7 @@ class PowerbiSource(DashboardServiceSource): schema = schema_table_match[0] if len(schema_table_match) > 0 else None table = schema_table_match[1] if len(schema_table_match) > 1 else None - if ( - schema and table - ): # atlease 2 entities schema and table should be fetched + if table: # atlease table should be fetched return {"database": database, "schema": schema, "table": table} return None except Exception as exc: @@ -718,30 +740,32 @@ class PowerbiSource(DashboardServiceSource): logger.debug(traceback.format_exc()) return None - def _parse_snowflake_source(self, source_expression: str) -> Optional[dict]: + def _parse_snowflake_source( + self, source_expression: str, datamodel_entity: DashboardDataModel + ) -> Optional[dict]: try: if "Snowflake.Databases" not in source_expression: # Not a snowflake valid expression return None db_match = re.search( - r'\[Name="([^"]+)",Kind="Database"\]', source_expression + r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Database"\]', source_expression ) schema_match = re.search( - r'\[Name="([^"]+)",Kind="Schema"\]', source_expression + r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Schema"\]', source_expression ) - view_match = re.search(r'\[Name="([^"]+)",Kind="View"\]', source_expression) table_match = re.search( - r'\[Name="([^"]+)",Kind="Table"\]', source_expression + r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Table"\]', source_expression + ) + view_match = re.search( + r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="View"\]', source_expression ) - database = db_match.group(1) if db_match else None - schema = schema_match.group(1) if schema_match else None - view = view_match.group(1) if view_match else None - table = table_match.group(1) if table_match else None + database = self._parse_snowflake_regex_exp(db_match, datamodel_entity) + schema = self._parse_snowflake_regex_exp(schema_match, datamodel_entity) + table = self._parse_snowflake_regex_exp(table_match, datamodel_entity) + view = self._parse_snowflake_regex_exp(view_match, datamodel_entity) - if schema and ( - table or view - ): # atlease 2 entities schema and table should be fetched + if table or view: # atlease table or view should be fetched return { "database": database, "schema": schema, @@ -753,14 +777,18 @@ class PowerbiSource(DashboardServiceSource): logger.debug(traceback.format_exc()) return None - def _parse_table_info_from_source_exp(self, table: PowerBiTable) -> dict: + def _parse_table_info_from_source_exp( + self, table: PowerBiTable, datamodel_entity: DashboardDataModel + ) -> dict: try: if not isinstance(table.source, list): return {} source_expression = table.source[0].expression # parse snowflake source - table_info = self._parse_snowflake_source(source_expression) + table_info = self._parse_snowflake_source( + source_expression, datamodel_entity + ) if isinstance(table_info, dict): return table_info # parse redshift source @@ -783,12 +811,12 @@ class PowerbiSource(DashboardServiceSource): Method to create lineage between table and datamodels """ try: - table_info = self._parse_table_info_from_source_exp(table) + table_info = self._parse_table_info_from_source_exp(table, datamodel_entity) fqn_search_string = build_es_fqn_search_string( - database_name=table_info.get("database"), - schema_name=table_info.get("schema"), service_name=db_service_name or "*", table_name=table_info.get("table") or table.name, + schema_name=table_info.get("schema") or "*", + database_name=table_info.get("database") or "*", ) table_entity = self.metadata.search_in_any_service( entity_type=Table, @@ -970,27 +998,68 @@ class PowerbiSource(DashboardServiceSource): try: owner_ref_list = [] # to assign multiple owners to entity if they exist for owner in dashboard_details.users or []: + owner_ref = None + # put filtering conditions + if isinstance(dashboard_details, Dataset): + access_right = owner.datasetUserAccessRight + elif isinstance(dashboard_details, Dataflow): + access_right = owner.dataflowUserAccessRight + elif isinstance(dashboard_details, PowerBIReport): + access_right = owner.reportUserAccessRight + elif isinstance(dashboard_details, PowerBIDashboard): + access_right = owner.dashboardUserAccessRight + + if owner.userType != "Member" or ( + isinstance( + dashboard_details, (Dataflow, PowerBIReport, PowerBIDashboard) + ) + and access_right != "Owner" + ): + logger.warning( + f"User is not a member and has no access to the {dashboard_details.id}: ({owner.displayName}, {owner.email})" + ) + continue if owner.email: - owner_ref = None try: owner_email = EmailStr._validate(owner.email) - owner_ref = self.metadata.get_reference_by_email( - owner_email.lower() - ) except PydanticCustomError: - logger.warning( - f"Could not fetch owner data for email: {owner.email}" - ) - if owner.displayName: - owner_ref = self.metadata.get_reference_by_name( - name=owner.displayName + logger.warning(f"Invalid email for owner: {owner.email}") + owner_email = None + if owner_email: + try: + owner_ref = self.metadata.get_reference_by_email( + owner_email.lower() ) + except Exception as err: + logger.warning( + f"Could not fetch owner data with email {owner.email} in {dashboard_details.id}: {err}" + ) + elif owner.displayName: + try: + owner_ref = self.metadata.get_reference_by_name( + name=owner.displayName + ) except Exception as err: logger.warning( - f"Error processing current owner data in {dashboard_details.id}: {err}" + f"Could not process owner data with name {owner.displayName} in {dashboard_details.id}: {err}" ) - if owner_ref: + if owner_ref: + owner_ref_list.append(owner_ref.root[0]) + # check for last modified, configuredBy user + current_active_user = None + if isinstance(dashboard_details, Dataset): + current_active_user = dashboard_details.configuredBy + elif isinstance(dashboard_details, (Dataflow, PowerBIReport)): + current_active_user = dashboard_details.modifiedBy + if current_active_user: + try: + owner_ref = self.metadata.get_reference_by_email( + current_active_user.lower() + ) + if owner_ref and owner_ref.root[0] not in owner_ref_list: owner_ref_list.append(owner_ref.root[0]) + except Exception as err: + logger.warning(f"Could not fetch owner data due to {err}") if len(owner_ref_list) > 0: logger.debug( f"Successfully fetched owners data for {dashboard_details.id}" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py index 0429fc2fc7b..3e0fc9e4664 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py @@ -39,6 +39,11 @@ class PowerBIUser(BaseModel): displayName: Optional[str] = None email: Optional[str] = Field(alias="emailAddress", default=None) + userType: Optional[str] = None + reportUserAccessRight: Optional[str] = None + datasetUserAccessRight: Optional[str] = None + dataflowUserAccessRight: Optional[str] = None + dashboardUserAccessRight: Optional[str] = None class PowerBIDashboard(BaseModel): @@ -65,6 +70,7 @@ class PowerBIReport(BaseModel): name: str datasetId: Optional[str] = None users: Optional[List[PowerBIUser]] = [] + modifiedBy: Optional[str] = None class DashboardsResponse(BaseModel): @@ -163,6 +169,11 @@ class TablesResponse(BaseModel): value: List[PowerBiTable] +class DatasetExpression(BaseModel): + name: str + expression: str + + class Dataset(BaseModel): """ PowerBI Dataset Model @@ -174,6 +185,8 @@ class Dataset(BaseModel): tables: Optional[List[PowerBiTable]] = [] description: Optional[str] = None users: Optional[List[PowerBIUser]] = [] + expressions: Optional[List[DatasetExpression]] = [] + configuredBy: Optional[str] = None class DatasetResponse(BaseModel): @@ -190,6 +203,8 @@ class Dataflow(BaseModel): id: str = Field(alias="objectId") name: str description: Optional[str] = None + users: Optional[List[PowerBIUser]] = [] + modifiedBy: Optional[str] = None class Group(BaseModel): diff --git a/ingestion/tests/unit/topology/dashboard/test_powerbi.py b/ingestion/tests/unit/topology/dashboard/test_powerbi.py index 33c89e6bb20..7c3897a3027 100644 --- a/ingestion/tests/unit/topology/dashboard/test_powerbi.py +++ b/ingestion/tests/unit/topology/dashboard/test_powerbi.py @@ -4,6 +4,10 @@ from unittest.mock import patch import pytest +from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -105,8 +109,18 @@ MOCK_DASHBOARD_WITH_OWNERS = { "embedUrl": "https://test.com/embed", "tiles": [], "users": [ - {"displayName": "John Doe", "emailAddress": "john.doe@example.com"}, - {"displayName": "Jane Smith", "emailAddress": "jane.smith@example.com"}, + { + "displayName": "John Doe", + "emailAddress": "john.doe@example.com", + "dashboardUserAccessRight": "Owner", + "userType": "Member", + }, + { + "displayName": "Jane Smith", + "emailAddress": "jane.smith@example.com", + "dashboardUserAccessRight": "Owner", + "userType": "Member", + }, ], } @@ -115,7 +129,14 @@ MOCK_DATASET_WITH_OWNERS = { "name": "Test Dataset", "tables": [], "description": "Test dataset description", - "users": [{"displayName": "John Doe", "emailAddress": "john.doe@example.com"}], + "users": [ + { + "displayName": "John Doe", + "emailAddress": "john.doe@example.com", + "datasetUserAccessRight": "Owner", + "userType": "Member", + } + ], } MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList( @@ -125,6 +146,34 @@ MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList( root=[EntityReference(id=uuid.uuid4(), name="Jane Smith", type="user")] ) +MOCK_SNOWFLAKE_EXP_V2 = 'let\n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]),\n Database = Source{[Name=DB,Kind="Database"]}[Data],\n DB_Schema = Database{[Name=Schema,Kind="Schema"]}[Data],\n Table = DB_Schema{[Name="CUSTOMER_TABLE",Kind="Table"]}[Data],\n #"Andere entfernte Spalten" = Table.SelectColumns(Table,{"ID_BERICHTSMONAT", "ID_AKQUISE_VERMITTLER", "ID_AKQUISE_OE", "ID_SPARTE", "ID_RISIKOTRAEGER", "ID_KUNDE", "STUECK", "BBE"})\nin\n #"Andere entfernte Spalten"' +EXPECTED_SNOWFLAKE_RESULT_V2 = { + "database": "MY_DB", + "schema": "MY_SCHEMA", + "table": "CUSTOMER_TABLE", +} +MOCK_DATASET_FROM_WORKSPACE = Dataset( + id="testdataset", + name="Test Dataset", + tables=[], + expressions=[ + { + "name": "DB", + "expression": '"MY_DB" meta [IsParameterQuery=true, List={"MY_DB_DEV", "MY_DB", "MY_DB_PROD"}, DefaultValue="MY_DB", Type="Text", IsParameterQueryRequired=true]', + }, + { + "name": "Schema", + "expression": '"MY_SCHEMA" meta [IsParameterQuery=true, List={"MY_SCHEMA", "MY_SCHEMA_PROD"}, DefaultValue="MY_SCHEMA", Type="Text", IsParameterQueryRequired=true]', + }, + ], +) +MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel( + name="dummy_datamodel", + id=uuid.uuid4(), + columns=[], + dataModelType=DataModelType.PowerBIDataModel.value, +) + class PowerBIUnitTest(TestCase): """ @@ -147,7 +196,12 @@ class PowerBIUnitTest(TestCase): ) @pytest.mark.order(1) - def test_parse_database_source(self): + @patch.object( + PowerbiSource, + "_fetch_dataset_from_workspace", + return_value=MOCK_DATASET_FROM_WORKSPACE, + ) + def test_parse_database_source(self, *_): # Test with valid redshift source result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP) self.assertEqual(result, EXPECTED_REDSHIFT_RESULT) @@ -161,13 +215,22 @@ class PowerBIUnitTest(TestCase): self.assertEqual(result, None) # Test with valid snowflake source - result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP) + result = self.powerbi._parse_snowflake_source( + MOCK_SNOWFLAKE_EXP, MOCK_DASHBOARD_DATA_MODEL + ) self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT) # Test with invalid snowflake source - result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP_INVALID) + result = self.powerbi._parse_snowflake_source( + MOCK_SNOWFLAKE_EXP_INVALID, MOCK_DASHBOARD_DATA_MODEL + ) self.assertEqual(result, None) + result = self.powerbi._parse_snowflake_source( + MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL + ) + self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT_V2) + @pytest.mark.order(2) @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email") def test_owner_ingestion(self, get_reference_by_email):