MINOR - Added quicksight pydantic models (#16269)

* Added quicksight pydantic models

* pyformat

* resolved type hints

* Renamed sheet -> chart in models
This commit is contained in:
Suman Maharana 2024-05-17 12:10:20 +05:30 committed by GitHub
parent 1798b647c3
commit bd3f47a563
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 151 additions and 95 deletions

View File

@ -11,7 +11,7 @@
"""QuickSight source module"""
import traceback
from typing import Any, Iterable, List, Optional
from typing import Iterable, List, Optional
from pydantic import ValidationError
@ -34,7 +34,12 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.quicksight.models import DataSourceResp
from metadata.ingestion.source.dashboard.quicksight.models import (
DashboardDetail,
DashboardResp,
DataSourceResp,
DescribeDataSourceResponse,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.logger import ingestion_logger
@ -109,36 +114,40 @@ class QuicksightSource(DashboardServiceSource):
dashboard["DashboardId"] for dashboard in dashboard_summary_list
}
dashboards = [
self.client.describe_dashboard(
DashboardResp(
**self.client.describe_dashboard(
AwsAccountId=self.aws_account_id, DashboardId=dashboard_id
)["Dashboard"]
)
).Dashboard
for dashboard_id in dashboard_set
]
return dashboards
def get_dashboard_name(self, dashboard: dict) -> str:
def get_dashboard_name(self, dashboard: DashboardDetail) -> str:
"""
Get Dashboard Name
"""
return dashboard["Name"]
return dashboard.Name
def get_dashboard_details(self, dashboard: dict) -> dict:
def get_dashboard_details(self, dashboard: dict) -> DashboardDetail:
"""
Get Dashboard Details
"""
return dashboard
return DashboardDetail(**dashboard)
def yield_dashboard(
self, dashboard_details: dict
self, dashboard_details: DashboardDetail
) -> Iterable[Either[CreateDashboardRequest]]:
"""
Method to Get Dashboard Entity
"""
dashboard_request = CreateDashboardRequest(
name=dashboard_details["DashboardId"],
name=dashboard_details.DashboardId,
sourceUrl=self.dashboard_url,
displayName=dashboard_details["Name"],
description=dashboard_details["Version"].get("Description"),
displayName=dashboard_details.Name,
description=dashboard_details.Version.Description
if dashboard_details.Version
else None,
charts=[
fqn.build(
self.metadata,
@ -155,29 +164,29 @@ class QuicksightSource(DashboardServiceSource):
self.register_record(dashboard_request=dashboard_request)
def yield_dashboard_chart(
self, dashboard_details: Any
self, dashboard_details: DashboardDetail
) -> Iterable[Either[CreateChartRequest]]:
"""Get chart method"""
# Each dashboard is guaranteed to have at least one sheet, which represents
# a chart in the context of QuickSight
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.describe_dashboard
charts = dashboard_details["Version"]["Sheets"]
for chart in charts:
if dashboard_details.Version:
for chart in dashboard_details.Version.Charts or []:
try:
if filter_by_chart(
self.source_config.chartFilterPattern, chart["Name"]
self.source_config.chartFilterPattern, chart.Name
):
self.status.filter(chart["Name"], "Chart Pattern not allowed")
self.status.filter(chart.Name, "Chart Pattern not allowed")
continue
self.dashboard_url = (
f"https://{self.aws_region}.quicksight.aws.amazon.com/sn/dashboards"
f'/{dashboard_details.get("DashboardId")}'
f"/{dashboard_details.DashboardId}"
)
yield Either(
right=CreateChartRequest(
name=chart["SheetId"],
displayName=chart["Name"],
name=chart.ChartId,
displayName=chart.Name,
chartType=ChartType.Other.value,
sourceUrl=self.dashboard_url,
service=self.context.get().dashboard_service,
@ -193,7 +202,7 @@ class QuicksightSource(DashboardServiceSource):
)
def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
self, dashboard_details: dict, db_service_name: str
self, dashboard_details: DashboardDetail, db_service_name: str
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
@ -208,28 +217,26 @@ class QuicksightSource(DashboardServiceSource):
)
dataset_ids = {
dataset["DataSetId"]
for dataset in data_set_summary_list
if dataset.get("Arn") in dashboard_details["Version"]["DataSetArns"]
for dataset in data_set_summary_list or []
if dataset.get("Arn") in dashboard_details.Version.DataSetArns
}
for dataset_id in dataset_ids:
for data_source in list(
for dataset_id in dataset_ids or []:
for data_source in (
list(
self.client.describe_data_set(
AwsAccountId=self.aws_account_id, DataSetId=dataset_id
)["DataSet"]["PhysicalTableMap"].values()
)
or []
):
try:
if not data_source.get("RelationalTable"):
raise KeyError(
f"We currently don't support lineage to {list(data_source.keys())}"
)
data_source_relational_table = data_source["RelationalTable"]
data_source_resp = DataSourceResp(
datasource_arn=data_source_relational_table[
"DataSourceArn"
],
schema_name=data_source_relational_table["Schema"],
table_name=data_source_relational_table["Name"],
**data_source["RelationalTable"]
)
except (KeyError, ValidationError) as err:
yield Either(
@ -257,21 +264,25 @@ class QuicksightSource(DashboardServiceSource):
data_source_ids = [
data_source_arn["DataSourceId"]
for data_source_arn in data_source_summary_list
for data_source_arn in data_source_summary_list or []
if data_source_arn["Arn"] in data_source_resp.datasource_arn
]
for data_source_id in data_source_ids:
data_source_dict = self.client.describe_data_source(
for data_source_id in data_source_ids or []:
data_source_resp = DescribeDataSourceResponse(
**self.client.describe_data_source(
AwsAccountId=self.aws_account_id,
DataSourceId=data_source_id,
)["DataSource"]["DataSourceParameters"]
for db in data_source_dict.keys():
)
).DataSource
if data_source_resp and data_source_resp.DataSourceParameters:
data_source_dict = data_source_resp.DataSourceParameters
for db in data_source_dict.keys() or []:
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
database_name=data_source_dict[db]["Database"],
database_name=data_source_dict[db].get("Database"),
schema_name=schema_name,
table_name=table_name,
skip_es_search=True,
@ -284,12 +295,13 @@ class QuicksightSource(DashboardServiceSource):
self.metadata,
entity_type=Dashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details["DashboardId"],
dashboard_name=dashboard_details.DashboardId,
)
to_entity = self.metadata.get_by_name(
entity=Dashboard,
fqn=to_fqn,
)
if from_entity is not None and to_entity is not None:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)

View File

@ -12,11 +12,50 @@
Pydantic Model to validate Quick Sight responses
"""
from typing import List, Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
class DataSourceResp(BaseModel):
datasource_arn: str
schema_name: str
table_name: str
datasource_arn: str = Field(alias="DataSourceArn")
schema_name: str = Field(alias="Schema")
table_name: str = Field(alias="Name")
class VersionSheet(BaseModel):
ChartId: Optional[str] = Field(alias="SheetId")
Name: Optional[str]
class DashboardVersion(BaseModel):
Status: Optional[str]
Arn: Optional[str]
SourceEntityArn: Optional[str]
DataSetArns: Optional[List]
Description: Optional[str]
Charts: Optional[List[VersionSheet]] = Field(alias="Sheets")
class DashboardDetail(BaseModel):
DashboardId: str
Arn: Optional[str]
Name: str
Version: Optional[DashboardVersion]
class DashboardResp(BaseModel):
Dashboard: DashboardDetail
Status: Optional[int]
RequestId: Optional[str]
class DataSource(BaseModel):
DataSourceId: str
DataSourceParameters: Optional[dict]
class DescribeDataSourceResponse(BaseModel):
DataSource: Optional[DataSource]
RequestId: Optional[str]
Status: Optional[int]

View File

@ -35,6 +35,7 @@ from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.source.dashboard.quicksight.metadata import QuicksightSource
from metadata.ingestion.source.dashboard.quicksight.models import DashboardDetail
mock_file_path = (
Path(__file__).parent.parent.parent / "resources/datasets/quicksight_dataset.json"
@ -171,7 +172,9 @@ class QuickSightUnitTest(TestCase):
@pytest.mark.order(1)
def test_dashboard(self):
dashboard_list = []
results = self.quicksight.yield_dashboard(MOCK_DASHBOARD_DETAILS)
results = self.quicksight.yield_dashboard(
DashboardDetail(**MOCK_DASHBOARD_DETAILS)
)
for result in results:
if isinstance(result, Either) and result.right:
dashboard_list.append(result.right)
@ -180,14 +183,16 @@ class QuickSightUnitTest(TestCase):
@pytest.mark.order(2)
def test_dashboard_name(self):
assert (
self.quicksight.get_dashboard_name(MOCK_DASHBOARD_DETAILS)
self.quicksight.get_dashboard_name(
DashboardDetail(**MOCK_DASHBOARD_DETAILS)
)
== mock_data["Name"]
)
@pytest.mark.order(3)
def test_chart(self):
dashboard_details = MOCK_DASHBOARD_DETAILS
dashboard_details["Version"]["Sheets"] = mock_data["Version"]["Sheets"]
dashboard_details = DashboardDetail(**MOCK_DASHBOARD_DETAILS)
dashboard_details.Version.Charts = mock_data["Version"]["Sheets"]
results = self.quicksight.yield_dashboard_chart(dashboard_details)
chart_list = []
for result in results: