feat(ingest): use urn builders in looker and validate data platforms (#2939)

This commit is contained in:
Harshal Sheth 2021-07-22 21:50:44 -07:00 committed by GitHub
parent d39030dab2
commit 01982310be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 18 deletions

View File

@ -705,12 +705,12 @@ source:
base_folder: /path/to/model/files # where the *.model.lkml and *.view.lkml files are stored
connection_to_platform_map: # mappings between connection names in the model files to platform names
connection_name: platform_name (or platform_name.database_name) # for ex. my_snowflake_conn: snowflake.my_database
platform_name: "looker" # optional, default is "looker"
actor: "urn:li:corpuser:etl" # optional, default is "urn:li:corpuser:etl"
model_pattern: {}
view_pattern: {}
env: "PROD" # optional, default is "PROD"
parse_table_names_from_sql: False # see note below
platform_name: "looker" # optional, default is "looker"
```
Note! The integration can use [`sql-metadata`](https://pypi.org/project/sql-metadata/) to try to parse the tables the
@ -731,14 +731,14 @@ See the [Looker authentication docs](https://docs.looker.com/reference/api-and-i
source:
type: "looker"
config:
client_id: str # Your Looker API3 client ID
client_secret: str # Your Looker API3 client secret
base_url: str # The url to your Looker instance: https://company.looker.com:19999 or https://looker.company.com, or similar.
dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
actor: str = "urn:li:corpuser:etl" # Optional, "urn:li:corpuser:etl"
env: str = "PROD" # Optional, default is "PROD"
platform_name: str = "looker" # Optional, default is "looker"
client_id: # Your Looker API3 client ID
client_secret: # Your Looker API3 client secret
base_url: # The url to your Looker instance: https://company.looker.com:19999 or https://looker.company.com, or similar.
dashboard_pattern: # supports allow/deny regexes
chart_pattern: # supports allow/deny regexes
actor: urn:li:corpuser:etl # Optional, defaults to urn:li:corpuser:etl
env: "PROD" # Optional, default is "PROD"
platform_name: "looker" # Optional, default is "looker"
```
### File `file`

View File

@ -1,4 +1,5 @@
"""Convenience functions for creating MCEs"""
import logging
import time
from typing import List, Optional, Type, TypeVar, get_type_hints
@ -18,12 +19,23 @@ DEFAULT_FLOW_CLUSTER = "prod"
UNKNOWN_USER = "urn:li:corpuser:unknown"
logger = logging.getLogger(__name__)
def get_sys_time() -> int:
# TODO deprecate this
return int(time.time() * 1000)
def _check_data_platform_name(platform_name: str) -> None:
if not platform_name.isalpha():
logger.warning(f"improperly formatted data platform: {platform_name}")
def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
_check_data_platform_name(platform)
return f"urn:li:dataPlatform:{platform}"
@ -57,6 +69,18 @@ def make_data_job_urn(
)
def make_dashboard_urn(platform: str, name: str) -> str:
# FIXME: dashboards don't currently include data platform urn prefixes.
_check_data_platform_name(platform)
return f"urn:li:dashboard:({platform},{name})"
def make_chart_urn(platform: str, name: str) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
_check_data_platform_name(platform)
return f"urn:li:chart:({platform},{name})"
def make_ml_primary_key_urn(feature_table_name: str, primary_key_name: str) -> str:
return f"urn:li:mlPrimaryKey:({feature_table_name},{primary_key_name})"

View File

@ -15,9 +15,9 @@ from looker_sdk.sdk.api31.models import (
Query,
)
import datahub.emitter.mce_builder as builder
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -47,7 +47,7 @@ class LookerDashboardSourceConfig(ConfigModel):
actor: str = "urn:li:corpuser:etl"
dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
env: str = DEFAULT_ENV
env: str = builder.DEFAULT_ENV
@dataclass
@ -91,10 +91,9 @@ class LookerDashboardElement:
# A dashboard element can use a look or just a raw query against an explore
return f"dashboard_elements.{self.id}"
def get_view_urns(self, platform_name: str) -> List[str]:
def get_view_urns(self, platform_name: str, env: str) -> List[str]:
return [
f"urn:li:dataset:(urn:li:dataPlatform:{platform_name},{v},PROD)"
for v in self.looker_views
builder.make_dataset_urn(platform_name, v, env) for v in self.looker_views
]
@ -298,7 +297,9 @@ class LookerDashboardSource(Source):
def _make_chart_mce(
self, dashboard_element: LookerDashboardElement
) -> MetadataChangeEvent:
chart_urn = f"urn:li:chart:({self.source_config.platform_name},{dashboard_element.get_urn_element_id()})"
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
)
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[],
@ -312,7 +313,9 @@ class LookerDashboardSource(Source):
title=dashboard_element.title or "",
lastModified=ChangeAuditStamps(),
chartUrl=dashboard_element.url(self.source_config.base_url),
inputs=dashboard_element.get_view_urns(self.source_config.platform_name),
inputs=dashboard_element.get_view_urns(
self.source_config.platform_name, self.source_config.env
),
)
chart_snapshot.aspects.append(chart_info)
@ -328,7 +331,9 @@ class LookerDashboardSource(Source):
for element in looker_dashboard.dashboard_elements
]
dashboard_urn = f"urn:li:dashboard:({self.source_config.platform_name},{looker_dashboard.get_urn_dashboard_id()})"
dashboard_urn = builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],

View File

@ -84,7 +84,7 @@ class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
def get_identifier(self, schema: str, table: str) -> str:
regular = super().get_identifier(schema, table)
return f"{self.database}.{regular}"
return f"{self.database.lower()}.{regular}"
class SnowflakeSource(SQLAlchemySource):