feat(ingest/looker): add browse paths for charts (#9639)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
Harshal Sheth 2024-01-23 08:28:08 -08:00 committed by GitHub
parent 59674b5457
commit 35c4df1e9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 4 deletions

View File

@ -1,6 +1,6 @@
import dataclasses import dataclasses
import json import json
from typing import TYPE_CHECKING, List, Optional, Tuple, Union from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
@ -100,7 +100,7 @@ class MetadataChangeProposalWrapper:
@classmethod @classmethod
def construct_many( def construct_many(
cls, entityUrn: str, aspects: List[Optional[_Aspect]] cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]]
) -> List["MetadataChangeProposalWrapper"]: ) -> List["MetadataChangeProposalWrapper"]:
return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect] return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect]

View File

@ -608,8 +608,14 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
else "" else ""
}, },
) )
chart_snapshot.aspects.append(chart_info) chart_snapshot.aspects.append(chart_info)
if dashboard and dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{dashboard.folder_path}/{dashboard.title}"]
)
chart_snapshot.aspects.append(browse_path)
if dashboard is not None: if dashboard is not None:
ownership = self.get_ownership(dashboard) ownership = self.get_ownership(dashboard)
if ownership is not None: if ownership is not None:

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import collections import collections
import concurrent.futures import concurrent.futures
import logging
import time import time
from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from threading import BoundedSemaphore from threading import BoundedSemaphore
@ -20,6 +21,7 @@ from typing import (
from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.closeable import Closeable
logger = logging.getLogger(__name__)
_R = TypeVar("_R") _R = TypeVar("_R")
_PARTITION_EXECUTOR_FLUSH_SLEEP_INTERVAL = 0.05 _PARTITION_EXECUTOR_FLUSH_SLEEP_INTERVAL = 0.05
@ -102,7 +104,19 @@ class PartitionExecutor(Closeable):
fn, args, kwargs, user_done_callback = self._pending_by_key[ fn, args, kwargs, user_done_callback = self._pending_by_key[
key key
].popleft() ].popleft()
try:
self._submit_nowait(key, fn, args, kwargs, user_done_callback) self._submit_nowait(key, fn, args, kwargs, user_done_callback)
except RuntimeError as e:
if self._executor._shutdown:
# If we're in shutdown mode, then we can't submit any more requests.
# That means we'll need to drop requests on the floor, which is to
# be expected in shutdown mode.
# The only reason we'd normally be in shutdown here is during
# Python exit (e.g. KeyboardInterrupt), so this is reasonable.
logger.debug("Dropping request due to shutdown")
else:
raise e
else: else:
# If there are no pending requests for this key, mark the key # If there are no pending requests for this key, mark the key