diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 2416a16b39f..6500ff63970 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -460,7 +460,6 @@ class MetadataRestSink(Sink[Entity]): def write_lineage(self, add_lineage: AddLineageRequest): try: - logger.info(add_lineage) created_lineage = self.metadata.add_lineage(add_lineage) logger.info(f"Successfully added Lineage {created_lineage}") self.status.records_written(f"Lineage: {created_lineage}") diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py index 022c836be2c..5831af0e702 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py @@ -55,12 +55,6 @@ class DashboardSourceService(Source, ABC): Get lineage between dashboard and data sources """ - @abstractmethod - def process_charts(self) -> Optional[Iterable[Chart]]: - """ - Method to fetch Charts - """ - @abstractmethod def fetch_dashboard_charts(self, dashboard: Any) -> Optional[Iterable[Chart]]: """ @@ -91,7 +85,6 @@ class DashboardSourceService(Source, ABC): def next_record(self) -> Iterable[Entity]: yield from self.process_dashboards() - yield from self.process_charts() or [] def process_dashboards( self, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker.py b/ingestion/src/metadata/ingestion/source/dashboard/looker.py index 15e3c90ce02..92bcfe1a0e1 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker.py @@ -125,13 +125,6 @@ class LookerSource(DashboardSourceService): logger.info("Lineage not implemented for Looker") return None - def process_charts(self) -> Optional[Iterable[Chart]]: - """ - Get lineage between dashboard and data sources - """ - logger.info("Fetch Charts Not implemented for Looker") - return None - def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]: """ Metod to fetch charts linked to dashboard diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index 5f2c5d39a03..f048ca68bf3 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -122,9 +122,6 @@ class MetabaseSource(DashboardSourceService): resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") return resp_dashboard.json() - def process_charts(self) -> Optional[Iterable[Chart]]: - return [] - def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: """ Method to Get Dashboard Entity diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 6441acdad40..4b003f2eeda 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -162,12 +162,6 @@ class PowerbiSource(DashboardSourceService): logger.debug(traceback.format_exc()) logger.error(err) - def process_charts(self) -> Iterable[Chart]: - """ - Method to fetch Charts - """ - logger.info("Fetch Charts Not implemented for PowerBi") - def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]: """Get chart method Args: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 6f7581169ab..114957e2d5b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -13,10 +13,13 @@ import uuid from dataclasses import dataclass, field from typing import Dict, Iterable, List, Optional -import requests +from sql_metadata import Parser from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.dashboard import ( + Dashboard as Lineage_Dashboard, +) from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import ( RedashConnection, ) @@ -26,13 +29,16 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException, SourceStatus from metadata.ingestion.models.table_metadata import Chart as ModelChart from metadata.ingestion.models.table_metadata import Dashboard from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger +from metadata.utils.sql_lineage import search_table_entities logger = ingestion_logger() @@ -92,90 +98,98 @@ class RedashSource(DashboardSourceService): """ Get Dashboard Details """ - return dashboard + return self.client.get_dashboard(dashboard["slug"]) def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: """ Method to Get Dashboard Entity """ yield from self.fetch_dashboard_charts(dashboard_details) - dashboard_id = dashboard_details["id"] - if dashboard_id is not None: - self.status.item_scanned_status() - dashboard_data = self.client.get_dashboard(dashboard_id) - dashboard_description = "" - for widgets in dashboard_data.get("widgets", []): - dashboard_description = widgets.get("text") - yield Dashboard( - id=uuid.uuid4(), - name=dashboard_id, - displayName=dashboard_details["name"], - description=dashboard_description if dashboard_details else "", - charts=self.dashboards_to_charts[dashboard_id], - usageSummary=None, - service=EntityReference(id=self.service.id, type="dashboardService"), - url=f"/dashboard/{dashboard_data.get('slug', '')}", - ) + self.status.item_scanned_status() + dashboard_description = "" + for widgets in dashboard_details.get("widgets", []): + dashboard_description = widgets.get("text") + yield Dashboard( + id=uuid.uuid4(), + name=dashboard_details.get("id"), + displayName=dashboard_details["name"], + description=dashboard_description if dashboard_details else "", + charts=self.dashboards_to_charts[dashboard_details.get("id")], + usageSummary=None, + service=EntityReference(id=self.service.id, type="dashboardService"), + url=f"/dashboard/{dashboard_details.get('slug', '')}", + ) - def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: + def get_lineage( + self, dashboard_details: dict + ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources + In redash we do not get table, database_schema or database name but we do get query + the lineage is being generated based on the query """ - logger.info("Lineage not implemented for redash") - return None - - def process_charts(self) -> Optional[Iterable[Chart]]: - """ - Metod to fetch Charts - """ - query_info = self.client.queries() - for query_info in query_info["results"]: - query_id = query_info["id"] - query_name = query_info["name"] - query_data = requests.get( - f"{self.service_connection.hostPort}/api/queries/{query_id}" - ).json() - for visualization in query_data.get("Visualizations", []): - chart_type = visualization.get("type", "") - chart_description = ( - visualization.get("description", "") - if visualization.get("description", "") - else "" - ) - yield Chart( - id=uuid.uuid4(), - name=query_id, - displayName=query_name, - chartType=chart_type, - service=EntityReference( - id=self.service.id, type="dashboardService" - ), - description=chart_description, + for widgets in dashboard_details.get("widgets", []): + visualization = widgets.get("visualization") + if not visualization.get("query"): + continue + table_list = [] + if visualization.get("query", {}).get("query"): + table_list = Parser(visualization["query"]["query"]) + for table in table_list.tables: + dataabase_schema = None + print(table) + if "." in table: + dataabase_schema, table = fqn.split(table)[-2:] + table_entities = search_table_entities( + metadata=self.metadata, + database=None, + service_name=self.source_config.dbServiceName, + database_schema=dataabase_schema, + table=table, ) + for from_entity in table_entities: + to_entity = self.metadata.get_by_name( + entity=Lineage_Dashboard, + fqn=fqn.build( + self.metadata, + Lineage_Dashboard, + service_name=self.config.serviceName, + dashboard_name=str(dashboard_details.get("id")), + ), + ) + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage - def fetch_dashboard_charts(self, dashboard: dict) -> Optional[Iterable[Chart]]: + def fetch_dashboard_charts( + self, dashboard_details: dict + ) -> Optional[Iterable[Chart]]: """ Metod to fetch charts linked to dashboard """ - dashboard_id = dashboard["id"] - if dashboard_id is not None: - dashboard_data = self.client.get_dashboard(dashboard_id) - self.dashboards_to_charts[dashboard_id] = [] - for widgets in dashboard_data.get("widgets", []): - visualization = widgets.get("visualization") - self.dashboards_to_charts[dashboard_id].append(widgets["id"]) - yield ModelChart( - name=widgets["id"], - displayName=visualization["query"]["name"] - if visualization and visualization["query"] - else "", - chart_type=visualization["type"] if visualization else "", - service=EntityReference( - id=self.service.id, type="dashboardService" - ), - url=f"/dashboard/{dashboard_data.get('slug', '')}", - description=visualization["description"] if visualization else "", - ) + self.dashboards_to_charts[dashboard_details.get("id")] = [] + for widgets in dashboard_details.get("widgets", []): + visualization = widgets.get("visualization") + self.dashboards_to_charts[dashboard_details.get("id")].append(widgets["id"]) + yield ModelChart( + name=widgets["id"], + displayName=visualization["query"]["name"] + if visualization and visualization["query"] + else "", + chart_type=visualization["type"] if visualization else "", + service=EntityReference(id=self.service.id, type="dashboardService"), + url=f"/dashboard/{dashboard_details.get('slug', '')}", + description=visualization["description"] if visualization else "", + ) def close(self): self.client.session.close() diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index e9903ec42c1..8e3302d811e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -14,7 +14,7 @@ Superset source module import json import traceback -from typing import Iterable, List, Optional +from typing import List, Optional import dateutil.parser as dateparser @@ -346,17 +346,3 @@ class SupersetSource(DashboardSourceService): ) yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text")) yield chart - - def process_charts(self) -> Optional[Iterable[Chart]]: - current_page = 0 - page_size = 25 - total_charts = self.client.fetch_total_charts() - while current_page * page_size <= total_charts: - charts = self.client.fetch_charts(current_page, page_size) - current_page += 1 - for chart_json in charts["result"]: - try: - yield from self._build_chart(chart_json) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(err)