Fix #5260: Refactor Superset Lineage (#5290)

This commit is contained in:
Mayur Singal 2022-06-07 12:34:10 +05:30 committed by GitHub
parent 519b5b3a6b
commit 273b3e9c4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 125 deletions

View File

@ -609,6 +609,7 @@ class MetadataRestSink(Sink[Entity]):
f"{db_schema.name.__root__}.{to_table_name}",
db.service.name,
db_schema_and_table.database.name.__root__,
db_schema_and_table.table.viewDefinition.__root__,
)
except Exception as e:
logger.error("Failed to create view lineage")

View File

@ -30,7 +30,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardService,
DashboardServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
@ -38,11 +37,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils.fqn import FQN_SEPARATOR
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -106,46 +104,6 @@ def get_owners(owners_obj):
return owners
# pylint: disable=too-many-return-statements, too-many-branches
def get_service_type_from_database_uri(uri: str) -> str:
"""
Get service type from database URI
Args:
uri (str):
Returns:
str
"""
if uri.startswith("bigquery"):
return "bigquery"
if uri.startswith("druid"):
return "druid"
if uri.startswith("mssql"):
return "mssql"
if uri.startswith("jdbc:postgres:") and uri.index("redshift.amazonaws") > 0:
return "redshift"
if uri.startswith("snowflake"):
return "snowflake"
if uri.startswith("presto"):
return "presto"
if uri.startswith("trino"):
return "trino"
if uri.startswith("postgresql"):
return "postgres"
if uri.startswith("pinot"):
return "pinot"
if uri.startswith("oracle"):
return "oracle"
if uri.startswith("mysql"):
return "mysql"
if uri.startswith("mongodb"):
return "mongodb"
if uri.startswith("hive"):
return "hive"
return "external"
class SupersetSource(DashboardSourceService):
"""
Superset source class
@ -187,6 +145,22 @@ class SupersetSource(DashboardSourceService):
)
return cls(config, metadata_config)
def prepare(self):
"""
Fetching all charts available in superset
this step is done because fetch_total_charts api fetches all
the required information which is not available in fetch_charts_with_id api
"""
self.all_charts = {}
current_page = 0
page_size = 25
total_charts = self.client.fetch_total_charts()
while current_page * page_size <= total_charts:
charts = self.client.fetch_charts(current_page, page_size)
current_page += 1
for i in range(len(charts["result"])):
self.all_charts[charts["ids"][i]] = charts["result"][i]
def get_dashboards_list(self) -> Optional[List[object]]:
"""
Get List of all dashboards
@ -204,7 +178,7 @@ class SupersetSource(DashboardSourceService):
"""
Get Dashboard Name
"""
return dashboard_details["id"]
return dashboard_details["dashboard_title"]
def get_dashboard_details(self, dashboard: dict) -> dict:
"""
@ -216,7 +190,7 @@ class SupersetSource(DashboardSourceService):
"""
Method to Get Dashboard Entity
"""
self.fetch_dashboard_charts(dashboard_details)
yield from self.fetch_dashboard_charts(dashboard_details)
last_modified = (
dateparser.parse(dashboard_details.get("changed_on_utc", "now")).timestamp()
* 1000
@ -232,81 +206,94 @@ class SupersetSource(DashboardSourceService):
lastModified=last_modified,
)
def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]:
"""
Method to fetch chart ids linked to dashboard
"""
raw_position_data = dashboard_details.get("position_json", {})
if raw_position_data:
position_data = json.loads(raw_position_data)
return [
value.get("meta", {}).get("chartId", "unknown")
for key, value in position_data.items()
if key.startswith("CHART-")
]
return []
def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]:
"""
Get lineage between dashboard and data sources
"""
logger.info("Lineage not implemented for superset")
return None
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
datasource_fqn = (
self._get_datasource_fqn(chart_json.get("datasource_id"))
if chart_json.get("datasource_id")
else None
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
)
yield lineage
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_dashboard_charts(self, dashboard_details: dict) -> None:
"""
Metod to fetch charts linked to dashboard
"""
raw_position_data = dashboard_details.get("position_json", "{}")
self.charts = []
if raw_position_data is not None:
position_data = json.loads(raw_position_data)
for key, value in position_data.items():
if not key.startswith("CHART-"):
continue
chart_id = value.get("meta", {}).get("chartId", "unknown")
self.charts.append(chart_id)
for chart_id in self._get_charts_of_dashboard(dashboard_details):
yield from self._build_chart(self.all_charts.get(chart_id))
self.charts.append(chart_id)
def _get_service_type_from_database_id(self, database_id):
database_json = self.client.fetch_database(database_id)
sqlalchemy_uri = database_json.get("result", {}).get("sqlalchemy_uri")
return get_service_type_from_database_uri(sqlalchemy_uri)
def _get_datasource_from_id(self, datasource_id):
datasource_json = self.client.fetch_datasource(datasource_id)
schema_name = datasource_json.get("result", {}).get("schema")
table_name = datasource_json.get("result", {}).get("table_name")
database_id = datasource_json.get("result", {}).get("database", {}).get("id")
database_name = (
datasource_json.get("result", {}).get("database", {}).get("database_name")
)
if database_id and table_name:
platform = self._get_service_type_from_database_id(database_id)
dataset_fqn = (
f"{platform}{FQN_SEPARATOR}{database_name + FQN_SEPARATOR if database_name else ''}"
f"{schema_name + FQN_SEPARATOR if schema_name else ''}"
f"{table_name}"
def _get_datasource_fqn(self, datasource_id: str) -> Optional[str]:
if not self.source_config.dbServiceName:
return
try:
datasource_json = self.client.fetch_datasource(datasource_id)
database_json = self.client.fetch_database(
datasource_json["result"]["database"]["id"]
)
dataset_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=datasource_json["result"]["table_name"],
schema_name=datasource_json["result"]["schema"],
database_name=database_json["result"]["parameters"]["database"],
service_name=self.source_config.dbServiceName,
)
return dataset_fqn
return None
def _check_lineage(self, chart_id, datasource_text):
if datasource_text and hasattr(self.service_connection, "dbServiceName"):
chart_data = self.client.fetch_charts_with_id(chart_id)
dashboards = chart_data["result"].get("dashboards")
for dashboard in dashboards:
try:
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=f"{self.service_connection.dbServiceName}.{datasource_text}",
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=f"{self.config.serviceName}.{dashboard['id']}",
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
)
yield lineage
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
except KeyError:
logger.warning(f"Failed to fetch Datasource with id: {datasource_id}")
return None
# pylint: disable=too-many-locals
def _build_chart(self, chart_json: dict) -> Chart:
@ -339,10 +326,11 @@ class SupersetSource(DashboardSourceService):
chart_type=chart_json["viz_type"],
url=chart_json["url"],
owners=get_owners(chart_json["owners"]),
datasource_fqn=self._get_datasource_from_id(chart_json["datasource_id"]),
datasource_fqn=self._get_datasource_fqn(chart_json["datasource_id"])
if chart_json["datasource_id"]
else None,
lastModified=last_modified,
service=EntityReference(id=self.service.id, type="dashboardService"),
custom_props=custom_properties,
)
yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text"))
yield chart

View File

@ -130,7 +130,7 @@ def _create_lineage_by_table_name(
metadata=metadata,
service_name=service_name,
)
to_raw_name = get_formatted_entity_name(str(from_table))
to_raw_name = get_formatted_entity_name(str(to_table))
to_table_obj = split_raw_table_name(database=database, raw_name=to_raw_name)
to_entities = search_table_entities(
table=to_table_obj.get("table"),
@ -154,22 +154,23 @@ def _create_lineage_by_table_name(
lineage_details = LineageDetails(
sqlQuery=query, columnsLineage=col_lineage
)
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
)
)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
created_lineage = metadata.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
if lineage_details:
lineage.edge.lineageDetails = lineage_details
created_lineage = metadata.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
except Exception as err:
logger.debug(traceback.format_exc())