mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-24 08:58:06 +00:00
parent
7fa47c31d8
commit
0144a1d46a
@ -170,7 +170,9 @@ class TopologyRunnerMixin(Generic[C]):
|
||||
yield entity_request
|
||||
# Improve validation logic
|
||||
entity = self.metadata.get_by_name(
|
||||
entity=stage.type_, fqn=entity_fqn
|
||||
entity=stage.type_,
|
||||
fqn=entity_fqn,
|
||||
fields=["*"], # Get all the available data from the Entity
|
||||
)
|
||||
tries -= 1
|
||||
else:
|
||||
|
@ -0,0 +1,47 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Mixin class containing Table specific methods
|
||||
|
||||
To be used by OpenMetadata class
|
||||
"""
|
||||
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.type.usageRequest import UsageRequest
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.ingestion.ometa.utils import ometa_logger
|
||||
|
||||
logger = ometa_logger()
|
||||
|
||||
|
||||
class OMetaDashboardMixin:
|
||||
"""
|
||||
OpenMetadata API methods related to Dashboards and Charts.
|
||||
|
||||
To be inherited by OpenMetadata
|
||||
"""
|
||||
|
||||
client: REST
|
||||
|
||||
def publish_dashboard_usage(
|
||||
self, dashboard: Dashboard, dashboard_usage_request: UsageRequest
|
||||
) -> None:
|
||||
"""
|
||||
POST usage details for a Dashboard
|
||||
|
||||
:param dashboard: Table Entity to update
|
||||
:param dashboard_usage_request: Usage data to add
|
||||
"""
|
||||
resp = self.client.post(
|
||||
f"/usage/dashboard/{dashboard.id.__root__}",
|
||||
data=dashboard_usage_request.json(),
|
||||
)
|
||||
logger.debug("published dashboard usage %s", resp)
|
@ -18,6 +18,8 @@ working with OpenMetadata entities.
|
||||
import urllib
|
||||
from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union
|
||||
|
||||
from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin
|
||||
|
||||
try:
|
||||
from typing import get_args
|
||||
except ImportError:
|
||||
@ -129,6 +131,7 @@ class OpenMetadata(
|
||||
OMetaServiceMixin,
|
||||
ESMixin,
|
||||
OMetaServerMixin,
|
||||
OMetaDashboardMixin,
|
||||
Generic[T, C],
|
||||
):
|
||||
"""
|
||||
@ -537,7 +540,7 @@ class OpenMetadata(
|
||||
entity: Type[T],
|
||||
fields: Optional[List[str]] = None,
|
||||
after: str = None,
|
||||
limit: int = 1000,
|
||||
limit: int = 100,
|
||||
params: Optional[Dict[str, str]] = None,
|
||||
) -> EntityList[T]:
|
||||
"""
|
||||
|
@ -54,6 +54,7 @@ from metadata.ingestion.models.table_tests import OMetaTableTest
|
||||
from metadata.ingestion.models.user import OMetaUserProfile
|
||||
from metadata.ingestion.ometa.client import APIError
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
|
||||
from metadata.ingestion.source.database.database_service import DataModelLink
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.sql_lineage import (
|
||||
@ -140,6 +141,8 @@ class MetadataRestSink(Sink[Entity]):
|
||||
self.write_pipeline_status(record)
|
||||
elif isinstance(record, DataModelLink):
|
||||
self.write_datamodel(record)
|
||||
elif isinstance(record, DashboardUsage):
|
||||
self.write_dashboard_usage(record)
|
||||
else:
|
||||
logging.debug(f"Processing Create request {type(record)}")
|
||||
self.write_create_request(record)
|
||||
@ -172,6 +175,20 @@ class MetadataRestSink(Sink[Entity]):
|
||||
table=table, data_model=datamodel_link.datamodel
|
||||
)
|
||||
|
||||
def write_dashboard_usage(self, dashboard_usage: DashboardUsage) -> None:
|
||||
"""
|
||||
Send a UsageRequest update to a dashboard entity
|
||||
:param dashboard_usage: dashboard entity and usage request
|
||||
"""
|
||||
|
||||
self.metadata.publish_dashboard_usage(
|
||||
dashboard=dashboard_usage.dashboard,
|
||||
dashboard_usage_request=dashboard_usage.usage,
|
||||
)
|
||||
logger.info(
|
||||
f"Successfully ingested usage for {dashboard_usage.dashboard.fullyQualifiedName.__root__}"
|
||||
)
|
||||
|
||||
def write_tables(self, db_schema_and_table: OMetaDatabaseAndTable):
|
||||
try:
|
||||
db_request = CreateDatabaseRequest(
|
||||
|
@ -15,6 +15,8 @@ from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Iterable, List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.api.data.createChart import CreateChartRequest
|
||||
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
@ -34,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipelin
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.generated.schema.type.usageRequest import UsageRequest
|
||||
from metadata.ingestion.api.source import Source, SourceStatus
|
||||
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
|
||||
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
|
||||
@ -51,6 +54,15 @@ from metadata.utils.logger import ingestion_logger
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
class DashboardUsage(BaseModel):
|
||||
"""
|
||||
Wrapper to handle type at the sink
|
||||
"""
|
||||
|
||||
dashboard: Dashboard
|
||||
usage: UsageRequest
|
||||
|
||||
|
||||
class DashboardServiceTopology(ServiceTopology):
|
||||
"""
|
||||
Defines the hierarchy in Dashboard Services.
|
||||
@ -110,6 +122,14 @@ class DashboardServiceTopology(ServiceTopology):
|
||||
ack_sink=False,
|
||||
nullable=True,
|
||||
),
|
||||
NodeStage(
|
||||
type_=UsageRequest,
|
||||
context="usage",
|
||||
processor="yield_dashboard_usage",
|
||||
consumer=["dashboard_service"],
|
||||
ack_sink=False,
|
||||
nullable=True,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
@ -144,7 +164,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: Any
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
@ -177,17 +197,34 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
Get Dashboard Details
|
||||
"""
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
self, dashboard_details: Any
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Yields lineage if config is enabled
|
||||
"""
|
||||
if self.source_config.dbServiceName:
|
||||
yield from self.yield_dashboard_lineage_details(dashboard_details)
|
||||
|
||||
def yield_tag(self, *args, **kwargs) -> Optional[Iterable[OMetaTagAndCategory]]:
|
||||
"""
|
||||
Method to fetch dashboard tags
|
||||
"""
|
||||
return # Dashboard does not supports fetching tags except Tableau
|
||||
return # Dashboard does not support fetching tags except Tableau
|
||||
|
||||
def yield_owner(self, *args, **kwargs) -> Optional[CreateUserRequest]:
|
||||
def yield_owner(self, *args, **kwargs) -> Optional[Iterable[CreateUserRequest]]:
|
||||
"""
|
||||
Method to fetch dashboard owner
|
||||
"""
|
||||
return # Dashboard does not supports fetching owner details except Tableau
|
||||
return # Dashboard does not support fetching owner details except Tableau
|
||||
|
||||
def yield_dashboard_usage(
|
||||
self, *args, **kwargs
|
||||
) -> Optional[Iterable[DashboardUsage]]:
|
||||
"""
|
||||
Method to pick up dashboard usage data
|
||||
"""
|
||||
return # Dashboard usage currently only available for Looker
|
||||
|
||||
status: DashboardSourceStatus
|
||||
source_config: DashboardServiceMetadataPipeline
|
||||
@ -242,7 +279,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
self.get_dashboard_name(dashboard_details),
|
||||
):
|
||||
self.status.filter(
|
||||
self.get_dashboard_name(dashboard),
|
||||
self.get_dashboard_name(dashboard_details),
|
||||
"Dashboard Pattern not Allowed",
|
||||
)
|
||||
continue
|
||||
|
@ -8,13 +8,27 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import traceback
|
||||
from typing import Any, Iterable, List, Optional
|
||||
from datetime import datetime
|
||||
from typing import Iterable, List, Optional, Set, cast
|
||||
|
||||
from looker_sdk.error import SDKError
|
||||
from looker_sdk.sdk.api31.models import Query
|
||||
from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard
|
||||
from looker_sdk.sdk.api40.models import (
|
||||
DashboardBase,
|
||||
DashboardElement,
|
||||
LookmlModelExplore,
|
||||
)
|
||||
|
||||
from metadata.generated.schema.api.data.createChart import CreateChartRequest
|
||||
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.dashboard import (
|
||||
Dashboard as LineageDashboard,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import (
|
||||
LookerConnection,
|
||||
)
|
||||
@ -24,17 +38,23 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.generated.schema.type.usageRequest import UsageRequest
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
|
||||
from metadata.ingestion.source.dashboard.dashboard_service import (
|
||||
DashboardServiceSource,
|
||||
DashboardUsage,
|
||||
)
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.filters import filter_by_chart
|
||||
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
|
||||
from metadata.utils.helpers import get_standard_chart_type
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
class LookerSource(DashboardSourceService):
|
||||
class LookerSource(DashboardServiceSource):
|
||||
config: WorkflowSource
|
||||
metadata_config: OpenMetadataConnection
|
||||
|
||||
@ -44,7 +64,7 @@ class LookerSource(DashboardSourceService):
|
||||
metadata_config: OpenMetadataConnection,
|
||||
):
|
||||
super().__init__(config, metadata_config)
|
||||
self.charts = []
|
||||
self.today = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
|
||||
@ -56,19 +76,19 @@ class LookerSource(DashboardSourceService):
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def get_dashboards_list(self) -> Optional[List[Any]]:
|
||||
def get_dashboards_list(self) -> Optional[List[DashboardBase]]:
|
||||
"""
|
||||
Get List of all dashboards
|
||||
"""
|
||||
return self.client.all_dashboards(fields="id")
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: object) -> str:
|
||||
def get_dashboard_name(self, dashboard_details: DashboardBase) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details.id
|
||||
|
||||
def get_dashboard_details(self, dashboard: object) -> dict:
|
||||
def get_dashboard_details(self, dashboard: DashboardBase) -> LookerDashboard:
|
||||
"""
|
||||
Get Dashboard Details
|
||||
"""
|
||||
@ -78,63 +98,267 @@ class LookerSource(DashboardSourceService):
|
||||
"dashboard_elements",
|
||||
"dashboard_filters",
|
||||
"view_count",
|
||||
"description",
|
||||
"folder",
|
||||
]
|
||||
return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields))
|
||||
|
||||
def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest:
|
||||
def yield_dashboard(
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> CreateDashboardRequest:
|
||||
"""
|
||||
Method to Get Dashboard Entity
|
||||
"""
|
||||
|
||||
yield CreateDashboardRequest(
|
||||
name=dashboard_details.id,
|
||||
displayName=dashboard_details.title,
|
||||
description=dashboard_details.description or "",
|
||||
charts=get_chart_entities_from_id(
|
||||
chart_ids=self.charts,
|
||||
metadata=self.metadata,
|
||||
service_name=self.config.serviceName,
|
||||
),
|
||||
charts=[
|
||||
EntityReference(id=chart.id.__root__, type="chart")
|
||||
for chart in self.context.charts
|
||||
],
|
||||
dashboardUrl=f"/dashboards/{dashboard_details.id}",
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
service=EntityReference(
|
||||
id=self.context.dashboard_service.id.__root__, type="dashboardService"
|
||||
),
|
||||
)
|
||||
|
||||
def get_lineage(self, dashboard_details) -> Optional[AddLineageRequest]:
|
||||
@staticmethod
|
||||
def _clean_table_name(table_name: str) -> str:
|
||||
"""
|
||||
Get lineage between dashboard and data sources
|
||||
sql_table_names might be renamed when defining
|
||||
an explore. E.g., customers as cust
|
||||
:param table_name: explore table name
|
||||
:return: clean table name
|
||||
"""
|
||||
logger.info("Lineage not implemented for Looker")
|
||||
return None
|
||||
|
||||
def fetch_dashboard_charts(
|
||||
self, dashboard_details
|
||||
return table_name.lower().split("as")[0].strip()
|
||||
|
||||
def _add_sql_table(self, query: Query, dashboard_sources: Set[str]):
|
||||
"""
|
||||
Add the SQL table information to the dashboard_sources.
|
||||
|
||||
Updates the seen dashboards.
|
||||
|
||||
:param query: Looker query, from a look or result_maker
|
||||
:param dashboard_sources: seen tables so far
|
||||
"""
|
||||
try:
|
||||
explore: LookmlModelExplore = self.client.lookml_model_explore(
|
||||
query.model, query.view
|
||||
)
|
||||
table_name = explore.sql_table_name
|
||||
|
||||
if table_name:
|
||||
dashboard_sources.add(self._clean_table_name(table_name))
|
||||
|
||||
except SDKError as err:
|
||||
logger.error(
|
||||
f"Cannot get explore from model={query.model}, view={query.view} - {err}"
|
||||
)
|
||||
|
||||
def get_dashboard_sources(self, dashboard_details: LookerDashboard) -> Set[str]:
|
||||
"""
|
||||
Set of source tables to build lineage for the processed dashboard
|
||||
"""
|
||||
dashboard_sources: Set[str] = set()
|
||||
|
||||
for chart in cast(
|
||||
Iterable[DashboardElement], dashboard_details.dashboard_elements
|
||||
):
|
||||
if chart.query and chart.query.view:
|
||||
self._add_sql_table(chart.query, dashboard_sources)
|
||||
if chart.look and chart.look.query and chart.look.query.view:
|
||||
self._add_sql_table(chart.look.query, dashboard_sources)
|
||||
if (
|
||||
chart.result_maker
|
||||
and chart.result_maker.query
|
||||
and chart.result_maker.query.view
|
||||
):
|
||||
self._add_sql_table(chart.result_maker.query, dashboard_sources)
|
||||
|
||||
return dashboard_sources
|
||||
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Get lineage between charts and data sources.
|
||||
|
||||
We look at:
|
||||
- chart.query
|
||||
- chart.look (chart.look.query)
|
||||
- chart.result_maker
|
||||
"""
|
||||
datasource_list = self.get_dashboard_sources(dashboard_details)
|
||||
|
||||
to_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=LineageDashboard,
|
||||
service_name=self.config.serviceName,
|
||||
dashboard_name=dashboard_details.id,
|
||||
)
|
||||
to_entity = self.metadata.get_by_name(
|
||||
entity=LineageDashboard,
|
||||
fqn=to_fqn,
|
||||
)
|
||||
|
||||
for source in datasource_list:
|
||||
try:
|
||||
source_elements = fqn.split_table_name(table_name=source)
|
||||
|
||||
from_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=self.source_config.dbServiceName,
|
||||
database_name=source_elements["database"],
|
||||
schema_name=source_elements["database_schema"],
|
||||
table_name=source_elements["table"],
|
||||
)
|
||||
from_entity = self.metadata.get_by_name(
|
||||
entity=Table,
|
||||
fqn=from_fqn,
|
||||
)
|
||||
|
||||
if from_entity and to_entity:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=from_entity.id.__root__, type="table"
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=to_entity.id.__root__, type="dashboard"
|
||||
),
|
||||
)
|
||||
)
|
||||
yield lineage
|
||||
|
||||
except (Exception, IndexError) as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Error building lineage - {err}")
|
||||
|
||||
def yield_dashboard_chart(
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> Optional[Iterable[CreateChartRequest]]:
|
||||
"""
|
||||
Metod to fetch charts linked to dashboard
|
||||
Method to fetch charts linked to dashboard
|
||||
"""
|
||||
self.charts = []
|
||||
for dashboard_elements in dashboard_details.dashboard_elements:
|
||||
for chart in cast(
|
||||
Iterable[DashboardElement], dashboard_details.dashboard_elements
|
||||
):
|
||||
try:
|
||||
if filter_by_chart(
|
||||
chart_filter_pattern=self.source_config.chartFilterPattern,
|
||||
chart_name=dashboard_elements.id,
|
||||
chart_name=chart.id,
|
||||
):
|
||||
self.status.filter(dashboard_elements.id, "Chart filtered out")
|
||||
self.status.filter(chart.id, "Chart filtered out")
|
||||
continue
|
||||
om_dashboard_elements = CreateChartRequest(
|
||||
name=dashboard_elements.id,
|
||||
displayName=dashboard_elements.title or dashboard_elements.id,
|
||||
|
||||
if not chart.id:
|
||||
logger.debug(f"Found chart {chart} without id. Skipping.")
|
||||
continue
|
||||
|
||||
yield CreateChartRequest(
|
||||
name=chart.id,
|
||||
displayName=chart.title or chart.id,
|
||||
description="",
|
||||
chartType=get_standard_chart_type(dashboard_elements.type).value,
|
||||
chartUrl=f"/dashboard_elements/{dashboard_elements.id}",
|
||||
chartType=get_standard_chart_type(chart.type).value,
|
||||
chartUrl=f"/dashboard_elements/{chart.id}",
|
||||
service=EntityReference(
|
||||
id=self.service.id, type="dashboardService"
|
||||
id=self.context.dashboard_service.id.__root__,
|
||||
type="dashboardService",
|
||||
),
|
||||
)
|
||||
if not dashboard_elements.id:
|
||||
raise ValueError("Chart(Dashboard Element) without ID")
|
||||
self.status.scanned(dashboard_elements.id)
|
||||
yield om_dashboard_elements
|
||||
self.charts.append(dashboard_elements.id)
|
||||
self.status.scanned(chart.id)
|
||||
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(err)
|
||||
|
||||
def yield_dashboard_usage(
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> Optional[DashboardUsage]:
|
||||
"""
|
||||
The dashboard.view_count gives us the total number of views. However, we need to
|
||||
pass the views for each day (execution).
|
||||
|
||||
In this function we will first validate if the usageSummary
|
||||
returns us some usage for today's date. If so, we will stop the
|
||||
execution.
|
||||
|
||||
Otherwise, we will add the difference between the usage from the last time
|
||||
the usage was reported and today's view_count from the dashboard.
|
||||
|
||||
Example usage summary from OM API:
|
||||
"usageSummary": {
|
||||
"dailyStats": {
|
||||
"count": 51,
|
||||
"percentileRank": 0.0
|
||||
},
|
||||
"date": "2022-06-23",
|
||||
"monthlyStats": {
|
||||
"count": 105,
|
||||
"percentileRank": 0.0
|
||||
},
|
||||
"weeklyStats": {
|
||||
"count": 105,
|
||||
"percentileRank": 0.0
|
||||
}
|
||||
},
|
||||
:param dashboard_details: Looker Dashboard
|
||||
:return: UsageRequest, if not computed
|
||||
"""
|
||||
|
||||
dashboard: Dashboard = self.context.dashboard
|
||||
|
||||
try:
|
||||
current_views = dashboard_details.view_count
|
||||
|
||||
if not current_views:
|
||||
logger.debug(f"No usage to report for {dashboard_details.title}")
|
||||
|
||||
if not dashboard.usageSummary:
|
||||
logger.info(
|
||||
f"Yielding fresh usage for {dashboard.fullyQualifiedName.__root__}"
|
||||
)
|
||||
yield DashboardUsage(
|
||||
dashboard=dashboard,
|
||||
usage=UsageRequest(date=self.today, count=current_views),
|
||||
)
|
||||
|
||||
elif (
|
||||
str(dashboard.usageSummary.date.__root__) != self.today
|
||||
or not dashboard.usageSummary.dailyStats.count
|
||||
):
|
||||
|
||||
latest_usage = dashboard.usageSummary.dailyStats.count
|
||||
|
||||
new_usage = current_views - latest_usage
|
||||
if new_usage < 0:
|
||||
raise ValueError(
|
||||
f"Wrong computation of usage difference. Got new_usage={new_usage}."
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Yielding new usage for {dashboard.fullyQualifiedName.__root__}"
|
||||
)
|
||||
yield DashboardUsage(
|
||||
dashboard=dashboard,
|
||||
usage=UsageRequest(
|
||||
date=self.today, count=current_views - latest_usage
|
||||
),
|
||||
)
|
||||
|
||||
else:
|
||||
logger.debug(
|
||||
f"Latest usage {dashboard.usageSummary} vs. today {self.today}. Nothing to compute."
|
||||
)
|
||||
logger.info(
|
||||
f"Usage already informed for {dashboard.fullyQualifiedName.__root__}"
|
||||
)
|
||||
|
||||
except Exception as err:
|
||||
logger.error(
|
||||
f"Exception computing dashboard usage for {dashboard.fullyQualifiedName.__root__} - {err}"
|
||||
)
|
||||
|
@ -193,7 +193,7 @@ class MetabaseSource(DashboardServiceSource):
|
||||
logger.debug(traceback.format_exc())
|
||||
continue
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""Get lineage method
|
||||
|
@ -124,7 +124,7 @@ class ModeSource(DashboardServiceSource):
|
||||
),
|
||||
)
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""Get lineage method
|
||||
|
@ -114,14 +114,12 @@ class PowerbiSource(DashboardServiceSource):
|
||||
),
|
||||
)
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Get lineage between dashboard and data sources
|
||||
"""
|
||||
if not self.source_config.dbServiceName:
|
||||
return
|
||||
try:
|
||||
charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get(
|
||||
"value"
|
||||
|
@ -110,7 +110,7 @@ class RedashSource(DashboardServiceSource):
|
||||
)
|
||||
self.status.scanned(dashboard_details["name"])
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
@ -118,8 +118,6 @@ class RedashSource(DashboardServiceSource):
|
||||
In redash we do not get table, database_schema or database name but we do get query
|
||||
the lineage is being generated based on the query
|
||||
"""
|
||||
if not self.source_config.dbServiceName:
|
||||
return
|
||||
for widgets in dashboard_details.get("widgets", []):
|
||||
visualization = widgets.get("visualization")
|
||||
if not visualization.get("query"):
|
||||
|
@ -202,14 +202,12 @@ class SupersetSource(DashboardServiceSource):
|
||||
]
|
||||
return []
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Get lineage between dashboard and data sources
|
||||
"""
|
||||
if not self.source_config.dbServiceName:
|
||||
return
|
||||
for chart_id in self._get_charts_of_dashboard(dashboard_details):
|
||||
chart_json = self.all_charts.get(chart_id)
|
||||
datasource_fqn = (
|
||||
|
@ -150,11 +150,13 @@ class TableauSource(DashboardServiceSource):
|
||||
"""
|
||||
return dashboard
|
||||
|
||||
def yield_owner(self, dashboard_details: dict) -> Optional[EntityReference]:
|
||||
def yield_owner(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[CreateUserRequest]]:
|
||||
"""Get dashboard owner
|
||||
|
||||
Args:
|
||||
owner:
|
||||
dashboard_details:
|
||||
Returns:
|
||||
Optional[EntityReference]
|
||||
"""
|
||||
@ -222,20 +224,30 @@ class TableauSource(DashboardServiceSource):
|
||||
),
|
||||
)
|
||||
|
||||
def yield_dashboard_lineage(
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: dict
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Get lineage between dashboard and data sources
|
||||
"""
|
||||
if not self.source_config.dbServiceName:
|
||||
return
|
||||
datasource_list = (
|
||||
get_workbook_connections_dataframe(self.client, dashboard_details.get("id"))
|
||||
.get("datasource_name")
|
||||
.tolist()
|
||||
)
|
||||
dashboard_name = dashboard_details.get("name")
|
||||
|
||||
to_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=LineageDashboard,
|
||||
service_name=self.config.serviceName,
|
||||
dashboard_name=dashboard_name,
|
||||
)
|
||||
to_entity = self.metadata.get_by_name(
|
||||
entity=LineageDashboard,
|
||||
fqn=to_fqn,
|
||||
)
|
||||
|
||||
for datasource in datasource_list:
|
||||
try:
|
||||
schema_and_table_name = (
|
||||
@ -255,16 +267,6 @@ class TableauSource(DashboardServiceSource):
|
||||
entity=Table,
|
||||
fqn=from_fqn,
|
||||
)
|
||||
to_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=LineageDashboard,
|
||||
service_name=self.config.serviceName,
|
||||
dashboard_name=dashboard_name,
|
||||
)
|
||||
to_entity = self.metadata.get_by_name(
|
||||
entity=LineageDashboard,
|
||||
fqn=to_fqn,
|
||||
)
|
||||
if from_entity and to_entity:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
|
@ -11,7 +11,6 @@
|
||||
"""
|
||||
Snowflake usage module
|
||||
"""
|
||||
|
||||
from typing import Iterable, Iterator, Union
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
|
||||
|
@ -608,7 +608,7 @@ def _(connection: LookerConnection, verbose: bool = False):
|
||||
os.environ["LOOKERSDK_CLIENT_SECRET"] = connection.password.get_secret_value()
|
||||
if not os.environ.get("LOOKERSDK_BASE_URL"):
|
||||
os.environ["LOOKERSDK_BASE_URL"] = connection.hostPort
|
||||
client = looker_sdk.init31()
|
||||
client = looker_sdk.init40()
|
||||
return LookerClient(client=client)
|
||||
|
||||
|
||||
|
@ -14,7 +14,7 @@ Filter information has been taken from the
|
||||
ES indexes definitions
|
||||
"""
|
||||
import re
|
||||
from typing import List, Optional, Type, TypeVar, Union
|
||||
from typing import Dict, List, Optional, Type, TypeVar, Union
|
||||
|
||||
from antlr4.CommonTokenStream import CommonTokenStream
|
||||
from antlr4.error.ErrorStrategy import BailErrorStrategy
|
||||
@ -270,3 +270,19 @@ def _(
|
||||
column_name: str,
|
||||
) -> str:
|
||||
return _build(service_name, database_name, schema_name, table_name, column_name)
|
||||
|
||||
|
||||
def split_table_name(table_name: str) -> Dict[str, Optional[str]]:
|
||||
"""
|
||||
Given a table name, try to extract database, schema and
|
||||
table info
|
||||
:param table_name: raw table name
|
||||
:return: dict with data
|
||||
"""
|
||||
|
||||
details: List[str] = split(table_name)
|
||||
# Pad None to the left until size of list is 3
|
||||
full_details: List[Optional[str]] = ([None] * (3 - len(details))) + details
|
||||
|
||||
database, database_schema, table = full_details
|
||||
return {"database": database, "database_schema": database_schema, "table": table}
|
||||
|
@ -15,7 +15,7 @@ Validate filter patterns
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.generated.schema.type.filterPattern import FilterPattern
|
||||
from metadata.utils.filters import filter_by_fqn
|
||||
from metadata.utils.filters import filter_by_dashboard, filter_by_fqn
|
||||
|
||||
|
||||
class FilterPatternTests(TestCase):
|
||||
@ -23,7 +23,8 @@ class FilterPatternTests(TestCase):
|
||||
Validate filter patterns
|
||||
"""
|
||||
|
||||
def test_filter_by_fqn(self):
|
||||
@staticmethod
|
||||
def test_filter_by_fqn():
|
||||
"""
|
||||
Check FQN filters
|
||||
"""
|
||||
@ -36,3 +37,17 @@ class FilterPatternTests(TestCase):
|
||||
|
||||
assert not filter_by_fqn(fqn_filter_schema, "service.my_db.my_schema.table")
|
||||
assert filter_by_fqn(fqn_filter_schema, "service.another_db.my_schema.table")
|
||||
|
||||
@staticmethod
|
||||
def test_filter_numbers():
|
||||
"""
|
||||
Check numeric filtering
|
||||
"""
|
||||
|
||||
num_filter = FilterPattern(includes=["^[4]"])
|
||||
|
||||
assert not filter_by_dashboard(num_filter, "40")
|
||||
assert not filter_by_dashboard(num_filter, "41")
|
||||
|
||||
assert filter_by_dashboard(num_filter, "50")
|
||||
assert filter_by_dashboard(num_filter, "54")
|
||||
|
Loading…
x
Reference in New Issue
Block a user