diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 997c6c6852..692c571d37 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -785,6 +785,38 @@ class LookerExplore: return [mce, mcp] +class LookerExploreRegistry: + """A caching registry of Looker Explores""" + + def __init__( + self, + client: Looker31SDK, + report: SourceReport, + transport_options: Optional[TransportOptions], + ): + self.client = client + self.report = report + self.transport_options = transport_options + self.explore_cache: Dict[Tuple[str, str], Optional[LookerExplore]] = {} + + def get_explore(self, model: str, explore: str) -> Optional[LookerExplore]: + if (model, explore) not in self.explore_cache: + looker_explore = LookerExplore.from_api( + model, + explore, + self.client, + self.report, + transport_options=self.transport_options, + ) + self.explore_cache[(model, explore)] = looker_explore + return self.explore_cache[(model, explore)] + + def get_all_explores(self) -> Iterable[LookerExplore]: + for key, value in self.explore_cache.items(): + if value is not None: + yield value + + class StageLatency(Report): name: str start_time: Optional[datetime.datetime] diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 3c903f63c9..858792ea46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -46,6 +46,7 @@ from datahub.ingestion.source.looker.looker_common import ( LookerCommonConfig, LookerDashboardSourceReport, LookerExplore, + LookerExploreRegistry, LookerUtil, ViewField, ViewFieldType, @@ -163,8 +164,6 @@ class LookerDashboardSource(Source): client: Looker31SDK user_registry: LookerUserRegistry explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {} - resolved_explores_map: Dict[Tuple[str, str], LookerExplore] = {} - resolved_dashboards_map: Dict[str, LookerDashboard] = {} accessed_dashboards: int = 0 resolved_user_ids: int = 0 email_ids_missing: int = 0 # resolved users with missing email addresses @@ -176,6 +175,13 @@ class LookerDashboardSource(Source): looker_api: LookerAPI = LookerAPI(self.source_config) self.client = looker_api.get_client() self.user_registry = LookerUserRegistry(looker_api) + self.explore_registry = LookerExploreRegistry( + self.client, + self.reporter, + self.source_config.transport_options.get_transport_options() + if self.source_config.transport_options + else None, + ) # Keep stat generators to generate entity stat aspect later stat_generator_config: looker_usage.StatGeneratorConfig = ( looker_usage.StatGeneratorConfig( @@ -353,7 +359,6 @@ class LookerDashboardSource(Source): explore=exp, via=f"look:{element.look_id}:query:{element.dashboard_id}", ) - # self.explores_to_fetch_set.add((element.query.model, exp)) return LookerDashboardElement( id=element.id, @@ -436,9 +441,6 @@ class LookerDashboardSource(Source): explore=exp, via=f"Look:{element.look_id}:resultmaker:query", ) - # self.explores_to_fetch_set.add( - # (element.result_maker.query.model, exp) - # ) # In addition to the query, filters can point to fields as well assert element.result_maker.filterables is not None @@ -451,7 +453,6 @@ class LookerDashboardSource(Source): explore=filterable.view, via=f"Look:{element.look_id}:resultmaker:filterable", ) - # self.explores_to_fetch_set.add((filterable.model, filterable.view)) listen = filterable.listen query = element.result_maker.query if listen is not None: @@ -574,9 +575,7 @@ class LookerDashboardSource(Source): max_workers=self.source_config.max_threads ) as async_executor: explore_futures = [ - async_executor.submit( - self.fetch_one_explore, model, explore, self.resolved_explores_map - ) + async_executor.submit(self.fetch_one_explore, model, explore) for (model, explore) in self.explores_to_fetch_set ] for future in concurrent.futures.as_completed(explore_futures): @@ -590,10 +589,7 @@ class LookerDashboardSource(Source): return explore_events def fetch_one_explore( - self, - model: str, - explore: str, - resolved_explores_map: Dict[Tuple[str, str], LookerExplore], + self, model: str, explore: str ) -> Tuple[ List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]], str, @@ -602,17 +598,8 @@ class LookerDashboardSource(Source): ]: start_time = datetime.datetime.now() events: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [] - looker_explore = LookerExplore.from_api( - model, - explore, - self.client, - self.reporter, - transport_options=self.source_config.transport_options.get_transport_options() - if self.source_config.transport_options is not None - else None, - ) + looker_explore = self.explore_registry.get_explore(model, explore) if looker_explore is not None: - resolved_explores_map[(model, explore)] = looker_explore events = ( looker_explore._to_metadata_events( self.source_config, self.reporter, self.source_config.base_url @@ -811,13 +798,8 @@ class LookerDashboardSource(Source): return user def process_metrics_dimensions_and_fields_for_dashboard( - self, dashboard_id: str - ) -> Tuple[List[MetadataWorkUnit], str, datetime.datetime, datetime.datetime]: - start_time = datetime.datetime.now() - - dashboard = self.resolved_dashboards_map.get(dashboard_id) - if dashboard is None: - return [], dashboard_id, start_time, datetime.datetime.now() + self, dashboard: LookerDashboard + ) -> List[MetadataWorkUnit]: chart_mcps = [ self._make_metrics_dimensions_chart_mcp(element, dashboard) @@ -837,7 +819,7 @@ class LookerDashboardSource(Source): for mcp in mcps ] - return workunits, dashboard_id, start_time, datetime.datetime.now() + return workunits def _input_fields_from_dashboard_element( self, dashboard_element: LookerDashboardElement @@ -858,10 +840,14 @@ class LookerDashboardSource(Source): view_field_for_reference = input_field.view_field if input_field.view_field is None: - explore = self.resolved_explores_map.get( - (input_field.model, input_field.explore) + explore = self.explore_registry.get_explore( + input_field.model, input_field.explore ) if explore is not None: + # add this to the list of explores to finally generate metadata for + self.add_explore_to_fetch( + input_field.model, input_field.explore, entity_urn + ) entity_urn = explore.get_explore_urn(self.source_config) explore_fields = ( explore.fields if explore.fields is not None else [] @@ -977,9 +963,7 @@ class LookerDashboardSource(Source): return [], None, dashboard_id, start_time, datetime.datetime.now() looker_dashboard = self._get_looker_dashboard(dashboard_object, self.client) - self.resolved_dashboards_map[looker_dashboard.id] = looker_dashboard mces = self._make_dashboard_and_chart_mces(looker_dashboard) - # for mce in mces: workunits = [ MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce) if isinstance(mce, MetadataChangeEvent) @@ -988,6 +972,13 @@ class LookerDashboardSource(Source): ) for mce in mces ] + + # add on metrics, dimensions, fields events + metric_dim_workunits = self.process_metrics_dimensions_and_fields_for_dashboard( + looker_dashboard + ) + workunits.extend(metric_dim_workunits) + return ( workunits, dashboard_object, @@ -1183,30 +1174,5 @@ class LookerDashboardSource(Source): yield workunit self.reporter.report_stage_end("usage_extraction") - # after fetching explores, we need to go back and enrich each chart and dashboard with - # metadata about the fields - self.reporter.report_stage_start("field_metadata") - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.source_config.max_threads - ) as async_executor: - async_workunits = [ - async_executor.submit( - self.process_metrics_dimensions_and_fields_for_dashboard, # type: ignore - dashboard_id, - ) - for dashboard_id in dashboard_ids - if dashboard_id is not None - ] - for async_workunit in concurrent.futures.as_completed(async_workunits): - work_units, dashboard_id, start_time, end_time = async_workunit.result() # type: ignore - logger.debug( - f"Running time of process_metrics_dimensions_and_fields_for_dashboard for {dashboard_id} = {(end_time - start_time).total_seconds()}" - ) - self.reporter.report_upstream_latency(start_time, end_time) - for mwu in work_units: - yield mwu - self.reporter.report_workunit(mwu) - self.reporter.report_stage_end("field_metadata") - def get_report(self) -> SourceReport: return self.reporter diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py index 7a5d022842..91fe2841ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py @@ -261,21 +261,26 @@ class BaseStatGenerator(ABC): yield self.id_vs_model[id], aspect def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]: - start_time = datetime.datetime.now() - rows = self.config.looker_api_wrapper.execute_query( - write_query=query.to_write_query() - ) - end_time = datetime.datetime.now() + rows = [] + try: + start_time = datetime.datetime.now() + rows = self.config.looker_api_wrapper.execute_query( + write_query=query.to_write_query() + ) + end_time = datetime.datetime.now() + + logger.debug( + f"{self.ctx}: Retrieved {len(rows)} rows in {(end_time - start_time).total_seconds()} seconds" + ) + self.report.report_query_latency( + f"{self.ctx}:{query_name}", (end_time - start_time).total_seconds() + ) + if self.post_filter: + rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model] + logger.debug(f"Filtered down to {len(rows)} rows") + except Exception as e: + logger.warning(f"Failed to execute {query_name} query", e) - logger.debug( - f"{self.ctx}: Retrieved {len(rows)} rows in {(end_time - start_time).total_seconds()} seconds" - ) - self.report.report_query_latency( - f"{self.ctx}:{query_name}", (end_time - start_time).total_seconds() - ) - if self.post_filter: - rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model] - logger.debug(f"Filtered down to {len(rows)} rows") return rows def _append_filters(self, query: LookerQuery) -> LookerQuery: