feat(ingest): Ingest Previews for Looker Charts, Dashboards, and Explores (#6941)

This commit is contained in:
John Joyce 2023-01-13 10:25:48 -08:00 committed by GitHub
parent d12bac6dd8
commit b8d8d198c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 383 additions and 59 deletions

View File

@ -21,6 +21,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProper
from datahub.metadata.schema_classes import (
ContainerClass,
DomainsClass,
EmbedClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
@ -309,3 +310,10 @@ def mcps_from_mce(
aspect=aspect,
systemMetadata=mce.systemMetadata,
)
def create_embed_mcp(urn: str, embed_url: str) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=EmbedClass(renderUrl=embed_url),
)

View File

@ -30,6 +30,7 @@ from datahub.configuration import ConfigModel
from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import create_embed_mcp
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
@ -82,6 +83,7 @@ from datahub.metadata.schema_classes import (
TagSnapshotClass,
)
from datahub.utilities.lossy_collections import LossyList, LossySet
from datahub.utilities.url_util import remove_port_from_url
if TYPE_CHECKING:
from datahub.ingestion.source.looker.lookml_source import (
@ -163,12 +165,10 @@ class LookerCommonConfig(DatasetSourceConfigBase):
description=f"Pattern for providing dataset names to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="{model}.explore.{name}"),
)
explore_browse_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing browse paths to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="/{env}/{platform}/{project}/explores"),
)
view_naming_pattern: LookerNamingPattern = Field(
LookerNamingPattern(pattern="{project}.view.{name}"),
description=f"Pattern for providing dataset names to views. {LookerNamingPattern.allowed_docstring()}",
@ -177,7 +177,6 @@ class LookerCommonConfig(DatasetSourceConfigBase):
LookerNamingPattern(pattern="/{env}/{platform}/{project}/views"),
description=f"Pattern for providing browse paths to views. {LookerNamingPattern.allowed_docstring()}",
)
tag_measures_and_dimensions: bool = Field(
True,
description="When enabled, attaches tags to measures, dimensions and dimension groups to make them more discoverable. When disabled, adds this information to the description of the column.",
@ -756,14 +755,19 @@ class LookerExplore:
return browse_path
def _get_url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
base_url = remove_port_from_url(base_url)
return f"{base_url}/explore/{self.model_name}/{self.name}"
def _get_embed_url(self, base_url: str) -> str:
base_url = remove_port_from_url(base_url)
return f"{base_url}/embed/explore/{self.model_name}/{self.name}"
def _to_metadata_events( # noqa: C901
self, config: LookerCommonConfig, reporter: SourceReport, base_url: str
self,
config: LookerCommonConfig,
reporter: SourceReport,
base_url: str,
extract_embed_urls: bool,
) -> Optional[List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]]:
# We only generate MCE-s for explores that contain from clauses and do NOT contain joins
# All other explores (passthrough explores and joins) end in correct resolution of lineage, and don't need additional nodes in the graph.
@ -862,7 +866,19 @@ class LookerExplore:
aspect=SubTypesClass(typeNames=["explore"]),
)
return [mce, mcp]
proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
mce,
mcp,
]
# If extracting embeds is enabled, produce an MCP for embed URL.
if extract_embed_urls:
embed_mcp = create_embed_mcp(
dataset_snapshot.urn, self._get_embed_url(base_url)
)
proposals.append(embed_mcp)
return proposals
class LookerExploreRegistry:
@ -1048,15 +1064,21 @@ class LookerDashboardElement:
def url(self, base_url: str) -> str:
# A dashboard element can use a look or just a raw query against an explore
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
base_url = remove_port_from_url(base_url)
if self.look_id is not None:
return f"{base_url}/looks/{self.look_id}"
else:
return f"{base_url}/x/{self.query_slug}"
def embed_url(self, base_url: str) -> Optional[str]:
# A dashboard element can use a look or just a raw query against an explore
base_url = remove_port_from_url(base_url)
if self.look_id is not None:
return f"{base_url}/embed/looks/{self.look_id}"
else:
# No embeddable URL
return None
def get_urn_element_id(self):
# A dashboard element can use a look or just a raw query against an explore
return f"dashboard_elements.{self.id}"
@ -1095,12 +1117,13 @@ class LookerDashboard:
last_viewed_at: Optional[datetime.datetime] = None
def url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
base_url = remove_port_from_url(base_url)
return f"{base_url}/dashboards/{self.id}"
def embed_url(self, base_url: str) -> str:
base_url = remove_port_from_url(base_url)
return f"{base_url}/embed/dashboards/{self.id}"
def get_urn_dashboard_id(self):
return get_urn_looker_dashboard_id(self.id)

View File

