diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index 1efadbfb979..8f4861f714d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -13,12 +13,14 @@ import traceback from typing import Any, Iterable, List, Optional +from pydantic import ValidationError + from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import ( QuickSightConnection, ) @@ -30,26 +32,36 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException, SourceStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource +from metadata.ingestion.source.dashboard.quicksight.models import DataSourceResp from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +# BoundLimit for MaxResults = MaxResults >= 0 and MaxResults <= 100 +QUICKSIGHT_MAXRESULTS = 100 -class QuickSightSource(DashboardServiceSource): + +class QuicksightSource(DashboardServiceSource): """ QuickSight Source Class """ config: WorkflowSource - metadata: OpenMetadataConnection + metadata: OpenMetadata status: SourceStatus def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) self.aws_account_id = self.service_connection.awsAccountId + self.dashboard_url = None + self.default_args = { + "AwsAccountId": self.aws_account_id, + "MaxResults": QUICKSIGHT_MAXRESULTS, + } @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -61,21 +73,42 @@ class QuickSightSource(DashboardServiceSource): ) return cls(config, metadata_config) + def _check_pagination(self, listing_method, entity_key) -> Optional[List]: + entity_summary_list = [] + entity_response = listing_method(self.default_args) + entity_summary_list.extend(entity_response[entity_key]) + while entity_response.get("NextToken"): + try: + copied_def_args = self.default_args.copy() + copied_def_args.update({"NextToken": entity_response.get("NextToken")}) + entity_response = listing_method(copied_def_args) + entity_summary_list.extend(entity_response[entity_key]) + except Exception as err: + logger.error(f"Pagination Failed with error: {err}") + logger.debug(traceback.format_exc()) + break + return entity_summary_list + def get_dashboards_list(self) -> Optional[List[dict]]: """ Get List of all dashboards """ - dashboard_ids = [ - dashboard["DashboardId"] - for dashboard in self.client.list_dashboards( - AwsAccountId=self.aws_account_id - )["DashboardSummaryList"] - ] + list_dashboards_func = lambda kwargs: self.client.list_dashboards( # pylint: disable=unnecessary-lambda-assignment + **kwargs + ) + + dashboard_summary_list = self._check_pagination( + listing_method=list_dashboards_func, + entity_key="DashboardSummaryList", + ) + dashboard_set = { + dashboard["DashboardId"] for dashboard in dashboard_summary_list + } dashboards = [ self.client.describe_dashboard( AwsAccountId=self.aws_account_id, DashboardId=dashboard_id )["Dashboard"] - for dashboard_id in dashboard_ids + for dashboard_id in dashboard_set ] return dashboards @@ -97,15 +130,17 @@ class QuickSightSource(DashboardServiceSource): """ Method to Get Dashboard Entity """ - dashboard_url = self.client.get_dashboard_embed_url( + self.dashboard_url = self.client.get_dashboard_embed_url( AwsAccountId=self.aws_account_id, DashboardId=dashboard_details["DashboardId"], - IdentityType="ANONYMOUS", + IdentityType=self.config.serviceConnection.__root__.config.identityType.value, + Namespace=self.config.serviceConnection.__root__.config.namespace + or "default", )["EmbedUrl"] yield CreateDashboardRequest( name=dashboard_details["DashboardId"], - dashboardUrl=dashboard_url, + dashboardUrl=self.dashboard_url, displayName=dashboard_details["Name"], description=dashboard_details["Version"].get("Description", ""), charts=[ @@ -127,11 +162,6 @@ class QuickSightSource(DashboardServiceSource): Returns: Iterable[CreateChartRequest] """ - dashboard_url = self.client.get_dashboard_embed_url( - AwsAccountId=self.aws_account_id, - DashboardId=dashboard_details["DashboardId"], - IdentityType="ANONYMOUS", - )["EmbedUrl"] # Each dashboard is guaranteed to have at least one sheet, which represents # a chart in the context of QuickSight # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.describe_dashboard @@ -149,7 +179,7 @@ class QuickSightSource(DashboardServiceSource): displayName=chart["Name"], description="", chartType=ChartType.Other.value, - chartUrl=f"{dashboard_url}/sheets/{chart['SheetId']}", + chartUrl=f"{self.dashboard_url}/sheets/{chart['SheetId']}", service=EntityReference( id=self.context.dashboard_service.id.__root__, type="dashboardService", @@ -161,61 +191,106 @@ class QuickSightSource(DashboardServiceSource): logger.warning(f"Error creating chart [{chart}]: {exc}") continue - def yield_dashboard_lineage_details( + def yield_dashboard_lineage_details( # pylint: disable=too-many-locals self, dashboard_details: dict, db_service_name: str ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ try: - dataset_arns = dashboard_details["Version"]["DatasetArns"] - dataset_ids = [ + list_data_set_func = lambda kwargs: self.client.list_data_sets( # pylint: disable=unnecessary-lambda-assignment + **kwargs + ) + data_set_summary_list = self._check_pagination( + listing_method=list_data_set_func, + entity_key="DataSetSummaries", + ) + dataset_ids = { dataset["DataSetId"] - for dataset in self.client.list_data_sets( - AwsAccountId=self.aws_account_id - ) - if dataset["Arn"] in dataset_arns - ] - data_source_arns = set() + for dataset in data_set_summary_list + if dataset.get("Arn") in dashboard_details["Version"]["DataSetArns"] + } + for dataset_id in dataset_ids: for data_source in list( self.client.describe_data_set( AwsAccountId=self.aws_account_id, DataSetId=dataset_id - )["Dataset"]["PhysicalTableMap"].values() - )[0]: - data_source_arns.add(data_source["DataSourceArn"]) - data_sources = [ - data_source - for data_source in self.client.list_data_sources( - AwsAccountId=self.aws_account_id - )["DataSources"] - if data_source["Arn"] in data_source_arns - ] - for data_source in data_sources: - database_name = data_source["Name"] - from_fqn = fqn.build( - self.metadata, - entity_type=Database, - service_name=db_service_name, - database_name=database_name, - ) - from_entity = self.metadata.get_by_name( - entity=Database, - fqn=from_fqn, - ) - to_fqn = fqn.build( - self.metadata, - entity_type=Dashboard, - service_name=self.config.serviceName, - dashboard_name=dashboard_details["DashboardId"], - ) - to_entity = self.metadata.get_by_name( - entity=Dashboard, - fqn=to_fqn, - ) - yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity - ) + )["DataSet"]["PhysicalTableMap"].values() + ): + try: + if not data_source.get("RelationalTable"): + raise KeyError( + f"We currently don't support lineage to {list(data_source.keys())}" + ) + data_source_relational_table = data_source["RelationalTable"] + data_source_resp = DataSourceResp( + datasource_arn=data_source_relational_table[ + "DataSourceArn" + ], + schema_name=data_source_relational_table["Schema"], + table_name=data_source_relational_table["Name"], + ) + except KeyError as err: + logger.error(err) + continue + except ValidationError as err: + logger.error( + f"{err} - error while trying to fetch lineage data source" + ) + logger.debug(traceback.format_exc()) + continue + + # db_name = data_source + schema_name = data_source_resp.schema_name + table_name = data_source_resp.table_name + + list_data_source_func = lambda kwargs: self.client.list_data_sources( # pylint: disable=unnecessary-lambda-assignment + **kwargs + ) + + data_source_summary_list = self._check_pagination( + listing_method=list_data_source_func, + entity_key="DataSources", + ) + + data_source_ids = [ + data_source_arn["DataSourceId"] + for data_source_arn in data_source_summary_list + if data_source_arn["Arn"] in data_source_resp.datasource_arn + ] + + for data_source_id in data_source_ids: + data_source_dict = self.client.describe_data_source( + AwsAccountId=self.aws_account_id, + DataSourceId=data_source_id, + )["DataSource"]["DataSourceParameters"] + for db in data_source_dict.keys(): + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=db_service_name, + database_name=data_source_dict[db]["Database"], + schema_name=schema_name, + table_name=table_name, + skip_es_search=True, + ) + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + to_fqn = fqn.build( + self.metadata, + entity_type=Dashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_details["DashboardId"], + ) + to_entity = self.metadata.get_by_name( + entity=Dashboard, + fqn=to_fqn, + ) + yield self._get_add_lineage_request( + to_entity=to_entity, from_entity=from_entity + ) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.error( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py new file mode 100644 index 00000000000..f090c358116 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/models.py @@ -0,0 +1,22 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Pydantic Model to validate Quick Sight responses +""" + + +from pydantic import BaseModel + + +class DataSourceResp(BaseModel): + datasource_arn: str + schema_name: str + table_name: str diff --git a/ingestion/tests/unit/topology/dashboard/test_quicksight.py b/ingestion/tests/unit/topology/dashboard/test_quicksight.py index b92ec6310fa..17d37210d6a 100644 --- a/ingestion/tests/unit/topology/dashboard/test_quicksight.py +++ b/ingestion/tests/unit/topology/dashboard/test_quicksight.py @@ -7,6 +7,8 @@ from pathlib import Path from unittest import TestCase from unittest.mock import patch +import pytest + from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -19,7 +21,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.source.dashboard.quicksight.metadata import QuickSightSource +from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource mock_file_path = ( Path(__file__).parent.parent.parent / "resources/datasets/quicksight_dataset.json" @@ -169,7 +171,7 @@ EXPECTED_DASHBOARDS = [ ] -def mock_get_dashboard_embed_url(AwsAccountId, DashboardId, IdentityType): +def mock_get_dashboard_embed_url(AwsAccountId, DashboardId, IdentityType, Namespace): return {"EmbedUrl": "https://dashboards.example.com/embed/1234"} @@ -186,14 +188,16 @@ class QuickSightUnitTest(TestCase): super().__init__(methodName) test_connection.return_value = False self.config = OpenMetadataWorkflowConfig.parse_obj(mock_quicksight_config) - self.quicksight = QuickSightSource.create( + self.quicksight = QuicksightSource.create( mock_quicksight_config["source"], self.config.workflowConfig.openMetadataServerConfig, ) + self.quicksight.dashboard_url = "https://dashboards.example.com/embed/1234" self.quicksight.context.__dict__["dashboard"] = MOCK_DASHBOARD self.quicksight.context.__dict__["dashboard_service"] = MOCK_DASHBOARD_SERVICE self.quicksight.client.get_dashboard_embed_url = mock_get_dashboard_embed_url + @pytest.mark.order(1) def test_dashboard(self): dashboard_list = [] results = self.quicksight.yield_dashboard(MOCK_DASHBOARD_DETAILS) @@ -202,12 +206,14 @@ class QuickSightUnitTest(TestCase): dashboard_list.append(result) self.assertEqual(EXPECTED_DASHBOARD, dashboard_list[0]) + @pytest.mark.order(2) def test_dashboard_name(self): assert ( self.quicksight.get_dashboard_name(MOCK_DASHBOARD_DETAILS) == mock_data["Name"] ) + @pytest.mark.order(3) def test_chart(self): dashboard_details = MOCK_DASHBOARD_DETAILS dashboard_details["Version"]["Sheets"] = mock_data["Version"]["Sheets"] diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/quickSightConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/quickSightConnection.json index c07c5eaef4b..e553f507bc8 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/quickSightConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/quickSightConnection.json @@ -9,7 +9,9 @@ "quickSightType": { "description": "QuickSight service type", "type": "string", - "enum": ["QuickSight"], + "enum": [ + "QuickSight" + ], "default": "QuickSight" } }, @@ -29,11 +31,29 @@ "description": "AWS Account ID", "type": "string" }, + "identityType": { + "title": "Identity Type", + "description": "The authentication method that the user uses to sign in.", + "type": "string", + "enum": [ + "IAM", + "QUICKSIGHT", + "ANONYMOUS" + ], + "default": "IAM" + }, + "namespace": { + "description": "The Amazon QuickSight namespace that contains the dashboard IDs in this request ( To be provided when identityType is `ANONYMOUS` )", + "type": "string" + }, "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" } }, "additionalProperties": false, - "required": ["awsConfig", "awsAccountId"] -} + "required": [ + "awsConfig", + "awsAccountId" + ] +} \ No newline at end of file