diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index e902457db68..afbf2fb7584 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -11,7 +11,7 @@ """QuickSight source module""" import traceback -from typing import Any, Iterable, List, Optional +from typing import Iterable, List, Optional from pydantic import ValidationError @@ -34,7 +34,12 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource -from metadata.ingestion.source.dashboard.quicksight.models import DataSourceResp +from metadata.ingestion.source.dashboard.quicksight.models import ( + DashboardDetail, + DashboardResp, + DataSourceResp, + DescribeDataSourceResponse, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.logger import ingestion_logger @@ -109,36 +114,40 @@ class QuicksightSource(DashboardServiceSource): dashboard["DashboardId"] for dashboard in dashboard_summary_list } dashboards = [ - self.client.describe_dashboard( - AwsAccountId=self.aws_account_id, DashboardId=dashboard_id - )["Dashboard"] + DashboardResp( + **self.client.describe_dashboard( + AwsAccountId=self.aws_account_id, DashboardId=dashboard_id + ) + ).Dashboard for dashboard_id in dashboard_set ] return dashboards - def get_dashboard_name(self, dashboard: dict) -> str: + def get_dashboard_name(self, dashboard: DashboardDetail) -> str: """ Get Dashboard Name """ - return dashboard["Name"] + return dashboard.Name - def get_dashboard_details(self, dashboard: dict) -> dict: + def get_dashboard_details(self, dashboard: dict) -> DashboardDetail: """ Get Dashboard Details """ - return dashboard + return DashboardDetail(**dashboard) def yield_dashboard( - self, dashboard_details: dict + self, dashboard_details: DashboardDetail ) -> Iterable[Either[CreateDashboardRequest]]: """ Method to Get Dashboard Entity """ dashboard_request = CreateDashboardRequest( - name=dashboard_details["DashboardId"], + name=dashboard_details.DashboardId, sourceUrl=self.dashboard_url, - displayName=dashboard_details["Name"], - description=dashboard_details["Version"].get("Description"), + displayName=dashboard_details.Name, + description=dashboard_details.Version.Description + if dashboard_details.Version + else None, charts=[ fqn.build( self.metadata, @@ -155,45 +164,45 @@ class QuicksightSource(DashboardServiceSource): self.register_record(dashboard_request=dashboard_request) def yield_dashboard_chart( - self, dashboard_details: Any + self, dashboard_details: DashboardDetail ) -> Iterable[Either[CreateChartRequest]]: """Get chart method""" # Each dashboard is guaranteed to have at least one sheet, which represents # a chart in the context of QuickSight # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.describe_dashboard - charts = dashboard_details["Version"]["Sheets"] - for chart in charts: - try: - if filter_by_chart( - self.source_config.chartFilterPattern, chart["Name"] - ): - self.status.filter(chart["Name"], "Chart Pattern not allowed") - continue + if dashboard_details.Version: + for chart in dashboard_details.Version.Charts or []: + try: + if filter_by_chart( + self.source_config.chartFilterPattern, chart.Name + ): + self.status.filter(chart.Name, "Chart Pattern not allowed") + continue - self.dashboard_url = ( - f"https://{self.aws_region}.quicksight.aws.amazon.com/sn/dashboards" - f'/{dashboard_details.get("DashboardId")}' - ) - yield Either( - right=CreateChartRequest( - name=chart["SheetId"], - displayName=chart["Name"], - chartType=ChartType.Other.value, - sourceUrl=self.dashboard_url, - service=self.context.get().dashboard_service, + self.dashboard_url = ( + f"https://{self.aws_region}.quicksight.aws.amazon.com/sn/dashboards" + f"/{dashboard_details.DashboardId}" ) - ) - except Exception as exc: - yield Either( - left=StackTraceError( - name="Chart", - error=f"Error creating chart [{chart}]: {exc}", - stackTrace=traceback.format_exc(), + yield Either( + right=CreateChartRequest( + name=chart.ChartId, + displayName=chart.Name, + chartType=ChartType.Other.value, + sourceUrl=self.dashboard_url, + service=self.context.get().dashboard_service, + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name="Chart", + error=f"Error creating chart [{chart}]: {exc}", + stackTrace=traceback.format_exc(), + ) ) - ) def yield_dashboard_lineage_details( # pylint: disable=too-many-locals - self, dashboard_details: dict, db_service_name: str + self, dashboard_details: DashboardDetail, db_service_name: str ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between dashboard and data sources @@ -208,28 +217,26 @@ class QuicksightSource(DashboardServiceSource): ) dataset_ids = { dataset["DataSetId"] - for dataset in data_set_summary_list - if dataset.get("Arn") in dashboard_details["Version"]["DataSetArns"] + for dataset in data_set_summary_list or [] + if dataset.get("Arn") in dashboard_details.Version.DataSetArns } - for dataset_id in dataset_ids: - for data_source in list( - self.client.describe_data_set( - AwsAccountId=self.aws_account_id, DataSetId=dataset_id - )["DataSet"]["PhysicalTableMap"].values() + for dataset_id in dataset_ids or []: + for data_source in ( + list( + self.client.describe_data_set( + AwsAccountId=self.aws_account_id, DataSetId=dataset_id + )["DataSet"]["PhysicalTableMap"].values() + ) + or [] ): try: if not data_source.get("RelationalTable"): raise KeyError( f"We currently don't support lineage to {list(data_source.keys())}" ) - data_source_relational_table = data_source["RelationalTable"] data_source_resp = DataSourceResp( - datasource_arn=data_source_relational_table[ - "DataSourceArn" - ], - schema_name=data_source_relational_table["Schema"], - table_name=data_source_relational_table["Name"], + **data_source["RelationalTable"] ) except (KeyError, ValidationError) as err: yield Either( @@ -257,42 +264,47 @@ class QuicksightSource(DashboardServiceSource): data_source_ids = [ data_source_arn["DataSourceId"] - for data_source_arn in data_source_summary_list + for data_source_arn in data_source_summary_list or [] if data_source_arn["Arn"] in data_source_resp.datasource_arn ] - for data_source_id in data_source_ids: - data_source_dict = self.client.describe_data_source( - AwsAccountId=self.aws_account_id, - DataSourceId=data_source_id, - )["DataSource"]["DataSourceParameters"] - for db in data_source_dict.keys(): - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, - database_name=data_source_dict[db]["Database"], - schema_name=schema_name, - table_name=table_name, - skip_es_search=True, - ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, - ) - to_fqn = fqn.build( - self.metadata, - entity_type=Dashboard, - service_name=self.config.serviceName, - dashboard_name=dashboard_details["DashboardId"], - ) - to_entity = self.metadata.get_by_name( - entity=Dashboard, - fqn=to_fqn, - ) - yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity + for data_source_id in data_source_ids or []: + data_source_resp = DescribeDataSourceResponse( + **self.client.describe_data_source( + AwsAccountId=self.aws_account_id, + DataSourceId=data_source_id, ) + ).DataSource + if data_source_resp and data_source_resp.DataSourceParameters: + data_source_dict = data_source_resp.DataSourceParameters + for db in data_source_dict.keys() or []: + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=db_service_name, + database_name=data_source_dict[db].get("Database"), + schema_name=schema_name, + table_name=table_name, + skip_es_search=True, + ) + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + to_fqn = fqn.build( + self.metadata, + entity_type=Dashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_details.DashboardId, + ) + to_entity = self.metadata.get_by_name( + entity=Dashboard, + fqn=to_fqn, + ) + if from_entity is not None and to_entity is not None: + yield self._get_add_lineage_request( + to_entity=to_entity, from_entity=from_entity + ) except Exception as exc: # pylint: disable=broad-except yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py index f090c358116..246df59ed4d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py @@ -12,11 +12,50 @@ Pydantic Model to validate Quick Sight responses """ +from typing import List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field class DataSourceResp(BaseModel): - datasource_arn: str - schema_name: str - table_name: str + datasource_arn: str = Field(alias="DataSourceArn") + schema_name: str = Field(alias="Schema") + table_name: str = Field(alias="Name") + + +class VersionSheet(BaseModel): + ChartId: Optional[str] = Field(alias="SheetId") + Name: Optional[str] + + +class DashboardVersion(BaseModel): + Status: Optional[str] + Arn: Optional[str] + SourceEntityArn: Optional[str] + DataSetArns: Optional[List] + Description: Optional[str] + Charts: Optional[List[VersionSheet]] = Field(alias="Sheets") + + +class DashboardDetail(BaseModel): + DashboardId: str + Arn: Optional[str] + Name: str + Version: Optional[DashboardVersion] + + +class DashboardResp(BaseModel): + Dashboard: DashboardDetail + Status: Optional[int] + RequestId: Optional[str] + + +class DataSource(BaseModel): + DataSourceId: str + DataSourceParameters: Optional[dict] + + +class DescribeDataSourceResponse(BaseModel): + DataSource: Optional[DataSource] + RequestId: Optional[str] + Status: Optional[int] diff --git a/ingestion/tests/unit/topology/dashboard/test_quicksight.py b/ingestion/tests/unit/topology/dashboard/test_quicksight.py index 1b3f8f01389..d63e2c49262 100644 --- a/ingestion/tests/unit/topology/dashboard/test_quicksight.py +++ b/ingestion/tests/unit/topology/dashboard/test_quicksight.py @@ -35,6 +35,7 @@ from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource +from metadata.ingestion.source.dashboard.quicksight.models import DashboardDetail mock_file_path = ( Path(__file__).parent.parent.parent / "resources/datasets/quicksight_dataset.json" @@ -171,7 +172,9 @@ class QuickSightUnitTest(TestCase): @pytest.mark.order(1) def test_dashboard(self): dashboard_list = [] - results = self.quicksight.yield_dashboard(MOCK_DASHBOARD_DETAILS) + results = self.quicksight.yield_dashboard( + DashboardDetail(**MOCK_DASHBOARD_DETAILS) + ) for result in results: if isinstance(result, Either) and result.right: dashboard_list.append(result.right) @@ -180,14 +183,16 @@ class QuickSightUnitTest(TestCase): @pytest.mark.order(2) def test_dashboard_name(self): assert ( - self.quicksight.get_dashboard_name(MOCK_DASHBOARD_DETAILS) + self.quicksight.get_dashboard_name( + DashboardDetail(**MOCK_DASHBOARD_DETAILS) + ) == mock_data["Name"] ) @pytest.mark.order(3) def test_chart(self): - dashboard_details = MOCK_DASHBOARD_DETAILS - dashboard_details["Version"]["Sheets"] = mock_data["Version"]["Sheets"] + dashboard_details = DashboardDetail(**MOCK_DASHBOARD_DETAILS) + dashboard_details.Version.Charts = mock_data["Version"]["Sheets"] results = self.quicksight.yield_dashboard_chart(dashboard_details) chart_list = [] for result in results: