Multiple fixes - Looker, Ingestion fix, pyarrow dependency (#2385)

* Added pyarrow dependency

* Fix #2378

* Fix Looker

* Fix Looker, secretstr to str
This commit is contained in:
Ayush Shah 2022-01-25 01:59:54 +05:30 committed by GitHub
parent 05c8d1ebb6
commit b689c2d632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 65 additions and 78 deletions

View File

@ -62,7 +62,7 @@ base_plugins = {
plugins: Dict[str, Set[str]] = {
"amundsen": {"neo4j~=4.4.0"},
"athena": {"PyAthena[SQLAlchemy]"},
"bigquery": {"sqlalchemy-bigquery==1.2.2"},
"bigquery": {"sqlalchemy-bigquery==1.2.2", "pyarrow-6.0.1"},
"bigquery-usage": {"google-cloud-logging", "cachetools"},
# "docker": {"docker==5.0.3"},
"docker": {"python_on_whales==0.34.0"},

View File

@ -13,16 +13,11 @@ import logging
import os
import uuid
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from typing import Iterable, List
import looker_sdk
from looker_sdk.error import SDKError
from looker_sdk.sdk.api31.models import Dashboard as LookerDashboard
from looker_sdk.sdk.api31.models import DashboardElement
from pydantic import SecretStr
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
@ -34,6 +29,7 @@ from metadata.ingestion.api.common import (
WorkflowContext,
)
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import get_dashboard_service_or_create
@ -97,13 +93,26 @@ class LookerSource(Source[Entity]):
metadata_config,
)
def check_env(self, env_key):
if os.environ.get(env_key):
return True
return None
def looker_client(self):
os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username
os.environ["LOOKERSDK_CLIENT_SECRET"] = self.config.password
os.environ["LOOKERSDK_BASE_URL"] = self.config.url
client = looker_sdk.init31()
client.me()
return client
try:
if not self.check_env("LOOKERSDK_CLIENT_ID"):
os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username
if not self.check_env("LOOKERSDK_CLIENT_SECRET"):
os.environ[
"LOOKERSDK_CLIENT_SECRET"
] = self.config.password.get_secret_value()
if not self.check_env("LOOKERSDK_BASE_URL"):
os.environ["LOOKERSDK_BASE_URL"] = self.config.url
client = looker_sdk.init31()
client.me()
return client
except Exception as err:
logger.error(f"ERROR: {repr(err)}")
@classmethod
def create(
@ -117,69 +126,30 @@ class LookerSource(Source[Entity]):
pass
def next_record(self) -> Iterable[Entity]:
yield from self._get_looker_charts()
yield from self._get_looker_dashboards()
def _yield_charts(self, chart: DashboardElement):
yield Chart(
def _get_dashboard_elements(self, dashboard_elements):
if not self.config.dashboard_pattern.included(dashboard_elements.id):
self.status.charts_dropped_status(dashboard_elements.id)
return None
self.status.charts_scanned_status(dashboard_elements.id)
return Chart(
id=uuid.uuid4(),
name=chart.id,
displayName=chart.title,
chartType=chart.type,
name=dashboard_elements.id,
displayName=dashboard_elements.id,
description=dashboard_elements.title,
chart_type=dashboard_elements.type,
url=self.config.url,
service=EntityReference(id=self.service.id, type="dashboardService"),
description=chart.subtitle_text,
)
def _get_looker_charts(self) -> Optional[Chart]:
for child_dashboard in self.client.all_dashboards(fields="id"):
fields = [
"id",
"title",
"dashboard_elements",
"dashboard_filters",
"view_count",
]
charts = self.client.dashboard_dashboard_elements(
dashboard_id=child_dashboard.id, fields=",".join(fields)
)
for chart in charts:
self._yield_charts(chart)
def looker_dashboard(self, dashboard: LookerDashboard) -> Dashboard:
charts: List[Chart] = []
if dashboard.dashboard_elements is not None:
for chart in dashboard.dashboard_elements:
if chart.id is not None:
if not self.config.chart_pattern.allowed(chart.id):
self.status.charts_dropped_status(chart.id)
continue
self.status.charts_scanned_status(chart.id)
chart = self._yield_charts(chart)
if chart is not None:
charts.append(chart)
yield Dashboard(
id=uuid.uuid4(),
name=dashboard.id,
displayName=dashboard.title,
description=dashboard.description,
charts=charts,
usageSummary=dashboard.view_count,
service=EntityReference(
id=self.service.id, type="dashboardService"
),
href=dashboard.slug,
)
else:
logger.warning(f"No charts under Dashboard: {dashboard.title}")
return None
def _get_looker_dashboards(self):
for child_dashboard in self.client.all_dashboards(fields="id"):
if not self.config.dashboard_pattern.allowed(child_dashboard.id):
self.status.dashboards_dropped_status(child_dashboard.id)
continue
self.status.dashboards_scanned_status(child_dashboard.id)
all_dashboards = self.client.all_dashboards(fields="id")
for child_dashboard in all_dashboards:
try:
if not self.config.dashboard_pattern.included(child_dashboard.id):
self.status.dashboards_dropped_status(child_dashboard.id)
continue
fields = [
"id",
"title",
@ -188,14 +158,27 @@ class LookerSource(Source[Entity]):
"view_count",
]
dashboard = self.client.dashboard(
dashboard_id=child_dashboard, fields=",".join(fields)
dashboard_id=child_dashboard.id, fields=",".join(fields)
)
except SDKError:
self.status.warning(
child_dashboard,
f"Error occurred while loading dashboard {child_dashboard}.",
charts = []
for iter_chart in dashboard.dashboard_elements:
chart = self._get_dashboard_elements(iter_chart)
if chart:
charts.append(chart)
yield Dashboard(
id=uuid.uuid4(),
name=dashboard.id,
displayName=dashboard.title,
description="temp",
charts=charts,
url=self.config.url,
service=EntityReference(
id=self.service.id, type="dashboardService"
),
)
return self.looker_dashboard(dashboard)
self.status.dashboards_scanned_status(child_dashboard.id)
except Exception as err:
logger.error(repr(err))
def get_status(self) -> SourceStatus:
return self.status

View File

@ -478,7 +478,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
col_data_length = None
arr_data_type = None
parsed_string = None
print(column["raw_data_type"])
if (
"raw_data_type" in column
and column["raw_data_type"] is not None

View File

@ -53,7 +53,7 @@ def build_sql_source_connection_url(
if username is not None:
url += f"{username}"
if password is not None:
url += f":{quote_plus(password.get_secret_value())}"
url += f":{quote_plus(password)}"
url += "@"
url += f"{host_port}"
if database:
@ -106,7 +106,7 @@ class SQLConnectionConfig(ConfigModel):
host_port=self.host_port,
scheme=self.scheme,
username=self.username,
password=self.password,
password=self.password.get_secret_value(),
database=self.database,
options=self.options,
)

View File

@ -12,6 +12,8 @@
from datetime import datetime, timedelta
from typing import List
from pydantic import SecretStr
from metadata.generated.schema.api.services.createDashboardService import (
CreateDashboardServiceEntityRequest,
)
@ -61,11 +63,14 @@ def get_database_service_or_create(
if service:
return service
else:
password = (
config.password.get_secret_value() if hasattr(config, "password") else None
)
service = {
"databaseConnection": {
"hostPort": config.host_port if hasattr(config, "host_port") else None,
"username": config.username if hasattr(config, "username") else None,
"password": config.password if hasattr(config, "password") else None,
"password": password,
"database": config.database if hasattr(config, "database") else None,
"connectionOptions": config.options
if hasattr(config, "options")