feat(ingest): looker - reduce memory requirements (#5815)

This commit is contained in:
Shirshanka Das 2022-09-02 01:45:51 -07:00 committed by GitHub
parent c91e29b052
commit ae4fb7c2e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 75 deletions

View File

@ -785,6 +785,38 @@ class LookerExplore:
return [mce, mcp]
class LookerExploreRegistry:
"""A caching registry of Looker Explores"""
def __init__(
self,
client: Looker31SDK,
report: SourceReport,
transport_options: Optional[TransportOptions],
):
self.client = client
self.report = report
self.transport_options = transport_options
self.explore_cache: Dict[Tuple[str, str], Optional[LookerExplore]] = {}
def get_explore(self, model: str, explore: str) -> Optional[LookerExplore]:
if (model, explore) not in self.explore_cache:
looker_explore = LookerExplore.from_api(
model,
explore,
self.client,
self.report,
transport_options=self.transport_options,
)
self.explore_cache[(model, explore)] = looker_explore
return self.explore_cache[(model, explore)]
def get_all_explores(self) -> Iterable[LookerExplore]:
for key, value in self.explore_cache.items():
if value is not None:
yield value
class StageLatency(Report):
name: str
start_time: Optional[datetime.datetime]

View File

@ -46,6 +46,7 @@ from datahub.ingestion.source.looker.looker_common import (
LookerCommonConfig,
LookerDashboardSourceReport,
LookerExplore,
LookerExploreRegistry,
LookerUtil,
ViewField,
ViewFieldType,
@ -163,8 +164,6 @@ class LookerDashboardSource(Source):
client: Looker31SDK
user_registry: LookerUserRegistry
explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {}
resolved_explores_map: Dict[Tuple[str, str], LookerExplore] = {}
resolved_dashboards_map: Dict[str, LookerDashboard] = {}
accessed_dashboards: int = 0
resolved_user_ids: int = 0
email_ids_missing: int = 0 # resolved users with missing email addresses
@ -176,6 +175,13 @@ class LookerDashboardSource(Source):
looker_api: LookerAPI = LookerAPI(self.source_config)
self.client = looker_api.get_client()
self.user_registry = LookerUserRegistry(looker_api)
self.explore_registry = LookerExploreRegistry(
self.client,
self.reporter,
self.source_config.transport_options.get_transport_options()
if self.source_config.transport_options
else None,
)
# Keep stat generators to generate entity stat aspect later
stat_generator_config: looker_usage.StatGeneratorConfig = (
looker_usage.StatGeneratorConfig(
@ -353,7 +359,6 @@ class LookerDashboardSource(Source):
explore=exp,
via=f"look:{element.look_id}:query:{element.dashboard_id}",
)
# self.explores_to_fetch_set.add((element.query.model, exp))
return LookerDashboardElement(
id=element.id,
@ -436,9 +441,6 @@ class LookerDashboardSource(Source):
explore=exp,
via=f"Look:{element.look_id}:resultmaker:query",
)
# self.explores_to_fetch_set.add(
# (element.result_maker.query.model, exp)
# )
# In addition to the query, filters can point to fields as well
assert element.result_maker.filterables is not None
@ -451,7 +453,6 @@ class LookerDashboardSource(Source):
explore=filterable.view,
via=f"Look:{element.look_id}:resultmaker:filterable",
)
# self.explores_to_fetch_set.add((filterable.model, filterable.view))
listen = filterable.listen
query = element.result_maker.query
if listen is not None:
@ -574,9 +575,7 @@ class LookerDashboardSource(Source):
max_workers=self.source_config.max_threads
) as async_executor:
explore_futures = [
async_executor.submit(
self.fetch_one_explore, model, explore, self.resolved_explores_map
)
async_executor.submit(self.fetch_one_explore, model, explore)
for (model, explore) in self.explores_to_fetch_set
]
for future in concurrent.futures.as_completed(explore_futures):
@ -590,10 +589,7 @@ class LookerDashboardSource(Source):
return explore_events
def fetch_one_explore(
self,
model: str,
explore: str,
resolved_explores_map: Dict[Tuple[str, str], LookerExplore],
self, model: str, explore: str
) -> Tuple[
List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]],
str,
@ -602,17 +598,8 @@ class LookerDashboardSource(Source):
]:
start_time = datetime.datetime.now()
events: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = []
looker_explore = LookerExplore.from_api(
model,
explore,
self.client,
self.reporter,
transport_options=self.source_config.transport_options.get_transport_options()
if self.source_config.transport_options is not None
else None,
)
looker_explore = self.explore_registry.get_explore(model, explore)
if looker_explore is not None:
resolved_explores_map[(model, explore)] = looker_explore
events = (
looker_explore._to_metadata_events(
self.source_config, self.reporter, self.source_config.base_url
@ -811,13 +798,8 @@ class LookerDashboardSource(Source):
return user
def process_metrics_dimensions_and_fields_for_dashboard(
self, dashboard_id: str
) -> Tuple[List[MetadataWorkUnit], str, datetime.datetime, datetime.datetime]:
start_time = datetime.datetime.now()
dashboard = self.resolved_dashboards_map.get(dashboard_id)
if dashboard is None:
return [], dashboard_id, start_time, datetime.datetime.now()
self, dashboard: LookerDashboard
) -> List[MetadataWorkUnit]:
chart_mcps = [
self._make_metrics_dimensions_chart_mcp(element, dashboard)
@ -837,7 +819,7 @@ class LookerDashboardSource(Source):
for mcp in mcps
]
return workunits, dashboard_id, start_time, datetime.datetime.now()
return workunits
def _input_fields_from_dashboard_element(
self, dashboard_element: LookerDashboardElement
@ -858,10 +840,14 @@ class LookerDashboardSource(Source):
view_field_for_reference = input_field.view_field
if input_field.view_field is None:
explore = self.resolved_explores_map.get(
(input_field.model, input_field.explore)
explore = self.explore_registry.get_explore(
input_field.model, input_field.explore
)
if explore is not None:
# add this to the list of explores to finally generate metadata for
self.add_explore_to_fetch(
input_field.model, input_field.explore, entity_urn
)
entity_urn = explore.get_explore_urn(self.source_config)
explore_fields = (
explore.fields if explore.fields is not None else []
@ -977,9 +963,7 @@ class LookerDashboardSource(Source):
return [], None, dashboard_id, start_time, datetime.datetime.now()
looker_dashboard = self._get_looker_dashboard(dashboard_object, self.client)
self.resolved_dashboards_map[looker_dashboard.id] = looker_dashboard
mces = self._make_dashboard_and_chart_mces(looker_dashboard)
# for mce in mces:
workunits = [
MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce)
if isinstance(mce, MetadataChangeEvent)
@ -988,6 +972,13 @@ class LookerDashboardSource(Source):
)
for mce in mces
]
# add on metrics, dimensions, fields events
metric_dim_workunits = self.process_metrics_dimensions_and_fields_for_dashboard(
looker_dashboard
)
workunits.extend(metric_dim_workunits)
return (
workunits,
dashboard_object,
@ -1183,30 +1174,5 @@ class LookerDashboardSource(Source):
yield workunit
self.reporter.report_stage_end("usage_extraction")
# after fetching explores, we need to go back and enrich each chart and dashboard with
# metadata about the fields
self.reporter.report_stage_start("field_metadata")
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.source_config.max_threads
) as async_executor:
async_workunits = [
async_executor.submit(
self.process_metrics_dimensions_and_fields_for_dashboard, # type: ignore
dashboard_id,
)
for dashboard_id in dashboard_ids
if dashboard_id is not None
]
for async_workunit in concurrent.futures.as_completed(async_workunits):
work_units, dashboard_id, start_time, end_time = async_workunit.result() # type: ignore
logger.debug(
f"Running time of process_metrics_dimensions_and_fields_for_dashboard for {dashboard_id} = {(end_time - start_time).total_seconds()}"
)
self.reporter.report_upstream_latency(start_time, end_time)
for mwu in work_units:
yield mwu
self.reporter.report_workunit(mwu)
self.reporter.report_stage_end("field_metadata")
def get_report(self) -> SourceReport:
return self.reporter

View File

@ -261,21 +261,26 @@ class BaseStatGenerator(ABC):
yield self.id_vs_model[id], aspect
def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]:
start_time = datetime.datetime.now()
rows = self.config.looker_api_wrapper.execute_query(
write_query=query.to_write_query()
)
end_time = datetime.datetime.now()
rows = []
try:
start_time = datetime.datetime.now()
rows = self.config.looker_api_wrapper.execute_query(
write_query=query.to_write_query()
)
end_time = datetime.datetime.now()
logger.debug(
f"{self.ctx}: Retrieved {len(rows)} rows in {(end_time - start_time).total_seconds()} seconds"
)
self.report.report_query_latency(
f"{self.ctx}:{query_name}", (end_time - start_time).total_seconds()
)
if self.post_filter:
rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model]
logger.debug(f"Filtered down to {len(rows)} rows")
except Exception as e:
logger.warning(f"Failed to execute {query_name} query", e)
logger.debug(
f"{self.ctx}: Retrieved {len(rows)} rows in {(end_time - start_time).total_seconds()} seconds"
)
self.report.report_query_latency(
f"{self.ctx}:{query_name}", (end_time - start_time).total_seconds()
)
if self.post_filter:
rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model]
logger.debug(f"Filtered down to {len(rows)} rows")
return rows
def _append_filters(self, query: LookerQuery) -> LookerQuery: