From 14b9439b982b9a5f4faeb12d70ba91448e9f3cb9 Mon Sep 17 00:00:00 2001 From: Pedro Sereno Date: Sat, 23 Jul 2022 08:44:31 -0300 Subject: [PATCH] feature: add metabase sqllineage extraction (#6257) * feature: add metabase sqllineage extraction this solves issue-6190 and also fixes a bug in the existing lineage that was trying to access key with a typo for the database name * fix: sqllineage import --- .../ingestion/source/dashboard/metabase.py | 139 +++++++++++++----- 1 file changed, 103 insertions(+), 36 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index 5c976be048d..1cb0c4b9e10 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -11,10 +11,17 @@ """Metabase source module""" import traceback +from logging.config import DictConfigurator from typing import Iterable, List, Optional import requests +# Prevent sqllineage from modifying the logger config +# Disable the DictConfigurator.configure method while importing LineageRunner +configure = DictConfigurator.configure +DictConfigurator.configure = lambda _: None +from sqllineage.runner import LineageRunner + from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -46,6 +53,7 @@ from metadata.utils.helpers import ( replace_special_with, ) from metadata.utils.logger import ingestion_logger +from metadata.utils.sql_lineage import search_table_entities HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} @@ -210,45 +218,104 @@ class MetabaseSource(DashboardServiceSource): for chart in chart_list: try: chart_details = chart["card"] - if not chart_details.get("table_id"): + if not "dataset_query" in chart_details: continue - resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}") - if resp_tables.status_code == 200: - table = resp_tables.json() - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.source_config.dbServiceName, - database_name=table["db"]["details"]["db"], - schema_name=table.get("schema"), - table_name=table.get("display_name"), + if not "type" in chart_details["dataset_query"]: + continue + chart_dataset_query_type = chart_details["dataset_query"]["type"] + if chart_dataset_query_type == "native": + if not chart_details.get("database_id"): + continue + resp_database = self.req_get( + f"/api/database/{chart_details['database_id']}" ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, - ) - to_fqn = fqn.build( - self.metadata, - entity_type=LineageDashboard, - service_name=self.config.serviceName, - dashboard_name=dashboard_name, - ) - to_entity = self.metadata.get_by_name( - entity=LineageDashboard, - fqn=to_fqn, - ) - if from_entity and to_entity: - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id.__root__, type="table" - ), - toEntity=EntityReference( - id=to_entity.id.__root__, type="dashboard" - ), - ) + if resp_database.status_code != 200: + continue + database = resp_database.json() + + query = chart_details["dataset_query"]["native"]["query"] + table_list = LineageRunner(query) + for table in table_list.source_tables: + database_schema_name, table = fqn.split(str(table))[-2:] + database_schema_name = ( + None + if database_schema_name == "" + else database_schema_name ) - yield lineage + from_entities = search_table_entities( + metadata=self.metadata, + database=database["details"]["dbname"], + service_name=self.source_config.dbServiceName, + database_schema=database_schema_name, + table=table, + ) + for from_entity in from_entities: + to_fqn = fqn.build( + self.metadata, + entity_type=LineageDashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_name, + ) + to_entity = self.metadata.get_by_name( + entity=LineageDashboard, + fqn=to_fqn, + ) + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage + # TODO: this method below only gets a single table, but if the chart of type query has a join the other + # table_ids will be ignored within a nested object + elif chart_dataset_query_type == "query": + if not chart_details.get("table_id"): + continue + resp_tables = self.req_get( + f"/api/table/{chart_details['table_id']}" + ) + if resp_tables.status_code == 200: + table = resp_tables.json() + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.source_config.dbServiceName, + database_name=table["db"]["details"]["dbname"], + schema_name=table.get("schema"), + table_name=table.get("display_name"), + ) + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + to_fqn = fqn.build( + self.metadata, + entity_type=LineageDashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_name, + ) + to_entity = self.metadata.get_by_name( + entity=LineageDashboard, + fqn=to_fqn, + ) + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage except Exception as err: # pylint: disable=broad-except,unused-variable logger.debug(traceback.format_exc()) logger.error(err)