@ -26,6 +26,7 @@ import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import create_embed_mcp
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
@ -149,7 +150,10 @@ class LookerDashboardSourceConfig(
"30 days",
description="Used only if extract_usage_history is set to True. Interval to extract looker dashboard usage history for. See https://docs.looker.com/reference/filter-expressions#date_and_time.",
)
extract_embed_urls: bool = Field(
True,
description="Produce URLs used to render Looker Explores as Previews inside of DataHub UI. Embeds must be enabled inside of Looker to use this feature.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)
@ -650,9 +654,9 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
return chart_type
def _make_chart_mce(
def _make_chart_metadata_events(
self, dashboard_element: LookerDashboardElement, dashboard: LookerDashboard
) -> MetadataChangeEvent:
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
)
@ -684,7 +688,81 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
if ownership is not None:
chart_snapshot.aspects.append(ownership)
return MetadataChangeEvent(proposedSnapshot=chart_snapshot)
chart_mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot)
proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
chart_mce
]
# If extracting embeds is enabled, produce an MCP for embed URL.
if (
self.source_config.extract_embed_urls
and self.source_config.external_base_url
):
maybe_embed_url = dashboard_element.embed_url(
self.source_config.external_base_url
)
if maybe_embed_url:
proposals.append(
create_embed_mcp(
chart_snapshot.urn,
maybe_embed_url,
)
)
return proposals
def _make_dashboard_metadata_events(
self, looker_dashboard: LookerDashboard, chart_urns: List[str]
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
dashboard_urn = builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
)
dashboard_info = DashboardInfoClass(
description=looker_dashboard.description or "",
title=looker_dashboard.title,
charts=chart_urns,
lastModified=self._get_change_audit_stamps(looker_dashboard),
dashboardUrl=looker_dashboard.url(self.source_config.external_base_url),
)
dashboard_snapshot.aspects.append(dashboard_info)
if looker_dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{looker_dashboard.folder_path}"]
)
dashboard_snapshot.aspects.append(browse_path)
ownership = self.get_ownership(looker_dashboard)
if ownership is not None:
dashboard_snapshot.aspects.append(ownership)
dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted))
dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
dashboard_mce
]
# If extracting embeds is enabled, produce an MCP for embed URL.
if (
self.source_config.extract_embed_urls
and self.source_config.external_base_url
):
proposals.append(
create_embed_mcp(
dashboard_snapshot.urn,
looker_dashboard.embed_url(self.source_config.external_base_url),
)
)
return proposals
def _make_explore_metadata_events(
self,
@ -726,55 +804,52 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
if looker_explore is not None:
events = (
looker_explore._to_metadata_events(
self.source_config, self.reporter, self.source_config.base_url
self.source_config,
self.reporter,
self.source_config.base_url,
self.source_config.extract_embed_urls,
)
or events
)
return events, f"{model}:{explore}", start_time, datetime.datetime.now()
def _extract_event_urn(
self, event: Union[MetadataChangeEvent, MetadataChangeProposalWrapper]
) -> Optional[str]:
if isinstance(event, MetadataChangeEvent):
return event.proposedSnapshot.urn
else:
return event.entityUrn
def _make_dashboard_and_chart_mces(
self, looker_dashboard: LookerDashboard
) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_mces = [
self._make_chart_mce(element, looker_dashboard)
for element in looker_dashboard.dashboard_elements
if element.type == "vis"
]
for chart_mce in chart_mces:
yield chart_mce
dashboard_urn = builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
# Step 1: Emit metadata for each Chart inside the Dashboard.
chart_events = []
for element in looker_dashboard.dashboard_elements:
if element.type == "vis":
chart_events.extend(
self._make_chart_metadata_events(element, looker_dashboard)
)
yield from chart_events
# Step 2: Emit metadata events for the Dashboard itself.
chart_urns: Set[
str
] = set() # Collect the unique child chart urns for dashboard input lineage.
for chart_event in chart_events:
chart_event_urn = self._extract_event_urn(chart_event)
if chart_event_urn:
chart_urns.add(chart_event_urn)
dashboard_events = self._make_dashboard_metadata_events(
looker_dashboard, list(chart_urns)
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
)
dashboard_info = DashboardInfoClass(
description=looker_dashboard.description or "",
title=looker_dashboard.title,
charts=[mce.proposedSnapshot.urn for mce in chart_mces],
lastModified=self._get_change_audit_stamps(looker_dashboard),
dashboardUrl=looker_dashboard.url(self.source_config.external_base_url),
)
dashboard_snapshot.aspects.append(dashboard_info)
if looker_dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{looker_dashboard.folder_path}"]
)
dashboard_snapshot.aspects.append(browse_path)
ownership = self.get_ownership(looker_dashboard)
if ownership is not None:
dashboard_snapshot.aspects.append(ownership)
dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted))
dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
yield dashboard_mce
for dashboard_event in dashboard_events:
yield dashboard_event
def get_ownership(
self, looker_dashboard: LookerDashboard

View File

@ -0,0 +1,8 @@
import re
def remove_port_from_url(base_url: str) -> str:
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
return base_url

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.11)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/11\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -189,6 +203,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -299,6 +327,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,bogus data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/bogus data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.11)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/11\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -175,6 +189,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.1)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/1\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -175,6 +189,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.1)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/1\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -199,6 +213,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.1)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/1\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -191,6 +205,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.1)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/1\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -175,6 +189,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {

View File

@ -37,6 +37,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.1)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/dashboards/1\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
@ -175,6 +189,20 @@
"runId": "looker-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "embed",
"aspect": {
"value": "{\"renderUrl\": \"https://looker.company.com/embed/explore/data/my_view\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {