Fix #5317: Fixed Redash Lineage (#5318)

This commit is contained in:
Mayur Singal 2022-06-07 10:48:09 +05:30 committed by GitHub
parent b44da7733e
commit a2e25d7cc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 86 additions and 110 deletions

View File

@ -460,7 +460,6 @@ class MetadataRestSink(Sink[Entity]):
def write_lineage(self, add_lineage: AddLineageRequest): def write_lineage(self, add_lineage: AddLineageRequest):
try: try:
logger.info(add_lineage)
created_lineage = self.metadata.add_lineage(add_lineage) created_lineage = self.metadata.add_lineage(add_lineage)
logger.info(f"Successfully added Lineage {created_lineage}") logger.info(f"Successfully added Lineage {created_lineage}")
self.status.records_written(f"Lineage: {created_lineage}") self.status.records_written(f"Lineage: {created_lineage}")

View File

@ -55,12 +55,6 @@ class DashboardSourceService(Source, ABC):
Get lineage between dashboard and data sources Get lineage between dashboard and data sources
""" """
@abstractmethod
def process_charts(self) -> Optional[Iterable[Chart]]:
"""
Method to fetch Charts
"""
@abstractmethod @abstractmethod
def fetch_dashboard_charts(self, dashboard: Any) -> Optional[Iterable[Chart]]: def fetch_dashboard_charts(self, dashboard: Any) -> Optional[Iterable[Chart]]:
""" """
@ -91,7 +85,6 @@ class DashboardSourceService(Source, ABC):
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
yield from self.process_dashboards() yield from self.process_dashboards()
yield from self.process_charts() or []
def process_dashboards( def process_dashboards(
self, self,

View File

@ -125,13 +125,6 @@ class LookerSource(DashboardSourceService):
logger.info("Lineage not implemented for Looker") logger.info("Lineage not implemented for Looker")
return None return None
def process_charts(self) -> Optional[Iterable[Chart]]:
"""
Get lineage between dashboard and data sources
"""
logger.info("Fetch Charts Not implemented for Looker")
return None
def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]: def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]:
""" """
Metod to fetch charts linked to dashboard Metod to fetch charts linked to dashboard

View File

@ -122,9 +122,6 @@ class MetabaseSource(DashboardSourceService):
resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}")
return resp_dashboard.json() return resp_dashboard.json()
def process_charts(self) -> Optional[Iterable[Chart]]:
return []
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
""" """
Method to Get Dashboard Entity Method to Get Dashboard Entity

View File

@ -162,12 +162,6 @@ class PowerbiSource(DashboardSourceService):
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(err) logger.error(err)
def process_charts(self) -> Iterable[Chart]:
"""
Method to fetch Charts
"""
logger.info("Fetch Charts Not implemented for PowerBi")
def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]: def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]:
"""Get chart method """Get chart method
Args: Args:

View File

@ -13,10 +13,13 @@ import uuid
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
import requests from sql_metadata import Parser
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import ( from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import (
RedashConnection, RedashConnection,
) )
@ -26,13 +29,16 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.table_metadata import Chart as ModelChart from metadata.ingestion.models.table_metadata import Chart as ModelChart
from metadata.ingestion.models.table_metadata import Dashboard from metadata.ingestion.models.table_metadata import Dashboard
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import search_table_entities
logger = ingestion_logger() logger = ingestion_logger()
@ -92,90 +98,98 @@ class RedashSource(DashboardSourceService):
""" """
Get Dashboard Details Get Dashboard Details
""" """
return dashboard return self.client.get_dashboard(dashboard["slug"])
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
""" """
Method to Get Dashboard Entity Method to Get Dashboard Entity
""" """
yield from self.fetch_dashboard_charts(dashboard_details) yield from self.fetch_dashboard_charts(dashboard_details)
dashboard_id = dashboard_details["id"] self.status.item_scanned_status()
if dashboard_id is not None: dashboard_description = ""
self.status.item_scanned_status() for widgets in dashboard_details.get("widgets", []):
dashboard_data = self.client.get_dashboard(dashboard_id) dashboard_description = widgets.get("text")
dashboard_description = "" yield Dashboard(
for widgets in dashboard_data.get("widgets", []): id=uuid.uuid4(),
dashboard_description = widgets.get("text") name=dashboard_details.get("id"),
yield Dashboard( displayName=dashboard_details["name"],
id=uuid.uuid4(), description=dashboard_description if dashboard_details else "",
name=dashboard_id, charts=self.dashboards_to_charts[dashboard_details.get("id")],
displayName=dashboard_details["name"], usageSummary=None,
description=dashboard_description if dashboard_details else "", service=EntityReference(id=self.service.id, type="dashboardService"),
charts=self.dashboards_to_charts[dashboard_id], url=f"/dashboard/{dashboard_details.get('slug', '')}",
usageSummary=None, )
service=EntityReference(id=self.service.id, type="dashboardService"),
url=f"/dashboard/{dashboard_data.get('slug', '')}",
)
def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: def get_lineage(
self, dashboard_details: dict
) -> Optional[Iterable[AddLineageRequest]]:
""" """
Get lineage between dashboard and data sources Get lineage between dashboard and data sources
In redash we do not get table, database_schema or database name but we do get query
the lineage is being generated based on the query
""" """
logger.info("Lineage not implemented for redash") for widgets in dashboard_details.get("widgets", []):
return None visualization = widgets.get("visualization")
if not visualization.get("query"):
def process_charts(self) -> Optional[Iterable[Chart]]: continue
""" table_list = []
Metod to fetch Charts if visualization.get("query", {}).get("query"):
""" table_list = Parser(visualization["query"]["query"])
query_info = self.client.queries() for table in table_list.tables:
for query_info in query_info["results"]: dataabase_schema = None
query_id = query_info["id"] print(table)
query_name = query_info["name"] if "." in table:
query_data = requests.get( dataabase_schema, table = fqn.split(table)[-2:]
f"{self.service_connection.hostPort}/api/queries/{query_id}" table_entities = search_table_entities(
).json() metadata=self.metadata,
for visualization in query_data.get("Visualizations", []): database=None,
chart_type = visualization.get("type", "") service_name=self.source_config.dbServiceName,
chart_description = ( database_schema=dataabase_schema,
visualization.get("description", "") table=table,
if visualization.get("description", "")
else ""
)
yield Chart(
id=uuid.uuid4(),
name=query_id,
displayName=query_name,
chartType=chart_type,
service=EntityReference(
id=self.service.id, type="dashboardService"
),
description=chart_description,
) )
for from_entity in table_entities:
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=fqn.build(
self.metadata,
Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details.get("id")),
),
)
if from_entity and to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
)
yield lineage
def fetch_dashboard_charts(self, dashboard: dict) -> Optional[Iterable[Chart]]: def fetch_dashboard_charts(
self, dashboard_details: dict
) -> Optional[Iterable[Chart]]:
""" """
Metod to fetch charts linked to dashboard Metod to fetch charts linked to dashboard
""" """
dashboard_id = dashboard["id"] self.dashboards_to_charts[dashboard_details.get("id")] = []
if dashboard_id is not None: for widgets in dashboard_details.get("widgets", []):
dashboard_data = self.client.get_dashboard(dashboard_id) visualization = widgets.get("visualization")
self.dashboards_to_charts[dashboard_id] = [] self.dashboards_to_charts[dashboard_details.get("id")].append(widgets["id"])
for widgets in dashboard_data.get("widgets", []): yield ModelChart(
visualization = widgets.get("visualization") name=widgets["id"],
self.dashboards_to_charts[dashboard_id].append(widgets["id"]) displayName=visualization["query"]["name"]
yield ModelChart( if visualization and visualization["query"]
name=widgets["id"], else "",
displayName=visualization["query"]["name"] chart_type=visualization["type"] if visualization else "",
if visualization and visualization["query"] service=EntityReference(id=self.service.id, type="dashboardService"),
else "", url=f"/dashboard/{dashboard_details.get('slug', '')}",
chart_type=visualization["type"] if visualization else "", description=visualization["description"] if visualization else "",
service=EntityReference( )
id=self.service.id, type="dashboardService"
),
url=f"/dashboard/{dashboard_data.get('slug', '')}",
description=visualization["description"] if visualization else "",
)
def close(self): def close(self):
self.client.session.close() self.client.session.close()

View File

@ -14,7 +14,7 @@ Superset source module
import json import json
import traceback import traceback
from typing import Iterable, List, Optional from typing import List, Optional
import dateutil.parser as dateparser import dateutil.parser as dateparser
@ -346,17 +346,3 @@ class SupersetSource(DashboardSourceService):
) )
yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text")) yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text"))
yield chart yield chart
def process_charts(self) -> Optional[Iterable[Chart]]:
current_page = 0
page_size = 25
total_charts = self.client.fetch_total_charts()
while current_page * page_size <= total_charts:
charts = self.client.fetch_charts(current_page, page_size)
current_page += 1
for chart_json in charts["result"]:
try:
yield from self._build_chart(chart_json)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)