From 44c5dd46a9a151a64312ba835bfc312e5648735d Mon Sep 17 00:00:00 2001 From: harshsoni2024 <64592571+harshsoni2024@users.noreply.github.com> Date: Thu, 24 Oct 2024 11:41:37 +0530 Subject: [PATCH] GEN-1911: Quicksight lineage source fix (#18348) --- .../source/dashboard/quicksight/metadata.py | 444 ++++++++++++++---- .../source/dashboard/quicksight/models.py | 20 +- .../entity/data/dashboardDataModel.json | 6 +- 3 files changed, 387 insertions(+), 83 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index fe091527d08..1d036743002 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -17,13 +17,20 @@ from pydantic import ValidationError from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import Chart, ChartType -from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, +) +from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import ( QuickSightConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -35,17 +42,31 @@ from metadata.generated.schema.type.basic import ( FullyQualifiedEntityName, Markdown, SourceUrl, + Uuid, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource +from metadata.ingestion.source.dashboard.dashboard_service import ( + LINEAGE_MAP, + DashboardServiceSource, +) from metadata.ingestion.source.dashboard.quicksight.models import ( DashboardDetail, DashboardResp, + DataSourceModel, DataSourceResp, + DataSourceRespQuery, + DataSourceRespS3, DescribeDataSourceResponse, ) +from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.logger import ingestion_logger @@ -209,12 +230,283 @@ class QuicksightSource(DashboardServiceSource): ) ) + def _get_database_service(self, db_service_name: str): + return self.metadata.get_by_name(DatabaseService, db_service_name) + + def _describe_data_sets( + self, dataset_id, dashboard_details: DashboardDetail + ) -> List: + """call botocore's describe api for datasets""" + try: + return list( + self.client.describe_data_set( + AwsAccountId=self.aws_account_id, DataSetId=dataset_id + )["DataSet"]["PhysicalTableMap"].values() + ) + except Exception as err: + logger.info( + f"Cannot parse lineage from the dashboard: {dashboard_details.Name} to dataset due to: {err}" + ) + return [] + + def _yield_lineage_from_query( + self, + data_model_entity, + data_source_resp: DataSourceModel, + dashboard_details: DashboardDetail, + db_service_entity, + ) -> Iterable[Either[AddLineageRequest]]: + """yield lineage from table(parsed form query source) <-> dashboard""" + if not db_service_entity: + logger.debug(f"db service is not ingested") + return None + sql_query = data_source_resp.data_source_resp.query + source_database_names = [] + try: + if data_source_resp.DataSourceParameters: + data_source_dict = data_source_resp.DataSourceParameters + for db in data_source_dict.keys() or []: + source_database_names.append(data_source_dict[db].get("Database")) + except Exception as err: + logger.info(f"Error to parse database names from source:{err}") + return None + + try: + lineage_parser = LineageParser( + sql_query, + ConnectionTypeDialectMapper.dialect_of( + db_service_entity.serviceType.value + ) + if db_service_entity + else None, + ) + lineage_details = LineageDetails( + source=LineageSource.DashboardLineage, sqlQuery=sql_query + ) + for db_name in source_database_names: + for table in lineage_parser.source_tables: + database_schema_name, table = fqn.split(str(table))[-2:] + database_schema_name = self.check_database_schema_name( + database_schema_name + ) + from_entities = search_table_entities( + metadata=self.metadata, + database=db_name, + service_name=db_service_entity.name.root, + database_schema=database_schema_name, + table=table, + ) + for from_entity in from_entities: + if from_entity is not None and data_model_entity is not None: + columns = [ + col.name.root for col in data_model_entity.columns + ] + column_lineage = self._get_column_lineage( + from_entity, data_model_entity, columns + ) + lineage_details.columnsLineage = column_lineage + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=Uuid(from_entity.id.root), + type=LINEAGE_MAP[type(from_entity)], + ), + toEntity=EntityReference( + id=Uuid(data_model_entity.id.root), + type=LINEAGE_MAP[type(data_model_entity)], + ), + lineageDetails=lineage_details, + ) + ) + ) + + except Exception as err: + yield Either( + left=StackTraceError( + name=dashboard_details.DashboardId, + error=f"Wild error ingesting table(query) <-> datamodel lineage {dashboard_details} - {err}", + stackTrace=traceback.format_exc(), + ) + ) + + def _yield_lineage_from_s3( + self, + data_model_entity, + data_source_resp: DataSourceModel, + dashboard_details: DashboardDetail, + ) -> Iterable[Either[AddLineageRequest]]: + """yield lineage from s3 container <-> dashboard""" + try: + if data_source_resp and data_source_resp.DataSourceParameters: + data_source_dict = data_source_resp.DataSourceParameters + for s3_param in data_source_dict.keys() or []: + bucket_name = ( + data_source_dict[s3_param] + .get("ManifestFileLocation", {}) + .get("Bucket") + ) + key_name = ( + data_source_dict[s3_param] + .get("ManifestFileLocation", {}) + .get("Key") + ) + containers = self.metadata.es_search_container_by_path( + full_path=f"s3://{bucket_name}/{key_name}" + ) + for container in containers: + if container is not None and data_model_entity is not None: + storage_entity = EntityReference( + id=Uuid(container.id.root), + type="container", + ) + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=storage_entity, + toEntity=EntityReference( + id=Uuid(data_model_entity.id.root), + type=LINEAGE_MAP[type(data_model_entity)], + ), + ) + ) + ) + except Exception as err: + yield Either( + left=StackTraceError( + name=dashboard_details.DashboardId, + error=f"Wild error ingesting s3 <-> datamodel lineage {dashboard_details} - {err}", + stackTrace=traceback.format_exc(), + ) + ) + + def _yield_lineage_from_table( + self, + data_model_entity, + data_source_resp: DataSourceModel, + dashboard_details: DashboardDetail, + db_service_entity, + ) -> Iterable[Either[AddLineageRequest]]: + """yield lineage from table <-> dashboard""" + try: + schema_name = data_source_resp.data_source_resp.schema_name + table_name = data_source_resp.data_source_resp.table_name + if data_source_resp and data_source_resp.DataSourceParameters: + data_source_dict = data_source_resp.DataSourceParameters + for db in data_source_dict.keys() or []: + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=db_service_entity.name.root, + database_name=data_source_dict[db].get("Database"), + schema_name=schema_name, + table_name=table_name, + ) + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + if from_entity is not None and data_model_entity is not None: + columns = [col.name.root for col in data_model_entity.columns] + column_lineage = self._get_column_lineage( + from_entity, data_model_entity, columns + ) + yield self._get_add_lineage_request( + to_entity=data_model_entity, + from_entity=from_entity, + column_lineage=column_lineage, + ) + except Exception as err: + yield Either( + left=StackTraceError( + name=dashboard_details.DashboardId, + error=f"Wild error ingesting table <-> datamodel lineage {dashboard_details} - {err}", + stackTrace=traceback.format_exc(), + ) + ) + + def _get_datamodel(self, datamodel_id: str): + datamodel_fqn = fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=datamodel_id, + ) + if datamodel_fqn: + return self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=datamodel_fqn, + ) + return None + def yield_dashboard_lineage_details( # pylint: disable=too-many-locals self, dashboard_details: DashboardDetail, db_service_name: str ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) + for datamodel in self.data_models or []: + try: + data_model_entity = self._get_datamodel( + datamodel_id=datamodel.DataSource.DataSourceId + ) + if isinstance( + datamodel.DataSource.data_source_resp, DataSourceRespQuery + ): + yield from self._yield_lineage_from_query( + data_model_entity, + datamodel.DataSource, + dashboard_details, + db_service_entity, + ) + elif isinstance( + datamodel.DataSource.data_source_resp, DataSourceRespS3 + ): + yield from self._yield_lineage_from_s3( + data_model_entity, datamodel.DataSource, dashboard_details + ) + elif isinstance(datamodel.DataSource.data_source_resp, DataSourceResp): + yield from self._yield_lineage_from_table( + data_model_entity, + datamodel.DataSource, + dashboard_details, + db_service_entity, + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name="Lineage", + error=f"Error to yield dashboard lineage details for DB service name [{db_service_name}] and dashboard_name [{dashboard_details.Name}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def _get_column_info(self, data_model: DescribeDataSourceResponse): + """Get column info""" + datasource_columns = [] + for field in data_model.DataSource.data_source_resp.columns or []: + try: + col_parse = ColumnTypeParser._parse_datatype_string( # pylint: disable=protected-access + field.get("Type") + ) + parsed_fields = { + "name": field.get("Name"), + "displayName": field.get("Name"), + "dataType": col_parse.get("dataType"), + } + datasource_columns.append(Column(**parsed_fields)) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error to yield datamodel column: {exc}") + return datasource_columns + + def _get_dashboard_datamodels(self, dashboard_details: DashboardDetail) -> list: + """Get dashboard datamodels""" + data_models = [] + dataset_ids = [] try: list_data_set_func = lambda kwargs: self.client.list_data_sets( # pylint: disable=unnecessary-lambda-assignment **kwargs @@ -228,105 +520,95 @@ class QuicksightSource(DashboardServiceSource): for dataset in data_set_summary_list or [] if dataset.get("Arn") in dashboard_details.Version.DataSetArns } + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error while processing datamodels for dashboard: {dashboard_details.Name}: {exc}" + ) - for dataset_id in dataset_ids or []: - for data_source in ( - list( - self.client.describe_data_set( - AwsAccountId=self.aws_account_id, DataSetId=dataset_id - )["DataSet"]["PhysicalTableMap"].values() - ) - or [] - ): - try: - if not data_source.get("RelationalTable"): - raise KeyError( - f"We currently don't support lineage to {list(data_source.keys())}" - ) + for dataset_id in dataset_ids or []: + data_source_list = self._describe_data_sets(dataset_id, dashboard_details) + for data_source in data_source_list: + try: + if data_source.get("RelationalTable"): data_source_resp = DataSourceResp( **data_source["RelationalTable"] ) - except (KeyError, ValidationError) as err: - data_source_resp = None - yield Either( - left=StackTraceError( - name="Lineage", - error=( - "Error to yield dashboard lineage details for DB service" - f" name [{db_service_name}]: {err}" - ), - stackTrace=traceback.format_exc(), - ) + elif data_source.get("CustomSql"): + data_source_resp = DataSourceRespQuery( + **data_source["CustomSql"] ) - if data_source_resp: - schema_name = data_source_resp.schema_name - table_name = data_source_resp.table_name - + elif data_source.get("S3Source"): + data_source_resp = DataSourceRespS3(**data_source["S3Source"]) + else: + raise KeyError( + f"We currently don't support data sources: {list(data_source.keys())}" + ) + except (KeyError, ValidationError) as err: + data_source_resp = None + logger.info( + f"Error while processing datamodels for dashboard {dashboard_details.Name}: {err}" + ) + continue + if data_source_resp: + try: list_data_source_func = lambda kwargs: self.client.list_data_sources( # pylint: disable=unnecessary-lambda-assignment **kwargs ) - data_source_summary_list = self._check_pagination( listing_method=list_data_source_func, entity_key="DataSources", ) - data_source_ids = [ data_source_arn["DataSourceId"] for data_source_arn in data_source_summary_list or [] if data_source_arn["Arn"] in data_source_resp.datasource_arn ] - for data_source_id in data_source_ids or []: - data_source_resp = DescribeDataSourceResponse( + desribed_source = DescribeDataSourceResponse( **self.client.describe_data_source( AwsAccountId=self.aws_account_id, DataSourceId=data_source_id, ) - ).DataSource - if ( + ) + desribed_source.DataSource.data_source_resp = ( data_source_resp - and data_source_resp.DataSourceParameters - ): - data_source_dict = data_source_resp.DataSourceParameters - for db in data_source_dict.keys() or []: - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, - database_name=data_source_dict[db].get( - "Database" - ), - schema_name=schema_name, - table_name=table_name, - skip_es_search=True, - ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, - ) - to_fqn = fqn.build( - self.metadata, - entity_type=Dashboard, - service_name=self.config.serviceName, - dashboard_name=dashboard_details.DashboardId, - ) - to_entity = self.metadata.get_by_name( - entity=Dashboard, - fqn=to_fqn, - ) - if ( - from_entity is not None - and to_entity is not None - ): - yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity - ) - except Exception as exc: # pylint: disable=broad-except - yield Either( - left=StackTraceError( - name="Lineage", - error=f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}", - stackTrace=traceback.format_exc(), + ) + data_models.append(desribed_source) + except Exception as err: + logger.info( + f"Error while processing data sources for dashboard {dashboard_details.Name}: {err}" + ) + return data_models + + def yield_datamodel( + self, dashboard_details: DashboardDetail + ) -> Iterable[Either[CreateDashboardDataModelRequest]]: + """ + Method to ingest the Datasources(Published and Embedded) as DataModels from Quicksight + """ + self.data_models: List[ + DescribeDataSourceResponse + ] = self._get_dashboard_datamodels(dashboard_details) + for data_model in self.data_models: + try: + data_model_request = CreateDashboardDataModelRequest( + name=EntityName(data_model.DataSource.DataSourceId), + displayName=data_model.DataSource.Name, + service=FullyQualifiedEntityName( + self.context.get().dashboard_service + ), + dataModelType=DataModelType.QuickSightDataModel.value, + serviceType=self.service_connection.type.value, + columns=self._get_column_info(data_model), + ) + yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=data_model.DataSource.Name, + error=f"Error yielding Data Model [{data_model.DataSource.Name}]: {exc}", + stackTrace=traceback.format_exc(), + ) ) - ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py index fa279af2dc5..37b71035a2a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py @@ -12,7 +12,7 @@ Pydantic Model to validate Quick Sight responses """ -from typing import List, Optional +from typing import List, Optional, Union from pydantic import BaseModel, Field @@ -21,6 +21,19 @@ class DataSourceResp(BaseModel): datasource_arn: str = Field(alias="DataSourceArn") schema_name: str = Field(alias="Schema") table_name: str = Field(alias="Name") + columns: Optional[list] = Field(alias="InputColumns") + + +class DataSourceRespQuery(BaseModel): + datasource_arn: str = Field(alias="DataSourceArn") + query: str = Field(alias="SqlQuery") + table_name: str = Field(alias="Name") + columns: Optional[list] = Field(alias="Columns") + + +class DataSourceRespS3(BaseModel): + datasource_arn: str = Field(alias="DataSourceArn") + columns: Optional[list] = Field(alias="InputColumns") class VersionSheet(BaseModel): @@ -51,8 +64,13 @@ class DashboardResp(BaseModel): class DataSourceModel(BaseModel): + Name: str + Type: str DataSourceId: str DataSourceParameters: Optional[dict] = None + data_source_resp: Optional[ + Union[DataSourceRespS3, DataSourceRespQuery, DataSourceResp] + ] = None class DescribeDataSourceResponse(BaseModel): diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json index a8092ac328e..20d0803afb0 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json @@ -22,7 +22,8 @@ "LookMlView", "LookMlExplore", "PowerBIDataModel", - "QlikDataModel" + "QlikDataModel", + "QuickSightDataModel" ], "javaEnums": [ { @@ -45,6 +46,9 @@ }, { "name": "QlikDataModel" + }, + { + "name": "QuickSightDataModel" } ] }