GEN-1911: Quicksight lineage source fix (#18348)

This commit is contained in:
harshsoni2024 2024-10-24 11:41:37 +05:30 committed by harshsoni2024
parent d5028e9397
commit 44c5dd46a9
3 changed files with 387 additions and 83 deletions

View File

@ -17,13 +17,20 @@ from pydantic import ValidationError
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import Chart, ChartType
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import (
QuickSightConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
@ -35,17 +42,31 @@ from metadata.generated.schema.type.basic import (
FullyQualifiedEntityName,
Markdown,
SourceUrl,
Uuid,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.dashboard_service import (
LINEAGE_MAP,
DashboardServiceSource,
)
from metadata.ingestion.source.dashboard.quicksight.models import (
DashboardDetail,
DashboardResp,
DataSourceModel,
DataSourceResp,
DataSourceRespQuery,
DataSourceRespS3,
DescribeDataSourceResponse,
)
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.logger import ingestion_logger
@ -209,12 +230,283 @@ class QuicksightSource(DashboardServiceSource):
)
)
def _get_database_service(self, db_service_name: str):
return self.metadata.get_by_name(DatabaseService, db_service_name)
def _describe_data_sets(
self, dataset_id, dashboard_details: DashboardDetail
) -> List:
"""call botocore's describe api for datasets"""
try:
return list(
self.client.describe_data_set(
AwsAccountId=self.aws_account_id, DataSetId=dataset_id
)["DataSet"]["PhysicalTableMap"].values()
)
except Exception as err:
logger.info(
f"Cannot parse lineage from the dashboard: {dashboard_details.Name} to dataset due to: {err}"
)
return []
def _yield_lineage_from_query(
self,
data_model_entity,
data_source_resp: DataSourceModel,
dashboard_details: DashboardDetail,
db_service_entity,
) -> Iterable[Either[AddLineageRequest]]:
"""yield lineage from table(parsed form query source) <-> dashboard"""
if not db_service_entity:
logger.debug(f"db service is not ingested")
return None
sql_query = data_source_resp.data_source_resp.query
source_database_names = []
try:
if data_source_resp.DataSourceParameters:
data_source_dict = data_source_resp.DataSourceParameters
for db in data_source_dict.keys() or []:
source_database_names.append(data_source_dict[db].get("Database"))
except Exception as err:
logger.info(f"Error to parse database names from source:{err}")
return None
try:
lineage_parser = LineageParser(
sql_query,
ConnectionTypeDialectMapper.dialect_of(
db_service_entity.serviceType.value
)
if db_service_entity
else None,
)
lineage_details = LineageDetails(
source=LineageSource.DashboardLineage, sqlQuery=sql_query
)
for db_name in source_database_names:
for table in lineage_parser.source_tables:
database_schema_name, table = fqn.split(str(table))[-2:]
database_schema_name = self.check_database_schema_name(
database_schema_name
)
from_entities = search_table_entities(
metadata=self.metadata,
database=db_name,
service_name=db_service_entity.name.root,
database_schema=database_schema_name,
table=table,
)
for from_entity in from_entities:
if from_entity is not None and data_model_entity is not None:
columns = [
col.name.root for col in data_model_entity.columns
]
column_lineage = self._get_column_lineage(
from_entity, data_model_entity, columns
)
lineage_details.columnsLineage = column_lineage
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=Uuid(from_entity.id.root),
type=LINEAGE_MAP[type(from_entity)],
),
toEntity=EntityReference(
id=Uuid(data_model_entity.id.root),
type=LINEAGE_MAP[type(data_model_entity)],
),
lineageDetails=lineage_details,
)
)
)
except Exception as err:
yield Either(
left=StackTraceError(
name=dashboard_details.DashboardId,
error=f"Wild error ingesting table(query) <-> datamodel lineage {dashboard_details} - {err}",
stackTrace=traceback.format_exc(),
)
)
def _yield_lineage_from_s3(
self,
data_model_entity,
data_source_resp: DataSourceModel,
dashboard_details: DashboardDetail,
) -> Iterable[Either[AddLineageRequest]]:
"""yield lineage from s3 container <-> dashboard"""
try:
if data_source_resp and data_source_resp.DataSourceParameters:
data_source_dict = data_source_resp.DataSourceParameters
for s3_param in data_source_dict.keys() or []:
bucket_name = (
data_source_dict[s3_param]
.get("ManifestFileLocation", {})
.get("Bucket")
)
key_name = (
data_source_dict[s3_param]
.get("ManifestFileLocation", {})
.get("Key")
)
containers = self.metadata.es_search_container_by_path(
full_path=f"s3://{bucket_name}/{key_name}"
)
for container in containers:
if container is not None and data_model_entity is not None:
storage_entity = EntityReference(
id=Uuid(container.id.root),
type="container",
)
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=storage_entity,
toEntity=EntityReference(
id=Uuid(data_model_entity.id.root),
type=LINEAGE_MAP[type(data_model_entity)],
),
)
)
)
except Exception as err:
yield Either(
left=StackTraceError(
name=dashboard_details.DashboardId,
error=f"Wild error ingesting s3 <-> datamodel lineage {dashboard_details} - {err}",
stackTrace=traceback.format_exc(),
)
)
def _yield_lineage_from_table(
self,
data_model_entity,
data_source_resp: DataSourceModel,
dashboard_details: DashboardDetail,
db_service_entity,
) -> Iterable[Either[AddLineageRequest]]:
"""yield lineage from table <-> dashboard"""
try:
schema_name = data_source_resp.data_source_resp.schema_name
table_name = data_source_resp.data_source_resp.table_name
if data_source_resp and data_source_resp.DataSourceParameters:
data_source_dict = data_source_resp.DataSourceParameters
for db in data_source_dict.keys() or []:
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_entity.name.root,
database_name=data_source_dict[db].get("Database"),
schema_name=schema_name,
table_name=table_name,
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
if from_entity is not None and data_model_entity is not None:
columns = [col.name.root for col in data_model_entity.columns]
column_lineage = self._get_column_lineage(
from_entity, data_model_entity, columns
)
yield self._get_add_lineage_request(
to_entity=data_model_entity,
from_entity=from_entity,
column_lineage=column_lineage,
)
except Exception as err:
yield Either(
left=StackTraceError(
name=dashboard_details.DashboardId,
error=f"Wild error ingesting table <-> datamodel lineage {dashboard_details} - {err}",
stackTrace=traceback.format_exc(),
)
)
def _get_datamodel(self, datamodel_id: str):
datamodel_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=datamodel_id,
)
if datamodel_fqn:
return self.metadata.get_by_name(
entity=DashboardDataModel,
fqn=datamodel_fqn,
)
return None
def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
self, dashboard_details: DashboardDetail, db_service_name: str
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(
datamodel_id=datamodel.DataSource.DataSourceId
)
if isinstance(
datamodel.DataSource.data_source_resp, DataSourceRespQuery
):
yield from self._yield_lineage_from_query(
data_model_entity,
datamodel.DataSource,
dashboard_details,
db_service_entity,
)
elif isinstance(
datamodel.DataSource.data_source_resp, DataSourceRespS3
):
yield from self._yield_lineage_from_s3(
data_model_entity, datamodel.DataSource, dashboard_details
)
elif isinstance(datamodel.DataSource.data_source_resp, DataSourceResp):
yield from self._yield_lineage_from_table(
data_model_entity,
datamodel.DataSource,
dashboard_details,
db_service_entity,
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Lineage",
error=f"Error to yield dashboard lineage details for DB service name [{db_service_name}] and dashboard_name [{dashboard_details.Name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
def _get_column_info(self, data_model: DescribeDataSourceResponse):
"""Get column info"""
datasource_columns = []
for field in data_model.DataSource.data_source_resp.columns or []:
try:
col_parse = ColumnTypeParser._parse_datatype_string( # pylint: disable=protected-access
field.get("Type")
)
parsed_fields = {
"name": field.get("Name"),
"displayName": field.get("Name"),
"dataType": col_parse.get("dataType"),
}
datasource_columns.append(Column(**parsed_fields))
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error to yield datamodel column: {exc}")
return datasource_columns
def _get_dashboard_datamodels(self, dashboard_details: DashboardDetail) -> list:
"""Get dashboard datamodels"""
data_models = []
dataset_ids = []
try:
list_data_set_func = lambda kwargs: self.client.list_data_sets( # pylint: disable=unnecessary-lambda-assignment
**kwargs
@ -228,105 +520,95 @@ class QuicksightSource(DashboardServiceSource):
for dataset in data_set_summary_list or []
if dataset.get("Arn") in dashboard_details.Version.DataSetArns
}
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while processing datamodels for dashboard: {dashboard_details.Name}: {exc}"
)
for dataset_id in dataset_ids or []:
for data_source in (
list(
self.client.describe_data_set(
AwsAccountId=self.aws_account_id, DataSetId=dataset_id
)["DataSet"]["PhysicalTableMap"].values()
)
or []
):
try:
if not data_source.get("RelationalTable"):
raise KeyError(
f"We currently don't support lineage to {list(data_source.keys())}"
)
for dataset_id in dataset_ids or []:
data_source_list = self._describe_data_sets(dataset_id, dashboard_details)
for data_source in data_source_list:
try:
if data_source.get("RelationalTable"):
data_source_resp = DataSourceResp(
**data_source["RelationalTable"]
)
except (KeyError, ValidationError) as err:
data_source_resp = None
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield dashboard lineage details for DB service"
f" name [{db_service_name}]: {err}"
),
stackTrace=traceback.format_exc(),
)
elif data_source.get("CustomSql"):
data_source_resp = DataSourceRespQuery(
**data_source["CustomSql"]
)
if data_source_resp:
schema_name = data_source_resp.schema_name
table_name = data_source_resp.table_name
elif data_source.get("S3Source"):
data_source_resp = DataSourceRespS3(**data_source["S3Source"])
else:
raise KeyError(
f"We currently don't support data sources: {list(data_source.keys())}"
)
except (KeyError, ValidationError) as err:
data_source_resp = None
logger.info(
f"Error while processing datamodels for dashboard {dashboard_details.Name}: {err}"
)
continue
if data_source_resp:
try:
list_data_source_func = lambda kwargs: self.client.list_data_sources( # pylint: disable=unnecessary-lambda-assignment
**kwargs
)
data_source_summary_list = self._check_pagination(
listing_method=list_data_source_func,
entity_key="DataSources",
)
data_source_ids = [
data_source_arn["DataSourceId"]
for data_source_arn in data_source_summary_list or []
if data_source_arn["Arn"] in data_source_resp.datasource_arn
]
for data_source_id in data_source_ids or []:
data_source_resp = DescribeDataSourceResponse(
desribed_source = DescribeDataSourceResponse(
**self.client.describe_data_source(
AwsAccountId=self.aws_account_id,
DataSourceId=data_source_id,
)
).DataSource
if (
)
desribed_source.DataSource.data_source_resp = (
data_source_resp
and data_source_resp.DataSourceParameters
):
data_source_dict = data_source_resp.DataSourceParameters
for db in data_source_dict.keys() or []:
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
database_name=data_source_dict[db].get(
"Database"
),
schema_name=schema_name,
table_name=table_name,
skip_es_search=True,
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details.DashboardId,
)
to_entity = self.metadata.get_by_name(
entity=Dashboard,
fqn=to_fqn,
)
if (
from_entity is not None
and to_entity is not None
):
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name="Lineage",
error=f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}",
stackTrace=traceback.format_exc(),
)
data_models.append(desribed_source)
except Exception as err:
logger.info(
f"Error while processing data sources for dashboard {dashboard_details.Name}: {err}"
)
return data_models
def yield_datamodel(
self, dashboard_details: DashboardDetail
) -> Iterable[Either[CreateDashboardDataModelRequest]]:
"""
Method to ingest the Datasources(Published and Embedded) as DataModels from Quicksight
"""
self.data_models: List[
DescribeDataSourceResponse
] = self._get_dashboard_datamodels(dashboard_details)
for data_model in self.data_models:
try:
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(data_model.DataSource.DataSourceId),
displayName=data_model.DataSource.Name,
service=FullyQualifiedEntityName(
self.context.get().dashboard_service
),
dataModelType=DataModelType.QuickSightDataModel.value,
serviceType=self.service_connection.type.value,
columns=self._get_column_info(data_model),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(
left=StackTraceError(
name=data_model.DataSource.Name,
error=f"Error yielding Data Model [{data_model.DataSource.Name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
)

View File

@ -12,7 +12,7 @@
Pydantic Model to validate Quick Sight responses
"""
from typing import List, Optional
from typing import List, Optional, Union
from pydantic import BaseModel, Field
@ -21,6 +21,19 @@ class DataSourceResp(BaseModel):
datasource_arn: str = Field(alias="DataSourceArn")
schema_name: str = Field(alias="Schema")
table_name: str = Field(alias="Name")
columns: Optional[list] = Field(alias="InputColumns")
class DataSourceRespQuery(BaseModel):
datasource_arn: str = Field(alias="DataSourceArn")
query: str = Field(alias="SqlQuery")
table_name: str = Field(alias="Name")
columns: Optional[list] = Field(alias="Columns")
class DataSourceRespS3(BaseModel):
datasource_arn: str = Field(alias="DataSourceArn")
columns: Optional[list] = Field(alias="InputColumns")
class VersionSheet(BaseModel):
@ -51,8 +64,13 @@ class DashboardResp(BaseModel):
class DataSourceModel(BaseModel):
Name: str
Type: str
DataSourceId: str
DataSourceParameters: Optional[dict] = None
data_source_resp: Optional[
Union[DataSourceRespS3, DataSourceRespQuery, DataSourceResp]
] = None
class DescribeDataSourceResponse(BaseModel):

View File

@ -22,7 +22,8 @@
"LookMlView",
"LookMlExplore",
"PowerBIDataModel",
"QlikDataModel"
"QlikDataModel",
"QuickSightDataModel"
],
"javaEnums": [
{
@ -45,6 +46,9 @@
},
{
"name": "QlikDataModel"
},
{
"name": "QuickSightDataModel"
}
]
}