diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json index e6d0d9596df..6f85b2b447a 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json @@ -42,6 +42,10 @@ "description": "Looker actor.", "type": "string" }, + "env": { + "description": "Looker Environment", + "type": "string" + }, "supportedPipelineTypes": { "description": "Supported Metadata Extraction Pipelines.", "type": "string", diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index da0c098be02..facb7c317bf 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -12,81 +12,52 @@ import logging import os import traceback -from dataclasses import dataclass, field -from typing import Iterable, List +from typing import Iterable import looker_sdk -from pydantic import SecretStr +from metadata.generated.schema.entity.services.connections.dashboard import ( + lookerConnection, +) from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import ConfigModel, Entity, IncludeFilterPattern -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard +from metadata.utils.filters import filter_by_chart, filter_by_dashboard from metadata.utils.helpers import get_dashboard_service_or_create logger = logging.getLogger(__name__) -class LookerSourceConfig(ConfigModel): - username: str - password: SecretStr - url: str - platform_name: str = "looker" - actor: str = "" - dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - env: str = "DEV" - service_name: str - service_type: str - - -@dataclass -class LookerDashboardSourceStatus(SourceStatus): - dashboards_scanned: List[str] = field(default_factory=list) - charts_scanned: List[str] = field(default_factory=list) - filtered_dashboards: List[str] = field(default_factory=list) - filtered_charts: List[str] = field(default_factory=list) - - def dashboards_scanned_status(self, id) -> None: - self.dashboards_scanned.append(id) - - def charts_scanned_status(self, id) -> None: - self.charts_scanned.append(id) - - def dashboards_dropped_status(self, model: str) -> None: - self.filtered_dashboards.append(model) - - def charts_dropped_status(self, view: str) -> None: - self.filtered_charts.append(view) - - class LookerSource(Source[Entity]): - config: LookerSourceConfig + config: WorkflowSource metadata_config: OpenMetadataServerConfig - status: LookerDashboardSourceStatus def __init__( self, - config: LookerSourceConfig, + config: WorkflowSource, metadata_config: OpenMetadataServerConfig, ): super().__init__() self.config = config + self.source_config = config.sourceConfig.config + self.service_connection = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.client = self.looker_client() - self.status = LookerDashboardSourceStatus() + self.status = SourceStatus() self.service = get_dashboard_service_or_create( - service_name=config.service_name, + service_name=config.serviceName, dashboard_service_type=DashboardServiceType.Looker.name, - username=config.username, - password=config.password.get_secret_value(), - dashboard_url=config.url, + config=self.service_connection.dict(), metadata_config=metadata_config, ) @@ -98,13 +69,13 @@ class LookerSource(Source[Entity]): def looker_client(self): try: if not self.check_env("LOOKERSDK_CLIENT_ID"): - os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username + os.environ["LOOKERSDK_CLIENT_ID"] = self.service_connection.username if not self.check_env("LOOKERSDK_CLIENT_SECRET"): os.environ[ "LOOKERSDK_CLIENT_SECRET" - ] = self.config.password.get_secret_value() + ] = self.service_connection.password.get_secret_value() if not self.check_env("LOOKERSDK_BASE_URL"): - os.environ["LOOKERSDK_BASE_URL"] = self.config.url + os.environ["LOOKERSDK_BASE_URL"] = self.service_connection.url client = looker_sdk.init31() client.me() return client @@ -113,7 +84,12 @@ class LookerSource(Source[Entity]): @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): - config = LookerSourceConfig.parse_obj(config_dict) + config = WorkflowSource.parse_obj(config_dict) + connection: lookerConnection = config.serviceConnection.__root__.config + if not isinstance(connection, lookerConnection): + raise InvalidSourceException( + f"Expected LookerConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -123,20 +99,23 @@ class LookerSource(Source[Entity]): yield from self._get_looker_dashboards() def _get_dashboard_elements(self, dashboard_elements): - if not self.config.chart_pattern.included(dashboard_elements.id): - self.status.charts_dropped_status(dashboard_elements.id) + if not filter_by_chart( + chart_filter_pattern=self.source_config.chartFilterPattern, + chart_name=dashboard_elements.id, + ): + self.status.failures(dashboard_elements.id) return None om_dashboard_elements = Chart( name=dashboard_elements.id, displayName=dashboard_elements.title or "", description="", chart_type=dashboard_elements.type, - url=f"{self.config.url}/dashboard_elements/{dashboard_elements.id}", + url=f"{self.service_connection.url}/dashboard_elements/{dashboard_elements.id}", service=EntityReference(id=self.service.id, type="dashboardService"), ) if not dashboard_elements.id: raise ValueError("Chart(Dashboard Element) without ID") - self.status.charts_scanned_status(dashboard_elements.id) + self.status.scanned(dashboard_elements.id) yield om_dashboard_elements self.charts.append(om_dashboard_elements) self.chart_names.append(dashboard_elements.id) @@ -145,8 +124,11 @@ class LookerSource(Source[Entity]): all_dashboards = self.client.all_dashboards(fields="id") for child_dashboard in all_dashboards: try: - if not self.config.dashboard_pattern.included(child_dashboard.id): - self.status.dashboards_dropped_status(child_dashboard.id) + if not filter_by_dashboard( + dashboard_filter_pattern=self.source_config.dashboardFilterPattern, + dashboard_name=child_dashboard.id, + ): + self.status.failures(child_dashboard.id) continue fields = [ "id", @@ -171,12 +153,12 @@ class LookerSource(Source[Entity]): displayName=dashboard.title, description=dashboard.description or "", charts=self.chart_names, - url=f"{self.config.url}/dashboards/{dashboard.id}", + url=f"{self.service_connection.url}/dashboards/{dashboard.id}", service=EntityReference( id=self.service.id, type="dashboardService" ), ) - self.status.dashboards_scanned_status(child_dashboard.id) + self.status.failures(child_dashboard.id) except Exception as err: logger.error(repr(err))