mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 02:17:53 +00:00
feat(ingest): Looker view and dashboard ingestion (#2493)
This commit is contained in:
parent
89535d7dea
commit
133577557c
@ -31,29 +31,31 @@ If you run into an error, try checking the [_common setup issues_](./developing.
|
||||
|
||||
We use a plugin architecture so that you can install only the dependencies you actually need.
|
||||
|
||||
| Plugin Name | Install Command | Provides |
|
||||
| ------------- | ---------------------------------------------------------- | -------------------------- |
|
||||
| file | _included by default_ | File source and sink |
|
||||
| console | _included by default_ | Console sink |
|
||||
| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
|
||||
| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
|
||||
| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source |
|
||||
| hive | `pip install 'acryl-datahub[hive]'` | Hive source |
|
||||
| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source |
|
||||
| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source |
|
||||
| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source |
|
||||
| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source |
|
||||
| Plugin Name | Install Command | Provides |
|
||||
| ------------- | ---------------------------------------------------------- | ----------------------------------- |
|
||||
| file | _included by default_ | File source and sink |
|
||||
| console | _included by default_ | Console sink |
|
||||
| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
|
||||
| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
|
||||
| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source |
|
||||
| hive | `pip install 'acryl-datahub[hive]'` | Hive source |
|
||||
| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source |
|
||||
| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source |
|
||||
| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source |
|
||||
| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source |
|
||||
| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source |
|
||||
| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source |
|
||||
| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source |
|
||||
| superset | `pip install 'acryl-datahub[superset]'` | Supserset source |
|
||||
| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source |
|
||||
| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source |
|
||||
| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source |
|
||||
| druid | `pip install 'acryl-datahub[druid]'` | Druid Source |
|
||||
| dbt | _no additional dependencies_ | DBT source |
|
||||
| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API |
|
||||
| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka |
|
||||
| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source |
|
||||
| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source |
|
||||
| superset | `pip install 'acryl-datahub[superset]'` | Supserset source |
|
||||
| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source |
|
||||
| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source |
|
||||
| looker | `pip install 'acryl-datahub[looker]'` | Looker source |
|
||||
| lookml | `pip install 'acryl-datahub[lookml]'` | LookML source, requires Python 3.7+ |
|
||||
| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source |
|
||||
| druid | `pip install 'acryl-datahub[druid]'` | Druid Source |
|
||||
| dbt | _no additional dependencies_ | DBT source |
|
||||
| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API |
|
||||
| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka |
|
||||
|
||||
These plugins can be mixed and matched as desired. For example:
|
||||
|
||||
@ -514,6 +516,57 @@ source:
|
||||
filter: "(objectClass=*)" # optional field
|
||||
```
|
||||
|
||||
### LookML `lookml`
|
||||
|
||||
Note! This plugin uses a package that requires Python 3.7+!
|
||||
|
||||
Extracts:
|
||||
|
||||
- LookML views from model files
|
||||
- Name, upstream table names, dimensions, measures, and dimension groups
|
||||
|
||||
```yml
|
||||
source:
|
||||
type: "lookml"
|
||||
config:
|
||||
base_folder: /path/to/model/files # Where the *.model.lkml and *.view.lkml files are stored.
|
||||
connection_to_platform_map: # mapping between connection names in the model files to platform names.
|
||||
my_snowflake_conn: snowflake
|
||||
platform_name: looker_views # Optional, default is "looker_views"
|
||||
actor: "urn:li:corpuser:etl" # Optional, "urn:li:corpuser:etl"
|
||||
model_pattern: {}
|
||||
view_pattern: {}
|
||||
env: "PROD" # Optional, default is "PROD"
|
||||
parse_table_names_from_sql: False # See note below.
|
||||
```
|
||||
|
||||
Note! The integration can use [`sql-metadata`](https://pypi.org/project/sql-metadata/) to try to parse the tables the
|
||||
views depends on. As these SQL's can be complicated, and the package doesn't official support all the SQL dialects that
|
||||
Looker support, the result might not be correct. This parsing is disables by default, but can be enabled by setting
|
||||
`parse_table_names_from_sql: True`.
|
||||
|
||||
### Looker dashboards `looker`
|
||||
|
||||
Extracts:
|
||||
|
||||
- Looker dashboards and dashboard elements (charts)
|
||||
- Names, descriptions, URLs, chart types, input view for the charts
|
||||
|
||||
```yml
|
||||
source:
|
||||
type: "looker"
|
||||
config:
|
||||
client_id: str # Your Looker API client ID. As your Looker admin
|
||||
client_secret: str # Your Looker API client secret. As your Looker admin
|
||||
base_url: str # The url to your Looker instance: https://company.looker.com:19999 or https://looker.company.com, or similar.
|
||||
platform_name: str = "looker" # Optional, default is "looker"
|
||||
view_platform_name: str = "looker_views" # Optional, default is "looker_views". Should be the same `platform_name` in the `lookml` source, if that source is also run.
|
||||
actor: str = "urn:li:corpuser:etl" # Optional, "urn:li:corpuser:etl"
|
||||
dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
env: str = "PROD" # Optional, default is "PROD"
|
||||
```
|
||||
|
||||
### File `file`
|
||||
|
||||
Pulls metadata from a previously generated file. Note that the file sink
|
||||
|
||||
@ -1,8 +1,12 @@
|
||||
import os
|
||||
import sys
|
||||
from typing import Dict, Set
|
||||
|
||||
import setuptools
|
||||
|
||||
is_py37_or_newer = sys.version_info >= (3, 7)
|
||||
|
||||
|
||||
package_metadata: dict = {}
|
||||
with open("./src/datahub/__init__.py") as fp:
|
||||
exec(fp.read(), package_metadata)
|
||||
@ -78,11 +82,15 @@ plugins: Dict[str, Set[str]] = {
|
||||
"snowflake": sql_common | {"snowflake-sqlalchemy"},
|
||||
"oracle": sql_common | {"cx_Oracle"},
|
||||
"ldap": {"python-ldap>=2.4"},
|
||||
"looker": {"looker-sdk==21.6.0"},
|
||||
"druid": sql_common | {"pydruid>=0.6.2"},
|
||||
"mongodb": {"pymongo>=3.11"},
|
||||
"superset": {"requests"},
|
||||
"glue": {"boto3"},
|
||||
}
|
||||
if is_py37_or_newer:
|
||||
plugins["lookml"] = {"lkml>=1.1.0", "sql-metadata==1.12.0"}
|
||||
|
||||
|
||||
base_dev_requirements = {
|
||||
*base_requirements,
|
||||
@ -110,6 +118,7 @@ base_dev_requirements = {
|
||||
"mssql",
|
||||
"mongodb",
|
||||
"ldap",
|
||||
"looker",
|
||||
"glue",
|
||||
"datahub-kafka",
|
||||
"datahub-rest",
|
||||
@ -119,6 +128,11 @@ base_dev_requirements = {
|
||||
),
|
||||
}
|
||||
|
||||
if is_py37_or_newer:
|
||||
base_dev_requirements = base_dev_requirements.union(
|
||||
{dependency for plugin in ["lookml"] for dependency in plugins[plugin]}
|
||||
)
|
||||
|
||||
dev_requirements = {
|
||||
*base_dev_requirements,
|
||||
"apache-airflow==1.10.15",
|
||||
@ -131,6 +145,49 @@ dev_requirements_airflow_2 = {
|
||||
}
|
||||
|
||||
|
||||
entry_points = {
|
||||
"console_scripts": ["datahub = datahub.entrypoints:datahub"],
|
||||
"datahub.ingestion.source.plugins": [
|
||||
"file = datahub.ingestion.source.mce_file:MetadataFileSource",
|
||||
"sqlalchemy = datahub.ingestion.source.sql_generic:SQLAlchemyGenericSource",
|
||||
"athena = datahub.ingestion.source.athena:AthenaSource",
|
||||
"bigquery = datahub.ingestion.source.bigquery:BigQuerySource",
|
||||
"dbt = datahub.ingestion.source.dbt:DBTSource",
|
||||
"druid = datahub.ingestion.source.druid:DruidSource",
|
||||
"glue = datahub.ingestion.source.glue:GlueSource",
|
||||
"hive = datahub.ingestion.source.hive:HiveSource",
|
||||
"kafka = datahub.ingestion.source.kafka:KafkaSource",
|
||||
"ldap = datahub.ingestion.source.ldap:LDAPSource",
|
||||
"looker = datahub.ingestion.source.looker:LookerDashboardSource",
|
||||
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",
|
||||
"mssql = datahub.ingestion.source.mssql:SQLServerSource",
|
||||
"mysql = datahub.ingestion.source.mysql:MySQLSource",
|
||||
"oracle = datahub.ingestion.source.oracle:OracleSource",
|
||||
"postgres = datahub.ingestion.source.postgres:PostgresSource",
|
||||
"redshift = datahub.ingestion.source.redshift:RedshiftSource",
|
||||
"snowflake = datahub.ingestion.source.snowflake:SnowflakeSource",
|
||||
"superset = datahub.ingestion.source.superset:SupersetSource",
|
||||
],
|
||||
"datahub.ingestion.sink.plugins": [
|
||||
"file = datahub.ingestion.sink.file:FileSink",
|
||||
"console = datahub.ingestion.sink.console:ConsoleSink",
|
||||
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
|
||||
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
|
||||
],
|
||||
"apache_airflow_provider": [
|
||||
"provider_info=datahub.integrations.airflow.get_provider_info:get_provider_info"
|
||||
],
|
||||
"airflow.plugins": [
|
||||
"datahub = datahub.integrations.airflow.get_provider_info:DatahubAirflowPlugin"
|
||||
],
|
||||
}
|
||||
|
||||
if is_py37_or_newer:
|
||||
entry_points["datahub.ingestion.source.plugins"].append(
|
||||
"lookml = datahub.ingestion.source.lookml:LookMLSource"
|
||||
)
|
||||
|
||||
|
||||
setuptools.setup(
|
||||
# Package metadata.
|
||||
name=package_metadata["__package_name__"],
|
||||
@ -175,38 +232,7 @@ setuptools.setup(
|
||||
"datahub": ["py.typed"],
|
||||
"datahub.metadata": ["schema.avsc"],
|
||||
},
|
||||
entry_points={
|
||||
"console_scripts": ["datahub = datahub.entrypoints:datahub"],
|
||||
"datahub.ingestion.source.plugins": [
|
||||
"file = datahub.ingestion.source.mce_file:MetadataFileSource",
|
||||
"sqlalchemy = datahub.ingestion.source.sql_generic:SQLAlchemyGenericSource",
|
||||
"athena = datahub.ingestion.source.athena:AthenaSource",
|
||||
"bigquery = datahub.ingestion.source.bigquery:BigQuerySource",
|
||||
"dbt = datahub.ingestion.source.dbt:DBTSource",
|
||||
"druid = datahub.ingestion.source.druid:DruidSource",
|
||||
"glue = datahub.ingestion.source.glue:GlueSource",
|
||||
"hive = datahub.ingestion.source.hive:HiveSource",
|
||||
"kafka = datahub.ingestion.source.kafka:KafkaSource",
|
||||
"ldap = datahub.ingestion.source.ldap:LDAPSource",
|
||||
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",
|
||||
"mssql = datahub.ingestion.source.mssql:SQLServerSource",
|
||||
"mysql = datahub.ingestion.source.mysql:MySQLSource",
|
||||
"oracle = datahub.ingestion.source.oracle:OracleSource",
|
||||
"postgres = datahub.ingestion.source.postgres:PostgresSource",
|
||||
"redshift = datahub.ingestion.source.redshift:RedshiftSource",
|
||||
"snowflake = datahub.ingestion.source.snowflake:SnowflakeSource",
|
||||
"superset = datahub.ingestion.source.superset:SupersetSource",
|
||||
],
|
||||
"datahub.ingestion.sink.plugins": [
|
||||
"file = datahub.ingestion.sink.file:FileSink",
|
||||
"console = datahub.ingestion.sink.console:ConsoleSink",
|
||||
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
|
||||
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
|
||||
],
|
||||
"apache_airflow_provider": [
|
||||
"provider_info = datahub_provider:get_provider_info"
|
||||
],
|
||||
},
|
||||
entry_points=entry_points,
|
||||
# Dependencies.
|
||||
install_requires=list(base_requirements | framework_common),
|
||||
extras_require={
|
||||
|
||||
467
metadata-ingestion/src/datahub/ingestion/source/looker.py
Normal file
467
metadata-ingestion/src/datahub/ingestion/source/looker.py
Normal file
@ -0,0 +1,467 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field as dataclass_field
|
||||
from typing import Any, Iterable, List, MutableMapping, Optional, Sequence
|
||||
|
||||
import looker_sdk
|
||||
from looker_sdk.error import SDKError
|
||||
from looker_sdk.sdk.api31.models import (
|
||||
Dashboard,
|
||||
DashboardElement,
|
||||
LookWithQuery,
|
||||
Query,
|
||||
)
|
||||
|
||||
from datahub.configuration import ConfigModel
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.common import (
|
||||
AuditStamp,
|
||||
ChangeAuditStamps,
|
||||
Status,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
|
||||
ChartSnapshot,
|
||||
DashboardSnapshot,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||
from datahub.metadata.schema_classes import (
|
||||
AuditStampClass,
|
||||
ChartInfoClass,
|
||||
ChartTypeClass,
|
||||
DashboardInfoClass,
|
||||
OwnerClass,
|
||||
OwnershipClass,
|
||||
OwnershipTypeClass,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LookerDashboardSourceConfig(ConfigModel):
|
||||
client_id: str
|
||||
client_secret: str
|
||||
base_url: str
|
||||
platform_name: str = "looker"
|
||||
# The datahub platform where looker views are stored, must be the same as `platform_name` in lookml source
|
||||
view_platform_name: str = "looker_views"
|
||||
actor: str = "urn:li:corpuser:etl"
|
||||
dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
env: str = "PROD"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookerDashboardSourceReport(SourceReport):
|
||||
dashboards_scanned: int = 0
|
||||
charts_scanned: int = 0
|
||||
filtered_dashboards: List[str] = dataclass_field(default_factory=list)
|
||||
filtered_charts: List[str] = dataclass_field(default_factory=list)
|
||||
|
||||
def report_dashboards_scanned(self) -> None:
|
||||
self.dashboards_scanned += 1
|
||||
|
||||
def report_charts_scanned(self) -> None:
|
||||
self.charts_scanned += 1
|
||||
|
||||
def report_dashboards_dropped(self, model: str) -> None:
|
||||
self.filtered_dashboards.append(model)
|
||||
|
||||
def report_charts_dropped(self, view: str) -> None:
|
||||
self.filtered_charts.append(view)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookerDashboardElement:
|
||||
id: str
|
||||
title: str
|
||||
query_slug: str
|
||||
looker_views: List[str]
|
||||
look_id: Optional[str]
|
||||
type: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
|
||||
def url(self, base_url: str) -> str:
|
||||
# A dashboard element can use a look or just a raw query against an explore
|
||||
if self.look_id is not None:
|
||||
return base_url + "/looks/" + self.look_id
|
||||
else:
|
||||
return base_url + "/x/" + self.query_slug
|
||||
|
||||
def get_urn_element_id(self):
|
||||
# A dashboard element can use a look or just a raw query against an explore
|
||||
return f"dashboard_elements.{self.id}"
|
||||
|
||||
def get_view_urns(self, platform_name: str) -> List[str]:
|
||||
return [
|
||||
f"urn:li:dataset:(urn:li:dataPlatform:{platform_name},{v},PROD)"
|
||||
for v in self.looker_views
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookerDashboard:
|
||||
id: str
|
||||
title: str
|
||||
dashboard_elements: List[LookerDashboardElement]
|
||||
created_at: Optional[datetime.datetime]
|
||||
description: Optional[str] = None
|
||||
is_deleted: bool = False
|
||||
is_hidden: bool = False
|
||||
|
||||
def url(self, base_url):
|
||||
return base_url + "/dashboards/" + self.id
|
||||
|
||||
def get_urn_dashboard_id(self):
|
||||
return f"dashboards.{self.id}"
|
||||
|
||||
|
||||
class LookerDashboardSource(Source):
|
||||
source_config: LookerDashboardSourceConfig
|
||||
report = LookerDashboardSourceReport()
|
||||
|
||||
def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
|
||||
super().__init__(ctx)
|
||||
self.source_config = config
|
||||
self.reporter = LookerDashboardSourceReport()
|
||||
|
||||
@staticmethod
|
||||
def _extract_view_from_field(field: str) -> str:
|
||||
assert (
|
||||
field.count(".") == 1
|
||||
), f"Error: A field must be prefixed by a view name, field is: {field}"
|
||||
view_name = field.split(".")[0]
|
||||
return view_name
|
||||
|
||||
def _get_views_from_query(self, query: Optional[Query]) -> List[str]:
|
||||
if query is None:
|
||||
return []
|
||||
|
||||
all_views = set()
|
||||
|
||||
# query.dynamic_fields can contain:
|
||||
# - looker table calculations: https://docs.looker.com/exploring-data/using-table-calculations
|
||||
# - looker custom measures: https://docs.looker.com/de/exploring-data/adding-fields/custom-measure
|
||||
# - looker custom dimensions: https://docs.looker.com/exploring-data/adding-fields/custom-measure#creating_a_custom_dimension_using_a_looker_expression
|
||||
dynamic_fields = json.loads(
|
||||
query.dynamic_fields if query.dynamic_fields is not None else "[]"
|
||||
)
|
||||
custom_field_to_underlying_field = {}
|
||||
for field in dynamic_fields:
|
||||
# Table calculations can only reference fields used in the fields section, so this will always be a subset of of the query.fields
|
||||
if "table_calculation" in field:
|
||||
continue
|
||||
# Looker custom measures can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on
|
||||
if "measure" in field:
|
||||
measure = field["measure"]
|
||||
based_on = field["based_on"]
|
||||
custom_field_to_underlying_field[measure] = based_on
|
||||
|
||||
# Looker custom dimensions can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on
|
||||
# However, unlike custom measures custom dimensions can be defined using an arbitrary expression
|
||||
# We are not going to support parsing arbitrary Looker expressions here, so going to ignore these fields for now
|
||||
if "dimension" in field:
|
||||
dimension = field["dimension"]
|
||||
expression = field["expression"] # noqa: F841
|
||||
custom_field_to_underlying_field[dimension] = None
|
||||
|
||||
# A query uses fields defined in views, find the views those fields use
|
||||
fields: Sequence[str] = query.fields if query.fields is not None else []
|
||||
for field in fields:
|
||||
# If the field is a custom field, look up the field it is based on
|
||||
field_name = (
|
||||
custom_field_to_underlying_field[field]
|
||||
if field in custom_field_to_underlying_field
|
||||
else field
|
||||
)
|
||||
if field_name is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
view_name = self._extract_view_from_field(field_name)
|
||||
except AssertionError:
|
||||
self.reporter.report_warning(
|
||||
key=f"chart-field-{field_name}",
|
||||
reason="The field was not prefixed by a view name. This can happen when the field references another dynamic field.",
|
||||
)
|
||||
continue
|
||||
all_views.add(view_name)
|
||||
|
||||
# A query uses fields for filtering and those fields are defined in views, find the views those fields use
|
||||
filters: MutableMapping[str, Any] = (
|
||||
query.filters if query.filters is not None else {}
|
||||
)
|
||||
for field in filters.keys():
|
||||
# If the field is a custom field, look up the field it is based on
|
||||
field_name = (
|
||||
custom_field_to_underlying_field[field]
|
||||
if field in custom_field_to_underlying_field
|
||||
else field
|
||||
)
|
||||
if field_name is None:
|
||||
continue
|
||||
try:
|
||||
view_name = self._extract_view_from_field(field_name)
|
||||
except AssertionError:
|
||||
self.reporter.report_warning(
|
||||
f"chart-field-{field_name}",
|
||||
"The field was not prefixed by a view name. This can happen when the field references another dynamic field.",
|
||||
)
|
||||
continue
|
||||
all_views.add(view_name)
|
||||
|
||||
return list(all_views)
|
||||
|
||||
def _get_views_from_look(self, look: LookWithQuery) -> List[str]:
|
||||
return self._get_views_from_query(look.query)
|
||||
|
||||
def _get_looker_dashboard_element(
|
||||
self,
|
||||
element: DashboardElement,
|
||||
) -> Optional[LookerDashboardElement]:
|
||||
# Dashboard elements can use raw queries against explores
|
||||
if element.id is None:
|
||||
raise ValueError("Element ID can't be None")
|
||||
|
||||
if element.query is not None:
|
||||
views = self._get_views_from_query(element.query)
|
||||
return LookerDashboardElement(
|
||||
id=element.id,
|
||||
title=element.title if element.title is not None else "",
|
||||
type=element.type,
|
||||
description=element.subtitle_text,
|
||||
look_id=None,
|
||||
query_slug=element.query.slug if element.query.slug is not None else "",
|
||||
looker_views=views,
|
||||
)
|
||||
|
||||
# Dashboard elements can *alternatively* link to an existing look
|
||||
if element.look is not None:
|
||||
views = self._get_views_from_look(element.look)
|
||||
if element.look.query and element.look.query.slug:
|
||||
slug = element.look.query.slug
|
||||
else:
|
||||
slug = ""
|
||||
return LookerDashboardElement(
|
||||
id=element.id,
|
||||
title=element.title if element.title is not None else "",
|
||||
type=element.type,
|
||||
description=element.subtitle_text,
|
||||
look_id=element.look_id,
|
||||
query_slug=slug,
|
||||
looker_views=views,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def _get_chart_type(
|
||||
self, dashboard_element: LookerDashboardElement
|
||||
) -> Optional[str]:
|
||||
type_mapping = {
|
||||
"looker_column": ChartTypeClass.BAR,
|
||||
"looker_scatter": ChartTypeClass.SCATTER,
|
||||
"looker_line": ChartTypeClass.LINE,
|
||||
"looker_area": ChartTypeClass.AREA,
|
||||
"looker_pie": ChartTypeClass.PIE,
|
||||
"looker_donut_multiples": ChartTypeClass.PIE,
|
||||
"looker_funnel": ChartTypeClass.BAR,
|
||||
"looker_timeline": ChartTypeClass.BAR,
|
||||
"looker_waterfall": ChartTypeClass.BAR,
|
||||
"text": ChartTypeClass.TEXT,
|
||||
"single_value": ChartTypeClass.TEXT,
|
||||
"looker_single_record": ChartTypeClass.TABLE,
|
||||
"table": ChartTypeClass.TABLE,
|
||||
"looker_grid": ChartTypeClass.TABLE,
|
||||
"looker_map": None,
|
||||
"looker_geo_coordinates": None,
|
||||
"looker_geo_choropleth": None,
|
||||
"looker_boxplot": ChartTypeClass.BOX_PLOT,
|
||||
"vis": None,
|
||||
}
|
||||
type_str = dashboard_element.type
|
||||
if not type_str:
|
||||
self.reporter.report_warning(
|
||||
key=f"looker-chart-{dashboard_element.id}",
|
||||
reason=f"Chart type {type_str} is missing. Setting to None",
|
||||
)
|
||||
return None
|
||||
try:
|
||||
chart_type = type_mapping[type_str]
|
||||
except KeyError:
|
||||
self.reporter.report_warning(
|
||||
key=f"looker-chart-{dashboard_element.id}",
|
||||
reason=f"Chart type {type_str} not supported. Setting to None",
|
||||
)
|
||||
chart_type = None
|
||||
|
||||
return chart_type
|
||||
|
||||
def _make_chart_mce(
|
||||
self, dashboard_element: LookerDashboardElement
|
||||
) -> MetadataChangeEvent:
|
||||
actor = self.source_config.actor
|
||||
sys_time = int(time.time()) * 1000
|
||||
chart_urn = f"urn:li:chart:({self.source_config.platform_name},{dashboard_element.get_urn_element_id()})"
|
||||
chart_snapshot = ChartSnapshot(
|
||||
urn=chart_urn,
|
||||
aspects=[],
|
||||
)
|
||||
|
||||
last_modified = ChangeAuditStamps(
|
||||
created=AuditStamp(time=sys_time, actor=actor),
|
||||
lastModified=AuditStamp(time=sys_time, actor=actor),
|
||||
)
|
||||
|
||||
chart_type = self._get_chart_type(dashboard_element)
|
||||
|
||||
chart_info = ChartInfoClass(
|
||||
type=chart_type,
|
||||
description=dashboard_element.description
|
||||
if dashboard_element.description is not None
|
||||
else "",
|
||||
title=dashboard_element.title
|
||||
if dashboard_element.title is not None
|
||||
else "",
|
||||
lastModified=last_modified,
|
||||
chartUrl=dashboard_element.url(self.source_config.base_url),
|
||||
inputs=dashboard_element.get_view_urns(self.source_config.platform_name),
|
||||
)
|
||||
chart_snapshot.aspects.append(chart_info)
|
||||
|
||||
return MetadataChangeEvent(proposedSnapshot=chart_snapshot)
|
||||
|
||||
def _make_dashboard_and_chart_mces(
|
||||
self, looker_dashboard: LookerDashboard
|
||||
) -> List[MetadataChangeEvent]:
|
||||
actor = self.source_config.actor
|
||||
sys_time = int(time.time()) * 1000
|
||||
|
||||
chart_mces = [
|
||||
self._make_chart_mce(element)
|
||||
for element in looker_dashboard.dashboard_elements
|
||||
]
|
||||
|
||||
dashboard_urn = f"urn:li:dashboard:({self.source_config.platform_name},{looker_dashboard.get_urn_dashboard_id()})"
|
||||
dashboard_snapshot = DashboardSnapshot(
|
||||
urn=dashboard_urn,
|
||||
aspects=[],
|
||||
)
|
||||
|
||||
last_modified = ChangeAuditStamps(
|
||||
created=AuditStamp(time=sys_time, actor=actor),
|
||||
lastModified=AuditStamp(time=sys_time, actor=actor),
|
||||
)
|
||||
|
||||
dashboard_info = DashboardInfoClass(
|
||||
description=looker_dashboard.description
|
||||
if looker_dashboard.description is not None
|
||||
else "",
|
||||
title=looker_dashboard.title,
|
||||
charts=[mce.proposedSnapshot.urn for mce in chart_mces],
|
||||
lastModified=last_modified,
|
||||
dashboardUrl=looker_dashboard.url(self.source_config.base_url),
|
||||
)
|
||||
|
||||
dashboard_snapshot.aspects.append(dashboard_info)
|
||||
owners = [OwnerClass(owner=actor, type=OwnershipTypeClass.DATAOWNER)]
|
||||
dashboard_snapshot.aspects.append(
|
||||
OwnershipClass(
|
||||
owners=owners,
|
||||
lastModified=AuditStampClass(
|
||||
time=sys_time, actor=self.source_config.actor
|
||||
),
|
||||
)
|
||||
)
|
||||
dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted))
|
||||
|
||||
dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
|
||||
|
||||
return chart_mces + [dashboard_mce]
|
||||
|
||||
def _get_looker_dashboard(self, dashboard: Dashboard) -> LookerDashboard:
|
||||
dashboard_elements: List[LookerDashboardElement] = []
|
||||
elements = (
|
||||
dashboard.dashboard_elements
|
||||
if dashboard.dashboard_elements is not None
|
||||
else []
|
||||
)
|
||||
for element in elements:
|
||||
self.reporter.report_charts_scanned()
|
||||
if element.id is not None and self.source_config.chart_pattern.allowed(
|
||||
element.id
|
||||
):
|
||||
self.reporter.report_charts_dropped(element.id)
|
||||
continue
|
||||
looker_dashboard_element = self._get_looker_dashboard_element(element)
|
||||
if looker_dashboard_element is not None:
|
||||
dashboard_elements.append(looker_dashboard_element)
|
||||
|
||||
if dashboard.id is None or dashboard.title is None:
|
||||
raise ValueError("Both dashboard ID and title are None")
|
||||
|
||||
looker_dashboard = LookerDashboard(
|
||||
id=dashboard.id,
|
||||
title=dashboard.title,
|
||||
description=dashboard.description,
|
||||
dashboard_elements=dashboard_elements,
|
||||
created_at=dashboard.created_at,
|
||||
is_deleted=dashboard.deleted if dashboard.deleted is not None else False,
|
||||
is_hidden=dashboard.deleted if dashboard.deleted is not None else False,
|
||||
)
|
||||
return looker_dashboard
|
||||
|
||||
def _get_looker_client(self):
|
||||
# The Looker SDK looks wants these as environment variables
|
||||
os.environ["LOOKERSDK_CLIENT_ID"] = self.source_config.client_id
|
||||
os.environ["LOOKERSDK_CLIENT_SECRET"] = self.source_config.client_secret
|
||||
os.environ["LOOKERSDK_BASE_URL"] = self.source_config.base_url
|
||||
|
||||
return looker_sdk.init31()
|
||||
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
client = self._get_looker_client()
|
||||
dashboard_ids = [
|
||||
dashboard_base.id
|
||||
for dashboard_base in client.all_dashboards(fields="id")
|
||||
if dashboard_base.id is not None
|
||||
]
|
||||
|
||||
for dashboard_id in dashboard_ids:
|
||||
self.reporter.report_dashboards_scanned()
|
||||
if not self.source_config.dashboard_pattern.allowed(dashboard_id):
|
||||
self.reporter.report_dashboards_dropped(dashboard_id)
|
||||
continue
|
||||
try:
|
||||
fields = ["id", "title", "dashboard_elements", "dashboard_filters"]
|
||||
dashboard_object = client.dashboard(
|
||||
dashboard_id=dashboard_id, fields=",".join(fields)
|
||||
)
|
||||
except SDKError:
|
||||
# A looker dashboard could be deleted in between the list and the get
|
||||
logger.warning(
|
||||
f"Error occuried while loading dashboard {dashboard_id}. Skipping."
|
||||
)
|
||||
continue
|
||||
|
||||
looker_dashboard = self._get_looker_dashboard(dashboard_object)
|
||||
mces = self._make_dashboard_and_chart_mces(looker_dashboard)
|
||||
for mce in mces:
|
||||
workunit = MetadataWorkUnit(
|
||||
id=f"looker-{mce.proposedSnapshot.urn}", mce=mce
|
||||
)
|
||||
self.reporter.report_workunit(workunit)
|
||||
yield workunit
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, ctx):
|
||||
config = LookerDashboardSourceConfig.parse_obj(config_dict)
|
||||
return cls(config, ctx)
|
||||
|
||||
def get_report(self) -> SourceReport:
|
||||
return self.reporter
|
||||
575
metadata-ingestion/src/datahub/ingestion/source/lookml.py
Normal file
575
metadata-ingestion/src/datahub/ingestion/source/lookml.py
Normal file
@ -0,0 +1,575 @@
|
||||
import glob
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field as dataclass_field
|
||||
from dataclasses import replace
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
if sys.version_info[1] >= 7:
|
||||
import lkml
|
||||
else:
|
||||
logging.warning("This plugin requres Python 3.7 or newer.")
|
||||
from sql_metadata import get_query_tables
|
||||
|
||||
from datahub.configuration import ConfigModel
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
|
||||
DatasetLineageTypeClass,
|
||||
UpstreamClass,
|
||||
UpstreamLineage,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
|
||||
ArrayTypeClass,
|
||||
BooleanTypeClass,
|
||||
DateTypeClass,
|
||||
NullTypeClass,
|
||||
NumberTypeClass,
|
||||
OtherSchema,
|
||||
SchemaField,
|
||||
SchemaFieldDataType,
|
||||
SchemaMetadata,
|
||||
StringTypeClass,
|
||||
TimeTypeClass,
|
||||
UnionTypeClass,
|
||||
)
|
||||
from datahub.metadata.schema_classes import EnumTypeClass, SchemaMetadataClass
|
||||
|
||||
assert sys.version_info[1] >= 7 # needed for mypy
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LookMLSourceConfig(ConfigModel): # pragma: no cover
|
||||
base_folder: str
|
||||
connection_to_platform_map: Dict[str, str]
|
||||
platform_name: str = "looker_views"
|
||||
actor: str = "urn:li:corpuser:etl"
|
||||
model_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
env: str = "PROD"
|
||||
parse_table_names_from_sql: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookMLSourceReport(SourceReport): # pragma: no cover
|
||||
models_scanned: int = 0
|
||||
views_scanned: int = 0
|
||||
filtered_models: List[str] = dataclass_field(default_factory=list)
|
||||
filtered_views: List[str] = dataclass_field(default_factory=list)
|
||||
|
||||
def report_models_scanned(self) -> None:
|
||||
self.models_scanned += 1
|
||||
|
||||
def report_views_scanned(self) -> None:
|
||||
self.views_scanned += 1
|
||||
|
||||
def report_models_dropped(self, model: str) -> None:
|
||||
self.filtered_models.append(model)
|
||||
|
||||
def report_views_dropped(self, view: str) -> None:
|
||||
self.filtered_views.append(view)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookerModel: # pragma: no cover
|
||||
connection: str
|
||||
includes: List[str]
|
||||
resolved_includes: List[str]
|
||||
|
||||
@staticmethod
|
||||
def from_looker_dict(looker_model_dict: dict, base_folder: str) -> "LookerModel":
|
||||
connection = looker_model_dict["connection"]
|
||||
includes = looker_model_dict["includes"]
|
||||
resolved_includes = LookerModel.resolve_includes(includes, base_folder)
|
||||
|
||||
return LookerModel(
|
||||
connection=connection,
|
||||
includes=includes,
|
||||
resolved_includes=resolved_includes,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def resolve_includes(includes: List, base_folder: str) -> List[str]:
|
||||
resolved = []
|
||||
for inc in includes:
|
||||
# Massage the looker include into a valid glob wildcard expression
|
||||
glob_expr = f"{base_folder}/{inc}"
|
||||
outputs = glob.glob(glob_expr)
|
||||
resolved.extend(outputs)
|
||||
return resolved
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookerViewFile: # pragma: no cover
|
||||
absolute_file_path: str
|
||||
connection: Optional[str]
|
||||
includes: List[str]
|
||||
resolved_includes: List[str]
|
||||
views: List[Dict]
|
||||
|
||||
@staticmethod
|
||||
def from_looker_dict(
|
||||
absolute_file_path: str, looker_view_file_dict: dict, base_folder: str
|
||||
) -> "LookerViewFile":
|
||||
includes = looker_view_file_dict.get("includes", [])
|
||||
resolved_includes = LookerModel.resolve_includes(includes, base_folder)
|
||||
views = looker_view_file_dict.get("views", [])
|
||||
|
||||
return LookerViewFile(
|
||||
absolute_file_path=absolute_file_path,
|
||||
connection=None,
|
||||
includes=includes,
|
||||
resolved_includes=resolved_includes,
|
||||
views=views,
|
||||
)
|
||||
|
||||
|
||||
class LookerViewFileLoader: # pragma: no cover
|
||||
"""
|
||||
Loads the looker viewfile at a :path and caches the LookerViewFile in memory
|
||||
This is to avoid reloading the same file off of disk many times during the recursive include resolution process
|
||||
"""
|
||||
|
||||
def __init__(self, base_folder: str) -> None:
|
||||
self.viewfile_cache: Dict[str, LookerViewFile] = {}
|
||||
self._base_folder = base_folder
|
||||
|
||||
def is_view_seen(self, path: str) -> bool:
|
||||
return path in self.viewfile_cache
|
||||
|
||||
def _load_viewfile(self, path: str) -> Optional[LookerViewFile]:
|
||||
if self.is_view_seen(path):
|
||||
return self.viewfile_cache[path]
|
||||
|
||||
try:
|
||||
with open(path, "r") as file:
|
||||
parsed = lkml.load(file)
|
||||
looker_viewfile = LookerViewFile.from_looker_dict(
|
||||
path, parsed, self._base_folder
|
||||
)
|
||||
self.viewfile_cache[path] = looker_viewfile
|
||||
return looker_viewfile
|
||||
except Exception:
|
||||
logger.warning(f"Error processing view file {path}. Skipping it")
|
||||
return None
|
||||
|
||||
def load_viewfile(self, path: str, connection: str) -> Optional[LookerViewFile]:
|
||||
viewfile = self._load_viewfile(path)
|
||||
if viewfile is None:
|
||||
return None
|
||||
|
||||
return replace(viewfile, connection=connection)
|
||||
|
||||
|
||||
class ViewFieldType(Enum):
|
||||
DIMENSION = "Dimension"
|
||||
DIMENSION_GROUP = "Dimension Group"
|
||||
MEASURE = "Measure"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ViewField:
|
||||
name: str
|
||||
type: str
|
||||
description: str
|
||||
field_type: ViewFieldType
|
||||
is_primary_key: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class LookerView: # pragma: no cover
|
||||
absolute_file_path: str
|
||||
connection: str
|
||||
view_name: str
|
||||
sql_table_names: List[str]
|
||||
fields: List[ViewField]
|
||||
|
||||
@classmethod
|
||||
def _get_sql_table_names(cls, sql: str) -> List[str]:
|
||||
sql_tables: List[str] = get_query_tables(sql)
|
||||
|
||||
# Remove temporary tables from WITH statements
|
||||
sql_table_names = [
|
||||
t
|
||||
for t in sql_tables
|
||||
if not re.search(
|
||||
fr"WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(",
|
||||
sql,
|
||||
re.IGNORECASE | re.DOTALL,
|
||||
)
|
||||
]
|
||||
|
||||
# Remove quotes from tables
|
||||
sql_table_names = [t.replace('"', "") for t in sql_table_names]
|
||||
|
||||
return sql_table_names
|
||||
|
||||
@classmethod
|
||||
def _get_fields(
|
||||
cls, field_list: List[Dict], type_cls: ViewFieldType
|
||||
) -> List[ViewField]:
|
||||
fields = []
|
||||
for field_dict in field_list:
|
||||
is_primary_key = field_dict.get("primary_key", "no") == "yes"
|
||||
name = field_dict["name"]
|
||||
native_type = field_dict.get("type", "string")
|
||||
description = field_dict.get("description", "")
|
||||
field = ViewField(
|
||||
name=name,
|
||||
type=native_type,
|
||||
description=description,
|
||||
is_primary_key=is_primary_key,
|
||||
field_type=type_cls,
|
||||
)
|
||||
fields.append(field)
|
||||
return fields
|
||||
|
||||
@classmethod
|
||||
def from_looker_dict(
|
||||
cls,
|
||||
looker_view: dict,
|
||||
connection: str,
|
||||
looker_viewfile: LookerViewFile,
|
||||
looker_viewfile_loader: LookerViewFileLoader,
|
||||
parse_table_names_from_sql: bool = False,
|
||||
) -> Optional["LookerView"]:
|
||||
view_name = looker_view["name"]
|
||||
sql_table_name = looker_view.get("sql_table_name", None)
|
||||
# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
|
||||
sql_table_name = (
|
||||
sql_table_name.replace('"', "") if sql_table_name is not None else None
|
||||
)
|
||||
derived_table = looker_view.get("derived_table", None)
|
||||
|
||||
dimensions = cls._get_fields(
|
||||
looker_view.get("dimensions", []), ViewFieldType.DIMENSION
|
||||
)
|
||||
dimension_groups = cls._get_fields(
|
||||
looker_view.get("dimension_groups", []), ViewFieldType.DIMENSION_GROUP
|
||||
)
|
||||
measures = cls._get_fields(
|
||||
looker_view.get("measures", []), ViewFieldType.MEASURE
|
||||
)
|
||||
fields: List[ViewField] = dimensions + dimension_groups + measures
|
||||
|
||||
# Parse SQL from derived tables to extract dependencies
|
||||
if derived_table is not None:
|
||||
if parse_table_names_from_sql and "sql" in derived_table:
|
||||
# Get the list of tables in the query
|
||||
sql_table_names = cls._get_sql_table_names(derived_table["sql"])
|
||||
else:
|
||||
sql_table_names = []
|
||||
|
||||
return LookerView(
|
||||
absolute_file_path=looker_viewfile.absolute_file_path,
|
||||
connection=connection,
|
||||
view_name=view_name,
|
||||
sql_table_names=sql_table_names,
|
||||
fields=fields,
|
||||
)
|
||||
|
||||
# There is a single dependency in the view, on the sql_table_name
|
||||
if sql_table_name is not None:
|
||||
return LookerView(
|
||||
absolute_file_path=looker_viewfile.absolute_file_path,
|
||||
connection=connection,
|
||||
view_name=view_name,
|
||||
sql_table_names=[sql_table_name],
|
||||
fields=fields,
|
||||
)
|
||||
|
||||
# The sql_table_name might be defined in another view and this view is extending that view, try to find it
|
||||
else:
|
||||
extends = looker_view.get("extends", looker_view.get("extends__all", []))
|
||||
if len(extends) == 0:
|
||||
# The view is malformed, the view is not a derived table, does not contain a sql_table_name or an extends
|
||||
logger.warning(
|
||||
f"Skipping malformed with view_name: {view_name} ({looker_viewfile.absolute_file_path}). View should have a sql_table_name if it is not a derived table"
|
||||
)
|
||||
return None
|
||||
|
||||
extends_to_looker_view = []
|
||||
|
||||
# The base view could live in the same file
|
||||
for raw_view in looker_viewfile.views:
|
||||
raw_view_name = raw_view["name"]
|
||||
# Make sure to skip loading view we are currently trying to resolve
|
||||
if raw_view_name != view_name:
|
||||
maybe_looker_view = LookerView.from_looker_dict(
|
||||
raw_view,
|
||||
connection,
|
||||
looker_viewfile,
|
||||
looker_viewfile_loader,
|
||||
parse_table_names_from_sql,
|
||||
)
|
||||
if (
|
||||
maybe_looker_view is not None
|
||||
and maybe_looker_view.view_name in extends
|
||||
):
|
||||
extends_to_looker_view.append(maybe_looker_view)
|
||||
|
||||
# Or it could live in one of the included files, we do not know which file the base view lives in, try them all!
|
||||
for include in looker_viewfile.resolved_includes:
|
||||
maybe_looker_viewfile = looker_viewfile_loader.load_viewfile(
|
||||
include, connection
|
||||
)
|
||||
if maybe_looker_viewfile is not None:
|
||||
for view in looker_viewfile.views:
|
||||
maybe_looker_view = LookerView.from_looker_dict(
|
||||
view,
|
||||
connection,
|
||||
looker_viewfile,
|
||||
looker_viewfile_loader,
|
||||
parse_table_names_from_sql,
|
||||
)
|
||||
if maybe_looker_view is None:
|
||||
continue
|
||||
|
||||
if (
|
||||
maybe_looker_view is not None
|
||||
and maybe_looker_view.view_name in extends
|
||||
):
|
||||
extends_to_looker_view.append(maybe_looker_view)
|
||||
|
||||
if len(extends_to_looker_view) != 1:
|
||||
logger.warning(
|
||||
f"Skipping malformed view with view_name: {view_name}. View should have a single view in a view inheritance chain with a sql_table_name"
|
||||
)
|
||||
return None
|
||||
|
||||
output_looker_view = LookerView(
|
||||
absolute_file_path=looker_viewfile.absolute_file_path,
|
||||
connection=connection,
|
||||
view_name=view_name,
|
||||
sql_table_names=extends_to_looker_view[0].sql_table_names,
|
||||
fields=fields,
|
||||
)
|
||||
return output_looker_view
|
||||
|
||||
|
||||
class LookMLSource(Source): # pragma: no cover
|
||||
source_config: LookMLSourceConfig
|
||||
report = LookMLSourceReport()
|
||||
|
||||
def __init__(self, config: LookMLSourceConfig, ctx: PipelineContext):
|
||||
super().__init__(ctx)
|
||||
self.source_config = config
|
||||
self.reporter = LookMLSourceReport()
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, ctx):
|
||||
config = LookMLSourceConfig.parse_obj(config_dict)
|
||||
return cls(config, ctx)
|
||||
|
||||
def _load_model(self, path: str) -> LookerModel:
|
||||
with open(path, "r") as file:
|
||||
parsed = lkml.load(file)
|
||||
looker_model = LookerModel.from_looker_dict(
|
||||
parsed, self.source_config.base_folder
|
||||
)
|
||||
return looker_model
|
||||
|
||||
def _construct_datalineage_urn(self, sql_table_name: str, connection: str) -> str:
|
||||
platform = self._get_platform_based_on_connection(connection)
|
||||
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{sql_table_name},{self.source_config.env})"
|
||||
|
||||
def _get_platform_based_on_connection(self, connection: str) -> str:
|
||||
if connection in self.source_config.connection_to_platform_map:
|
||||
return self.source_config.connection_to_platform_map[connection]
|
||||
else:
|
||||
raise Exception(
|
||||
f"Could not find a platform for looker view with connection: {connection}"
|
||||
)
|
||||
|
||||
def _get_upsteam_lineage(
|
||||
self, looker_view: LookerView, actor: str, sys_time: int
|
||||
) -> UpstreamLineage:
|
||||
upstreams = []
|
||||
for sql_table_name in looker_view.sql_table_names:
|
||||
upstream = UpstreamClass(
|
||||
dataset=self._construct_datalineage_urn(
|
||||
sql_table_name, looker_view.connection
|
||||
),
|
||||
auditStamp=AuditStamp(actor=actor, time=sys_time),
|
||||
type=DatasetLineageTypeClass.TRANSFORMED,
|
||||
)
|
||||
upstreams.append(upstream)
|
||||
|
||||
upstream_lineage = UpstreamLineage(upstreams=upstreams)
|
||||
|
||||
return upstream_lineage
|
||||
|
||||
def _get_field_type(self, native_type: str) -> SchemaFieldDataType:
|
||||
field_type_mapping = {
|
||||
"date": DateTypeClass,
|
||||
"date_time": TimeTypeClass,
|
||||
"distance": NumberTypeClass,
|
||||
"duration": NumberTypeClass,
|
||||
"location": UnionTypeClass,
|
||||
"number": NumberTypeClass,
|
||||
"string": StringTypeClass,
|
||||
"tier": EnumTypeClass,
|
||||
"time": TimeTypeClass,
|
||||
"unquoted": StringTypeClass,
|
||||
"yesno": BooleanTypeClass,
|
||||
"zipcode": EnumTypeClass,
|
||||
"int": NumberTypeClass,
|
||||
"average": NumberTypeClass,
|
||||
"average_distinct": NumberTypeClass,
|
||||
"count": NumberTypeClass,
|
||||
"count_distinct": NumberTypeClass,
|
||||
"list": ArrayTypeClass,
|
||||
"max": NumberTypeClass,
|
||||
"median": NumberTypeClass,
|
||||
"median_distinct": NumberTypeClass,
|
||||
"min": NumberTypeClass,
|
||||
"percent_of_previous": NumberTypeClass,
|
||||
"percent_of_total": NumberTypeClass,
|
||||
"percentile": NumberTypeClass,
|
||||
"percentile_distinct": NumberTypeClass,
|
||||
"running_total": NumberTypeClass,
|
||||
"sum": NumberTypeClass,
|
||||
"sum_distinct": NumberTypeClass,
|
||||
}
|
||||
|
||||
if native_type in field_type_mapping:
|
||||
type_class = field_type_mapping[native_type]
|
||||
else:
|
||||
self.reporter.report_warning(
|
||||
native_type,
|
||||
f"The type '{native_type}' is not recognised for field type, setting as NullTypeClass.",
|
||||
)
|
||||
type_class = NullTypeClass
|
||||
data_type = SchemaFieldDataType(type=type_class())
|
||||
return data_type
|
||||
|
||||
def _get_fields_and_primary_keys(
|
||||
self, looker_view: LookerView
|
||||
) -> Tuple[List[SchemaField], List[str]]:
|
||||
fields: List[SchemaField] = []
|
||||
primary_keys: List = []
|
||||
for field in looker_view.fields:
|
||||
schema_field = SchemaField(
|
||||
fieldPath=field.name,
|
||||
type=self._get_field_type(field.type),
|
||||
nativeDataType=field.type,
|
||||
description=f"{field.field_type.value}. {field.description}",
|
||||
)
|
||||
fields.append(schema_field)
|
||||
if field.is_primary_key:
|
||||
primary_keys.append(schema_field.fieldPath)
|
||||
return fields, primary_keys
|
||||
|
||||
def _get_schema(
|
||||
self, looker_view: LookerView, actor: str, sys_time: int
|
||||
) -> SchemaMetadataClass:
|
||||
fields, primary_keys = self._get_fields_and_primary_keys(looker_view)
|
||||
stamp = AuditStamp(time=sys_time, actor=actor)
|
||||
schema_metadata = SchemaMetadata(
|
||||
schemaName=looker_view.view_name,
|
||||
platform=self.source_config.platform_name,
|
||||
version=0,
|
||||
fields=fields,
|
||||
primaryKeys=primary_keys,
|
||||
created=stamp,
|
||||
lastModified=stamp,
|
||||
hash="",
|
||||
platformSchema=OtherSchema(rawSchema="looker-view"),
|
||||
)
|
||||
return schema_metadata
|
||||
|
||||
def _build_dataset_mce(self, looker_view: LookerView) -> MetadataChangeEvent:
|
||||
"""
|
||||
Creates MetadataChangeEvent for the dataset, creating upstream lineage links
|
||||
"""
|
||||
logger.debug(f"looker_view = {looker_view.view_name}")
|
||||
|
||||
dataset_name = looker_view.view_name
|
||||
actor = self.source_config.actor
|
||||
sys_time = int(time.time()) * 1000
|
||||
|
||||
dataset_snapshot = DatasetSnapshot(
|
||||
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.source_config.platform_name}, {dataset_name}, {self.source_config.env})",
|
||||
aspects=[], # we append to this list later on
|
||||
)
|
||||
dataset_snapshot.aspects.append(Status(removed=False))
|
||||
dataset_snapshot.aspects.append(
|
||||
self._get_upsteam_lineage(looker_view, actor, sys_time)
|
||||
)
|
||||
dataset_snapshot.aspects.append(self._get_schema(looker_view, actor, sys_time))
|
||||
|
||||
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
|
||||
|
||||
return mce
|
||||
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
viewfile_loader = LookerViewFileLoader(self.source_config.base_folder)
|
||||
|
||||
model_files = sorted(
|
||||
f
|
||||
for f in glob.glob(
|
||||
f"{self.source_config.base_folder}/**/*.model.lkml", recursive=True
|
||||
)
|
||||
)
|
||||
for file_path in model_files:
|
||||
model_name = Path(file_path).stem
|
||||
self.reporter.report_models_scanned()
|
||||
if not self.source_config.model_pattern.allowed(model_name):
|
||||
self.reporter.report_models_dropped(model_name)
|
||||
continue
|
||||
try:
|
||||
model = self._load_model(file_path)
|
||||
except Exception:
|
||||
self.reporter.report_warning(
|
||||
"LookML", f"unable to parse Looker model: {file_path}"
|
||||
)
|
||||
continue
|
||||
|
||||
for include in model.resolved_includes:
|
||||
is_view_seen = viewfile_loader.is_view_seen(include)
|
||||
if is_view_seen:
|
||||
continue
|
||||
looker_viewfile = viewfile_loader.load_viewfile(
|
||||
include, model.connection
|
||||
)
|
||||
if looker_viewfile is not None:
|
||||
for raw_view in looker_viewfile.views:
|
||||
maybe_looker_view = LookerView.from_looker_dict(
|
||||
raw_view,
|
||||
model.connection,
|
||||
looker_viewfile,
|
||||
viewfile_loader,
|
||||
self.source_config.parse_table_names_from_sql,
|
||||
)
|
||||
if maybe_looker_view:
|
||||
self.reporter.report_views_scanned()
|
||||
if self.source_config.view_pattern.allowed(
|
||||
maybe_looker_view.view_name
|
||||
):
|
||||
mce = self._build_dataset_mce(maybe_looker_view)
|
||||
workunit = MetadataWorkUnit(
|
||||
id=f"lookml-{maybe_looker_view.view_name}", mce=mce
|
||||
)
|
||||
self.reporter.report_workunit(workunit)
|
||||
yield workunit
|
||||
else:
|
||||
self.reporter.report_views_dropped(
|
||||
maybe_looker_view.view_name
|
||||
)
|
||||
|
||||
def get_report(self):
|
||||
return self.report
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
@ -0,0 +1,59 @@
|
||||
[
|
||||
{
|
||||
"auditHeader": null,
|
||||
"proposedSnapshot": {
|
||||
"com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": {
|
||||
"urn": "urn:li:dashboard:(looker,dashboards.1)",
|
||||
"aspects": [
|
||||
{
|
||||
"com.linkedin.pegasus2avro.dashboard.DashboardInfo": {
|
||||
"customProperties": {},
|
||||
"externalUrl": null,
|
||||
"title": "foo",
|
||||
"description": "lorem ipsum",
|
||||
"charts": [],
|
||||
"lastModified": {
|
||||
"created": {
|
||||
"time": 1615443388000,
|
||||
"actor": "urn:li:corpuser:etl",
|
||||
"impersonator": null
|
||||
},
|
||||
"lastModified": {
|
||||
"time": 1615443388000,
|
||||
"actor": "urn:li:corpuser:etl",
|
||||
"impersonator": null
|
||||
},
|
||||
"deleted": null
|
||||
},
|
||||
"dashboardUrl": "https://looker.company.com/dashboards/1",
|
||||
"access": null,
|
||||
"lastRefreshed": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"com.linkedin.pegasus2avro.common.Ownership": {
|
||||
"owners": [
|
||||
{
|
||||
"owner": "urn:li:corpuser:etl",
|
||||
"type": "DATAOWNER",
|
||||
"source": null
|
||||
}
|
||||
],
|
||||
"lastModified": {
|
||||
"time": 1615443388000,
|
||||
"actor": "urn:li:corpuser:etl",
|
||||
"impersonator": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"com.linkedin.pegasus2avro.common.Status": {
|
||||
"removed": false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"proposedDelta": null
|
||||
}
|
||||
]
|
||||
65
metadata-ingestion/tests/integration/looker/test_looker.py
Normal file
65
metadata-ingestion/tests/integration/looker/test_looker.py
Normal file
@ -0,0 +1,65 @@
|
||||
import time
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
from looker_sdk.sdk.api31.models import Dashboard, DashboardElement, Query
|
||||
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from tests.test_helpers import mce_helpers
|
||||
|
||||
|
||||
def test_looker_ingest(pytestconfig, tmp_path, mock_time):
|
||||
mocked_client = mock.MagicMock()
|
||||
with mock.patch(
|
||||
"datahub.ingestion.source.looker.LookerDashboardSource._get_looker_client",
|
||||
mocked_client,
|
||||
):
|
||||
mocked_client.return_value.all_dashboards.return_value = [Dashboard(id="1")]
|
||||
mocked_client.return_value.dashboard.return_value = Dashboard(
|
||||
id="1",
|
||||
title="foo",
|
||||
created_at=datetime.utcfromtimestamp(time.time()),
|
||||
description="lorem ipsum",
|
||||
dashboard_elements=[
|
||||
DashboardElement(
|
||||
id="2",
|
||||
type="",
|
||||
subtitle_text="Some text",
|
||||
query=Query(
|
||||
model="data",
|
||||
view="my_view",
|
||||
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}',
|
||||
),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
||||
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "looker-test",
|
||||
"source": {
|
||||
"type": "looker",
|
||||
"config": {
|
||||
"base_url": "https://looker.company.com",
|
||||
"client_id": "foo",
|
||||
"client_secret": "bar",
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": f"{tmp_path}/looker_mces.json",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
pipeline.run()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
output = mce_helpers.load_json_file(str(tmp_path / "looker_mces.json"))
|
||||
expected = mce_helpers.load_json_file(
|
||||
str(test_resources_dir / "expected_output.json")
|
||||
)
|
||||
mce_helpers.assert_mces_equal(output, expected)
|
||||
19
metadata-ingestion/tests/integration/lookml/data.model.lkml
Normal file
19
metadata-ingestion/tests/integration/lookml/data.model.lkml
Normal file
@ -0,0 +1,19 @@
|
||||
connection: "my_connection"
|
||||
|
||||
include: "foo.view.lkml"
|
||||
|
||||
explore: data_model {
|
||||
label: "Data model!"
|
||||
description: "Lorem ipsum"
|
||||
|
||||
always_filter: {
|
||||
filters: {
|
||||
field: is_latest_forecast
|
||||
value: "TRUE"
|
||||
}
|
||||
filters: {
|
||||
field: granularity
|
||||
value: "day"
|
||||
}
|
||||
}
|
||||
}
|
||||
138
metadata-ingestion/tests/integration/lookml/expected_output.json
Normal file
138
metadata-ingestion/tests/integration/lookml/expected_output.json
Normal file
@ -0,0 +1,138 @@
|
||||
[
|
||||
{
|
||||
"auditHeader": null,
|
||||
"proposedSnapshot": {
|
||||
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker_views, my_view, PROD)",
|
||||
"aspects": [
|
||||
{
|
||||
"com.linkedin.pegasus2avro.common.Status": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
|
||||
"upstreams": [
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 1615443388000,
|
||||
"actor": "urn:li:corpuser:etl",
|
||||
"impersonator": null
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:conn,my_table,PROD)",
|
||||
"type": "TRANSFORMED"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
|
||||
"schemaName": "my_view",
|
||||
"platform": "looker_views",
|
||||
"version": 0,
|
||||
"created": {
|
||||
"time": 1615443388000,
|
||||
"actor": "urn:li:corpuser:etl",
|
||||
"impersonator": null
|
||||
},
|
||||
"lastModified": {
|
||||
"time": 1615443388000,
|
||||
"actor": "urn:li:corpuser:etl",
|
||||
"impersonator": null
|
||||
},
|
||||
"deleted": null,
|
||||
"dataset": null,
|
||||
"cluster": null,
|
||||
"hash": "",
|
||||
"platformSchema": {
|
||||
"com.linkedin.pegasus2avro.schema.OtherSchema": {
|
||||
"rawSchema": "looker-view"
|
||||
}
|
||||
},
|
||||
"fields": [
|
||||
{
|
||||
"fieldPath": "country",
|
||||
"jsonPath": null,
|
||||
"nullable": false,
|
||||
"description": "Dimension. The country",
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.pegasus2avro.schema.StringType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "string",
|
||||
"recursive": false,
|
||||
"globalTags": null,
|
||||
"glossaryTerms": null
|
||||
},
|
||||
{
|
||||
"fieldPath": "city",
|
||||
"jsonPath": null,
|
||||
"nullable": false,
|
||||
"description": "Dimension. City",
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.pegasus2avro.schema.StringType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "string",
|
||||
"recursive": false,
|
||||
"globalTags": null,
|
||||
"glossaryTerms": null
|
||||
},
|
||||
{
|
||||
"fieldPath": "is_latest",
|
||||
"jsonPath": null,
|
||||
"nullable": false,
|
||||
"description": "Dimension. Is latest data",
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.pegasus2avro.schema.BooleanType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "yesno",
|
||||
"recursive": false,
|
||||
"globalTags": null,
|
||||
"glossaryTerms": null
|
||||
},
|
||||
{
|
||||
"fieldPath": "timestamp",
|
||||
"jsonPath": null,
|
||||
"nullable": false,
|
||||
"description": "Dimension Group. Timestamp of measurement",
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.pegasus2avro.schema.TimeType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "time",
|
||||
"recursive": false,
|
||||
"globalTags": null,
|
||||
"glossaryTerms": null
|
||||
},
|
||||
{
|
||||
"fieldPath": "average_measurement",
|
||||
"jsonPath": null,
|
||||
"nullable": false,
|
||||
"description": "Measure. My measurement",
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.pegasus2avro.schema.NumberType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "average",
|
||||
"recursive": false,
|
||||
"globalTags": null,
|
||||
"glossaryTerms": null
|
||||
}
|
||||
],
|
||||
"primaryKeys": [],
|
||||
"foreignKeysSpecs": null
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"proposedDelta": null
|
||||
}
|
||||
]
|
||||
52
metadata-ingestion/tests/integration/lookml/foo.view.lkml
Normal file
52
metadata-ingestion/tests/integration/lookml/foo.view.lkml
Normal file
@ -0,0 +1,52 @@
|
||||
###
|
||||
### Please update the docs below after editing this:
|
||||
### https://woltwide.atlassian.net/wiki/spaces/AN/pages/429195880/Weather+Forecasts
|
||||
###
|
||||
|
||||
view: my_view {
|
||||
derived_table: {
|
||||
sql:
|
||||
SELECT
|
||||
is_latest,
|
||||
country,
|
||||
city,
|
||||
timestamp,
|
||||
measurement
|
||||
FROM
|
||||
my_table ;;
|
||||
}
|
||||
|
||||
dimension: country {
|
||||
type: string
|
||||
description: "The country"
|
||||
sql: ${TABLE}.country ;;
|
||||
}
|
||||
|
||||
dimension: city {
|
||||
type: string
|
||||
description: "City"
|
||||
sql: ${TABLE}.city ;;
|
||||
}
|
||||
|
||||
dimension: is_latest {
|
||||
type: yesno
|
||||
description: "Is latest data"
|
||||
sql: ${TABLE}.is_latest ;;
|
||||
}
|
||||
|
||||
dimension_group: timestamp {
|
||||
group_label: "Timestamp"
|
||||
type: time
|
||||
description: "Timestamp of measurement"
|
||||
sql: ${TABLE}.timestamp ;;
|
||||
timeframes: [hour, date, week, day_of_week]
|
||||
}
|
||||
|
||||
measure: average_measurement {
|
||||
group_label: "Measurement"
|
||||
type: average
|
||||
description: "My measurement"
|
||||
sql: ${TABLE}.measurement ;;
|
||||
}
|
||||
|
||||
}
|
||||
42
metadata-ingestion/tests/integration/lookml/test_lookml.py
Normal file
42
metadata-ingestion/tests/integration/lookml/test_lookml.py
Normal file
@ -0,0 +1,42 @@
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from tests.test_helpers import mce_helpers
|
||||
|
||||
logging.getLogger("lkml").setLevel(logging.INFO)
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.version_info < (3, 7), reason="lkml requires Python 3.7+")
|
||||
def test_lookml_ingest(pytestconfig, tmp_path, mock_time):
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
|
||||
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "lookml-test",
|
||||
"source": {
|
||||
"type": "lookml",
|
||||
"config": {
|
||||
"base_folder": str(test_resources_dir),
|
||||
"connection_to_platform_map": {"my_connection": "conn"},
|
||||
"parse_table_names_from_sql": True,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": f"{tmp_path}/lookml_mces.json",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
pipeline.run()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
output = mce_helpers.load_json_file(str(tmp_path / "lookml_mces.json"))
|
||||
expected = mce_helpers.load_json_file(
|
||||
str(test_resources_dir / "expected_output.json")
|
||||
)
|
||||
mce_helpers.assert_mces_equal(output, expected)
|
||||
Loading…
x
Reference in New Issue
Block a user