diff --git a/ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py b/ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py index dafbd1ea292..4a2870ce620 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py @@ -22,7 +22,6 @@ from metadata.generated.schema.entity.services.connections.dashboard.sigmaConnec from metadata.ingestion.ometa.client import REST, ClientConfig from metadata.ingestion.source.dashboard.sigma.models import ( AuthToken, - EdgeSource, EdgeSourceResponse, Elements, ElementsResponse, @@ -161,7 +160,7 @@ class SigmaApiClient: def get_lineage_details( self, workbook_id: str, element_id: str - ) -> Optional[List[EdgeSource]]: + ) -> Optional[List[NodeDetails]]: """ method to fetch dashboards lineage details from api """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py index cce758aea18..e44cf5e29a6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py @@ -15,11 +15,15 @@ from typing import Iterable, List, Optional from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest -from metadata.generated.schema.entity.data.chart import Chart -from metadata.generated.schema.entity.data.dashboard import ( - Dashboard as LineageDashboard, +from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, ) -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, +) +from metadata.generated.schema.entity.data.table import Column, DataType, Table from metadata.generated.schema.entity.services.connections.dashboard.sigmaConnection import ( SigmaConnection, ) @@ -40,7 +44,12 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource -from metadata.ingestion.source.dashboard.sigma.models import Workbook, WorkbookDetails +from metadata.ingestion.source.dashboard.sigma.models import ( + Elements, + NodeDetails, + Workbook, + WorkbookDetails, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.helpers import get_standard_chart_type @@ -69,6 +78,14 @@ class SigmaSource(DashboardServiceSource): ) return cls(config, metadata) + def __init__( + self, + config: WorkflowSource, + metadata: OpenMetadata, + ): + super().__init__(config, metadata) + self.data_models: List[Elements] = [] + def get_dashboards_list(self) -> Optional[List[Workbook]]: """ get list of dashboard @@ -164,6 +181,46 @@ class SigmaSource(DashboardServiceSource): ) ) + def _get_datamodel(self, datamodel_id: str): + datamodel_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=datamodel_id, + ) + if datamodel_fqn: + return self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=datamodel_fqn, + ) + return None + + def _get_table_entity_from_node( + self, node: NodeDetails, db_service_name: str + ) -> Optional[Table]: + """ + Get the table entity for lineage + """ + if node.node_schema: + try: + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=db_service_name, + schema_name=node.node_schema, + table_name=node.name, + database_name="", + ) + if table_fqn: + return self.metadata.get_by_name( + entity=Table, + fqn=table_fqn, + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error occured while finding table fqn: {exc}") + return None + def yield_dashboard_lineage_details( self, dashboard_details: WorkbookDetails, db_service_name: Optional[str] ): @@ -172,48 +229,90 @@ class SigmaSource(DashboardServiceSource): """ if not db_service_name: return - to_fqn = fqn.build( - self.metadata, - entity_type=LineageDashboard, - service_name=self.config.serviceName, - dashboard_name=str(dashboard_details.workbookId), - ) - to_entity = self.metadata.get_by_name( - entity=LineageDashboard, - fqn=to_fqn, - ) - for chart in self.context.get().charts or []: - nodes = self.client.get_lineage_details(dashboard_details.workbookId, chart) - for node in nodes: - if node.node_schema: - try: - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, - schema_name=node.node_schema, - table_name=node.name, - database_name="", + # charts and datamodels are same here as we are using charts as metadata for datamodels + for data_model in self.data_models or []: + try: + data_model_entity = self._get_datamodel( + datamodel_id=data_model.elementId + ) + if data_model_entity: + nodes = self.client.get_lineage_details( + dashboard_details.workbookId, data_model.elementId + ) + for node in nodes: + table_entity = self._get_table_entity_from_node( + node, db_service_name ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, - ) - if from_entity and to_entity: + if table_entity and data_model.columns: + columns_list = data_model.columns + column_lineage = self._get_column_lineage( + table_entity, data_model_entity, columns_list + ) yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity - ) - except Exception as exc: - yield Either( - left=StackTraceError( - name="Lineage", - error=( - "Error to yield dashboard lineage details for DB " - f"service name [{db_service_name}]: {exc}" - ), - stackTrace=traceback.format_exc(), + to_entity=data_model_entity, + from_entity=table_entity, + column_lineage=column_lineage, ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=f"{dashboard_details.name} Lineage", + error=( + "Error to yield dashboard lineage details for DB " + f"service name [{db_service_name}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) + + def get_column_info(self, element: Elements) -> Optional[List[Column]]: + """Build data model columns""" + datamodel_columns = [] + for col in element.columns or []: + try: + datamodel_columns.append( + Column( + name=col, + displayName=col, + dataType=DataType.UNKNOWN, + dataTypeDisplay="Sigma Field", + ) + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error to yield datamodel column: {exc}") + return datamodel_columns + + def yield_datamodel( + self, dashboard_details: WorkbookDetails + ) -> Iterable[Either[DashboardDataModel]]: + if self.source_config.includeDataModels: + # we are ingesting charts/Elements as datamodels here + self.data_models = self.client.get_chart_details( + dashboard_details.workbookId + ) + for data_model in self.data_models or []: + try: + data_model_request = CreateDashboardDataModelRequest( + name=EntityName(data_model.elementId), + displayName=data_model.name, + service=FullyQualifiedEntityName( + self.context.get().dashboard_service + ), + dataModelType=DataModelType.SigmaDataModel.value, + serviceType=self.service_connection.type.value, + columns=self.get_column_info(data_model), + ) + yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=data_model.elementId, + error=f"Error yielding Data Model [{data_model.elementId}]: {exc}", + stackTrace=traceback.format_exc(), ) + ) def get_owner_ref( self, dashboard_details: WorkbookDetails diff --git a/ingestion/src/metadata/ingestion/source/dashboard/sigma/models.py b/ingestion/src/metadata/ingestion/source/dashboard/sigma/models.py index db1cf528785..756ab318e91 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/sigma/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/sigma/models.py @@ -61,6 +61,7 @@ class Elements(BaseModel): elementId: str name: Optional[str] = None vizualizationType: Optional[str] = None + columns: Optional[List[str]] = [] class ElementsResponse(BaseModel): diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json index 1831dc3006c..60208a19542 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json @@ -23,7 +23,8 @@ "LookMlExplore", "PowerBIDataModel", "QlikDataModel", - "QuickSightDataModel" + "QuickSightDataModel", + "SigmaDataModel" ], "javaEnums": [ { @@ -55,6 +56,9 @@ }, { "name": "QuickSightDataModel" + }, + { + "name": "SigmaDataModel" } ] }