From db2081cf4abc958e67cb3f255719a777b6fe83ce Mon Sep 17 00:00:00 2001 From: harshsoni2024 <64592571+harshsoni2024@users.noreply.github.com> Date: Wed, 18 Jun 2025 20:46:17 +0530 Subject: [PATCH] feat-21712: PowerBI internal entities & cross workspace lineage (#21837) --- .../source/dashboard/powerbi/metadata.py | 259 +++++++++++++++--- .../source/dashboard/powerbi/models.py | 13 + .../unit/topology/dashboard/test_powerbi.py | 35 +++ 3 files changed, 270 insertions(+), 37 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 1afaa4f14f0..1412d0a6e04 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -239,6 +239,7 @@ class PowerbiSource(DashboardServiceSource): self.workspace_data.append(workspace) self.context.get().workspace = workspace self.filtered_dashboards = [] + self.filtered_datamodels = [] for dashboard in self.get_dashboards_list() or []: dashboard_details = self.get_dashboard_details(dashboard) dashboard_name = self.get_dashboard_name(dashboard_details) @@ -531,6 +532,7 @@ class PowerbiSource(DashboardServiceSource): ): self.status.filter(dataset.name, "Data model filtered out.") continue + self.filtered_datamodels.append(dataset) if isinstance(dataset, Dataset): data_model_type = DataModelType.PowerBIDataModel.value datamodel_columns = self._get_column_info(dataset) @@ -605,7 +607,7 @@ class PowerbiSource(DashboardServiceSource): except Exception as exc: # pylint: disable=broad-except yield Either( left=StackTraceError( - name="Lineage", + name="Report and Dashboard Lineage", error=f"Error to yield report and dashboard lineage details: {exc}", stackTrace=traceback.format_exc(), ) @@ -613,7 +615,6 @@ class PowerbiSource(DashboardServiceSource): def create_datamodel_report_lineage( self, - db_service_name: Optional[str], dashboard_details: PowerBIReport, ) -> Iterable[Either[CreateDashboardRequest]]: """ @@ -630,45 +631,28 @@ class PowerbiSource(DashboardServiceSource): entity=Dashboard, fqn=report_fqn, ) + datamodel_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=dashboard_details.datasetId, + ) + datamodel_entity = self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=datamodel_fqn, + ) - dataset = self._fetch_dataset_from_workspace(dashboard_details.datasetId) - if dataset: - datamodel_fqn = fqn.build( - self.metadata, - entity_type=DashboardDataModel, - service_name=self.context.get().dashboard_service, - data_model_name=dataset.id, - ) - datamodel_entity = self.metadata.get_by_name( - entity=DashboardDataModel, - fqn=datamodel_fqn, + if datamodel_entity and report_entity: + yield self._get_add_lineage_request( + to_entity=report_entity, from_entity=datamodel_entity ) - if datamodel_entity and report_entity: - yield self._get_add_lineage_request( - to_entity=report_entity, from_entity=datamodel_entity - ) - - for table in dataset.tables or []: - yield from self._get_table_and_datamodel_lineage( - db_service_name=db_service_name, - table=table, - datamodel_entity=datamodel_entity, - ) - - # create the lineage between table and datamodel using the pbit files - if self.client.file_client: - yield from self.create_table_datamodel_lineage_from_files( - db_service_name=db_service_name, - datamodel_entity=datamodel_entity, - ) except Exception as exc: # pylint: disable=broad-except yield Either( left=StackTraceError( - name=f"{db_service_name} Report Lineage", + name=f"Datamodel and Report Lineage", error=( - "Error to yield datamodel and report lineage details for DB " - f"service name [{db_service_name}]: {exc}" + f"Error to yield datamodel and report lineage details: {exc}" ), stackTrace=traceback.format_exc(), ) @@ -891,12 +875,153 @@ class PowerbiSource(DashboardServiceSource): name="DataModel Lineage", error=( "Error to yield datamodel lineage details for DB " - f"service name [{db_service_name}]: {exc}" + f"service name [{str(db_service_name)}]: {exc}" ), stackTrace=traceback.format_exc(), ) ) + def create_dataset_upstream_dataflow_lineage( + self, datamodel: Dataset, datamodel_entity: DashboardDataModel + ) -> Iterable[Either[AddLineageRequest]]: + """ + Create lineage between dataset and upstreamDataflow + """ + for upstream_dataflow in datamodel.upstreamDataflows or []: + try: + if not upstream_dataflow.targetDataflowId: + logger.debug( + f"No targetDataflowId found for upstreamDataflow in " + f"datamodel [{datamodel_entity.name.root}], " + f"Moving to next upstreamDataflow" + ) + continue + upstream_dataflow_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=upstream_dataflow.targetDataflowId, + ) + upstream_dataflow_entity = self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=upstream_dataflow_fqn, + ) + if upstream_dataflow_entity and datamodel_entity: + yield self._get_add_lineage_request( + from_entity=upstream_dataflow_entity, + to_entity=datamodel_entity, + ) + else: + logger.debug( + f"No upstreamDataflow entity with id={str(upstream_dataflow.targetDataflowId)} " + f"found for datamodel [{datamodel_entity.name.root}]" + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name="Dataset and UpstreamDataflow Lineage", + error=( + "Error to yield dataset and upstreamDataflow lineage " + f"between [{datamodel_entity.name.root}, {str(upstream_dataflow.targetDataflowId)}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + + def create_dataset_upstream_dataset_lineage( + self, datamodel: Dataset, datamodel_entity: DashboardDataModel + ) -> Iterable[Either[AddLineageRequest]]: + """ + Create lineage between dataset and upstreamDataset + """ + for upstream_dataset in datamodel.upstreamDatasets or []: + try: + if not upstream_dataset.targetDatasetId: + logger.debug( + f"No targetDatasetId found for upstreamDataset in " + f"datamodel [{datamodel_entity.name.root}], " + f"Moving to next upstreamDataset" + ) + continue + upstream_dataset_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=upstream_dataset.targetDatasetId, + ) + upstream_dataset_entity = self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=upstream_dataset_fqn, + ) + if upstream_dataset_entity and datamodel_entity: + yield self._get_add_lineage_request( + from_entity=upstream_dataset_entity, + to_entity=datamodel_entity, + ) + else: + logger.debug( + f"No upstreamDataset entity with id={str(upstream_dataset.targetDatasetId)} " + f"found for datamodel [{datamodel_entity.name.root}]" + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name="Dataset and UpstreamDataset Lineage", + error=( + "Error to yield dataset and upstreamDataset lineage " + f"between [{datamodel_entity.name.root}, {str(upstream_dataset.targetDatasetId)}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + + def create_dataflow_upstream_dataflow_lineage( + self, datamodel: Dataflow, datamodel_entity: DashboardDataModel + ) -> Iterable[Either[AddLineageRequest]]: + """ + Create lineage between dataflow and upstreamDataflow + """ + for upstream_dataflow in datamodel.upstreamDataflows or []: + try: + if not upstream_dataflow.targetDataflowId: + logger.debug( + f"No targetDataflowId found for upstreamDataflow in " + f"datamodel [{datamodel_entity.name.root}], " + f"Moving to next upstreamDataflow" + ) + continue + upstream_dataflow_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=upstream_dataflow.targetDataflowId, + ) + upstream_dataflow_entity = self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=upstream_dataflow_fqn, + ) + if upstream_dataflow_entity and datamodel_entity: + yield self._get_add_lineage_request( + from_entity=upstream_dataflow_entity, + to_entity=datamodel_entity, + ) + else: + logger.debug( + f"No upstreamDataflow entity with id={str(upstream_dataflow.targetDataflowId)} " + f"found for datamodel [{datamodel_entity.name.root}]" + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name="Dataflow and UpstreamDataflow Lineage", + error=( + f"Error to yield dataflow and upstreamDataflow lineage " + f"between [{datamodel_entity.name.root}, {str(upstream_dataflow.targetDataflowId)}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + def yield_dashboard_lineage_details( self, dashboard_details: Group, @@ -911,7 +1036,6 @@ class PowerbiSource(DashboardServiceSource): try: if isinstance(dashboard_details, PowerBIReport): yield from self.create_datamodel_report_lineage( - db_service_name=db_service_name, dashboard_details=dashboard_details, ) if isinstance(dashboard_details, PowerBIDashboard): @@ -922,7 +1046,68 @@ class PowerbiSource(DashboardServiceSource): yield Either( left=StackTraceError( name="Dashboard Lineage", - error=f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}", + error=f"Error to yield dashboard lineage details for DB service name [{str(db_service_name)}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + """ + Iterate loop for filtered datamodels so datamodels which are not connected to + any report but have tables would be eligible for a dataset-db_table lineage. + Also create below lineages: + 1. dataset-db_table + 2. dataset-upstreamDataflow + 3. dataset-upstreamDataset + 4. dataflow-upstreamDataflow + """ + for datamodel in self.filtered_datamodels or []: + try: + datamodel_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=datamodel.id, + ) + datamodel_entity = self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=datamodel_fqn, + ) + if datamodel_entity: + if isinstance(datamodel, Dataset): + # 1. datamodel-db_table lineage + for table in datamodel.tables or []: + yield from self._get_table_and_datamodel_lineage( + db_service_name=db_service_name, + table=table, + datamodel_entity=datamodel_entity, + ) + # 2. dataset-upstreamDataflow lineage + yield from self.create_dataset_upstream_dataflow_lineage( + datamodel, datamodel_entity + ) + # 3. dataset-upstreamDataset lineage + yield from self.create_dataset_upstream_dataset_lineage( + datamodel, datamodel_entity + ) + # create the lineage between table and datamodel using the pbit files + if self.client.file_client: + yield from self.create_table_datamodel_lineage_from_files( + db_service_name=db_service_name, + datamodel_entity=datamodel_entity, + ) + elif isinstance(datamodel, Dataflow): + # create dataflow-upstreamDataflow lineage + yield from self.create_dataflow_upstream_dataflow_lineage( + datamodel, datamodel_entity + ) + else: + logger.warning( + f"Unknown datamodel type: {type(datamodel)}, name: {datamodel.name}" + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name="Datamodel Lineage", + error=f"Error to yield datamodel lineage details for DB service name [{str(db_service_name)}]: {exc}", stackTrace=traceback.format_exc(), ) ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py index e988b6a3fd7..2eca4099eb4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py @@ -174,6 +174,16 @@ class DatasetExpression(BaseModel): expression: Optional[str] = None +class UpstreaDataflow(BaseModel): + groupId: Optional[str] = None + targetDataflowId: Optional[str] = None + + +class UpstreaDataset(BaseModel): + groupId: Optional[str] = None + targetDatasetId: Optional[str] = None + + class Dataset(BaseModel): """ PowerBI Dataset Model @@ -187,6 +197,8 @@ class Dataset(BaseModel): users: Optional[List[PowerBIUser]] = [] expressions: Optional[List[DatasetExpression]] = [] configuredBy: Optional[str] = None + upstreamDataflows: Optional[List[UpstreaDataflow]] = [] + upstreamDatasets: Optional[List[UpstreaDataset]] = [] class DatasetResponse(BaseModel): @@ -205,6 +217,7 @@ class Dataflow(BaseModel): description: Optional[str] = None users: Optional[List[PowerBIUser]] = [] modifiedBy: Optional[str] = None + upstreamDataflows: Optional[List[UpstreaDataflow]] = [] class Group(BaseModel): diff --git a/ingestion/tests/unit/topology/dashboard/test_powerbi.py b/ingestion/tests/unit/topology/dashboard/test_powerbi.py index 957dec257e2..0c5669f5061 100644 --- a/ingestion/tests/unit/topology/dashboard/test_powerbi.py +++ b/ingestion/tests/unit/topology/dashboard/test_powerbi.py @@ -16,11 +16,14 @@ from metadata.generated.schema.type.entityReferenceList import EntityReferenceLi from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource from metadata.ingestion.source.dashboard.powerbi.models import ( + Dataflow, Dataset, PowerBIDashboard, PowerBiTable, PowerBITableSource, + UpstreaDataflow, ) +from metadata.utils import fqn MOCK_REDSHIFT_EXP = """ let @@ -191,6 +194,12 @@ MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel( columns=[], dataModelType=DataModelType.PowerBIDataModel.value, ) +MOCK_DATAMODEL_ENTITY = DashboardDataModel( + name="dummy_dataflow_id_a", + id=uuid.uuid4(), + dataModelType=DataModelType.PowerBIDataFlow.value, + columns=[], +) class PowerBIUnitTest(TestCase): @@ -383,3 +392,29 @@ class PowerBIUnitTest(TestCase): self.assertIsNone(result["database"]) self.assertIsNone(result["schema"]) self.assertEqual(result["table"], "CUSTOMER_TABLE") + + @pytest.mark.order(5) + @patch.object(OpenMetadata, "get_by_name", return_value=MOCK_DATAMODEL_ENTITY) + @patch.object(fqn, "build", return_value=None) + def test_upstream_dataflow_lineage(self, *_): + MOCK_DATAMODEL_ENTITY_2 = DashboardDataModel( + name="dummy_dataflow_id_b", + id=uuid.uuid4(), + dataModelType=DataModelType.PowerBIDataFlow.value, + columns=[], + ) + MOCK_DATAMODEL_2 = Dataflow( + name="dataflow_b", + objectId="dummy_dataflow_id_b", + upstreamDataflows=[ + UpstreaDataflow( + targetDataflowId="dataflow_a", + ) + ], + ) + lineage_request = list( + self.powerbi.create_dataflow_upstream_dataflow_lineage( + MOCK_DATAMODEL_2, MOCK_DATAMODEL_ENTITY_2 + ) + ) + assert lineage_request[0].right is not None