diff --git a/ingestion/setup.py b/ingestion/setup.py index c0f0e2abd0a..625904a3087 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -62,7 +62,7 @@ base_plugins = { plugins: Dict[str, Set[str]] = { "amundsen": {"neo4j~=4.4.0"}, "athena": {"PyAthena[SQLAlchemy]"}, - "bigquery": {"sqlalchemy-bigquery==1.2.2"}, + "bigquery": {"sqlalchemy-bigquery==1.2.2", "pyarrow-6.0.1"}, "bigquery-usage": {"google-cloud-logging", "cachetools"}, # "docker": {"docker==5.0.3"}, "docker": {"python_on_whales==0.34.0"}, diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index f38d5a7b335..34a46f476fa 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -13,16 +13,11 @@ import logging import os import uuid from dataclasses import dataclass, field -from typing import Iterable, List, Optional +from typing import Iterable, List import looker_sdk -from looker_sdk.error import SDKError -from looker_sdk.sdk.api31.models import Dashboard as LookerDashboard -from looker_sdk.sdk.api31.models import DashboardElement from pydantic import SecretStr -from metadata.generated.schema.entity.data.chart import Chart -from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) @@ -34,6 +29,7 @@ from metadata.ingestion.api.common import ( WorkflowContext, ) from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.utils.helpers import get_dashboard_service_or_create @@ -97,13 +93,26 @@ class LookerSource(Source[Entity]): metadata_config, ) + def check_env(self, env_key): + if os.environ.get(env_key): + return True + return None + def looker_client(self): - os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username - os.environ["LOOKERSDK_CLIENT_SECRET"] = self.config.password - os.environ["LOOKERSDK_BASE_URL"] = self.config.url - client = looker_sdk.init31() - client.me() - return client + try: + if not self.check_env("LOOKERSDK_CLIENT_ID"): + os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username + if not self.check_env("LOOKERSDK_CLIENT_SECRET"): + os.environ[ + "LOOKERSDK_CLIENT_SECRET" + ] = self.config.password.get_secret_value() + if not self.check_env("LOOKERSDK_BASE_URL"): + os.environ["LOOKERSDK_BASE_URL"] = self.config.url + client = looker_sdk.init31() + client.me() + return client + except Exception as err: + logger.error(f"ERROR: {repr(err)}") @classmethod def create( @@ -117,69 +126,30 @@ class LookerSource(Source[Entity]): pass def next_record(self) -> Iterable[Entity]: - yield from self._get_looker_charts() yield from self._get_looker_dashboards() - def _yield_charts(self, chart: DashboardElement): - yield Chart( + def _get_dashboard_elements(self, dashboard_elements): + if not self.config.dashboard_pattern.included(dashboard_elements.id): + self.status.charts_dropped_status(dashboard_elements.id) + return None + self.status.charts_scanned_status(dashboard_elements.id) + return Chart( id=uuid.uuid4(), - name=chart.id, - displayName=chart.title, - chartType=chart.type, + name=dashboard_elements.id, + displayName=dashboard_elements.id, + description=dashboard_elements.title, + chart_type=dashboard_elements.type, + url=self.config.url, service=EntityReference(id=self.service.id, type="dashboardService"), - description=chart.subtitle_text, ) - def _get_looker_charts(self) -> Optional[Chart]: - for child_dashboard in self.client.all_dashboards(fields="id"): - fields = [ - "id", - "title", - "dashboard_elements", - "dashboard_filters", - "view_count", - ] - charts = self.client.dashboard_dashboard_elements( - dashboard_id=child_dashboard.id, fields=",".join(fields) - ) - for chart in charts: - self._yield_charts(chart) - - def looker_dashboard(self, dashboard: LookerDashboard) -> Dashboard: - charts: List[Chart] = [] - if dashboard.dashboard_elements is not None: - for chart in dashboard.dashboard_elements: - if chart.id is not None: - if not self.config.chart_pattern.allowed(chart.id): - self.status.charts_dropped_status(chart.id) - continue - self.status.charts_scanned_status(chart.id) - chart = self._yield_charts(chart) - if chart is not None: - charts.append(chart) - yield Dashboard( - id=uuid.uuid4(), - name=dashboard.id, - displayName=dashboard.title, - description=dashboard.description, - charts=charts, - usageSummary=dashboard.view_count, - service=EntityReference( - id=self.service.id, type="dashboardService" - ), - href=dashboard.slug, - ) - else: - logger.warning(f"No charts under Dashboard: {dashboard.title}") - return None - def _get_looker_dashboards(self): - for child_dashboard in self.client.all_dashboards(fields="id"): - if not self.config.dashboard_pattern.allowed(child_dashboard.id): - self.status.dashboards_dropped_status(child_dashboard.id) - continue - self.status.dashboards_scanned_status(child_dashboard.id) + all_dashboards = self.client.all_dashboards(fields="id") + for child_dashboard in all_dashboards: try: + if not self.config.dashboard_pattern.included(child_dashboard.id): + self.status.dashboards_dropped_status(child_dashboard.id) + continue fields = [ "id", "title", @@ -188,14 +158,27 @@ class LookerSource(Source[Entity]): "view_count", ] dashboard = self.client.dashboard( - dashboard_id=child_dashboard, fields=",".join(fields) + dashboard_id=child_dashboard.id, fields=",".join(fields) ) - except SDKError: - self.status.warning( - child_dashboard, - f"Error occurred while loading dashboard {child_dashboard}.", + charts = [] + for iter_chart in dashboard.dashboard_elements: + chart = self._get_dashboard_elements(iter_chart) + if chart: + charts.append(chart) + yield Dashboard( + id=uuid.uuid4(), + name=dashboard.id, + displayName=dashboard.title, + description="temp", + charts=charts, + url=self.config.url, + service=EntityReference( + id=self.service.id, type="dashboardService" + ), ) - return self.looker_dashboard(dashboard) + self.status.dashboards_scanned_status(child_dashboard.id) + except Exception as err: + logger.error(repr(err)) def get_status(self) -> SourceStatus: return self.status diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index db34af411bd..e4f91ae2a69 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -478,7 +478,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]): col_data_length = None arr_data_type = None parsed_string = None - print(column["raw_data_type"]) if ( "raw_data_type" in column and column["raw_data_type"] is not None diff --git a/ingestion/src/metadata/ingestion/source/sql_source_common.py b/ingestion/src/metadata/ingestion/source/sql_source_common.py index 49593b6825b..32f1ad6756d 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source_common.py +++ b/ingestion/src/metadata/ingestion/source/sql_source_common.py @@ -53,7 +53,7 @@ def build_sql_source_connection_url( if username is not None: url += f"{username}" if password is not None: - url += f":{quote_plus(password.get_secret_value())}" + url += f":{quote_plus(password)}" url += "@" url += f"{host_port}" if database: @@ -106,7 +106,7 @@ class SQLConnectionConfig(ConfigModel): host_port=self.host_port, scheme=self.scheme, username=self.username, - password=self.password, + password=self.password.get_secret_value(), database=self.database, options=self.options, ) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 96c3dc4afcd..050923daa0c 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -12,6 +12,8 @@ from datetime import datetime, timedelta from typing import List +from pydantic import SecretStr + from metadata.generated.schema.api.services.createDashboardService import ( CreateDashboardServiceEntityRequest, ) @@ -61,11 +63,14 @@ def get_database_service_or_create( if service: return service else: + password = ( + config.password.get_secret_value() if hasattr(config, "password") else None + ) service = { "databaseConnection": { "hostPort": config.host_port if hasattr(config, "host_port") else None, "username": config.username if hasattr(config, "username") else None, - "password": config.password if hasattr(config, "password") else None, + "password": password, "database": config.database if hasattr(config, "database") else None, "connectionOptions": config.options if hasattr(config, "options")