Rel to #10927 - Looker DataModel (#10945)

* Organise calls

* Prepare skeleton

* Add looker model handling

* Parse files as sql

* Handle labels

* Linting

* Format

* Fix version

* Also check the API for explore lineage
This commit is contained in:
Pere Miquel Brull 2023-04-11 08:44:00 +02:00 committed by GitHub
parent 1a451fe2a8
commit 09b283818d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1035 additions and 136 deletions

View File

@ -180,7 +180,7 @@ plugins: Dict[str, Set[str]] = {
"kafka": {*COMMONS["kafka"]},
"kinesis": {VERSIONS["boto3"]},
"ldap-users": {"ldap3==2.9.1"},
"looker": {"looker-sdk>=22.20.0"},
"looker": {"looker-sdk>=22.20.0", "lkml~=1.3"},
"mlflow": {"mlflow-skinny~=1.30", "alembic~=1.10.2"},
"mssql": {"sqlalchemy-pytds~=0.3"},
"mssql-odbc": {VERSIONS["pyodbc"]},

View File

@ -102,9 +102,26 @@ class DashboardServiceTopology(ServiceTopology):
nullable=True,
),
],
children=["dashboard"],
children=["bulk_data_model", "dashboard"],
post_process=["mark_dashboards_as_deleted"],
)
# Dashboard Services have very different approaches when
# when dealing with data models. Tableau has the models
# tightly coupled with dashboards, while Looker
# handles them as independent entities.
# When configuring a new source, we will either implement
# the yield_bulk_datamodel or yield_datamodel functions.
bulk_data_model = TopologyNode(
producer="list_datamodels",
stages=[
NodeStage(
type_=DashboardDataModel,
context="dataModel",
processor="yield_bulk_datamodel",
consumer=["dashboard_service"],
)
],
)
dashboard = TopologyNode(
producer="get_dashboard",
stages=[
@ -238,6 +255,13 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
Get Dashboard Details
"""
def list_datamodels(self) -> Iterable[Any]:
"""
Optional Node producer for processing datamodels in bulk
before the dashboards
"""
return []
def yield_datamodel(self, _) -> Optional[Iterable[CreateDashboardDataModelRequest]]:
"""
Method to fetch DataModel linked to Dashboard
@ -247,6 +271,17 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
f"DataModel is not supported for {self.service_connection.type.name}"
)
def yield_bulk_datamodel(
self, _
) -> Optional[Iterable[CreateDashboardDataModelRequest]]:
"""
Method to fetch DataModels in bulk
"""
logger.debug(
f"DataModel is not supported for {self.service_connection.type.name}"
)
def yield_dashboard_lineage(
self, dashboard_details: Any
) -> Optional[Iterable[AddLineageRequest]]:
@ -255,6 +290,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
We will look for the data in all the services
we have informed.
TODO: This we'll need to not make it dependant
on the dbServiceNames since our lineage will now be
model -> dashboard
"""
for db_service_name in self.source_config.dbServiceNames or []:
yield from self.yield_dashboard_lineage_details(

View File

@ -0,0 +1,109 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Looker general utilities
"""
from typing import List, Sequence, Union, cast
from looker_sdk.sdk.api40.models import LookmlModelExplore, LookmlModelExploreField
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.ingestion.source.dashboard.looker.models import LookMlView
# Some docs on types https://cloud.google.com/looker/docs/reference/param-dimension-filter-parameter-types
LOOKER_TYPE_MAP = {
"average": DataType.NUMBER,
"average_distinct": DataType.NUMBER,
"bin": DataType.ARRAY,
"count": DataType.NUMBER,
"count_distinct": DataType.NUMBER,
"date": DataType.DATE,
"date_date": DataType.DATE,
"date_day_of_month": DataType.NUMBER,
"date_day_of_week": DataType.NUMBER,
"date_day_of_week_index": DataType.NUMBER,
"date_fiscal_month_num": DataType.NUMBER,
"date_fiscal_quarter": DataType.DATE,
"date_fiscal_quarter_of_year": DataType.NUMBER,
"date_hour": DataType.TIME,
"date_hour_of_day": DataType.NUMBER,
"date_month": DataType.DATE,
"date_month_num": DataType.NUMBER,
"date_month_name": DataType.NUMBER,
"date_quarter": DataType.DATE,
"date_quarter_of_year": DataType.DATE,
"date_time": DataType.TIME,
"date_time_of_day": DataType.TIME,
"date_microsecond": DataType.TIME,
"date_millisecond": DataType.TIME,
"date_minute": DataType.TIME,
"date_raw": DataType.STRING,
"date_second": DataType.TIME,
"date_week": DataType.TIME,
"date_year": DataType.TIME,
"date_day_of_year": DataType.NUMBER,
"date_week_of_year": DataType.NUMBER,
"date_fiscal_year": DataType.DATE,
"duration_day": DataType.STRING,
"duration_hour": DataType.STRING,
"duration_minute": DataType.STRING,
"duration_month": DataType.STRING,
"duration_quarter": DataType.STRING,
"duration_second": DataType.STRING,
"duration_week": DataType.STRING,
"duration_year": DataType.STRING,
"distance": DataType.NUMBER,
"duration": DataType.NUMBER,
"int": DataType.NUMBER,
"list": DataType.ARRAY,
"location": DataType.UNION,
"max": DataType.NUMBER,
"median": DataType.NUMBER,
"median_distinct": DataType.NUMBER,
"min": DataType.NUMBER,
"number": DataType.NUMBER,
"percent_of_previous": DataType.NUMBER,
"percent_of_total": DataType.NUMBER,
"percentile": DataType.NUMBER,
"percentile_distinct": DataType.NUMBER,
"running_total": DataType.NUMBER,
"sum": DataType.NUMBER,
"sum_distinct": DataType.NUMBER,
"string": DataType.STRING,
"tier": DataType.ENUM,
"time": DataType.TIME,
"unknown": DataType.UNKNOWN,
"unquoted": DataType.STRING,
"yesno": DataType.BOOLEAN,
"zipcode": DataType.STRING,
}
def get_columns_from_model(
model: Union[LookmlModelExplore, LookMlView]
) -> List[Column]:
"""
Obtain the column (measures and dimensions) from the models
"""
columns = []
all_fields = (model.fields.dimensions or []) + (model.fields.measures or [])
for field in cast(Sequence[LookmlModelExploreField], all_fields):
columns.append(
Column(
name=field.name,
displayName=getattr(field, "label_short", field.label),
dataType=LOOKER_TYPE_MAP.get(field.type, DataType.UNKNOWN),
dataTypeDisplay=field.type,
description=field.description,
)
)
return columns

View File

@ -21,25 +21,33 @@ Notes:
import traceback
from datetime import datetime
from typing import Iterable, List, Optional, Set, cast
from typing import Iterable, List, Optional, Sequence, Set, Union, cast
from looker_sdk.error import SDKError
from looker_sdk.sdk.api31.models import Query
from looker_sdk.sdk.api40.methods import Looker40SDK
from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard
from looker_sdk.sdk.api40.models import (
DashboardBase,
DashboardElement,
LookmlModel,
LookmlModelExplore,
LookmlModelNavExplore,
)
from pydantic import ValidationError
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDashboardDataModel import (
CreateDashboardDataModelRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as MetadataDashboard,
)
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import (
LookerConnection,
@ -47,9 +55,13 @@ from metadata.generated.schema.entity.services.connections.dashboard.lookerConne
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.source import InvalidSourceException
@ -57,6 +69,14 @@ from metadata.ingestion.source.dashboard.dashboard_service import (
DashboardServiceSource,
DashboardUsage,
)
from metadata.ingestion.source.dashboard.looker.columns import get_columns_from_model
from metadata.ingestion.source.dashboard.looker.models import (
Includes,
LookMlView,
ViewName,
)
from metadata.ingestion.source.dashboard.looker.parser import LkmlParser
from metadata.readers.github import GitHubReader
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import clean_uri, get_standard_chart_type
@ -81,6 +101,20 @@ GET_DASHBOARD_FIELDS = [
]
def clean_dashboard_name(name: str) -> str:
"""
Clean incorrect (and known) looker characters in ids
"""
return name.replace("::", "_")
def build_datamodel_name(model_name: str, explore_name: str) -> str:
"""
Build the explore name using the model name
"""
return clean_dashboard_name(model_name + "_" + explore_name)
class LookerSource(DashboardServiceSource):
"""
Looker Source Class.
@ -100,8 +134,8 @@ class LookerSource(DashboardServiceSource):
super().__init__(config, metadata_config)
self.today = datetime.now().strftime("%Y-%m-%d")
# Owners cache. The key will be the user_id and the value its OM user EntityRef
self._owners_ref = {}
self._parser = None
self._explores_cache = {}
@classmethod
def create(
@ -115,6 +149,166 @@ class LookerSource(DashboardServiceSource):
)
return cls(config, metadata_config)
@property
def parser(self) -> Optional[LkmlParser]:
if not self._parser and self.service_connection.githubCredentials:
self._parser = LkmlParser(
reader=GitHubReader(self.service_connection.githubCredentials)
)
return self._parser
def list_datamodels(self) -> Iterable[LookmlModelExplore]:
"""
Fetch explores with the SDK
"""
# First, pick up all the LookML Models
all_lookml_models: Sequence[LookmlModel] = self.client.all_lookml_models()
# Then, fetch the explores for each of them
for lookml_model in all_lookml_models:
# Each LookML model have a list of explores we'll be ingesting
for explore_nav in (
cast(Sequence[LookmlModelNavExplore], lookml_model.explores) or []
):
explore = self.client.lookml_model_explore(
lookml_model_name=lookml_model.name, explore_name=explore_nav.name
)
yield explore
def yield_bulk_datamodel(
self, model: LookmlModelExplore
) -> Optional[Iterable[CreateDashboardDataModelRequest]]:
"""
Get the Explore and View information and prepare
the model creation request
"""
try:
explore_datamodel = CreateDashboardDataModelRequest(
name=build_datamodel_name(model.model_name, model.name),
displayName=model.name,
description=model.description,
service=self.context.dashboard_service.fullyQualifiedName.__root__,
dataModelType=DataModelType.LookMlExplore.value,
serviceType=DashboardServiceType.Looker.value,
columns=get_columns_from_model(model),
sql=self._get_explore_sql(model),
)
yield explore_datamodel
self.status.scanned(f"Data Model Scanned: {model.name}")
# Maybe use the project_name as key too?
# Save the explores for when we create the lineage with the dashboards and views
self._explores_cache[
explore_datamodel.name.__root__
] = self.context.dataModel # This is the newly created explore
# We can get VIEWs from the JOINs to know the dependencies
# We will only try and fetch if we have the credentials
if self.service_connection.githubCredentials:
for view in model.joins:
yield from self._process_view(
view_name=ViewName(view.name), explore=model
)
except ValidationError as err:
error = f"Validation error yielding Data Model [{model.name}]: {err}"
logger.debug(traceback.format_exc())
logger.error(error)
self.status.failed(
name=model.name, error=error, stack_trace=traceback.format_exc()
)
except Exception as err:
error = f"Wild error yielding Data Model [{model.name}]: {err}"
logger.debug(traceback.format_exc())
logger.error(error)
self.status.failed(
name=model.name, error=error, stack_trace=traceback.format_exc()
)
def _get_explore_sql(self, explore: LookmlModelExplore) -> Optional[str]:
"""
If github creds are sent, we can pick the explore
file definition and add it here
"""
# Only look to parse if creds are in
if self.service_connection.githubCredentials:
try:
# This will only parse if the file has not been parsed yet
self.parser.parse_file(Includes(explore.source_file))
return self.parser.parsed_files.get(Includes(explore.source_file))
except Exception as err:
logger.warning(f"Exception getting the model sql: {err}")
return None
def _process_view(self, view_name: ViewName, explore: LookmlModelExplore):
"""
For each view referenced in the JOIN of the explore,
We first load the explore file from GitHub, then:
1. Fetch the view from the GitHub files (search in includes)
2. Yield the view as a dashboard Model
3. Yield the lineage between the View -> Explore and Source -> View
Every visited view, will be cached so that we don't need to process
everything again.
"""
view: Optional[LookMlView] = self.parser.find_view(
view_name=view_name, path=Includes(explore.source_file)
)
if view:
yield CreateDashboardDataModelRequest(
name=build_datamodel_name(explore.model_name, view.name),
displayName=view.name,
description=view.description,
service=self.context.dashboard_service.fullyQualifiedName.__root__,
dataModelType=DataModelType.LookMlView.value,
serviceType=DashboardServiceType.Looker.value,
columns=get_columns_from_model(view),
sql=self.parser.parsed_files.get(Includes(view.source_file)),
)
self.status.scanned(f"Data Model Scanned: {view.name}")
yield from self.add_view_lineage(view, explore)
def add_view_lineage(
self, view: LookMlView, explore: LookmlModelExplore
) -> Iterable[AddLineageRequest]:
"""
Add the lineage source -> view -> explore
"""
try:
# TODO: column-level lineage parsing the explore columns with the format `view_name.col`
# Now the context has the newly created view
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=self.context.dataModel.id.__root__, type="dashboardDataModel"
),
toEntity=EntityReference(
id=self._explores_cache[explore.name].id.__root__,
type="dashboardDataModel",
),
)
)
if view.sql_table_name:
source_table_name = self._clean_table_name(view.sql_table_name)
# View to the source is only there if we are informing the dbServiceNames
for db_service_name in self.source_config.dbServiceNames or []:
yield self.build_lineage_request(
source=source_table_name,
db_service_name=db_service_name,
to_entity=self.context.dataModel,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield lineage details for view [{view.name}]: {err}"
)
def get_dashboards_list(self) -> List[DashboardBase]:
"""
Get List of all dashboards
@ -160,27 +354,17 @@ class LookerSource(DashboardServiceSource):
Optional[EntityReference]
"""
try:
if (
dashboard_details.user_id is not None
and dashboard_details.user_id not in self._owners_ref
):
if dashboard_details.user_id is not None:
dashboard_owner = self.client.user(dashboard_details.user_id)
user = self.metadata.get_user_by_email(dashboard_owner.email)
if user: # Save the EntityRef
self._owners_ref[dashboard_details.user_id] = EntityReference(
id=user.id, type="user"
)
else: # Otherwise, flag the user as missing in OM
self._owners_ref[dashboard_details.user_id] = None
logger.debug(
f"User {dashboard_owner.email} not found in OpenMetadata."
)
if user:
return EntityReference(id=user.id.__root__, type="user")
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not fetch owner data due to {err}")
return self._owners_ref.get(dashboard_details.user_id)
return None
def yield_dashboard(
self, dashboard_details: LookerDashboard
@ -190,7 +374,7 @@ class LookerSource(DashboardServiceSource):
"""
dashboard_request = CreateDashboardRequest(
name=dashboard_details.id.replace("::", "_"),
name=clean_dashboard_name(dashboard_details.id),
displayName=dashboard_details.title,
description=dashboard_details.description or None,
charts=[
@ -219,33 +403,10 @@ class LookerSource(DashboardServiceSource):
return table_name.lower().split("as")[0].strip()
def _add_sql_table(self, query: Query, dashboard_sources: Set[str]):
@staticmethod
def get_dashboard_sources(dashboard_details: LookerDashboard) -> Set[str]:
"""
Add the SQL table information to the dashboard_sources.
Updates the seen dashboards.
:param query: Looker query, from a look or result_maker
:param dashboard_sources: seen tables so far
"""
try:
explore: LookmlModelExplore = self.client.lookml_model_explore(
query.model, query.view
)
table_name = explore.sql_table_name
if table_name:
dashboard_sources.add(self._clean_table_name(table_name))
except SDKError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Cannot get explore from model={query.model}, view={query.view}: {err}"
)
def get_dashboard_sources(self, dashboard_details: LookerDashboard) -> Set[str]:
"""
Set of source tables to build lineage for the processed dashboard
Set explores to build lineage for the processed dashboard
"""
dashboard_sources: Set[str] = set()
@ -253,20 +414,42 @@ class LookerSource(DashboardServiceSource):
Iterable[DashboardElement], dashboard_details.dashboard_elements
):
if chart.query and chart.query.view:
self._add_sql_table(chart.query, dashboard_sources)
dashboard_sources.add(
build_datamodel_name(chart.query.model, chart.query.view)
)
if chart.look and chart.look.query and chart.look.query.view:
self._add_sql_table(chart.look.query, dashboard_sources)
dashboard_sources.add(
build_datamodel_name(chart.look.query.model, chart.look.query.view)
)
if (
chart.result_maker
and chart.result_maker.query
and chart.result_maker.query.view
):
self._add_sql_table(chart.result_maker.query, dashboard_sources)
dashboard_sources.add(
build_datamodel_name(
chart.result_maker.query.model, chart.result_maker.query.view
)
)
return dashboard_sources
def get_explore(self, explore_name: str) -> Optional[DashboardDataModel]:
"""
Get the dashboard model from cache or API
"""
return self._explores_cache.get(explore_name) or self.metadata.get_by_name(
entity=DashboardDataModel,
fqn=fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.dashboard_service.fullyQualifiedName.__root__,
data_model_name=explore_name,
),
)
def yield_dashboard_lineage_details(
self, dashboard_details: LookerDashboard, db_service_name: str
self, dashboard_details: LookerDashboard, _: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between charts and data sources.
@ -276,35 +459,38 @@ class LookerSource(DashboardServiceSource):
- chart.look (chart.look.query)
- chart.result_maker
"""
datasource_list = self.get_dashboard_sources(dashboard_details)
to_fqn = fqn.build(
self.metadata,
entity_type=MetadataDashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details.id.replace("::", "_"),
)
to_entity = self.metadata.get_by_name(
entity=MetadataDashboard,
fqn=to_fqn,
)
try:
source_explore_list = self.get_dashboard_sources(dashboard_details)
for explore_name in source_explore_list:
cached_explore = self.get_explore(explore_name)
if cached_explore:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=cached_explore.id.__root__,
type="dashboardDataModel",
),
toEntity=EntityReference(
id=self.context.dashboard.id.__root__,
type="dashboard",
),
)
)
for source in datasource_list:
try:
yield self.build_lineage_request(
source=source,
db_service_name=db_service_name,
to_entity=to_entity,
)
except (Exception, IndexError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error building lineage for database service [{db_service_name}]: {err}"
)
except Exception as exc:
error = f"Unexpected exception yielding lineage from [{self.context.dashboard.displayName}]: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(
self.context.dashboard.displayName, error, traceback.format_exc()
)
def build_lineage_request(
self, source: str, db_service_name: str, to_entity: MetadataDashboard
self,
source: str,
db_service_name: str,
to_entity: Union[MetadataDashboard, DashboardDataModel],
) -> Optional[AddLineageRequest]:
"""
Once we have a list of origin data sources, check their components

View File

@ -0,0 +1,50 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Looker pydantic models
"""
from typing import List, NewType, Optional
from pydantic import BaseModel, Field
Includes = NewType("Includes", str)
ViewName = NewType("ViewName", str)
class LookMlField(BaseModel):
description: Optional[str] = Field(None, description="Field description")
label: Optional[str] = Field(None, description="Field display name")
type: Optional[str] = Field(None, description="Field type to be mapped to OM")
name: str = Field(..., description="Field name")
class LookMlView(BaseModel):
name: ViewName = Field(..., description="View name")
description: Optional[str] = Field(None, description="View description")
sql_table_name: Optional[str] = Field(
None, description="To track lineage with the source"
)
measures: List[LookMlField] = Field([], description="Measures to ingest as cols")
dimensions: List[LookMlField] = Field(
[], description="Dimensions to ingest as cols"
)
source_file: Optional[Includes] = Field(None, description="lkml file path")
class LkmlFile(BaseModel):
"""
it might also have explores, but we don't care.
We'll pick explores from the API
"""
includes: List[Includes] = Field([], description="Full include list")
views: List[LookMlView] = Field([], description="Views we want to parse")

View File

@ -0,0 +1,136 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.lkml files parser
"""
import traceback
from typing import Dict, List, Optional
import lkml
from pydantic import ValidationError
from metadata.ingestion.source.dashboard.looker.models import (
Includes,
LkmlFile,
LookMlView,
ViewName,
)
from metadata.readers.base import Reader, ReadException
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class LkmlParser:
"""
Parses and caches the visited files & views.
Here we'll just parse VIEWs, as we are already getting
the explores from the API.
The goal is to make sure we parse files only once
and store the results of the processed views.
We want to parse the minimum number of files each time
until we find the view we are looking for.
Approach:
When we parse, it is because we are looking for a view definition, then
1. When parsing any source file the outcome is
a. Look for any defined view, and cache it
b. The parsing result is the list of includes
2. Is my view present in the cache?
yes. Then return it
no. Then keep parsing `includes` until the response is yes.
"""
def __init__(self, reader: Reader):
self._views_cache: Dict[ViewName, LookMlView] = {}
self._visited_files: Dict[Includes, List[Includes]] = {}
# To store the raw string of the lkml explores
self.parsed_files: Dict[Includes, str] = {}
self.reader = reader
def parse_file(self, path: Includes) -> Optional[List[Includes]]:
"""
Internal parser. Parse the file and cache the views
If a lkml include starts with //, means that it is pointing to
a external repository. we won't send it to the reader
"""
# If visited, return its includes to continue parsing
if path in self._visited_files:
return self._visited_files[path]
# If the path starts with //, we will ignore it for now
if path.startswith("//"):
logger.info(f"We do not support external includes yet. Skipping {path}")
return []
try:
file = self.reader.read(path)
lkml_file = LkmlFile.parse_obj(lkml.load(file))
self.parsed_files[path] = file
# Cache everything
self._visited_files[path] = lkml_file.includes
for view in lkml_file.views:
view.source_file = path
self._views_cache[view.name] = view
return lkml_file.includes
except ReadException as err:
logger.debug(traceback.format_exc())
logger.error(f"Error trying to read the file [{path}]: {err}")
except ValidationError as err:
logger.error(
f"Validation error building the .lkml file from [{path}]: {err}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Unknown error building the .lkml file from [{path}]: {err}")
return None
def get_view_from_cache(self, view_name: ViewName) -> Optional[LookMlView]:
"""
Check if view is cached, and return it.
Otherwise, return None
"""
if view_name in self._views_cache:
return self._views_cache[view_name]
return None
def find_view(self, view_name: ViewName, path: Includes) -> Optional[LookMlView]:
"""
Parse an incoming file (either from a `source_file` or an `include`),
cache the views and return the list of includes to parse if
we still don't find the view afterwards
"""
cached_view = self.get_view_from_cache(view_name)
if cached_view:
return cached_view
for include in self.parse_file(path) or []:
cached_view = self.get_view_from_cache(view_name)
if cached_view:
return cached_view
# Recursively parse inner includes
self.find_view(view_name, include)
# We might not find the view ever
return self.get_view_from_cache(view_name)

View File

@ -0,0 +1,29 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base local reader
"""
from abc import ABC, abstractmethod
class ReadException(Exception):
"""
To be raised by any errors with the read calls
"""
class Reader(ABC):
@abstractmethod
def read(self, path: str) -> str:
"""
Given a string, return a string
"""
raise NotImplementedError("Missing read implementation")

View File

@ -0,0 +1,101 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
GitHub client to read files with token auth
"""
import base64
import traceback
from enum import Enum
from typing import Any, Dict
import requests
from metadata.generated.schema.security.credentials.githubCredentials import (
GitHubCredentials,
)
from metadata.readers.base import Reader, ReadException
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
HOST = "https://api.github.com"
class UrlParts(Enum):
REPOS = "repos"
CONTENTS = "contents"
class GitHubReader(Reader):
"""
Handle calls to the GitHub API against a repo
"""
def __init__(self, credentials: GitHubCredentials):
self.credentials = credentials
self._auth_headers = None
@property
def auth_headers(self) -> Dict[str, str]:
"""
Build the headers to authenticate
to the API
"""
if self._auth_headers is None:
self._auth_headers = {"Authentication": f"token {self.credentials.token}"}
return self._auth_headers
@staticmethod
def _build_url(*parts: str):
"""
Build URL parts
"""
return "/".join(parts)
@staticmethod
def _decode_content(json_response: Dict[str, Any]) -> str:
"""
Return the content of the response
If no `content` there, throw the KeyError
"""
return base64.b64decode(json_response["content"]).decode(UTF_8)
def read(self, path: str) -> str:
"""
Read a file from a GitHub Repo and return its
contents as a string
https://docs.github.com/en/rest/repos/contents?apiVersion=2022-11-28#get-repository-content
"""
try:
res = requests.get(
self._build_url(
HOST,
UrlParts.REPOS.value,
self.credentials.repositoryOwner,
self.credentials.repositoryName,
UrlParts.CONTENTS.value,
path,
),
timeout=30,
)
if res.status_code == 200:
return self._decode_content(res.json())
except Exception as err:
logger.debug(traceback.format_exc())
raise ReadException(f"Error fetching file [{path}] from repo: {err}")
raise ReadException(f"Could not fetch file [{path}] from repo")

View File

@ -0,0 +1,42 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Local Reader
"""
import traceback
from pathlib import Path
from metadata.readers.base import Reader, ReadException
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class LocalReader(Reader):
"""
Read files locally
"""
def __init__(self, base_path: Path):
self.base_path = base_path
def read(self, path: str) -> str:
"""
simple local reader
"""
try:
with open(self.base_path / path, encoding=UTF_8) as file:
return file.read()
except Exception as err:
logger.debug(traceback.format_exc())
raise ReadException(f"Error reading file [{path}] locally: {err}")

View File

@ -0,0 +1,29 @@
include: "views/cats.view.lkml"
include: "views/dogs.view.lkml"
explore: cats {
label: "Cats"
join: dogs {
relationship: many_to_one
sql_on: ${cats.name} = ${dogs.name} ;;
}
}
view: birds {
sql_table_name: birds ;;
dimension: name {
type: string
sql: ${TABLE}.name ;;
}
}
explore: birds {
label: "Birds"
join: dogs {
relationship: many_to_one
sql_on: ${cats.name} = ${birds.name} ;;
}
}

View File

@ -0,0 +1,11 @@
include: "views/kittens.view.lkml"
include: "views/dogs.view.lkml"
explore: kittens {
label: "Kittens"
join: dogs {
relationship: many_to_one
sql_on: ${kittens.name} = ${kittens.name} ;;
}
}

View File

@ -0,0 +1,13 @@
view: cats {
sql_table_name: cats ;;
dimension: name {
type: string
sql: ${TABLE}.name ;;
}
dimension: age {
type: int
sql: ${TABLE}.age ;;
}
}

View File

@ -0,0 +1,18 @@
view: dogs {
sql_table_name: dogs ;;
dimension: name {
type: string
sql: ${TABLE}.name ;;
}
dimension: age {
type: number
sql: ${TABLE}.age ;;
}
dimension: ball {
type: yesno
sql: ${TABLE}.ball ;;
}
}

View File

@ -0,0 +1,10 @@
include: "views/cats.view.lkml"
view: kittens {
extends: [cats]
dimension: parent {
type: string
sql: ${TABLE}.name ;;
}
}

View File

@ -247,16 +247,7 @@ class LookerUnitTest(TestCase):
"""
Check how we pick or not the owner
"""
# First, validate that we return the user if we have it
# in the ref
ref = EntityReference(id=uuid.uuid4(), type="user")
self.looker._owners_ref["user_id"] = ref
self.assertEqual(self.looker.get_owner_details(MOCK_LOOKER_DASHBOARD), ref)
# Now check that we are storing the ref properly
self.looker._owners_ref = {}
with patch.object(Looker40SDK, "user", return_value=MOCK_USER), patch.object(
# This does not really return a ref, but for simplicity
@ -265,8 +256,6 @@ class LookerUnitTest(TestCase):
return_value=ref,
):
self.assertEqual(self.looker.get_owner_details(MOCK_LOOKER_DASHBOARD), ref)
# The call also updates the _owners_ref dict
self.assertEqual(self.looker._owners_ref.get("user_id"), ref)
def raise_something_bad():
raise RuntimeError("Something bad")
@ -309,51 +298,6 @@ class LookerUnitTest(TestCase):
self.assertEqual(self.looker._clean_table_name("TABLE AS ALIAS"), "table")
def test_add_sql_table(self):
"""
Check how we get the table name from the explore
"""
# Check how we get the table name back
with patch.object(
Looker40SDK,
"lookml_model_explore",
return_value=LookmlModelExplore(sql_table_name="MY_TABLE"),
):
dashboard_sources = set()
self.looker._add_sql_table(
query=Query(model="model", view="view"),
dashboard_sources=dashboard_sources,
)
self.assertEqual(dashboard_sources, {"my_table"})
# If there's no name, nothing happens
with patch.object(
Looker40SDK,
"lookml_model_explore",
return_value=LookmlModelExplore(sql_table_name=None),
):
dashboard_sources = set()
self.looker._add_sql_table(
query=Query(model="model", view="view"),
dashboard_sources=dashboard_sources,
)
self.assertEqual(len(dashboard_sources), 0)
def something_bad(*_):
raise SDKError("something bad")
# We don't raise on errors, just log
with patch.object(
Looker40SDK, "lookml_model_explore", side_effect=something_bad
):
self.looker._add_sql_table(
query=Query(model="model", view="view"),
dashboard_sources=dashboard_sources,
)
def test_get_dashboard_sources(self):
"""
Check how we are building the sources
@ -361,10 +305,13 @@ class LookerUnitTest(TestCase):
with patch.object(
Looker40SDK,
"lookml_model_explore",
return_value=LookmlModelExplore(sql_table_name="MY_TABLE"),
return_value=LookmlModelExplore(
sql_table_name="MY_TABLE", model_name="model2", view_name="view"
),
):
dashboard_sources = self.looker.get_dashboard_sources(MOCK_LOOKER_DASHBOARD)
self.assertEqual(dashboard_sources, {"my_table"})
# Picks it up from the chart, not here
self.assertEqual(dashboard_sources, {"model_view"})
def test_build_lineage_request(self):
"""

View File

@ -0,0 +1,138 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test the lkml parser
"""
from pathlib import Path
from unittest import TestCase
from metadata.ingestion.source.dashboard.looker.parser import (
Includes,
LkmlParser,
ViewName,
)
from metadata.readers.local import LocalReader
BASE_PATH = Path(__file__).parent.parent.parent / "resources/lkml"
class TestLkmlParser(TestCase):
"""
Check the parser with the local reader
"""
def test_local_parser_with_birds(self):
"""
Birds view is declared in the explore file.
Nothing else to parse, so no visited files
"""
reader = LocalReader(BASE_PATH)
parser = LkmlParser(reader)
view = parser.find_view(
view_name=ViewName("birds"), path=Includes("cats.explore.lkml")
)
self.assertIsNotNone(view)
self.assertEqual(view.name, "birds")
self.assertEqual(parser.get_view_from_cache(ViewName("birds")).name, "birds")
self.assertEqual(
parser._visited_files,
{
"cats.explore.lkml": ["views/cats.view.lkml", "views/dogs.view.lkml"],
},
)
def test_local_parser_with_cats(self):
"""
Cats view is in a separate view file which we need to visit.
We go there before jumping into the dogs, and stop.
"""
reader = LocalReader(BASE_PATH)
parser = LkmlParser(reader)
view = parser.find_view(
view_name=ViewName("cats"), path=Includes("cats.explore.lkml")
)
self.assertIsNotNone(view)
self.assertEqual(view.name, "cats")
self.assertEqual(parser.get_view_from_cache(ViewName("cats")).name, "cats")
self.assertEqual(
parser._visited_files,
{
"cats.explore.lkml": ["views/cats.view.lkml", "views/dogs.view.lkml"],
"views/cats.view.lkml": [],
},
)
def test_local_parser_with_dogs(self):
"""
Dogs view is in a separate view file which we need to visit.
We go after visiting the cats, so we need to parse both
"""
reader = LocalReader(BASE_PATH)
parser = LkmlParser(reader)
view = parser.find_view(
view_name=ViewName("dogs"), path=Includes("cats.explore.lkml")
)
self.assertIsNotNone(view)
self.assertEqual(view.name, "dogs")
self.assertEqual(parser.get_view_from_cache(ViewName("dogs")).name, "dogs")
self.assertEqual(
parser._visited_files,
{
"cats.explore.lkml": ["views/cats.view.lkml", "views/dogs.view.lkml"],
"views/cats.view.lkml": [],
"views/dogs.view.lkml": [],
},
)
# Now I can directly get the cats view from the cache as well
# as I already visited that file
self.assertEqual(parser.get_view_from_cache(ViewName("cats")).name, "cats")
def test_local_parser_with_kittens(self):
"""
We will now parse the kittens explore looking for the cats view.
This requires two jumps: Kittens explore -> Kittens View -> Cats View
"""
reader = LocalReader(BASE_PATH)
parser = LkmlParser(reader)
view = parser.find_view(
view_name=ViewName("cats"), path=Includes("kittens.explore.lkml")
)
self.assertIsNotNone(view)
self.assertEqual(view.name, "cats")
self.assertEqual(parser.get_view_from_cache(ViewName("cats")).name, "cats")
self.assertEqual(
parser._visited_files,
{
"kittens.explore.lkml": [
"views/kittens.view.lkml",
"views/dogs.view.lkml",
],
"views/kittens.view.lkml": ["views/cats.view.lkml"],
"views/cats.view.lkml": [],
},
)

View File

@ -16,7 +16,9 @@
"enum": [
"TableauSheet",
"SupersetDataModel",
"MetabaseDataModel"
"MetabaseDataModel",
"LookMlView",
"LookMlExplore"
],
"javaEnums": [
{
@ -27,6 +29,12 @@
},
{
"name": "MetabaseDataModel"
},
{
"name": "LookMlView"
},
{
"name": "LookMlExplore"
}
]
}

View File

@ -38,6 +38,11 @@
"type": "string",
"format": "uri"
},
"githubCredentials": {
"title": "GitHub Credentials",
"description": "Credentials to extract the .lkml files from a repository. This is required to get all the lineage and definitions.",
"$ref": "../../../../security/credentials/githubCredentials.json"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"

View File

@ -0,0 +1,28 @@
{
"$id": "https://open-metadata.org/security/credentials/githubCredentials.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GitHubCredentials",
"description": "Credentials for a GitHub repository",
"type": "object",
"javaType": "org.openmetadata.schema.security.credentials.GitHubCredentials",
"properties": {
"repositoryOwner": {
"title": "Repository Owner",
"description": "The owner (user or organization) of a GitHub repository. For example, in https://github.com/open-metadata/OpenMetadata, the owner is `open-metadata`.",
"type": "string"
},
"repositoryName": {
"title": "Repository Name",
"description": "The name of a GitHub repository. For example, in https://github.com/open-metadata/OpenMetadata, the name is `OpenMetadata`.",
"type": "string"
},
"token": {
"title": "API Token",
"description": "Token to use the API. This is required for private repositories and to ensure we don't hit API limits.",
"type": "string",
"format": "password"
}
},
"additionalProperties": false,
"required": ["repositoryOwner", "repositoryName"]
}