Added Looker refactoring changes (#3963)

This commit is contained in:
Ayush Shah 2022-04-08 12:08:23 -07:00 committed by GitHub
parent 22bbc407ad
commit 21d8ba72a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 58 deletions

View File

@ -42,6 +42,10 @@
"description": "Looker actor.", "description": "Looker actor.",
"type": "string" "type": "string"
}, },
"env": {
"description": "Looker Environment",
"type": "string"
},
"supportedPipelineTypes": { "supportedPipelineTypes": {
"description": "Supported Metadata Extraction Pipelines.", "description": "Supported Metadata Extraction Pipelines.",
"type": "string", "type": "string",

View File

@ -12,81 +12,52 @@
import logging import logging
import os import os
import traceback import traceback
from dataclasses import dataclass, field from typing import Iterable
from typing import Iterable, List
import looker_sdk import looker_sdk
from pydantic import SecretStr
from metadata.generated.schema.entity.services.connections.dashboard import (
lookerConnection,
)
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import ConfigModel, Entity, IncludeFilterPattern from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.utils.filters import filter_by_chart, filter_by_dashboard
from metadata.utils.helpers import get_dashboard_service_or_create from metadata.utils.helpers import get_dashboard_service_or_create
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LookerSourceConfig(ConfigModel):
username: str
password: SecretStr
url: str
platform_name: str = "looker"
actor: str = ""
dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
env: str = "DEV"
service_name: str
service_type: str
@dataclass
class LookerDashboardSourceStatus(SourceStatus):
dashboards_scanned: List[str] = field(default_factory=list)
charts_scanned: List[str] = field(default_factory=list)
filtered_dashboards: List[str] = field(default_factory=list)
filtered_charts: List[str] = field(default_factory=list)
def dashboards_scanned_status(self, id) -> None:
self.dashboards_scanned.append(id)
def charts_scanned_status(self, id) -> None:
self.charts_scanned.append(id)
def dashboards_dropped_status(self, model: str) -> None:
self.filtered_dashboards.append(model)
def charts_dropped_status(self, view: str) -> None:
self.filtered_charts.append(view)
class LookerSource(Source[Entity]): class LookerSource(Source[Entity]):
config: LookerSourceConfig config: WorkflowSource
metadata_config: OpenMetadataServerConfig metadata_config: OpenMetadataServerConfig
status: LookerDashboardSourceStatus
def __init__( def __init__(
self, self,
config: LookerSourceConfig, config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
self.source_config = config.sourceConfig.config
self.service_connection = config.serviceConnection.__root__.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.client = self.looker_client() self.client = self.looker_client()
self.status = LookerDashboardSourceStatus() self.status = SourceStatus()
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
service_name=config.service_name, service_name=config.serviceName,
dashboard_service_type=DashboardServiceType.Looker.name, dashboard_service_type=DashboardServiceType.Looker.name,
username=config.username, config=self.service_connection.dict(),
password=config.password.get_secret_value(),
dashboard_url=config.url,
metadata_config=metadata_config, metadata_config=metadata_config,
) )
@ -98,13 +69,13 @@ class LookerSource(Source[Entity]):
def looker_client(self): def looker_client(self):
try: try:
if not self.check_env("LOOKERSDK_CLIENT_ID"): if not self.check_env("LOOKERSDK_CLIENT_ID"):
os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username os.environ["LOOKERSDK_CLIENT_ID"] = self.service_connection.username
if not self.check_env("LOOKERSDK_CLIENT_SECRET"): if not self.check_env("LOOKERSDK_CLIENT_SECRET"):
os.environ[ os.environ[
"LOOKERSDK_CLIENT_SECRET" "LOOKERSDK_CLIENT_SECRET"
] = self.config.password.get_secret_value() ] = self.service_connection.password.get_secret_value()
if not self.check_env("LOOKERSDK_BASE_URL"): if not self.check_env("LOOKERSDK_BASE_URL"):
os.environ["LOOKERSDK_BASE_URL"] = self.config.url os.environ["LOOKERSDK_BASE_URL"] = self.service_connection.url
client = looker_sdk.init31() client = looker_sdk.init31()
client.me() client.me()
return client return client
@ -113,7 +84,12 @@ class LookerSource(Source[Entity]):
@classmethod @classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig):
config = LookerSourceConfig.parse_obj(config_dict) config = WorkflowSource.parse_obj(config_dict)
connection: lookerConnection = config.serviceConnection.__root__.config
if not isinstance(connection, lookerConnection):
raise InvalidSourceException(
f"Expected LookerConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -123,20 +99,23 @@ class LookerSource(Source[Entity]):
yield from self._get_looker_dashboards() yield from self._get_looker_dashboards()
def _get_dashboard_elements(self, dashboard_elements): def _get_dashboard_elements(self, dashboard_elements):
if not self.config.chart_pattern.included(dashboard_elements.id): if not filter_by_chart(
self.status.charts_dropped_status(dashboard_elements.id) chart_filter_pattern=self.source_config.chartFilterPattern,
chart_name=dashboard_elements.id,
):
self.status.failures(dashboard_elements.id)
return None return None
om_dashboard_elements = Chart( om_dashboard_elements = Chart(
name=dashboard_elements.id, name=dashboard_elements.id,
displayName=dashboard_elements.title or "", displayName=dashboard_elements.title or "",
description="", description="",
chart_type=dashboard_elements.type, chart_type=dashboard_elements.type,
url=f"{self.config.url}/dashboard_elements/{dashboard_elements.id}", url=f"{self.service_connection.url}/dashboard_elements/{dashboard_elements.id}",
service=EntityReference(id=self.service.id, type="dashboardService"), service=EntityReference(id=self.service.id, type="dashboardService"),
) )
if not dashboard_elements.id: if not dashboard_elements.id:
raise ValueError("Chart(Dashboard Element) without ID") raise ValueError("Chart(Dashboard Element) without ID")
self.status.charts_scanned_status(dashboard_elements.id) self.status.scanned(dashboard_elements.id)
yield om_dashboard_elements yield om_dashboard_elements
self.charts.append(om_dashboard_elements) self.charts.append(om_dashboard_elements)
self.chart_names.append(dashboard_elements.id) self.chart_names.append(dashboard_elements.id)
@ -145,8 +124,11 @@ class LookerSource(Source[Entity]):
all_dashboards = self.client.all_dashboards(fields="id") all_dashboards = self.client.all_dashboards(fields="id")
for child_dashboard in all_dashboards: for child_dashboard in all_dashboards:
try: try:
if not self.config.dashboard_pattern.included(child_dashboard.id): if not filter_by_dashboard(
self.status.dashboards_dropped_status(child_dashboard.id) dashboard_filter_pattern=self.source_config.dashboardFilterPattern,
dashboard_name=child_dashboard.id,
):
self.status.failures(child_dashboard.id)
continue continue
fields = [ fields = [
"id", "id",
@ -171,12 +153,12 @@ class LookerSource(Source[Entity]):
displayName=dashboard.title, displayName=dashboard.title,
description=dashboard.description or "", description=dashboard.description or "",
charts=self.chart_names, charts=self.chart_names,
url=f"{self.config.url}/dashboards/{dashboard.id}", url=f"{self.service_connection.url}/dashboards/{dashboard.id}",
service=EntityReference( service=EntityReference(
id=self.service.id, type="dashboardService" id=self.service.id, type="dashboardService"
), ),
) )
self.status.dashboards_scanned_status(child_dashboard.id) self.status.failures(child_dashboard.id)
except Exception as err: except Exception as err:
logger.error(repr(err)) logger.error(repr(err))