Fix: Quicksight Dashboard and chart Metadata Ingestion (#9786)

* Fix Quicksight json schema and errors

* Fix Quicksight lineage

* add pagination

* Generalize Pagination

* Fix Pylints

* update max results

* fix tests

* Add Pydantic Models for QuickSight resp

* Add fqn builder

* Fix Py lInt

* Fix Tests
This commit is contained in:
Ayush Shah 2023-01-18 20:37:41 +05:30 committed by GitHub
parent f2649041f2
commit 02147b6502
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 192 additions and 69 deletions

View File

@ -13,12 +13,14 @@
import traceback
from typing import Any, Iterable, List, Optional
from pydantic import ValidationError
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import (
QuickSightConnection,
)
@ -30,26 +32,36 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.quicksight.models import DataSourceResp
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
# BoundLimit for MaxResults = MaxResults >= 0 and MaxResults <= 100
QUICKSIGHT_MAXRESULTS = 100
class QuickSightSource(DashboardServiceSource):
class QuicksightSource(DashboardServiceSource):
"""
QuickSight Source Class
"""
config: WorkflowSource
metadata: OpenMetadataConnection
metadata: OpenMetadata
status: SourceStatus
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.aws_account_id = self.service_connection.awsAccountId
self.dashboard_url = None
self.default_args = {
"AwsAccountId": self.aws_account_id,
"MaxResults": QUICKSIGHT_MAXRESULTS,
}
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -61,21 +73,42 @@ class QuickSightSource(DashboardServiceSource):
)
return cls(config, metadata_config)
def _check_pagination(self, listing_method, entity_key) -> Optional[List]:
entity_summary_list = []
entity_response = listing_method(self.default_args)
entity_summary_list.extend(entity_response[entity_key])
while entity_response.get("NextToken"):
try:
copied_def_args = self.default_args.copy()
copied_def_args.update({"NextToken": entity_response.get("NextToken")})
entity_response = listing_method(copied_def_args)
entity_summary_list.extend(entity_response[entity_key])
except Exception as err:
logger.error(f"Pagination Failed with error: {err}")
logger.debug(traceback.format_exc())
break
return entity_summary_list
def get_dashboards_list(self) -> Optional[List[dict]]:
"""
Get List of all dashboards
"""
dashboard_ids = [
dashboard["DashboardId"]
for dashboard in self.client.list_dashboards(
AwsAccountId=self.aws_account_id
)["DashboardSummaryList"]
]
list_dashboards_func = lambda kwargs: self.client.list_dashboards( # pylint: disable=unnecessary-lambda-assignment
**kwargs
)
dashboard_summary_list = self._check_pagination(
listing_method=list_dashboards_func,
entity_key="DashboardSummaryList",
)
dashboard_set = {
dashboard["DashboardId"] for dashboard in dashboard_summary_list
}
dashboards = [
self.client.describe_dashboard(
AwsAccountId=self.aws_account_id, DashboardId=dashboard_id
)["Dashboard"]
for dashboard_id in dashboard_ids
for dashboard_id in dashboard_set
]
return dashboards
@ -97,15 +130,17 @@ class QuickSightSource(DashboardServiceSource):
"""
Method to Get Dashboard Entity
"""
dashboard_url = self.client.get_dashboard_embed_url(
self.dashboard_url = self.client.get_dashboard_embed_url(
AwsAccountId=self.aws_account_id,
DashboardId=dashboard_details["DashboardId"],
IdentityType="ANONYMOUS",
IdentityType=self.config.serviceConnection.__root__.config.identityType.value,
Namespace=self.config.serviceConnection.__root__.config.namespace
or "default",
)["EmbedUrl"]
yield CreateDashboardRequest(
name=dashboard_details["DashboardId"],
dashboardUrl=dashboard_url,
dashboardUrl=self.dashboard_url,
displayName=dashboard_details["Name"],
description=dashboard_details["Version"].get("Description", ""),
charts=[
@ -127,11 +162,6 @@ class QuickSightSource(DashboardServiceSource):
Returns:
Iterable[CreateChartRequest]
"""
dashboard_url = self.client.get_dashboard_embed_url(
AwsAccountId=self.aws_account_id,
DashboardId=dashboard_details["DashboardId"],
IdentityType="ANONYMOUS",
)["EmbedUrl"]
# Each dashboard is guaranteed to have at least one sheet, which represents
# a chart in the context of QuickSight
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.describe_dashboard
@ -149,7 +179,7 @@ class QuickSightSource(DashboardServiceSource):
displayName=chart["Name"],
description="",
chartType=ChartType.Other.value,
chartUrl=f"{dashboard_url}/sheets/{chart['SheetId']}",
chartUrl=f"{self.dashboard_url}/sheets/{chart['SheetId']}",
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
@ -161,61 +191,106 @@ class QuickSightSource(DashboardServiceSource):
logger.warning(f"Error creating chart [{chart}]: {exc}")
continue
def yield_dashboard_lineage_details(
def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
try:
dataset_arns = dashboard_details["Version"]["DatasetArns"]
dataset_ids = [
list_data_set_func = lambda kwargs: self.client.list_data_sets( # pylint: disable=unnecessary-lambda-assignment
**kwargs
)
data_set_summary_list = self._check_pagination(
listing_method=list_data_set_func,
entity_key="DataSetSummaries",
)
dataset_ids = {
dataset["DataSetId"]
for dataset in self.client.list_data_sets(
AwsAccountId=self.aws_account_id
)
if dataset["Arn"] in dataset_arns
]
data_source_arns = set()
for dataset in data_set_summary_list
if dataset.get("Arn") in dashboard_details["Version"]["DataSetArns"]
}
for dataset_id in dataset_ids:
for data_source in list(
self.client.describe_data_set(
AwsAccountId=self.aws_account_id, DataSetId=dataset_id
)["Dataset"]["PhysicalTableMap"].values()
)[0]:
data_source_arns.add(data_source["DataSourceArn"])
data_sources = [
data_source
for data_source in self.client.list_data_sources(
AwsAccountId=self.aws_account_id
)["DataSources"]
if data_source["Arn"] in data_source_arns
]
for data_source in data_sources:
database_name = data_source["Name"]
from_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=db_service_name,
database_name=database_name,
)
from_entity = self.metadata.get_by_name(
entity=Database,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details["DashboardId"],
)
to_entity = self.metadata.get_by_name(
entity=Dashboard,
fqn=to_fqn,
)
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
)["DataSet"]["PhysicalTableMap"].values()
):
try:
if not data_source.get("RelationalTable"):
raise KeyError(
f"We currently don't support lineage to {list(data_source.keys())}"
)
data_source_relational_table = data_source["RelationalTable"]
data_source_resp = DataSourceResp(
datasource_arn=data_source_relational_table[
"DataSourceArn"
],
schema_name=data_source_relational_table["Schema"],
table_name=data_source_relational_table["Name"],
)
except KeyError as err:
logger.error(err)
continue
except ValidationError as err:
logger.error(
f"{err} - error while trying to fetch lineage data source"
)
logger.debug(traceback.format_exc())
continue
# db_name = data_source
schema_name = data_source_resp.schema_name
table_name = data_source_resp.table_name
list_data_source_func = lambda kwargs: self.client.list_data_sources( # pylint: disable=unnecessary-lambda-assignment
**kwargs
)
data_source_summary_list = self._check_pagination(
listing_method=list_data_source_func,
entity_key="DataSources",
)
data_source_ids = [
data_source_arn["DataSourceId"]
for data_source_arn in data_source_summary_list
if data_source_arn["Arn"] in data_source_resp.datasource_arn
]
for data_source_id in data_source_ids:
data_source_dict = self.client.describe_data_source(
AwsAccountId=self.aws_account_id,
DataSourceId=data_source_id,
)["DataSource"]["DataSourceParameters"]
for db in data_source_dict.keys():
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
database_name=data_source_dict[db]["Database"],
schema_name=schema_name,
table_name=table_name,
skip_es_search=True,
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details["DashboardId"],
)
to_entity = self.metadata.get_by_name(
entity=Dashboard,
fqn=to_fqn,
)
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(

View File

@ -0,0 +1,22 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pydantic Model to validate Quick Sight responses
"""
from pydantic import BaseModel
class DataSourceResp(BaseModel):
datasource_arn: str
schema_name: str
table_name: str

View File

@ -7,6 +7,8 @@ from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
import pytest
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.entity.data.dashboard import Dashboard
@ -19,7 +21,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.dashboard.quicksight.metadata import QuickSightSource
from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource
mock_file_path = (
Path(__file__).parent.parent.parent / "resources/datasets/quicksight_dataset.json"
@ -169,7 +171,7 @@ EXPECTED_DASHBOARDS = [
]
def mock_get_dashboard_embed_url(AwsAccountId, DashboardId, IdentityType):
def mock_get_dashboard_embed_url(AwsAccountId, DashboardId, IdentityType, Namespace):
return {"EmbedUrl": "https://dashboards.example.com/embed/1234"}
@ -186,14 +188,16 @@ class QuickSightUnitTest(TestCase):
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_quicksight_config)
self.quicksight = QuickSightSource.create(
self.quicksight = QuicksightSource.create(
mock_quicksight_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.quicksight.dashboard_url = "https://dashboards.example.com/embed/1234"
self.quicksight.context.__dict__["dashboard"] = MOCK_DASHBOARD
self.quicksight.context.__dict__["dashboard_service"] = MOCK_DASHBOARD_SERVICE
self.quicksight.client.get_dashboard_embed_url = mock_get_dashboard_embed_url
@pytest.mark.order(1)
def test_dashboard(self):
dashboard_list = []
results = self.quicksight.yield_dashboard(MOCK_DASHBOARD_DETAILS)
@ -202,12 +206,14 @@ class QuickSightUnitTest(TestCase):
dashboard_list.append(result)
self.assertEqual(EXPECTED_DASHBOARD, dashboard_list[0])
@pytest.mark.order(2)
def test_dashboard_name(self):
assert (
self.quicksight.get_dashboard_name(MOCK_DASHBOARD_DETAILS)
== mock_data["Name"]
)
@pytest.mark.order(3)
def test_chart(self):
dashboard_details = MOCK_DASHBOARD_DETAILS
dashboard_details["Version"]["Sheets"] = mock_data["Version"]["Sheets"]

View File

@ -9,7 +9,9 @@
"quickSightType": {
"description": "QuickSight service type",
"type": "string",
"enum": ["QuickSight"],
"enum": [
"QuickSight"
],
"default": "QuickSight"
}
},
@ -29,11 +31,29 @@
"description": "AWS Account ID",
"type": "string"
},
"identityType": {
"title": "Identity Type",
"description": "The authentication method that the user uses to sign in.",
"type": "string",
"enum": [
"IAM",
"QUICKSIGHT",
"ANONYMOUS"
],
"default": "IAM"
},
"namespace": {
"description": "The Amazon QuickSight namespace that contains the dashboard IDs in this request ( To be provided when identityType is `ANONYMOUS` )",
"type": "string"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": ["awsConfig", "awsAccountId"]
}
"required": [
"awsConfig",
"awsAccountId"
]
}