feat: add QuickSight connector (#8442)

This commit is contained in:
michizhou 2022-11-08 06:24:49 -08:00 committed by GitHub
parent 1538972341
commit da06d183db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 596 additions and 1 deletions

View File

@ -21,6 +21,7 @@ from metadata.clients.connection_clients import (
GlueDBClient,
GluePipelineClient,
KinesisClient,
QuickSightClient,
SageMakerClient,
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
@ -99,3 +100,6 @@ class AWSClient:
def get_kinesis_client(self) -> KinesisClient:
return KinesisClient(self.get_client("kinesis"))
def get_quicksight_client(self) -> QuickSightClient:
return QuickSightClient(self.get_client("quicksight"))

View File

@ -97,6 +97,12 @@ class LookerClient:
self.client = client
@dataclass
class QuickSightClient:
def __init__(self, client) -> None:
self.client = client
@dataclass
class DatalakeClient:
def __init__(self, client, config) -> None:

View File

@ -0,0 +1,24 @@
source:
type: quicksight
serviceName: local_quicksight
serviceConnection:
config:
type: QuickSight
awsConfig:
awsAccessKeyId: aws_access_key_id
awsSecretAccessKey: aws_secret_access_key
awsRegion: aws region
endPointURL: https://quicksight.<region_name>.amazonaws.com
awsAccountId: aws_account_id
sourceConfig:
config:
type: DashboardMetadata
dashboardFilterPattern: {}
chartFilterPattern: {}
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -0,0 +1,224 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""QuickSight source module"""
import traceback
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import (
QuickSightConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class QuickSightSource(DashboardServiceSource):
"""
QuickSight Source Class
"""
config: WorkflowSource
metadata: OpenMetadataConnection
status: SourceStatus
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.quicksight = self.connection.client
self.aws_account_id = self.service_connection.awsAccountId
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = WorkflowSource.parse_obj(config_dict)
connection: QuickSightConnection = config.serviceConnection.__root__.config
if not isinstance(connection, QuickSightConnection):
raise InvalidSourceException(
f"Expected QuickSightConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_dashboards_list(self) -> Optional[List[dict]]:
"""
Get List of all dashboards
"""
dashboard_ids = [
dashboard["DashboardId"]
for dashboard in self.quicksight.list_dashboards(
AwsAccountId=self.aws_account_id
)["DashboardSummaryList"]
]
dashboards = [
self.quicksight.describe_dashboard(
AwsAccountId=self.aws_account_id, DashboardId=dashboard_id
)["Dashboard"]
for dashboard_id in dashboard_ids
]
return dashboards
def get_dashboard_name(self, dashboard_details: dict) -> str:
"""
Get Dashboard Name
"""
return dashboard_details["Name"]
def get_dashboard_details(self, dashboard: dict) -> dict:
"""
Get Dashboard Details
"""
return dashboard
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
dashboard_url = self.quicksight.get_dashboard_embed_url(
AwsAccountId=self.aws_account_id,
DashboardId=dashboard_details["DashboardId"],
IdentityType="ANONYMOUS",
)["EmbedUrl"]
yield CreateDashboardRequest(
name=dashboard_details["DashboardId"],
dashboardUrl=dashboard_url,
displayName=dashboard_details["Name"],
description=dashboard_details["Version"].get("Description", ""),
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
)
def yield_dashboard_chart(
self, dashboard_details: Any
) -> Optional[Iterable[CreateChartRequest]]:
"""Get chart method
Args:
dashboard_details:
Returns:
Iterable[CreateChartRequest]
"""
dashboard_url = self.quicksight.get_dashboard_embed_url(
AwsAccountId=self.aws_account_id,
DashboardId=dashboard_details["DashboardId"],
IdentityType="ANONYMOUS",
)["EmbedUrl"]
# Each dashboard is guaranteed to have at least one sheet, which represents
# a chart in the context of QuickSight
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.describe_dashboard
charts = dashboard_details["Version"]["Sheets"]
for chart in charts:
try:
if filter_by_chart(
self.source_config.chartFilterPattern, chart["Name"]
):
self.status.filter(chart["Name"], "Chart Pattern not allowed")
continue
yield CreateChartRequest(
name=chart["Name"],
displayName=chart["Name"],
description="",
chartType=ChartType.Other.value,
chartUrl=f"{dashboard_url}/sheets/{chart['SheetId']}",
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
self.status.scanned(chart["Name"])
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error creating chart [{chart}]: {exc}")
continue
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
try:
dataset_arns = dashboard_details["Version"]["DatasetArns"]
dataset_ids = [
dataset["DataSetId"]
for dataset in self.quicksight.list_data_sets(
AwsAccountId=self.aws_account_id
)
if dataset["Arn"] in dataset_arns
]
data_source_arns = set()
for dataset_id in dataset_ids:
for data_source in list(
self.quicksight.describe_data_set(
AwsAccountId=self.aws_account_id, DataSetId=dataset_id
)["Dataset"]["PhysicalTableMap"].values()
)[0]:
data_source_arns.add(data_source["DataSourceArn"])
data_sources = [
data_source
for data_source in self.quicksight.list_data_sources(
AwsAccountId=self.aws_account_id
)["DataSources"]
if data_source["Arn"] in data_source_arns
]
for data_source in data_sources:
database_name = data_source["Name"]
from_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=db_service_name,
database_name=database_name,
)
from_entity = self.metadata.get_by_name(
entity=Database,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details["DashboardId"],
)
to_entity = self.metadata.get_by_name(
entity=Dashboard,
fqn=to_fqn,
)
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)

View File

@ -47,6 +47,7 @@ from metadata.clients.connection_clients import (
ModeClient,
NifiClientWrapper,
PowerBiClient,
QuickSightClient,
RedashClient,
SageMakerClient,
SalesforceClient,
@ -72,6 +73,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.modeConnect
from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import (
PowerBIConnection,
)
from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import (
QuickSightConnection,
)
from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import (
RedashConnection,
)
@ -847,6 +851,36 @@ def _(connection: LookerClient) -> None:
raise SourceConnectionException(msg) from exc
@get_connection.register
def _(
connection: QuickSightConnection,
verbose: bool = False, # pylint: disable=unused-argument
) -> QuickSightClient:
from metadata.clients.aws_client import AWSClient
quicksight_connection = AWSClient(connection.awsConfig).get_quicksight_client()
return quicksight_connection
@test_connection.register
def _(connection: QuickSightClient) -> None:
"""
Test that we can connect to the QuickSight source using the given aws resource
:param engine: boto service resource to test
:return: None or raise an exception if we cannot connect
"""
from botocore.client import ClientError
try:
connection.client.list_dashboards(AwsAccountId=connection.awsAccountId)
except ClientError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg) from exc
@test_connection.register
def _(connection: DatalakeClient) -> None:
"""

View File

@ -0,0 +1,37 @@
{
"DashboardId":"552315335",
"Arn":"arn:aws:quicksight:us-east-2:673353295256:dashboard/85648f16-52ca-4a3d-9516-bf3298715a2e",
"Name":"New Dashboard",
"Version": {
"CreatedTime": "2022-10-07T12:05:38.903Z",
"Errors": [],
"VersionNumber": 123,
"Status": "UPDATE_SUCCESSFUL",
"Arn": "arn:aws:quicksight:us-east-2:673353295256:dashboard/85648f16-52ca-4a3d-9516-bf3298715a2e",
"SourceEntityArn": "99429b3c-36eb-40d8-90a5-a59d76e4d242",
"DataSetArns": [
"arn:aws:quicksight:us-east-2:673353295256:dataset/1eb93e0f-a262-4f62-a948-ad9a52b5d0b5",
"arn:aws:quicksight:us-east-2:673353295256:dataset/2e41e76b-fc02-480d-a932-91bdbea40fe5",
"arn:aws:quicksight:us-east-2:673353295256:dataset/eb319fef-58ee-4dcb-986d-90a885269bc6"
],
"Description": "",
"ThemeArn": "21a5c41c-da65-4531-83cb-e6d3425c4ec7",
"Sheets":[
{
"SheetId":"1108771657",
"Name":"Top Salespeople"
},
{
"SheetId":"1985861713",
"Name":"Milan Datasets"
},
{
"SheetId":"2025899139",
"Name":"Page Fans"
}
]
},
"CreatedTime": "2022-10-06T13:32:59.000Z",
"LastPublishedTime": "2022-10-07T12:05:38.903Z",
"LastUpdatedTime": "2022-10-07T12:05:38.903Z"
}

View File

@ -0,0 +1,220 @@
"""
Test QuickSight using the topology
"""
import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
DashboardService,
DashboardServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.dashboard.quicksight import QuickSightSource
mock_file_path = (
Path(__file__).parent.parent.parent / "resources/datasets/quicksight_dataset.json"
)
with open(mock_file_path, encoding="UTF-8") as file:
mock_data: dict = json.load(file)
MOCK_DASHBOARD_SERVICE = DashboardService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="quicksight_source_test",
connection=DashboardConnection(),
serviceType=DashboardServiceType.QuickSight,
)
MOCK_DASHBOARD = Dashboard(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
name="do_it_all_with_default_config",
fullyQualifiedName="quicksight_source.do_it_all_with_default_config",
displayName="do_it_all_with_default_config",
description="",
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="dashboardService"
),
)
mock_quicksight_config = {
"source": {
"type": "quicksight",
"serviceName": "local_quicksight",
"serviceConnection": {
"config": {
"type": "QuickSight",
"awsConfig": {
"awsAccessKeyId": "aws_access_key_id",
"awsSecretAccessKey": "aws_secret_access_key",
"awsRegion": "us-east-2",
"endPointURL": "https://endpoint.com/",
},
"awsAccountId": "6733-5329-5256",
}
},
"sourceConfig": {
"config": {"dashboardFilterPattern": {}, "chartFilterPattern": {}}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_DASHBOARD_DETAILS = {
"DashboardId": "552315335",
"Name": "New Dashboard",
"Version": {
"Sheets": [],
},
}
EXPECTED_DASHBOARD = CreateDashboardRequest(
name="552315335",
displayName="New Dashboard",
description="",
dashboardUrl="https://dashboards.example.com/embed/1234",
charts=[],
tags=None,
owner=None,
service=EntityReference(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
type="dashboardService",
name=None,
fullyQualifiedName=None,
description=None,
displayName=None,
deleted=None,
href=None,
),
extension=None,
)
EXPECTED_DASHBOARDS = [
CreateChartRequest(
name="Top Salespeople",
displayName="Top Salespeople",
description="",
chartType="Other",
chartUrl="https://dashboards.example.com/embed/1234/sheets/1108771657",
tables=None,
tags=None,
owner=None,
service=EntityReference(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
type="dashboardService",
name=None,
fullyQualifiedName=None,
description=None,
displayName=None,
deleted=None,
href=None,
),
),
CreateChartRequest(
name="Milan Datasets",
displayName="Milan Datasets",
description="",
chartType="Other",
chartUrl="https://dashboards.example.com/embed/1234/sheets/1985861713",
tables=None,
tags=None,
owner=None,
service=EntityReference(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
type="dashboardService",
name=None,
fullyQualifiedName=None,
description=None,
displayName=None,
deleted=None,
href=None,
),
),
CreateChartRequest(
name="Page Fans",
displayName="Page Fans",
description="",
chartType="Other",
chartUrl="https://dashboards.example.com/embed/1234/sheets/2025899139",
tables=None,
tags=None,
owner=None,
service=EntityReference(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
type="dashboardService",
name=None,
fullyQualifiedName=None,
description=None,
displayName=None,
deleted=None,
href=None,
),
),
]
def mock_get_dashboard_embed_url(AwsAccountId, DashboardId, IdentityType):
return {"EmbedUrl": "https://dashboards.example.com/embed/1234"}
class QuickSightUnitTest(TestCase):
"""
Implements the necessary methods to extract
QuickSight Unit Test
"""
@patch("metadata.ingestion.source.dashboard.dashboard_service.test_connection")
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_quicksight_config)
self.quicksight = QuickSightSource.create(
mock_quicksight_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.quicksight.context.__dict__["dashboard"] = MOCK_DASHBOARD
self.quicksight.context.__dict__["dashboard_service"] = MOCK_DASHBOARD_SERVICE
self.quicksight.quicksight.get_dashboard_embed_url = (
mock_get_dashboard_embed_url
)
def test_dashboard(self):
dashboard_list = []
results = self.quicksight.yield_dashboard(MOCK_DASHBOARD_DETAILS)
for result in results:
if isinstance(result, CreateDashboardRequest):
dashboard_list.append(result)
self.assertEqual(EXPECTED_DASHBOARD, dashboard_list[0])
def test_dashboard_name(self):
assert (
self.quicksight.get_dashboard_name(MOCK_DASHBOARD_DETAILS)
== mock_data["Name"]
)
def test_chart(self):
dashboard_details = MOCK_DASHBOARD_DETAILS
dashboard_details["Version"]["Sheets"] = mock_data["Version"]["Sheets"]
results = self.quicksight.yield_dashboard_chart(dashboard_details)
chart_list = []
for result in results:
if isinstance(result, CreateChartRequest):
chart_list.append(result)
for _, (expected, original) in enumerate(zip(EXPECTED_DASHBOARDS, chart_list)):
self.assertEqual(expected, original)

View File

@ -0,0 +1,39 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/dashboard/quickSightConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "QuickSightConnection",
"description": "QuickSight Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.dashboard.QuickSightConnection",
"definitions": {
"quickSightType": {
"description": "QuickSight service type",
"type": "string",
"enum": ["QuickSight"],
"default": "QuickSight"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/quickSightType",
"default": "QuickSight"
},
"awsConfig": {
"title": "AWS Credentials Configuration",
"$ref": "../../../../security/credentials/awsCredentials.json"
},
"awsAccountId": {
"title": "AWS Account ID",
"description": "AWS Account ID",
"type": "string"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": ["awsConfig", "awsAccountId"]
}

View File

@ -23,7 +23,8 @@
"PowerBI",
"Mode",
"CustomDashboard",
"DomoDashboard"
"DomoDashboard",
"QuickSight"
],
"javaEnums": [
{
@ -52,6 +53,9 @@
},
{
"name": "DomoDashboard"
},
{
"name": "QuickSight"
}
]
},
@ -92,6 +96,9 @@
},
{
"$ref": "./connections/dashboard/domodashboardConnection.json"
},
{
"$ref": "./connections/dashboard/quickSightConnection.json"
}
]
}