mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-30 21:10:07 +00:00
feat(ingest/looker): support emitting unused explores (#9159)
This commit is contained in:
parent
ec9725026d
commit
148ad1ad9f
@ -388,7 +388,7 @@ class LookerUtil:
|
||||
|
||||
# if still not found, log and continue
|
||||
if type_class is None:
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"The type '{native_type}' is not recognized for field type, setting as NullTypeClass.",
|
||||
)
|
||||
type_class = NullTypeClass
|
||||
|
@ -205,6 +205,10 @@ class LookerDashboardSourceConfig(
|
||||
False,
|
||||
description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion should also be enabled.",
|
||||
)
|
||||
emit_used_explores_only: bool = Field(
|
||||
True,
|
||||
description="When enabled, only explores that are used by a Dashboard/Look will be ingested.",
|
||||
)
|
||||
|
||||
@validator("external_base_url", pre=True, always=True)
|
||||
def external_url_defaults_to_api_config_base_url(
|
||||
|
@ -59,6 +59,7 @@ class LookerAPIStats(BaseModel):
|
||||
lookml_model_calls: int = 0
|
||||
all_dashboards_calls: int = 0
|
||||
all_looks_calls: int = 0
|
||||
all_models_calls: int = 0
|
||||
get_query_calls: int = 0
|
||||
search_looks_calls: int = 0
|
||||
search_dashboards_calls: int = 0
|
||||
@ -155,6 +156,12 @@ class LookerAPI:
|
||||
transport_options=self.transport_options,
|
||||
)
|
||||
|
||||
def all_lookml_models(self) -> Sequence[LookmlModel]:
|
||||
self.client_stats.all_models_calls += 1
|
||||
return self.client.all_lookml_models(
|
||||
transport_options=self.transport_options,
|
||||
)
|
||||
|
||||
def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore:
|
||||
self.client_stats.explore_calls += 1
|
||||
return self.client.lookml_model_explore(
|
||||
|
@ -147,9 +147,12 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
)
|
||||
self.reporter._looker_explore_registry = self.explore_registry
|
||||
self.reporter._looker_api = self.looker_api
|
||||
|
||||
self.reachable_look_registry = set()
|
||||
|
||||
self.explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {}
|
||||
# (model, explore) -> list of charts/looks/dashboards that reference this explore
|
||||
# The list values are used purely for debugging purposes.
|
||||
self.reachable_explores: Dict[Tuple[str, str], List[str]] = {}
|
||||
|
||||
# Keep stat generators to generate entity stat aspect later
|
||||
stat_generator_config: looker_usage.StatGeneratorConfig = (
|
||||
@ -378,11 +381,11 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
|
||||
return result
|
||||
|
||||
def add_explore_to_fetch(self, model: str, explore: str, via: str) -> None:
|
||||
if (model, explore) not in self.explores_to_fetch_set:
|
||||
self.explores_to_fetch_set[(model, explore)] = []
|
||||
def add_reachable_explore(self, model: str, explore: str, via: str) -> None:
|
||||
if (model, explore) not in self.reachable_explores:
|
||||
self.reachable_explores[(model, explore)] = []
|
||||
|
||||
self.explores_to_fetch_set[(model, explore)].append(via)
|
||||
self.reachable_explores[(model, explore)].append(via)
|
||||
|
||||
def _get_looker_dashboard_element( # noqa: C901
|
||||
self, element: DashboardElement
|
||||
@ -403,7 +406,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
f"Element {element.title}: Explores added via query: {explores}"
|
||||
)
|
||||
for exp in explores:
|
||||
self.add_explore_to_fetch(
|
||||
self.add_reachable_explore(
|
||||
model=element.query.model,
|
||||
explore=exp,
|
||||
via=f"look:{element.look_id}:query:{element.dashboard_id}",
|
||||
@ -439,7 +442,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
explores = [element.look.query.view]
|
||||
logger.debug(f"Element {title}: Explores added via look: {explores}")
|
||||
for exp in explores:
|
||||
self.add_explore_to_fetch(
|
||||
self.add_reachable_explore(
|
||||
model=element.look.query.model,
|
||||
explore=exp,
|
||||
via=f"Look:{element.look_id}:query:{element.dashboard_id}",
|
||||
@ -483,7 +486,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
)
|
||||
|
||||
for exp in explores:
|
||||
self.add_explore_to_fetch(
|
||||
self.add_reachable_explore(
|
||||
model=element.result_maker.query.model,
|
||||
explore=exp,
|
||||
via=f"Look:{element.look_id}:resultmaker:query",
|
||||
@ -495,7 +498,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
if filterable.view is not None and filterable.model is not None:
|
||||
model = filterable.model
|
||||
explores.append(filterable.view)
|
||||
self.add_explore_to_fetch(
|
||||
self.add_reachable_explore(
|
||||
model=filterable.model,
|
||||
explore=filterable.view,
|
||||
via=f"Look:{element.look_id}:resultmaker:filterable",
|
||||
@ -694,20 +697,26 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
def _make_explore_metadata_events(
|
||||
self,
|
||||
) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
|
||||
if self.source_config.emit_used_explores_only:
|
||||
explores_to_fetch = list(self.reachable_explores.keys())
|
||||
else:
|
||||
explores_to_fetch = list(self.list_all_explores())
|
||||
explores_to_fetch.sort()
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=self.source_config.max_threads
|
||||
) as async_executor:
|
||||
self.reporter.total_explores = len(self.explores_to_fetch_set)
|
||||
self.reporter.total_explores = len(explores_to_fetch)
|
||||
|
||||
explore_futures = {
|
||||
async_executor.submit(self.fetch_one_explore, model, explore): (
|
||||
model,
|
||||
explore,
|
||||
)
|
||||
for (model, explore) in self.explores_to_fetch_set
|
||||
for (model, explore) in explores_to_fetch
|
||||
}
|
||||
|
||||
for future in concurrent.futures.as_completed(explore_futures):
|
||||
for future in concurrent.futures.wait(explore_futures).done:
|
||||
events, explore_id, start_time, end_time = future.result()
|
||||
del explore_futures[future]
|
||||
self.reporter.explores_scanned += 1
|
||||
@ -717,6 +726,17 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
f"Running time of fetch_one_explore for {explore_id}: {(end_time - start_time).total_seconds()}"
|
||||
)
|
||||
|
||||
def list_all_explores(self) -> Iterable[Tuple[str, str]]:
|
||||
# returns a list of (model, explore) tuples
|
||||
|
||||
for model in self.looker_api.all_lookml_models():
|
||||
if model.name is None or model.explores is None:
|
||||
continue
|
||||
for explore in model.explores:
|
||||
if explore.name is None:
|
||||
continue
|
||||
yield (model.name, explore.name)
|
||||
|
||||
def fetch_one_explore(
|
||||
self, model: str, explore: str
|
||||
) -> Tuple[
|
||||
@ -954,7 +974,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
|
||||
)
|
||||
if explore is not None:
|
||||
# add this to the list of explores to finally generate metadata for
|
||||
self.add_explore_to_fetch(
|
||||
self.add_reachable_explore(
|
||||
input_field.model, input_field.explore, entity_urn
|
||||
)
|
||||
entity_urn = explore.get_explore_urn(self.source_config)
|
||||
|
Loading…
x
Reference in New Issue
Block a user