diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index c053170665e..5268cfcd9e9 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -170,7 +170,9 @@ class TopologyRunnerMixin(Generic[C]): yield entity_request # Improve validation logic entity = self.metadata.get_by_name( - entity=stage.type_, fqn=entity_fqn + entity=stage.type_, + fqn=entity_fqn, + fields=["*"], # Get all the available data from the Entity ) tries -= 1 else: diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py new file mode 100644 index 00000000000..796315f50ec --- /dev/null +++ b/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py @@ -0,0 +1,47 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Mixin class containing Table specific methods + +To be used by OpenMetadata class +""" + +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.type.usageRequest import UsageRequest +from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.utils import ometa_logger + +logger = ometa_logger() + + +class OMetaDashboardMixin: + """ + OpenMetadata API methods related to Dashboards and Charts. + + To be inherited by OpenMetadata + """ + + client: REST + + def publish_dashboard_usage( + self, dashboard: Dashboard, dashboard_usage_request: UsageRequest + ) -> None: + """ + POST usage details for a Dashboard + + :param dashboard: Table Entity to update + :param dashboard_usage_request: Usage data to add + """ + resp = self.client.post( + f"/usage/dashboard/{dashboard.id.__root__}", + data=dashboard_usage_request.json(), + ) + logger.debug("published dashboard usage %s", resp) diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 4026cb4db28..88de9ecbb60 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -18,6 +18,8 @@ working with OpenMetadata entities. import urllib from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union +from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin + try: from typing import get_args except ImportError: @@ -129,6 +131,7 @@ class OpenMetadata( OMetaServiceMixin, ESMixin, OMetaServerMixin, + OMetaDashboardMixin, Generic[T, C], ): """ @@ -537,7 +540,7 @@ class OpenMetadata( entity: Type[T], fields: Optional[List[str]] = None, after: str = None, - limit: int = 1000, + limit: int = 100, params: Optional[Dict[str, str]] = None, ) -> EntityList[T]: """ diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 383e4de2ced..bf153fe2712 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -54,6 +54,7 @@ from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.database.database_service import DataModelLink from metadata.utils.logger import ingestion_logger from metadata.utils.sql_lineage import ( @@ -140,6 +141,8 @@ class MetadataRestSink(Sink[Entity]): self.write_pipeline_status(record) elif isinstance(record, DataModelLink): self.write_datamodel(record) + elif isinstance(record, DashboardUsage): + self.write_dashboard_usage(record) else: logging.debug(f"Processing Create request {type(record)}") self.write_create_request(record) @@ -172,6 +175,20 @@ class MetadataRestSink(Sink[Entity]): table=table, data_model=datamodel_link.datamodel ) + def write_dashboard_usage(self, dashboard_usage: DashboardUsage) -> None: + """ + Send a UsageRequest update to a dashboard entity + :param dashboard_usage: dashboard entity and usage request + """ + + self.metadata.publish_dashboard_usage( + dashboard=dashboard_usage.dashboard, + dashboard_usage_request=dashboard_usage.usage, + ) + logger.info( + f"Successfully ingested usage for {dashboard_usage.dashboard.fullyQualifiedName.__root__}" + ) + def write_tables(self, db_schema_and_table: OMetaDatabaseAndTable): try: db_request = CreateDatabaseRequest( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 6a1039689eb..5ddb0d39687 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -15,6 +15,8 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, Iterable, List, Optional +from pydantic import BaseModel + from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -34,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipelin from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.topology_runner import TopologyRunnerMixin from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory @@ -51,6 +54,15 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +class DashboardUsage(BaseModel): + """ + Wrapper to handle type at the sink + """ + + dashboard: Dashboard + usage: UsageRequest + + class DashboardServiceTopology(ServiceTopology): """ Defines the hierarchy in Dashboard Services. @@ -110,6 +122,14 @@ class DashboardServiceTopology(ServiceTopology): ack_sink=False, nullable=True, ), + NodeStage( + type_=UsageRequest, + context="usage", + processor="yield_dashboard_usage", + consumer=["dashboard_service"], + ack_sink=False, + nullable=True, + ), ], ) @@ -144,7 +164,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): """ @abstractmethod - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: Any ) -> Optional[Iterable[AddLineageRequest]]: """ @@ -177,17 +197,34 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): Get Dashboard Details """ + def yield_dashboard_lineage( + self, dashboard_details: Any + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Yields lineage if config is enabled + """ + if self.source_config.dbServiceName: + yield from self.yield_dashboard_lineage_details(dashboard_details) + def yield_tag(self, *args, **kwargs) -> Optional[Iterable[OMetaTagAndCategory]]: """ Method to fetch dashboard tags """ - return # Dashboard does not supports fetching tags except Tableau + return # Dashboard does not support fetching tags except Tableau - def yield_owner(self, *args, **kwargs) -> Optional[CreateUserRequest]: + def yield_owner(self, *args, **kwargs) -> Optional[Iterable[CreateUserRequest]]: """ Method to fetch dashboard owner """ - return # Dashboard does not supports fetching owner details except Tableau + return # Dashboard does not support fetching owner details except Tableau + + def yield_dashboard_usage( + self, *args, **kwargs + ) -> Optional[Iterable[DashboardUsage]]: + """ + Method to pick up dashboard usage data + """ + return # Dashboard usage currently only available for Looker status: DashboardSourceStatus source_config: DashboardServiceMetadataPipeline @@ -242,7 +279,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): self.get_dashboard_name(dashboard_details), ): self.status.filter( - self.get_dashboard_name(dashboard), + self.get_dashboard_name(dashboard_details), "Dashboard Pattern not Allowed", ) continue diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker.py b/ingestion/src/metadata/ingestion/source/dashboard/looker.py index ed716ac8453..aa24a6ac615 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker.py @@ -8,13 +8,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import traceback -from typing import Any, Iterable, List, Optional +from datetime import datetime +from typing import Iterable, List, Optional, Set, cast + +from looker_sdk.error import SDKError +from looker_sdk.sdk.api31.models import Query +from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard +from looker_sdk.sdk.api40.models import ( + DashboardBase, + DashboardElement, + LookmlModelExplore, +) from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.dashboard import ( + Dashboard as LineageDashboard, +) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import ( LookerConnection, ) @@ -24,17 +38,23 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.ingestion.source.dashboard.dashboard_service import ( + DashboardServiceSource, + DashboardUsage, +) +from metadata.utils import fqn from metadata.utils.filters import filter_by_chart -from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type +from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -class LookerSource(DashboardSourceService): +class LookerSource(DashboardServiceSource): config: WorkflowSource metadata_config: OpenMetadataConnection @@ -44,7 +64,7 @@ class LookerSource(DashboardSourceService): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.charts = [] + self.today = datetime.now().strftime("%Y-%m-%d") @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): @@ -56,19 +76,19 @@ class LookerSource(DashboardSourceService): ) return cls(config, metadata_config) - def get_dashboards_list(self) -> Optional[List[Any]]: + def get_dashboards_list(self) -> Optional[List[DashboardBase]]: """ Get List of all dashboards """ return self.client.all_dashboards(fields="id") - def get_dashboard_name(self, dashboard_details: object) -> str: + def get_dashboard_name(self, dashboard_details: DashboardBase) -> str: """ Get Dashboard Name """ return dashboard_details.id - def get_dashboard_details(self, dashboard: object) -> dict: + def get_dashboard_details(self, dashboard: DashboardBase) -> LookerDashboard: """ Get Dashboard Details """ @@ -78,63 +98,267 @@ class LookerSource(DashboardSourceService): "dashboard_elements", "dashboard_filters", "view_count", + "description", + "folder", ] return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields)) - def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest: + def yield_dashboard( + self, dashboard_details: LookerDashboard + ) -> CreateDashboardRequest: """ Method to Get Dashboard Entity """ + yield CreateDashboardRequest( name=dashboard_details.id, displayName=dashboard_details.title, description=dashboard_details.description or "", - charts=get_chart_entities_from_id( - chart_ids=self.charts, - metadata=self.metadata, - service_name=self.config.serviceName, - ), + charts=[ + EntityReference(id=chart.id.__root__, type="chart") + for chart in self.context.charts + ], dashboardUrl=f"/dashboards/{dashboard_details.id}", - service=EntityReference(id=self.service.id, type="dashboardService"), + service=EntityReference( + id=self.context.dashboard_service.id.__root__, type="dashboardService" + ), ) - def get_lineage(self, dashboard_details) -> Optional[AddLineageRequest]: + @staticmethod + def _clean_table_name(table_name: str) -> str: """ - Get lineage between dashboard and data sources + sql_table_names might be renamed when defining + an explore. E.g., customers as cust + :param table_name: explore table name + :return: clean table name """ - logger.info("Lineage not implemented for Looker") - return None - def fetch_dashboard_charts( - self, dashboard_details + return table_name.lower().split("as")[0].strip() + + def _add_sql_table(self, query: Query, dashboard_sources: Set[str]): + """ + Add the SQL table information to the dashboard_sources. + + Updates the seen dashboards. + + :param query: Looker query, from a look or result_maker + :param dashboard_sources: seen tables so far + """ + try: + explore: LookmlModelExplore = self.client.lookml_model_explore( + query.model, query.view + ) + table_name = explore.sql_table_name + + if table_name: + dashboard_sources.add(self._clean_table_name(table_name)) + + except SDKError as err: + logger.error( + f"Cannot get explore from model={query.model}, view={query.view} - {err}" + ) + + def get_dashboard_sources(self, dashboard_details: LookerDashboard) -> Set[str]: + """ + Set of source tables to build lineage for the processed dashboard + """ + dashboard_sources: Set[str] = set() + + for chart in cast( + Iterable[DashboardElement], dashboard_details.dashboard_elements + ): + if chart.query and chart.query.view: + self._add_sql_table(chart.query, dashboard_sources) + if chart.look and chart.look.query and chart.look.query.view: + self._add_sql_table(chart.look.query, dashboard_sources) + if ( + chart.result_maker + and chart.result_maker.query + and chart.result_maker.query.view + ): + self._add_sql_table(chart.result_maker.query, dashboard_sources) + + return dashboard_sources + + def yield_dashboard_lineage_details( + self, dashboard_details: LookerDashboard + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Get lineage between charts and data sources. + + We look at: + - chart.query + - chart.look (chart.look.query) + - chart.result_maker + """ + datasource_list = self.get_dashboard_sources(dashboard_details) + + to_fqn = fqn.build( + self.metadata, + entity_type=LineageDashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_details.id, + ) + to_entity = self.metadata.get_by_name( + entity=LineageDashboard, + fqn=to_fqn, + ) + + for source in datasource_list: + try: + source_elements = fqn.split_table_name(table_name=source) + + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.source_config.dbServiceName, + database_name=source_elements["database"], + schema_name=source_elements["database_schema"], + table_name=source_elements["table"], + ) + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage + + except (Exception, IndexError) as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error building lineage - {err}") + + def yield_dashboard_chart( + self, dashboard_details: LookerDashboard ) -> Optional[Iterable[CreateChartRequest]]: """ - Metod to fetch charts linked to dashboard + Method to fetch charts linked to dashboard """ - self.charts = [] - for dashboard_elements in dashboard_details.dashboard_elements: + for chart in cast( + Iterable[DashboardElement], dashboard_details.dashboard_elements + ): try: if filter_by_chart( chart_filter_pattern=self.source_config.chartFilterPattern, - chart_name=dashboard_elements.id, + chart_name=chart.id, ): - self.status.filter(dashboard_elements.id, "Chart filtered out") + self.status.filter(chart.id, "Chart filtered out") continue - om_dashboard_elements = CreateChartRequest( - name=dashboard_elements.id, - displayName=dashboard_elements.title or dashboard_elements.id, + + if not chart.id: + logger.debug(f"Found chart {chart} without id. Skipping.") + continue + + yield CreateChartRequest( + name=chart.id, + displayName=chart.title or chart.id, description="", - chartType=get_standard_chart_type(dashboard_elements.type).value, - chartUrl=f"/dashboard_elements/{dashboard_elements.id}", + chartType=get_standard_chart_type(chart.type).value, + chartUrl=f"/dashboard_elements/{chart.id}", service=EntityReference( - id=self.service.id, type="dashboardService" + id=self.context.dashboard_service.id.__root__, + type="dashboardService", ), ) - if not dashboard_elements.id: - raise ValueError("Chart(Dashboard Element) without ID") - self.status.scanned(dashboard_elements.id) - yield om_dashboard_elements - self.charts.append(dashboard_elements.id) + self.status.scanned(chart.id) + except Exception as err: logger.debug(traceback.format_exc()) logger.error(err) + + def yield_dashboard_usage( + self, dashboard_details: LookerDashboard + ) -> Optional[DashboardUsage]: + """ + The dashboard.view_count gives us the total number of views. However, we need to + pass the views for each day (execution). + + In this function we will first validate if the usageSummary + returns us some usage for today's date. If so, we will stop the + execution. + + Otherwise, we will add the difference between the usage from the last time + the usage was reported and today's view_count from the dashboard. + + Example usage summary from OM API: + "usageSummary": { + "dailyStats": { + "count": 51, + "percentileRank": 0.0 + }, + "date": "2022-06-23", + "monthlyStats": { + "count": 105, + "percentileRank": 0.0 + }, + "weeklyStats": { + "count": 105, + "percentileRank": 0.0 + } + }, + :param dashboard_details: Looker Dashboard + :return: UsageRequest, if not computed + """ + + dashboard: Dashboard = self.context.dashboard + + try: + current_views = dashboard_details.view_count + + if not current_views: + logger.debug(f"No usage to report for {dashboard_details.title}") + + if not dashboard.usageSummary: + logger.info( + f"Yielding fresh usage for {dashboard.fullyQualifiedName.__root__}" + ) + yield DashboardUsage( + dashboard=dashboard, + usage=UsageRequest(date=self.today, count=current_views), + ) + + elif ( + str(dashboard.usageSummary.date.__root__) != self.today + or not dashboard.usageSummary.dailyStats.count + ): + + latest_usage = dashboard.usageSummary.dailyStats.count + + new_usage = current_views - latest_usage + if new_usage < 0: + raise ValueError( + f"Wrong computation of usage difference. Got new_usage={new_usage}." + ) + + logger.info( + f"Yielding new usage for {dashboard.fullyQualifiedName.__root__}" + ) + yield DashboardUsage( + dashboard=dashboard, + usage=UsageRequest( + date=self.today, count=current_views - latest_usage + ), + ) + + else: + logger.debug( + f"Latest usage {dashboard.usageSummary} vs. today {self.today}. Nothing to compute." + ) + logger.info( + f"Usage already informed for {dashboard.fullyQualifiedName.__root__}" + ) + + except Exception as err: + logger.error( + f"Exception computing dashboard usage for {dashboard.fullyQualifiedName.__root__} - {err}" + ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index adee1318b6d..5c976be048d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -193,7 +193,7 @@ class MetabaseSource(DashboardServiceSource): logger.debug(traceback.format_exc()) continue - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """Get lineage method diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode.py b/ingestion/src/metadata/ingestion/source/dashboard/mode.py index 5465b3f7e46..07abd031024 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode.py @@ -124,7 +124,7 @@ class ModeSource(DashboardServiceSource): ), ) - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """Get lineage method diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 5a260840547..045f34c2bb4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -114,14 +114,12 @@ class PowerbiSource(DashboardServiceSource): ), ) - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ - if not self.source_config.dbServiceName: - return try: charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get( "value" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 50c45795bf0..4efdb0e2155 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -110,7 +110,7 @@ class RedashSource(DashboardServiceSource): ) self.status.scanned(dashboard_details["name"]) - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """ @@ -118,8 +118,6 @@ class RedashSource(DashboardServiceSource): In redash we do not get table, database_schema or database name but we do get query the lineage is being generated based on the query """ - if not self.source_config.dbServiceName: - return for widgets in dashboard_details.get("widgets", []): visualization = widgets.get("visualization") if not visualization.get("query"): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index ad6fd94e71c..b62aa4506c4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -202,14 +202,12 @@ class SupersetSource(DashboardServiceSource): ] return [] - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ - if not self.source_config.dbServiceName: - return for chart_id in self._get_charts_of_dashboard(dashboard_details): chart_json = self.all_charts.get(chart_id) datasource_fqn = ( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index 1ea2dda551b..1360d42248b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -150,11 +150,13 @@ class TableauSource(DashboardServiceSource): """ return dashboard - def yield_owner(self, dashboard_details: dict) -> Optional[EntityReference]: + def yield_owner( + self, dashboard_details: dict + ) -> Optional[Iterable[CreateUserRequest]]: """Get dashboard owner Args: - owner: + dashboard_details: Returns: Optional[EntityReference] """ @@ -222,20 +224,30 @@ class TableauSource(DashboardServiceSource): ), ) - def yield_dashboard_lineage( + def yield_dashboard_lineage_details( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ - if not self.source_config.dbServiceName: - return datasource_list = ( get_workbook_connections_dataframe(self.client, dashboard_details.get("id")) .get("datasource_name") .tolist() ) dashboard_name = dashboard_details.get("name") + + to_fqn = fqn.build( + self.metadata, + entity_type=LineageDashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_name, + ) + to_entity = self.metadata.get_by_name( + entity=LineageDashboard, + fqn=to_fqn, + ) + for datasource in datasource_list: try: schema_and_table_name = ( @@ -255,16 +267,6 @@ class TableauSource(DashboardServiceSource): entity=Table, fqn=from_fqn, ) - to_fqn = fqn.build( - self.metadata, - entity_type=LineageDashboard, - service_name=self.config.serviceName, - dashboard_name=dashboard_name, - ) - to_entity = self.metadata.get_by_name( - entity=LineageDashboard, - fqn=to_fqn, - ) if from_entity and to_entity: lineage = AddLineageRequest( edge=EntitiesEdge( diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py index e8e5a95aa07..687e7a417ba 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py @@ -11,7 +11,6 @@ """ Snowflake usage module """ - from typing import Iterable, Iterator, Union from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 3f36dcd5bdd..409a237419b 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -608,7 +608,7 @@ def _(connection: LookerConnection, verbose: bool = False): os.environ["LOOKERSDK_CLIENT_SECRET"] = connection.password.get_secret_value() if not os.environ.get("LOOKERSDK_BASE_URL"): os.environ["LOOKERSDK_BASE_URL"] = connection.hostPort - client = looker_sdk.init31() + client = looker_sdk.init40() return LookerClient(client=client) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 4588d609186..ababab1b2b2 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -14,7 +14,7 @@ Filter information has been taken from the ES indexes definitions """ import re -from typing import List, Optional, Type, TypeVar, Union +from typing import Dict, List, Optional, Type, TypeVar, Union from antlr4.CommonTokenStream import CommonTokenStream from antlr4.error.ErrorStrategy import BailErrorStrategy @@ -270,3 +270,19 @@ def _( column_name: str, ) -> str: return _build(service_name, database_name, schema_name, table_name, column_name) + + +def split_table_name(table_name: str) -> Dict[str, Optional[str]]: + """ + Given a table name, try to extract database, schema and + table info + :param table_name: raw table name + :return: dict with data + """ + + details: List[str] = split(table_name) + # Pad None to the left until size of list is 3 + full_details: List[Optional[str]] = ([None] * (3 - len(details))) + details + + database, database_schema, table = full_details + return {"database": database, "database_schema": database_schema, "table": table} diff --git a/ingestion/tests/unit/filter_pattern.py b/ingestion/tests/unit/filter_pattern.py index 335edaa6e89..0d1f0ecdb85 100644 --- a/ingestion/tests/unit/filter_pattern.py +++ b/ingestion/tests/unit/filter_pattern.py @@ -15,7 +15,7 @@ Validate filter patterns from unittest import TestCase from metadata.generated.schema.type.filterPattern import FilterPattern -from metadata.utils.filters import filter_by_fqn +from metadata.utils.filters import filter_by_dashboard, filter_by_fqn class FilterPatternTests(TestCase): @@ -23,7 +23,8 @@ class FilterPatternTests(TestCase): Validate filter patterns """ - def test_filter_by_fqn(self): + @staticmethod + def test_filter_by_fqn(): """ Check FQN filters """ @@ -36,3 +37,17 @@ class FilterPatternTests(TestCase): assert not filter_by_fqn(fqn_filter_schema, "service.my_db.my_schema.table") assert filter_by_fqn(fqn_filter_schema, "service.another_db.my_schema.table") + + @staticmethod + def test_filter_numbers(): + """ + Check numeric filtering + """ + + num_filter = FilterPattern(includes=["^[4]"]) + + assert not filter_by_dashboard(num_filter, "40") + assert not filter_by_dashboard(num_filter, "41") + + assert filter_by_dashboard(num_filter, "50") + assert filter_by_dashboard(num_filter, "54")