Metabase lineage (#3003)

* metabase-lineage

* metabase-lineage-completed
This commit is contained in:
codingwithabhi 2022-02-27 04:26:09 +05:30 committed by GitHub
parent 105f2dd6cd
commit 83c9b75a5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,6 +19,7 @@ from urllib.parse import quote
import requests
from pydantic import SecretStr
from sqllineage.runner import LineageRunner
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import Dashboard as Model_Dashboard
@ -39,11 +40,10 @@ from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.utils.helpers import get_dashboard_service_or_create
from metadata.utils.helpers import get_dashboard_service_or_create, ingest_lineage
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
logger: logging.Logger = logging.getLogger(__name__)
@ -135,6 +135,7 @@ class MetabaseSource(Source[Entity]):
def next_record(self) -> Iterable[Entity]:
yield from self.get_dashboards()
self.get_cards()
def get_charts(self, charts) -> Iterable[Chart]:
"""Get chart method
@ -258,3 +259,36 @@ class MetabaseSource(Source[Entity]):
def prepare(self):
pass
def get_card_detail(self, card_list):
for card in card_list:
try:
card_details = card["card"]
card_detail_resp = self.req_get(f"/api/card/{card_details['id']}")
if card_detail_resp.status_code == 200:
card = card_detail_resp.json()
raw_query = (
card_details.get("dataset_query", {})
.get("native", {})
.get("query", "")
)
query_info = {
"sql": raw_query,
"from_type": "table",
"to_type": "table",
"service_name": self.config.service_name,
}
ingest_lineage(query_info, self.metadata_config)
except Exception as e:
logger.error(repr(e))
def get_cards(self):
"""Get cards method"""
resp_dashboards = self.req_get("/api/dashboard")
if resp_dashboards.status_code == 200:
for dashboard in resp_dashboards.json():
resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}")
dashboard_details = resp_dashboard.json()
card_list = dashboard_details["ordered_cards"]
self.get_card_detail(card_list)