From 273b3e9c4b799c5990da4633e20732852888a307 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 7 Jun 2022 12:34:10 +0530 Subject: [PATCH] Fix #5260: Refactor Superset Lineage (#5290) --- .../metadata/ingestion/sink/metadata_rest.py | 1 + .../ingestion/source/dashboard/superset.py | 206 +++++++++--------- ingestion/src/metadata/utils/sql_lineage.py | 33 +-- 3 files changed, 115 insertions(+), 125 deletions(-) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 6500ff63970..629cadacde3 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -609,6 +609,7 @@ class MetadataRestSink(Sink[Entity]): f"{db_schema.name.__root__}.{to_table_name}", db.service.name, db_schema_and_table.database.name.__root__, + db_schema_and_table.table.viewDefinition.__root__, ) except Exception as e: logger.error("Failed to create view lineage") diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index 8e3302d811e..68932678f35 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -30,7 +30,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.generated.schema.entity.services.dashboardService import ( - DashboardService, DashboardServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( @@ -38,11 +37,10 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService -from metadata.utils.fqn import FQN_SEPARATOR +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -106,46 +104,6 @@ def get_owners(owners_obj): return owners -# pylint: disable=too-many-return-statements, too-many-branches -def get_service_type_from_database_uri(uri: str) -> str: - """ - Get service type from database URI - - Args: - uri (str): - - Returns: - str - """ - if uri.startswith("bigquery"): - return "bigquery" - if uri.startswith("druid"): - return "druid" - if uri.startswith("mssql"): - return "mssql" - if uri.startswith("jdbc:postgres:") and uri.index("redshift.amazonaws") > 0: - return "redshift" - if uri.startswith("snowflake"): - return "snowflake" - if uri.startswith("presto"): - return "presto" - if uri.startswith("trino"): - return "trino" - if uri.startswith("postgresql"): - return "postgres" - if uri.startswith("pinot"): - return "pinot" - if uri.startswith("oracle"): - return "oracle" - if uri.startswith("mysql"): - return "mysql" - if uri.startswith("mongodb"): - return "mongodb" - if uri.startswith("hive"): - return "hive" - return "external" - - class SupersetSource(DashboardSourceService): """ Superset source class @@ -187,6 +145,22 @@ class SupersetSource(DashboardSourceService): ) return cls(config, metadata_config) + def prepare(self): + """ + Fetching all charts available in superset + this step is done because fetch_total_charts api fetches all + the required information which is not available in fetch_charts_with_id api + """ + self.all_charts = {} + current_page = 0 + page_size = 25 + total_charts = self.client.fetch_total_charts() + while current_page * page_size <= total_charts: + charts = self.client.fetch_charts(current_page, page_size) + current_page += 1 + for i in range(len(charts["result"])): + self.all_charts[charts["ids"][i]] = charts["result"][i] + def get_dashboards_list(self) -> Optional[List[object]]: """ Get List of all dashboards @@ -204,7 +178,7 @@ class SupersetSource(DashboardSourceService): """ Get Dashboard Name """ - return dashboard_details["id"] + return dashboard_details["dashboard_title"] def get_dashboard_details(self, dashboard: dict) -> dict: """ @@ -216,7 +190,7 @@ class SupersetSource(DashboardSourceService): """ Method to Get Dashboard Entity """ - self.fetch_dashboard_charts(dashboard_details) + yield from self.fetch_dashboard_charts(dashboard_details) last_modified = ( dateparser.parse(dashboard_details.get("changed_on_utc", "now")).timestamp() * 1000 @@ -232,81 +206,94 @@ class SupersetSource(DashboardSourceService): lastModified=last_modified, ) + def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]: + """ + Method to fetch chart ids linked to dashboard + """ + raw_position_data = dashboard_details.get("position_json", {}) + if raw_position_data: + position_data = json.loads(raw_position_data) + return [ + value.get("meta", {}).get("chartId", "unknown") + for key, value in position_data.items() + if key.startswith("CHART-") + ] + return [] + def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: """ Get lineage between dashboard and data sources """ - logger.info("Lineage not implemented for superset") - return None + for chart_id in self._get_charts_of_dashboard(dashboard_details): + chart_json = self.all_charts.get(chart_id) + datasource_fqn = ( + self._get_datasource_fqn(chart_json.get("datasource_id")) + if chart_json.get("datasource_id") + else None + ) + if not datasource_fqn: + continue + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=datasource_fqn, + ) + try: + dashboard_fqn = fqn.build( + self.metadata, + entity_type=Lineage_Dashboard, + service_name=self.config.serviceName, + dashboard_name=str(dashboard_details["id"]), + ) + to_entity = self.metadata.get_by_name( + entity=Lineage_Dashboard, + fqn=dashboard_fqn, + ) + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) def fetch_dashboard_charts(self, dashboard_details: dict) -> None: """ Metod to fetch charts linked to dashboard """ - raw_position_data = dashboard_details.get("position_json", "{}") self.charts = [] - if raw_position_data is not None: - position_data = json.loads(raw_position_data) - for key, value in position_data.items(): - if not key.startswith("CHART-"): - continue - chart_id = value.get("meta", {}).get("chartId", "unknown") - self.charts.append(chart_id) + for chart_id in self._get_charts_of_dashboard(dashboard_details): + yield from self._build_chart(self.all_charts.get(chart_id)) + self.charts.append(chart_id) - def _get_service_type_from_database_id(self, database_id): - database_json = self.client.fetch_database(database_id) - sqlalchemy_uri = database_json.get("result", {}).get("sqlalchemy_uri") - return get_service_type_from_database_uri(sqlalchemy_uri) - - def _get_datasource_from_id(self, datasource_id): - datasource_json = self.client.fetch_datasource(datasource_id) - schema_name = datasource_json.get("result", {}).get("schema") - table_name = datasource_json.get("result", {}).get("table_name") - database_id = datasource_json.get("result", {}).get("database", {}).get("id") - database_name = ( - datasource_json.get("result", {}).get("database", {}).get("database_name") - ) - - if database_id and table_name: - platform = self._get_service_type_from_database_id(database_id) - dataset_fqn = ( - f"{platform}{FQN_SEPARATOR}{database_name + FQN_SEPARATOR if database_name else ''}" - f"{schema_name + FQN_SEPARATOR if schema_name else ''}" - f"{table_name}" + def _get_datasource_fqn(self, datasource_id: str) -> Optional[str]: + if not self.source_config.dbServiceName: + return + try: + datasource_json = self.client.fetch_datasource(datasource_id) + database_json = self.client.fetch_database( + datasource_json["result"]["database"]["id"] + ) + dataset_fqn = fqn.build( + self.metadata, + entity_type=Table, + table_name=datasource_json["result"]["table_name"], + schema_name=datasource_json["result"]["schema"], + database_name=database_json["result"]["parameters"]["database"], + service_name=self.source_config.dbServiceName, ) return dataset_fqn - return None - - def _check_lineage(self, chart_id, datasource_text): - if datasource_text and hasattr(self.service_connection, "dbServiceName"): - chart_data = self.client.fetch_charts_with_id(chart_id) - dashboards = chart_data["result"].get("dashboards") - for dashboard in dashboards: - try: - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=f"{self.service_connection.dbServiceName}.{datasource_text}", - ) - to_entity = self.metadata.get_by_name( - entity=Lineage_Dashboard, - fqn=f"{self.config.serviceName}.{dashboard['id']}", - ) - if from_entity and to_entity: - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id.__root__, type="table" - ), - toEntity=EntityReference( - id=to_entity.id.__root__, type="dashboard" - ), - ) - ) - yield lineage - - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(err) + except KeyError: + logger.warning(f"Failed to fetch Datasource with id: {datasource_id}") + return None # pylint: disable=too-many-locals def _build_chart(self, chart_json: dict) -> Chart: @@ -339,10 +326,11 @@ class SupersetSource(DashboardSourceService): chart_type=chart_json["viz_type"], url=chart_json["url"], owners=get_owners(chart_json["owners"]), - datasource_fqn=self._get_datasource_from_id(chart_json["datasource_id"]), + datasource_fqn=self._get_datasource_fqn(chart_json["datasource_id"]) + if chart_json["datasource_id"] + else None, lastModified=last_modified, service=EntityReference(id=self.service.id, type="dashboardService"), custom_props=custom_properties, ) - yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text")) yield chart diff --git a/ingestion/src/metadata/utils/sql_lineage.py b/ingestion/src/metadata/utils/sql_lineage.py index 42199a1a575..c5c5e188869 100644 --- a/ingestion/src/metadata/utils/sql_lineage.py +++ b/ingestion/src/metadata/utils/sql_lineage.py @@ -130,7 +130,7 @@ def _create_lineage_by_table_name( metadata=metadata, service_name=service_name, ) - to_raw_name = get_formatted_entity_name(str(from_table)) + to_raw_name = get_formatted_entity_name(str(to_table)) to_table_obj = split_raw_table_name(database=database, raw_name=to_raw_name) to_entities = search_table_entities( table=to_table_obj.get("table"), @@ -154,22 +154,23 @@ def _create_lineage_by_table_name( lineage_details = LineageDetails( sqlQuery=query, columnsLineage=col_lineage ) - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id.__root__, - type="table", - ), - toEntity=EntityReference( - id=to_entity.id.__root__, - type="table", - ), + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, + type="table", + ), + toEntity=EntityReference( + id=to_entity.id.__root__, + type="table", + ), + ) ) - ) - if lineage_details: - lineage.edge.lineageDetails = lineage_details - created_lineage = metadata.add_lineage(lineage) - logger.info(f"Successfully added Lineage {created_lineage}") + if lineage_details: + lineage.edge.lineageDetails = lineage_details + created_lineage = metadata.add_lineage(lineage) + logger.info(f"Successfully added Lineage {created_lineage}") except Exception as err: logger.debug(traceback.format_exc())