feature: add metabase sqllineage extraction (#6257)

* feature: add metabase sqllineage extraction

this solves issue-6190 and also fixes a bug in the existing lineage
that was trying to access key with a typo for the database name

* fix: sqllineage import
This commit is contained in:
Pedro Sereno 2022-07-23 08:44:31 -03:00 committed by GitHub
parent 78dd44ed32
commit 14b9439b98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,10 +11,17 @@
"""Metabase source module"""
import traceback
from logging.config import DictConfigurator
from typing import Iterable, List, Optional
import requests
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -46,6 +53,7 @@ from metadata.utils.helpers import (
replace_special_with,
)
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import search_table_entities
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
@ -210,45 +218,104 @@ class MetabaseSource(DashboardServiceSource):
for chart in chart_list:
try:
chart_details = chart["card"]
if not chart_details.get("table_id"):
if not "dataset_query" in chart_details:
continue
resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}")
if resp_tables.status_code == 200:
table = resp_tables.json()
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.source_config.dbServiceName,
database_name=table["db"]["details"]["db"],
schema_name=table.get("schema"),
table_name=table.get("display_name"),
if not "type" in chart_details["dataset_query"]:
continue
chart_dataset_query_type = chart_details["dataset_query"]["type"]
if chart_dataset_query_type == "native":
if not chart_details.get("database_id"):
continue
resp_database = self.req_get(
f"/api/database/{chart_details['database_id']}"
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_name,
)
to_entity = self.metadata.get_by_name(
entity=LineageDashboard,
fqn=to_fqn,
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
if resp_database.status_code != 200:
continue
database = resp_database.json()
query = chart_details["dataset_query"]["native"]["query"]
table_list = LineageRunner(query)
for table in table_list.source_tables:
database_schema_name, table = fqn.split(str(table))[-2:]
database_schema_name = (
None
if database_schema_name == "<default>"
else database_schema_name
)
yield lineage
from_entities = search_table_entities(
metadata=self.metadata,
database=database["details"]["dbname"],
service_name=self.source_config.dbServiceName,
database_schema=database_schema_name,
table=table,
)
for from_entity in from_entities:
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_name,
)
to_entity = self.metadata.get_by_name(
entity=LineageDashboard,
fqn=to_fqn,
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
)
yield lineage
# TODO: this method below only gets a single table, but if the chart of type query has a join the other
# table_ids will be ignored within a nested object
elif chart_dataset_query_type == "query":
if not chart_details.get("table_id"):
continue
resp_tables = self.req_get(
f"/api/table/{chart_details['table_id']}"
)
if resp_tables.status_code == 200:
table = resp_tables.json()
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.source_config.dbServiceName,
database_name=table["db"]["details"]["dbname"],
schema_name=table.get("schema"),
table_name=table.get("display_name"),
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_name,
)
to_entity = self.metadata.get_by_name(
entity=LineageDashboard,
fqn=to_fqn,
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
)
yield lineage
except Exception as err: # pylint: disable=broad-except,unused-variable
logger.debug(traceback.format_exc())
logger.error(err)