feat-21712: PowerBI internal entities & cross workspace lineage (#21837)

This commit is contained in:
harshsoni2024 2025-06-18 20:46:17 +05:30 committed by harshsoni2024
parent cfb2b4f4c8
commit db2081cf4a
3 changed files with 270 additions and 37 deletions

View File

@ -239,6 +239,7 @@ class PowerbiSource(DashboardServiceSource):
self.workspace_data.append(workspace)
self.context.get().workspace = workspace
self.filtered_dashboards = []
self.filtered_datamodels = []
for dashboard in self.get_dashboards_list() or []:
dashboard_details = self.get_dashboard_details(dashboard)
dashboard_name = self.get_dashboard_name(dashboard_details)
@ -531,6 +532,7 @@ class PowerbiSource(DashboardServiceSource):
):
self.status.filter(dataset.name, "Data model filtered out.")
continue
self.filtered_datamodels.append(dataset)
if isinstance(dataset, Dataset):
data_model_type = DataModelType.PowerBIDataModel.value
datamodel_columns = self._get_column_info(dataset)
@ -605,7 +607,7 @@ class PowerbiSource(DashboardServiceSource):
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Lineage",
name="Report and Dashboard Lineage",
error=f"Error to yield report and dashboard lineage details: {exc}",
stackTrace=traceback.format_exc(),
)
@ -613,7 +615,6 @@ class PowerbiSource(DashboardServiceSource):
def create_datamodel_report_lineage(
self,
db_service_name: Optional[str],
dashboard_details: PowerBIReport,
) -> Iterable[Either[CreateDashboardRequest]]:
"""
@ -630,14 +631,11 @@ class PowerbiSource(DashboardServiceSource):
entity=Dashboard,
fqn=report_fqn,
)
dataset = self._fetch_dataset_from_workspace(dashboard_details.datasetId)
if dataset:
datamodel_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=dataset.id,
data_model_name=dashboard_details.datasetId,
)
datamodel_entity = self.metadata.get_by_name(
entity=DashboardDataModel,
@ -649,26 +647,12 @@ class PowerbiSource(DashboardServiceSource):
to_entity=report_entity, from_entity=datamodel_entity
)
for table in dataset.tables or []:
yield from self._get_table_and_datamodel_lineage(
db_service_name=db_service_name,
table=table,
datamodel_entity=datamodel_entity,
)
# create the lineage between table and datamodel using the pbit files
if self.client.file_client:
yield from self.create_table_datamodel_lineage_from_files(
db_service_name=db_service_name,
datamodel_entity=datamodel_entity,
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name=f"{db_service_name} Report Lineage",
name=f"Datamodel and Report Lineage",
error=(
"Error to yield datamodel and report lineage details for DB "
f"service name [{db_service_name}]: {exc}"
f"Error to yield datamodel and report lineage details: {exc}"
),
stackTrace=traceback.format_exc(),
)
@ -891,7 +875,148 @@ class PowerbiSource(DashboardServiceSource):
name="DataModel Lineage",
error=(
"Error to yield datamodel lineage details for DB "
f"service name [{db_service_name}]: {exc}"
f"service name [{str(db_service_name)}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
)
def create_dataset_upstream_dataflow_lineage(
self, datamodel: Dataset, datamodel_entity: DashboardDataModel
) -> Iterable[Either[AddLineageRequest]]:
"""
Create lineage between dataset and upstreamDataflow
"""
for upstream_dataflow in datamodel.upstreamDataflows or []:
try:
if not upstream_dataflow.targetDataflowId:
logger.debug(
f"No targetDataflowId found for upstreamDataflow in "
f"datamodel [{datamodel_entity.name.root}], "
f"Moving to next upstreamDataflow"
)
continue
upstream_dataflow_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=upstream_dataflow.targetDataflowId,
)
upstream_dataflow_entity = self.metadata.get_by_name(
entity=DashboardDataModel,
fqn=upstream_dataflow_fqn,
)
if upstream_dataflow_entity and datamodel_entity:
yield self._get_add_lineage_request(
from_entity=upstream_dataflow_entity,
to_entity=datamodel_entity,
)
else:
logger.debug(
f"No upstreamDataflow entity with id={str(upstream_dataflow.targetDataflowId)} "
f"found for datamodel [{datamodel_entity.name.root}]"
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Dataset and UpstreamDataflow Lineage",
error=(
"Error to yield dataset and upstreamDataflow lineage "
f"between [{datamodel_entity.name.root}, {str(upstream_dataflow.targetDataflowId)}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
)
def create_dataset_upstream_dataset_lineage(
self, datamodel: Dataset, datamodel_entity: DashboardDataModel
) -> Iterable[Either[AddLineageRequest]]:
"""
Create lineage between dataset and upstreamDataset
"""
for upstream_dataset in datamodel.upstreamDatasets or []:
try:
if not upstream_dataset.targetDatasetId:
logger.debug(
f"No targetDatasetId found for upstreamDataset in "
f"datamodel [{datamodel_entity.name.root}], "
f"Moving to next upstreamDataset"
)
continue
upstream_dataset_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=upstream_dataset.targetDatasetId,
)
upstream_dataset_entity = self.metadata.get_by_name(
entity=DashboardDataModel,
fqn=upstream_dataset_fqn,
)
if upstream_dataset_entity and datamodel_entity:
yield self._get_add_lineage_request(
from_entity=upstream_dataset_entity,
to_entity=datamodel_entity,
)
else:
logger.debug(
f"No upstreamDataset entity with id={str(upstream_dataset.targetDatasetId)} "
f"found for datamodel [{datamodel_entity.name.root}]"
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Dataset and UpstreamDataset Lineage",
error=(
"Error to yield dataset and upstreamDataset lineage "
f"between [{datamodel_entity.name.root}, {str(upstream_dataset.targetDatasetId)}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
)
def create_dataflow_upstream_dataflow_lineage(
self, datamodel: Dataflow, datamodel_entity: DashboardDataModel
) -> Iterable[Either[AddLineageRequest]]:
"""
Create lineage between dataflow and upstreamDataflow
"""
for upstream_dataflow in datamodel.upstreamDataflows or []:
try:
if not upstream_dataflow.targetDataflowId:
logger.debug(
f"No targetDataflowId found for upstreamDataflow in "
f"datamodel [{datamodel_entity.name.root}], "
f"Moving to next upstreamDataflow"
)
continue
upstream_dataflow_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=upstream_dataflow.targetDataflowId,
)
upstream_dataflow_entity = self.metadata.get_by_name(
entity=DashboardDataModel,
fqn=upstream_dataflow_fqn,
)
if upstream_dataflow_entity and datamodel_entity:
yield self._get_add_lineage_request(
from_entity=upstream_dataflow_entity,
to_entity=datamodel_entity,
)
else:
logger.debug(
f"No upstreamDataflow entity with id={str(upstream_dataflow.targetDataflowId)} "
f"found for datamodel [{datamodel_entity.name.root}]"
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Dataflow and UpstreamDataflow Lineage",
error=(
f"Error to yield dataflow and upstreamDataflow lineage "
f"between [{datamodel_entity.name.root}, {str(upstream_dataflow.targetDataflowId)}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
@ -911,7 +1036,6 @@ class PowerbiSource(DashboardServiceSource):
try:
if isinstance(dashboard_details, PowerBIReport):
yield from self.create_datamodel_report_lineage(
db_service_name=db_service_name,
dashboard_details=dashboard_details,
)
if isinstance(dashboard_details, PowerBIDashboard):
@ -922,7 +1046,68 @@ class PowerbiSource(DashboardServiceSource):
yield Either(
left=StackTraceError(
name="Dashboard Lineage",
error=f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}",
error=f"Error to yield dashboard lineage details for DB service name [{str(db_service_name)}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
"""
Iterate loop for filtered datamodels so datamodels which are not connected to
any report but have tables would be eligible for a dataset-db_table lineage.
Also create below lineages:
1. dataset-db_table
2. dataset-upstreamDataflow
3. dataset-upstreamDataset
4. dataflow-upstreamDataflow
"""
for datamodel in self.filtered_datamodels or []:
try:
datamodel_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=datamodel.id,
)
datamodel_entity = self.metadata.get_by_name(
entity=DashboardDataModel,
fqn=datamodel_fqn,
)
if datamodel_entity:
if isinstance(datamodel, Dataset):
# 1. datamodel-db_table lineage
for table in datamodel.tables or []:
yield from self._get_table_and_datamodel_lineage(
db_service_name=db_service_name,
table=table,
datamodel_entity=datamodel_entity,
)
# 2. dataset-upstreamDataflow lineage
yield from self.create_dataset_upstream_dataflow_lineage(
datamodel, datamodel_entity
)
# 3. dataset-upstreamDataset lineage
yield from self.create_dataset_upstream_dataset_lineage(
datamodel, datamodel_entity
)
# create the lineage between table and datamodel using the pbit files
if self.client.file_client:
yield from self.create_table_datamodel_lineage_from_files(
db_service_name=db_service_name,
datamodel_entity=datamodel_entity,
)
elif isinstance(datamodel, Dataflow):
# create dataflow-upstreamDataflow lineage
yield from self.create_dataflow_upstream_dataflow_lineage(
datamodel, datamodel_entity
)
else:
logger.warning(
f"Unknown datamodel type: {type(datamodel)}, name: {datamodel.name}"
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Datamodel Lineage",
error=f"Error to yield datamodel lineage details for DB service name [{str(db_service_name)}]: {exc}",
stackTrace=traceback.format_exc(),
)
)

View File

@ -174,6 +174,16 @@ class DatasetExpression(BaseModel):
expression: Optional[str] = None
class UpstreaDataflow(BaseModel):
groupId: Optional[str] = None
targetDataflowId: Optional[str] = None
class UpstreaDataset(BaseModel):
groupId: Optional[str] = None
targetDatasetId: Optional[str] = None
class Dataset(BaseModel):
"""
PowerBI Dataset Model
@ -187,6 +197,8 @@ class Dataset(BaseModel):
users: Optional[List[PowerBIUser]] = []
expressions: Optional[List[DatasetExpression]] = []
configuredBy: Optional[str] = None
upstreamDataflows: Optional[List[UpstreaDataflow]] = []
upstreamDatasets: Optional[List[UpstreaDataset]] = []
class DatasetResponse(BaseModel):
@ -205,6 +217,7 @@ class Dataflow(BaseModel):
description: Optional[str] = None
users: Optional[List[PowerBIUser]] = []
modifiedBy: Optional[str] = None
upstreamDataflows: Optional[List[UpstreaDataflow]] = []
class Group(BaseModel):

View File

@ -16,11 +16,14 @@ from metadata.generated.schema.type.entityReferenceList import EntityReferenceLi
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
from metadata.ingestion.source.dashboard.powerbi.models import (
Dataflow,
Dataset,
PowerBIDashboard,
PowerBiTable,
PowerBITableSource,
UpstreaDataflow,
)
from metadata.utils import fqn
MOCK_REDSHIFT_EXP = """
let
@ -191,6 +194,12 @@ MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel(
columns=[],
dataModelType=DataModelType.PowerBIDataModel.value,
)
MOCK_DATAMODEL_ENTITY = DashboardDataModel(
name="dummy_dataflow_id_a",
id=uuid.uuid4(),
dataModelType=DataModelType.PowerBIDataFlow.value,
columns=[],
)
class PowerBIUnitTest(TestCase):
@ -383,3 +392,29 @@ class PowerBIUnitTest(TestCase):
self.assertIsNone(result["database"])
self.assertIsNone(result["schema"])
self.assertEqual(result["table"], "CUSTOMER_TABLE")
@pytest.mark.order(5)
@patch.object(OpenMetadata, "get_by_name", return_value=MOCK_DATAMODEL_ENTITY)
@patch.object(fqn, "build", return_value=None)
def test_upstream_dataflow_lineage(self, *_):
MOCK_DATAMODEL_ENTITY_2 = DashboardDataModel(
name="dummy_dataflow_id_b",
id=uuid.uuid4(),
dataModelType=DataModelType.PowerBIDataFlow.value,
columns=[],
)
MOCK_DATAMODEL_2 = Dataflow(
name="dataflow_b",
objectId="dummy_dataflow_id_b",
upstreamDataflows=[
UpstreaDataflow(
targetDataflowId="dataflow_a",
)
],
)
lineage_request = list(
self.powerbi.create_dataflow_upstream_dataflow_lineage(
MOCK_DATAMODEL_2, MOCK_DATAMODEL_ENTITY_2
)
)
assert lineage_request[0].right is not None