diff --git a/ingestion/setup.py b/ingestion/setup.py index 3e076306165..a1316020d73 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -119,7 +119,7 @@ plugins: Dict[str, Set[str]] = { "protobuf", }, "ldap-users": {"ldap3==2.9.1"}, - "looker": {"looker-sdk>=22.4.0"}, + "looker": {"looker-sdk>=22.20.0"}, "mssql": {"sqlalchemy-pytds>=0.3"}, "pymssql": {"pymssql==2.2.5"}, "mssql-odbc": {"pyodbc"}, diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py index cac3195b76d..21b531d3b83 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py @@ -13,6 +13,7 @@ Mixin class containing User specific methods To be used by OpenMetadata class """ +from typing import Optional from metadata.generated.schema.entity.teams.user import User from metadata.ingestion.ometa.client import REST @@ -30,7 +31,7 @@ class OMetaUserMixin: client: REST - def get_user_by_email(self, email: str) -> None: + def get_user_by_email(self, email: str) -> Optional[User]: """ GET user entity by name @@ -40,8 +41,7 @@ class OMetaUserMixin: name = email.split("@")[0] users = self.es_search_from_fqn(entity_type=User, fqn_search_string=name) - if users: - for user in users: - if user.email.__root__ == email: - return user + for user in users or []: + if user.email.__root__ == email: + return user return None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 02a0ecc5510..bcb1255d2d2 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -183,9 +183,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): """ @abstractmethod - def get_dashboard_name(self, dashboard_details: Any) -> str: + def get_dashboard_name(self, dashboard: Any) -> str: """ - Get Dashboard Name + Get Dashboard Name from each element coming from `get_dashboards_list` """ @abstractmethod @@ -198,7 +198,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): self, dashboard_details: Any ) -> Optional[Iterable[AddLineageRequest]]: """ - Yields lineage if config is enabled + Yields lineage if config is enabled. + + We will look for the data in all the services + we have informed. """ for db_service_name in self.source_config.dbServiceNames or []: yield from self.yield_dashboard_lineage_details( @@ -265,7 +268,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): entity=DashboardService, config=config ) - def _get_add_lineage_request(self, to_entity: Dashboard, from_entity: Table): + @staticmethod + def _get_add_lineage_request( + to_entity: Dashboard, from_entity: Table + ) -> Optional[AddLineageRequest]: if from_entity and to_entity: return AddLineageRequest( edge=EntitiesEdge( @@ -281,10 +287,21 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): def get_dashboard(self) -> Any: """ - Method to iterate through dashboard lists filter dashbaords & yield dashboard details + Method to iterate through dashboard lists filter dashboards & yield dashboard details """ for dashboard in self.get_dashboards_list(): + dashboard_name = self.get_dashboard_name(dashboard) + if filter_by_dashboard( + self.source_config.dashboardFilterPattern, + dashboard_name, + ): + self.status.filter( + dashboard_name, + "Dashboard Filtered Out", + ) + continue + try: dashboard_details = self.get_dashboard_details(dashboard) except Exception as exc: @@ -293,17 +310,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): f"Cannot extract dashboard details from {dashboard}: {exc}" ) continue - dashboard_name = self.get_dashboard_name(dashboard_details) - if filter_by_dashboard( - self.source_config.dashboardFilterPattern, - dashboard_name, - ): - self.status.filter( - dashboard_name, - "Dashboard Fltered Out", - ) - continue yield dashboard_details def test_connection(self) -> None: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py index 97aa12ea190..289c1d00278 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py @@ -69,8 +69,8 @@ class DomodashboardSource(DashboardServiceSource): dashboards = self.domo_client.page_list() return dashboards - def get_dashboard_name(self, dashboard_details: dict) -> str: - return dashboard_details["name"] + def get_dashboard_name(self, dashboard: dict) -> str: + return dashboard["name"] def get_dashboard_details(self, dashboard: dict) -> dict: return dashboard diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker.py b/ingestion/src/metadata/ingestion/source/dashboard/looker.py index 960c180784d..474a476f84b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker.py @@ -8,7 +8,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Looker source module""" +""" +Looker source module. +Supports: +- owner +- lineage +- usage + +Notes: +- Filtering is applied on the Dashboard title or ID, if the title is missing +""" import traceback from datetime import datetime @@ -16,6 +25,7 @@ from typing import Iterable, List, Optional, Set, cast from looker_sdk.error import SDKError from looker_sdk.sdk.api31.models import Query +from looker_sdk.sdk.api40.methods import Looker40SDK from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard from looker_sdk.sdk.api40.models import ( DashboardBase, @@ -54,13 +64,32 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +LIST_DASHBOARD_FIELDS = ["id", "title"] + +# Here we can update the fields to get further information, such as: +# created_at, updated_at, last_updater_id, deleted_at, deleter_id, favorite_count, last_viewed_at +GET_DASHBOARD_FIELDS = [ + "id", + "title", + "dashboard_elements", + "dashboard_filters", + "view_count", + "description", + "folder", + "user_id", # Use as owner +] + + class LookerSource(DashboardServiceSource): """ - Looker Source Class + Looker Source Class. + + Its client uses Looker 40 from the SDK: client = looker_sdk.init40() """ config: WorkflowSource metadata_config: OpenMetadataConnection + client: Looker40SDK def __init__( self, @@ -70,6 +99,9 @@ class LookerSource(DashboardServiceSource): super().__init__(config, metadata_config) self.today = datetime.now().strftime("%Y-%m-%d") + # Owners cache. The key will be the user_id and the value its OM user EntityRef + self._owners_ref = {} + @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): config = WorkflowSource.parse_obj(config_dict) @@ -80,32 +112,73 @@ class LookerSource(DashboardServiceSource): ) return cls(config, metadata_config) - def get_dashboards_list(self) -> Optional[List[DashboardBase]]: + def get_dashboards_list(self) -> List[DashboardBase]: """ Get List of all dashboards """ - return self.client.all_dashboards(fields="id") + try: + return list( + self.client.all_dashboards(fields=",".join(LIST_DASHBOARD_FIELDS)) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Wild error trying to obtain dashboard list {err}") + # If we cannot list the dashboards, let's blow up + raise err - def get_dashboard_name(self, dashboard_details: DashboardBase) -> str: + def get_dashboard_name(self, dashboard: DashboardBase) -> str: """ - Get Dashboard Name + Get Dashboard Title. This will be used for filtering. + If the title is not present, we'll send the ID """ - return dashboard_details.id + return dashboard.title or dashboard.id def get_dashboard_details(self, dashboard: DashboardBase) -> LookerDashboard: """ Get Dashboard Details """ - fields = [ - "id", - "title", - "dashboard_elements", - "dashboard_filters", - "view_count", - "description", - "folder", - ] - return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields)) + return self.client.dashboard( + dashboard_id=dashboard.id, fields=",".join(GET_DASHBOARD_FIELDS) + ) + + def get_owner_details( + self, dashboard_details: LookerDashboard + ) -> Optional[EntityReference]: + """Get dashboard owner + + Store the visited users in the _owners_ref cache, even if we found them + in OM or not. + + If the user has not yet been visited, store it and return from cache. + + Args: + dashboard_details: LookerDashboard + Returns: + Optional[EntityReference] + """ + + try: + if ( + dashboard_details.user_id is not None + and dashboard_details.user_id not in self._owners_ref + ): + dashboard_owner = self.client.user(dashboard_details.user_id) + user = self.metadata.get_user_by_email(dashboard_owner.email) + if user: # Save the EntityRef + self._owners_ref[dashboard_details.user_id] = EntityReference( + id=user.id, type="user" + ) + else: # Otherwise, flag the user as missing in OM + self._owners_ref[dashboard_details.user_id] = None + logger.debug( + f"User {dashboard_owner.email} not found in OpenMetadata." + ) + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Could not fetch owner data due to {err}") + + return self._owners_ref.get(dashboard_details.user_id) def yield_dashboard( self, dashboard_details: LookerDashboard @@ -117,7 +190,7 @@ class LookerSource(DashboardServiceSource): yield CreateDashboardRequest( name=dashboard_details.id.replace("::", "_"), displayName=dashboard_details.title, - description=dashboard_details.description or "", + description=dashboard_details.description or None, charts=[ EntityReference(id=chart.id.__root__, type="chart") for chart in self.context.charts @@ -126,6 +199,7 @@ class LookerSource(DashboardServiceSource): service=EntityReference( id=self.context.dashboard_service.id.__root__, type="dashboardService" ), + owner=self.get_owner_details(dashboard_details), ) @staticmethod @@ -186,7 +260,7 @@ class LookerSource(DashboardServiceSource): return dashboard_sources def yield_dashboard_lineage_details( - self, dashboard_details: LookerDashboard, db_service_name + self, dashboard_details: LookerDashboard, db_service_name: str ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between charts and data sources. @@ -211,22 +285,10 @@ class LookerSource(DashboardServiceSource): for source in datasource_list: try: - source_elements = fqn.split_table_name(table_name=source) - - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, - database_name=source_elements["database"], - schema_name=source_elements["database_schema"], - table_name=source_elements["table"], - ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, - ) - yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity + yield self.build_lineage_request( + source=source, + db_service_name=db_service_name, + to_entity=to_entity, ) except (Exception, IndexError) as err: @@ -235,15 +297,53 @@ class LookerSource(DashboardServiceSource): f"Error building lineage for database service [{db_service_name}]: {err}" ) + def build_lineage_request( + self, source: str, db_service_name: str, to_entity: MetadataDashboard + ) -> Optional[AddLineageRequest]: + """ + Once we have a list of origin data sources, check their components + and build the lineage request. + + We will try searching in ES with and without the `database` + + Args: + source: table name from the source list + db_service_name: name of the service from the config + to_entity: Dashboard Entity being used + """ + + source_elements = fqn.split_table_name(table_name=source) + + for database_name in [source_elements["database"], None]: + + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=db_service_name, + database_name=database_name, + schema_name=source_elements["database_schema"], + table_name=source_elements["table"], + ) + + from_entity: Table = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + + if from_entity: + return self._get_add_lineage_request( + to_entity=to_entity, from_entity=from_entity + ) + + return None + def yield_dashboard_chart( self, dashboard_details: LookerDashboard ) -> Optional[Iterable[CreateChartRequest]]: """ Method to fetch charts linked to dashboard """ - for chart in cast( - Iterable[DashboardElement], dashboard_details.dashboard_elements - ): + for chart in dashboard_details.dashboard_elements: try: if filter_by_chart( chart_filter_pattern=self.source_config.chartFilterPattern, @@ -259,7 +359,7 @@ class LookerSource(DashboardServiceSource): yield CreateChartRequest( name=chart.id, displayName=chart.title or chart.id, - description="", + description=self.build_chart_description(chart) or None, chartType=get_standard_chart_type(chart.type).value, chartUrl=f"/dashboard_elements/{chart.id}", service=EntityReference( @@ -273,6 +373,28 @@ class LookerSource(DashboardServiceSource): logger.debug(traceback.format_exc()) logger.warning(f"Error creating chart [{chart}]: {exc}") + @staticmethod + def build_chart_description(chart: DashboardElement) -> Optional[str]: + """ + Chart descriptions will be based on the subtitle + note_text, if exists. + If the chart is a text tile, we will add the text as the chart description as well. + This should keep the dashboard searchable without breaking the original metadata structure. + """ + + # If the string is None or empty, filter it out. + try: + return "; ".join( + filter( + lambda string: string, + [chart.subtitle_text, chart.body_text, chart.note_text], + ) + or [] + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error getting chart description: {err}") + return None + def yield_dashboard_usage( # pylint: disable=W0221 self, dashboard_details: LookerDashboard ) -> Optional[DashboardUsage]: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index 7e0021cc310..080055d150f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -83,11 +83,11 @@ class MetabaseSource(DashboardServiceSource): return resp_dashboards.json() return [] - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details["name"] + return dashboard["name"] def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode.py b/ingestion/src/metadata/ingestion/source/dashboard/mode.py index e5f2de26831..082a5730419 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode.py @@ -72,11 +72,11 @@ class ModeSource(DashboardServiceSource): """ return self.client.fetch_all_reports(self.workspace_name) - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details.get(mode_client.NAME) + return dashboard.get(mode_client.NAME) def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 3ade6e210e0..ef4021c8559 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -128,11 +128,11 @@ class PowerbiSource(DashboardServiceSource): """ return self.context.workspace.get("dashboards") - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details["displayName"] + return dashboard["displayName"] def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight.py index 3dac1fdc981..9173e4d9f16 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight.py @@ -80,11 +80,11 @@ class QuickSightSource(DashboardServiceSource): ] return dashboards - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details["Name"] + return dashboard["Name"] def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 22ef3a81d0d..14c11b30c81 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -64,11 +64,11 @@ class RedashSource(DashboardServiceSource): dashboard_info = self.client.dashboards() return dashboard_info["results"] - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details["name"] + return dashboard["name"] def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index effb9779049..a1593eb8023 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -137,11 +137,11 @@ class SupersetSource(DashboardServiceSource): for dashboard in dashboards["result"]: yield dashboard - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details["dashboard_title"] + return dashboard["dashboard_title"] def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index 5b4c29a0d27..322a19a1e78 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -146,11 +146,11 @@ class TableauSource(DashboardServiceSource): """ return self.workbooks.values() - def get_dashboard_name(self, dashboard_details: dict) -> str: + def get_dashboard_name(self, dashboard: dict) -> str: """ Get Dashboard Name """ - return dashboard_details.get("name") + return dashboard.get("name") def get_dashboard_details(self, dashboard: dict) -> dict: """ diff --git a/ingestion/tests/integration/utils/test_fqn.py b/ingestion/tests/integration/utils/test_fqn.py index d0a083f2723..541d24311e9 100644 --- a/ingestion/tests/integration/utils/test_fqn.py +++ b/ingestion/tests/integration/utils/test_fqn.py @@ -182,3 +182,27 @@ class FQNBuildTest(TestCase): str(context.exception), "Service Name and Table Name should be informed, but got service=`None`, table=`None`", ) + + def test_split_table_name(self): + """Different tables are properly partitioned""" + + self.assertEqual( + {"database": "database", "database_schema": "schema", "table": "table"}, + fqn.split_table_name(table_name="database.schema.table"), + ) + + self.assertEqual( + {"database": None, "database_schema": "schema", "table": "table"}, + fqn.split_table_name(table_name="schema.table"), + ) + + self.assertEqual( + {"database": None, "database_schema": None, "table": "table"}, + fqn.split_table_name(table_name="table"), + ) + + # We also clean quotes + self.assertEqual( + {"database": "database", "database_schema": "schema", "table": "table"}, + fqn.split_table_name(table_name='database."schema".table'), + )