diff --git a/ingestion/setup.py b/ingestion/setup.py index 5490a440eaa..94e0a6d86aa 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -180,7 +180,7 @@ plugins: Dict[str, Set[str]] = { "kafka": {*COMMONS["kafka"]}, "kinesis": {VERSIONS["boto3"]}, "ldap-users": {"ldap3==2.9.1"}, - "looker": {"looker-sdk>=22.20.0"}, + "looker": {"looker-sdk>=22.20.0", "lkml~=1.3"}, "mlflow": {"mlflow-skinny~=1.30", "alembic~=1.10.2"}, "mssql": {"sqlalchemy-pytds~=0.3"}, "mssql-odbc": {VERSIONS["pyodbc"]}, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index ee46e03af48..9d105dd19ac 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -102,9 +102,26 @@ class DashboardServiceTopology(ServiceTopology): nullable=True, ), ], - children=["dashboard"], + children=["bulk_data_model", "dashboard"], post_process=["mark_dashboards_as_deleted"], ) + # Dashboard Services have very different approaches when + # when dealing with data models. Tableau has the models + # tightly coupled with dashboards, while Looker + # handles them as independent entities. + # When configuring a new source, we will either implement + # the yield_bulk_datamodel or yield_datamodel functions. + bulk_data_model = TopologyNode( + producer="list_datamodels", + stages=[ + NodeStage( + type_=DashboardDataModel, + context="dataModel", + processor="yield_bulk_datamodel", + consumer=["dashboard_service"], + ) + ], + ) dashboard = TopologyNode( producer="get_dashboard", stages=[ @@ -238,6 +255,13 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): Get Dashboard Details """ + def list_datamodels(self) -> Iterable[Any]: + """ + Optional Node producer for processing datamodels in bulk + before the dashboards + """ + return [] + def yield_datamodel(self, _) -> Optional[Iterable[CreateDashboardDataModelRequest]]: """ Method to fetch DataModel linked to Dashboard @@ -247,6 +271,17 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): f"DataModel is not supported for {self.service_connection.type.name}" ) + def yield_bulk_datamodel( + self, _ + ) -> Optional[Iterable[CreateDashboardDataModelRequest]]: + """ + Method to fetch DataModels in bulk + """ + + logger.debug( + f"DataModel is not supported for {self.service_connection.type.name}" + ) + def yield_dashboard_lineage( self, dashboard_details: Any ) -> Optional[Iterable[AddLineageRequest]]: @@ -255,6 +290,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): We will look for the data in all the services we have informed. + + TODO: This we'll need to not make it dependant + on the dbServiceNames since our lineage will now be + model -> dashboard """ for db_service_name in self.source_config.dbServiceNames or []: yield from self.yield_dashboard_lineage_details( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/columns.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/columns.py new file mode 100644 index 00000000000..7d00b92b016 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/columns.py @@ -0,0 +1,109 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Looker general utilities +""" +from typing import List, Sequence, Union, cast + +from looker_sdk.sdk.api40.models import LookmlModelExplore, LookmlModelExploreField + +from metadata.generated.schema.entity.data.table import Column, DataType +from metadata.ingestion.source.dashboard.looker.models import LookMlView + +# Some docs on types https://cloud.google.com/looker/docs/reference/param-dimension-filter-parameter-types +LOOKER_TYPE_MAP = { + "average": DataType.NUMBER, + "average_distinct": DataType.NUMBER, + "bin": DataType.ARRAY, + "count": DataType.NUMBER, + "count_distinct": DataType.NUMBER, + "date": DataType.DATE, + "date_date": DataType.DATE, + "date_day_of_month": DataType.NUMBER, + "date_day_of_week": DataType.NUMBER, + "date_day_of_week_index": DataType.NUMBER, + "date_fiscal_month_num": DataType.NUMBER, + "date_fiscal_quarter": DataType.DATE, + "date_fiscal_quarter_of_year": DataType.NUMBER, + "date_hour": DataType.TIME, + "date_hour_of_day": DataType.NUMBER, + "date_month": DataType.DATE, + "date_month_num": DataType.NUMBER, + "date_month_name": DataType.NUMBER, + "date_quarter": DataType.DATE, + "date_quarter_of_year": DataType.DATE, + "date_time": DataType.TIME, + "date_time_of_day": DataType.TIME, + "date_microsecond": DataType.TIME, + "date_millisecond": DataType.TIME, + "date_minute": DataType.TIME, + "date_raw": DataType.STRING, + "date_second": DataType.TIME, + "date_week": DataType.TIME, + "date_year": DataType.TIME, + "date_day_of_year": DataType.NUMBER, + "date_week_of_year": DataType.NUMBER, + "date_fiscal_year": DataType.DATE, + "duration_day": DataType.STRING, + "duration_hour": DataType.STRING, + "duration_minute": DataType.STRING, + "duration_month": DataType.STRING, + "duration_quarter": DataType.STRING, + "duration_second": DataType.STRING, + "duration_week": DataType.STRING, + "duration_year": DataType.STRING, + "distance": DataType.NUMBER, + "duration": DataType.NUMBER, + "int": DataType.NUMBER, + "list": DataType.ARRAY, + "location": DataType.UNION, + "max": DataType.NUMBER, + "median": DataType.NUMBER, + "median_distinct": DataType.NUMBER, + "min": DataType.NUMBER, + "number": DataType.NUMBER, + "percent_of_previous": DataType.NUMBER, + "percent_of_total": DataType.NUMBER, + "percentile": DataType.NUMBER, + "percentile_distinct": DataType.NUMBER, + "running_total": DataType.NUMBER, + "sum": DataType.NUMBER, + "sum_distinct": DataType.NUMBER, + "string": DataType.STRING, + "tier": DataType.ENUM, + "time": DataType.TIME, + "unknown": DataType.UNKNOWN, + "unquoted": DataType.STRING, + "yesno": DataType.BOOLEAN, + "zipcode": DataType.STRING, +} + + +def get_columns_from_model( + model: Union[LookmlModelExplore, LookMlView] +) -> List[Column]: + """ + Obtain the column (measures and dimensions) from the models + """ + columns = [] + all_fields = (model.fields.dimensions or []) + (model.fields.measures or []) + for field in cast(Sequence[LookmlModelExploreField], all_fields): + columns.append( + Column( + name=field.name, + displayName=getattr(field, "label_short", field.label), + dataType=LOOKER_TYPE_MAP.get(field.type, DataType.UNKNOWN), + dataTypeDisplay=field.type, + description=field.description, + ) + ) + + return columns diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index e43ad3d324a..4129d512bad 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -21,25 +21,33 @@ Notes: import traceback from datetime import datetime -from typing import Iterable, List, Optional, Set, cast +from typing import Iterable, List, Optional, Sequence, Set, Union, cast -from looker_sdk.error import SDKError -from looker_sdk.sdk.api31.models import Query from looker_sdk.sdk.api40.methods import Looker40SDK from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard from looker_sdk.sdk.api40.models import ( DashboardBase, DashboardElement, + LookmlModel, LookmlModelExplore, + LookmlModelNavExplore, ) +from pydantic import ValidationError from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.data.createDashboardDataModel import ( + CreateDashboardDataModelRequest, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import ( Dashboard as MetadataDashboard, ) +from metadata.generated.schema.entity.data.dashboardDataModel import ( + DashboardDataModel, + DataModelType, +) from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import ( LookerConnection, @@ -47,9 +55,13 @@ from metadata.generated.schema.entity.services.connections.dashboard.lookerConne from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.dashboardService import ( + DashboardServiceType, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.source import InvalidSourceException @@ -57,6 +69,14 @@ from metadata.ingestion.source.dashboard.dashboard_service import ( DashboardServiceSource, DashboardUsage, ) +from metadata.ingestion.source.dashboard.looker.columns import get_columns_from_model +from metadata.ingestion.source.dashboard.looker.models import ( + Includes, + LookMlView, + ViewName, +) +from metadata.ingestion.source.dashboard.looker.parser import LkmlParser +from metadata.readers.github import GitHubReader from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.helpers import clean_uri, get_standard_chart_type @@ -81,6 +101,20 @@ GET_DASHBOARD_FIELDS = [ ] +def clean_dashboard_name(name: str) -> str: + """ + Clean incorrect (and known) looker characters in ids + """ + return name.replace("::", "_") + + +def build_datamodel_name(model_name: str, explore_name: str) -> str: + """ + Build the explore name using the model name + """ + return clean_dashboard_name(model_name + "_" + explore_name) + + class LookerSource(DashboardServiceSource): """ Looker Source Class. @@ -100,8 +134,8 @@ class LookerSource(DashboardServiceSource): super().__init__(config, metadata_config) self.today = datetime.now().strftime("%Y-%m-%d") - # Owners cache. The key will be the user_id and the value its OM user EntityRef - self._owners_ref = {} + self._parser = None + self._explores_cache = {} @classmethod def create( @@ -115,6 +149,166 @@ class LookerSource(DashboardServiceSource): ) return cls(config, metadata_config) + @property + def parser(self) -> Optional[LkmlParser]: + if not self._parser and self.service_connection.githubCredentials: + self._parser = LkmlParser( + reader=GitHubReader(self.service_connection.githubCredentials) + ) + + return self._parser + + def list_datamodels(self) -> Iterable[LookmlModelExplore]: + """ + Fetch explores with the SDK + """ + # First, pick up all the LookML Models + all_lookml_models: Sequence[LookmlModel] = self.client.all_lookml_models() + + # Then, fetch the explores for each of them + for lookml_model in all_lookml_models: + # Each LookML model have a list of explores we'll be ingesting + for explore_nav in ( + cast(Sequence[LookmlModelNavExplore], lookml_model.explores) or [] + ): + explore = self.client.lookml_model_explore( + lookml_model_name=lookml_model.name, explore_name=explore_nav.name + ) + yield explore + + def yield_bulk_datamodel( + self, model: LookmlModelExplore + ) -> Optional[Iterable[CreateDashboardDataModelRequest]]: + """ + Get the Explore and View information and prepare + the model creation request + """ + try: + explore_datamodel = CreateDashboardDataModelRequest( + name=build_datamodel_name(model.model_name, model.name), + displayName=model.name, + description=model.description, + service=self.context.dashboard_service.fullyQualifiedName.__root__, + dataModelType=DataModelType.LookMlExplore.value, + serviceType=DashboardServiceType.Looker.value, + columns=get_columns_from_model(model), + sql=self._get_explore_sql(model), + ) + yield explore_datamodel + self.status.scanned(f"Data Model Scanned: {model.name}") + + # Maybe use the project_name as key too? + # Save the explores for when we create the lineage with the dashboards and views + self._explores_cache[ + explore_datamodel.name.__root__ + ] = self.context.dataModel # This is the newly created explore + + # We can get VIEWs from the JOINs to know the dependencies + # We will only try and fetch if we have the credentials + if self.service_connection.githubCredentials: + for view in model.joins: + yield from self._process_view( + view_name=ViewName(view.name), explore=model + ) + + except ValidationError as err: + error = f"Validation error yielding Data Model [{model.name}]: {err}" + logger.debug(traceback.format_exc()) + logger.error(error) + self.status.failed( + name=model.name, error=error, stack_trace=traceback.format_exc() + ) + except Exception as err: + error = f"Wild error yielding Data Model [{model.name}]: {err}" + logger.debug(traceback.format_exc()) + logger.error(error) + self.status.failed( + name=model.name, error=error, stack_trace=traceback.format_exc() + ) + + def _get_explore_sql(self, explore: LookmlModelExplore) -> Optional[str]: + """ + If github creds are sent, we can pick the explore + file definition and add it here + """ + # Only look to parse if creds are in + if self.service_connection.githubCredentials: + try: + # This will only parse if the file has not been parsed yet + self.parser.parse_file(Includes(explore.source_file)) + return self.parser.parsed_files.get(Includes(explore.source_file)) + except Exception as err: + logger.warning(f"Exception getting the model sql: {err}") + + return None + + def _process_view(self, view_name: ViewName, explore: LookmlModelExplore): + """ + For each view referenced in the JOIN of the explore, + We first load the explore file from GitHub, then: + 1. Fetch the view from the GitHub files (search in includes) + 2. Yield the view as a dashboard Model + 3. Yield the lineage between the View -> Explore and Source -> View + Every visited view, will be cached so that we don't need to process + everything again. + """ + view: Optional[LookMlView] = self.parser.find_view( + view_name=view_name, path=Includes(explore.source_file) + ) + + if view: + yield CreateDashboardDataModelRequest( + name=build_datamodel_name(explore.model_name, view.name), + displayName=view.name, + description=view.description, + service=self.context.dashboard_service.fullyQualifiedName.__root__, + dataModelType=DataModelType.LookMlView.value, + serviceType=DashboardServiceType.Looker.value, + columns=get_columns_from_model(view), + sql=self.parser.parsed_files.get(Includes(view.source_file)), + ) + self.status.scanned(f"Data Model Scanned: {view.name}") + + yield from self.add_view_lineage(view, explore) + + def add_view_lineage( + self, view: LookMlView, explore: LookmlModelExplore + ) -> Iterable[AddLineageRequest]: + """ + Add the lineage source -> view -> explore + """ + try: + # TODO: column-level lineage parsing the explore columns with the format `view_name.col` + # Now the context has the newly created view + yield AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=self.context.dataModel.id.__root__, type="dashboardDataModel" + ), + toEntity=EntityReference( + id=self._explores_cache[explore.name].id.__root__, + type="dashboardDataModel", + ), + ) + ) + + if view.sql_table_name: + source_table_name = self._clean_table_name(view.sql_table_name) + + # View to the source is only there if we are informing the dbServiceNames + for db_service_name in self.source_config.dbServiceNames or []: + yield self.build_lineage_request( + source=source_table_name, + db_service_name=db_service_name, + to_entity=self.context.dataModel, + ) + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error( + f"Error to yield lineage details for view [{view.name}]: {err}" + ) + def get_dashboards_list(self) -> List[DashboardBase]: """ Get List of all dashboards @@ -160,27 +354,17 @@ class LookerSource(DashboardServiceSource): Optional[EntityReference] """ try: - if ( - dashboard_details.user_id is not None - and dashboard_details.user_id not in self._owners_ref - ): + if dashboard_details.user_id is not None: dashboard_owner = self.client.user(dashboard_details.user_id) user = self.metadata.get_user_by_email(dashboard_owner.email) - if user: # Save the EntityRef - self._owners_ref[dashboard_details.user_id] = EntityReference( - id=user.id, type="user" - ) - else: # Otherwise, flag the user as missing in OM - self._owners_ref[dashboard_details.user_id] = None - logger.debug( - f"User {dashboard_owner.email} not found in OpenMetadata." - ) + if user: + return EntityReference(id=user.id.__root__, type="user") except Exception as err: logger.debug(traceback.format_exc()) logger.warning(f"Could not fetch owner data due to {err}") - return self._owners_ref.get(dashboard_details.user_id) + return None def yield_dashboard( self, dashboard_details: LookerDashboard @@ -190,7 +374,7 @@ class LookerSource(DashboardServiceSource): """ dashboard_request = CreateDashboardRequest( - name=dashboard_details.id.replace("::", "_"), + name=clean_dashboard_name(dashboard_details.id), displayName=dashboard_details.title, description=dashboard_details.description or None, charts=[ @@ -219,33 +403,10 @@ class LookerSource(DashboardServiceSource): return table_name.lower().split("as")[0].strip() - def _add_sql_table(self, query: Query, dashboard_sources: Set[str]): + @staticmethod + def get_dashboard_sources(dashboard_details: LookerDashboard) -> Set[str]: """ - Add the SQL table information to the dashboard_sources. - - Updates the seen dashboards. - - :param query: Looker query, from a look or result_maker - :param dashboard_sources: seen tables so far - """ - try: - explore: LookmlModelExplore = self.client.lookml_model_explore( - query.model, query.view - ) - table_name = explore.sql_table_name - - if table_name: - dashboard_sources.add(self._clean_table_name(table_name)) - - except SDKError as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Cannot get explore from model={query.model}, view={query.view}: {err}" - ) - - def get_dashboard_sources(self, dashboard_details: LookerDashboard) -> Set[str]: - """ - Set of source tables to build lineage for the processed dashboard + Set explores to build lineage for the processed dashboard """ dashboard_sources: Set[str] = set() @@ -253,20 +414,42 @@ class LookerSource(DashboardServiceSource): Iterable[DashboardElement], dashboard_details.dashboard_elements ): if chart.query and chart.query.view: - self._add_sql_table(chart.query, dashboard_sources) + dashboard_sources.add( + build_datamodel_name(chart.query.model, chart.query.view) + ) if chart.look and chart.look.query and chart.look.query.view: - self._add_sql_table(chart.look.query, dashboard_sources) + dashboard_sources.add( + build_datamodel_name(chart.look.query.model, chart.look.query.view) + ) if ( chart.result_maker and chart.result_maker.query and chart.result_maker.query.view ): - self._add_sql_table(chart.result_maker.query, dashboard_sources) + dashboard_sources.add( + build_datamodel_name( + chart.result_maker.query.model, chart.result_maker.query.view + ) + ) return dashboard_sources + def get_explore(self, explore_name: str) -> Optional[DashboardDataModel]: + """ + Get the dashboard model from cache or API + """ + return self._explores_cache.get(explore_name) or self.metadata.get_by_name( + entity=DashboardDataModel, + fqn=fqn.build( + self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.dashboard_service.fullyQualifiedName.__root__, + data_model_name=explore_name, + ), + ) + def yield_dashboard_lineage_details( - self, dashboard_details: LookerDashboard, db_service_name: str + self, dashboard_details: LookerDashboard, _: str ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between charts and data sources. @@ -276,35 +459,38 @@ class LookerSource(DashboardServiceSource): - chart.look (chart.look.query) - chart.result_maker """ - datasource_list = self.get_dashboard_sources(dashboard_details) - to_fqn = fqn.build( - self.metadata, - entity_type=MetadataDashboard, - service_name=self.config.serviceName, - dashboard_name=dashboard_details.id.replace("::", "_"), - ) - to_entity = self.metadata.get_by_name( - entity=MetadataDashboard, - fqn=to_fqn, - ) + try: + source_explore_list = self.get_dashboard_sources(dashboard_details) + for explore_name in source_explore_list: + cached_explore = self.get_explore(explore_name) + if cached_explore: + yield AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=cached_explore.id.__root__, + type="dashboardDataModel", + ), + toEntity=EntityReference( + id=self.context.dashboard.id.__root__, + type="dashboard", + ), + ) + ) - for source in datasource_list: - try: - yield self.build_lineage_request( - source=source, - db_service_name=db_service_name, - to_entity=to_entity, - ) - - except (Exception, IndexError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Error building lineage for database service [{db_service_name}]: {err}" - ) + except Exception as exc: + error = f"Unexpected exception yielding lineage from [{self.context.dashboard.displayName}]: {exc}" + logger.debug(traceback.format_exc()) + logger.warning(error) + self.status.failed( + self.context.dashboard.displayName, error, traceback.format_exc() + ) def build_lineage_request( - self, source: str, db_service_name: str, to_entity: MetadataDashboard + self, + source: str, + db_service_name: str, + to_entity: Union[MetadataDashboard, DashboardDataModel], ) -> Optional[AddLineageRequest]: """ Once we have a list of origin data sources, check their components diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py new file mode 100644 index 00000000000..052c08c261e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py @@ -0,0 +1,50 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Looker pydantic models +""" + +from typing import List, NewType, Optional + +from pydantic import BaseModel, Field + +Includes = NewType("Includes", str) +ViewName = NewType("ViewName", str) + + +class LookMlField(BaseModel): + description: Optional[str] = Field(None, description="Field description") + label: Optional[str] = Field(None, description="Field display name") + type: Optional[str] = Field(None, description="Field type to be mapped to OM") + name: str = Field(..., description="Field name") + + +class LookMlView(BaseModel): + name: ViewName = Field(..., description="View name") + description: Optional[str] = Field(None, description="View description") + sql_table_name: Optional[str] = Field( + None, description="To track lineage with the source" + ) + measures: List[LookMlField] = Field([], description="Measures to ingest as cols") + dimensions: List[LookMlField] = Field( + [], description="Dimensions to ingest as cols" + ) + source_file: Optional[Includes] = Field(None, description="lkml file path") + + +class LkmlFile(BaseModel): + """ + it might also have explores, but we don't care. + We'll pick explores from the API + """ + + includes: List[Includes] = Field([], description="Full include list") + views: List[LookMlView] = Field([], description="Views we want to parse") diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/parser.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/parser.py new file mode 100644 index 00000000000..0da310558b6 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/parser.py @@ -0,0 +1,136 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +.lkml files parser +""" +import traceback +from typing import Dict, List, Optional + +import lkml +from pydantic import ValidationError + +from metadata.ingestion.source.dashboard.looker.models import ( + Includes, + LkmlFile, + LookMlView, + ViewName, +) +from metadata.readers.base import Reader, ReadException +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class LkmlParser: + """ + Parses and caches the visited files & views. + + Here we'll just parse VIEWs, as we are already getting + the explores from the API. + + The goal is to make sure we parse files only once + and store the results of the processed views. + + We want to parse the minimum number of files each time + until we find the view we are looking for. + + Approach: + When we parse, it is because we are looking for a view definition, then + 1. When parsing any source file the outcome is + a. Look for any defined view, and cache it + b. The parsing result is the list of includes + 2. Is my view present in the cache? + yes. Then return it + no. Then keep parsing `includes` until the response is yes. + """ + + def __init__(self, reader: Reader): + self._views_cache: Dict[ViewName, LookMlView] = {} + self._visited_files: Dict[Includes, List[Includes]] = {} + + # To store the raw string of the lkml explores + self.parsed_files: Dict[Includes, str] = {} + + self.reader = reader + + def parse_file(self, path: Includes) -> Optional[List[Includes]]: + """ + Internal parser. Parse the file and cache the views + + If a lkml include starts with //, means that it is pointing to + a external repository. we won't send it to the reader + """ + + # If visited, return its includes to continue parsing + if path in self._visited_files: + return self._visited_files[path] + + # If the path starts with //, we will ignore it for now + if path.startswith("//"): + logger.info(f"We do not support external includes yet. Skipping {path}") + return [] + + try: + file = self.reader.read(path) + lkml_file = LkmlFile.parse_obj(lkml.load(file)) + self.parsed_files[path] = file + + # Cache everything + self._visited_files[path] = lkml_file.includes + for view in lkml_file.views: + view.source_file = path + self._views_cache[view.name] = view + + return lkml_file.includes + + except ReadException as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error trying to read the file [{path}]: {err}") + except ValidationError as err: + logger.error( + f"Validation error building the .lkml file from [{path}]: {err}" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Unknown error building the .lkml file from [{path}]: {err}") + + return None + + def get_view_from_cache(self, view_name: ViewName) -> Optional[LookMlView]: + """ + Check if view is cached, and return it. + Otherwise, return None + """ + if view_name in self._views_cache: + return self._views_cache[view_name] + + return None + + def find_view(self, view_name: ViewName, path: Includes) -> Optional[LookMlView]: + """ + Parse an incoming file (either from a `source_file` or an `include`), + cache the views and return the list of includes to parse if + we still don't find the view afterwards + """ + cached_view = self.get_view_from_cache(view_name) + if cached_view: + return cached_view + + for include in self.parse_file(path) or []: + cached_view = self.get_view_from_cache(view_name) + if cached_view: + return cached_view + + # Recursively parse inner includes + self.find_view(view_name, include) + + # We might not find the view ever + return self.get_view_from_cache(view_name) diff --git a/ingestion/src/metadata/readers/__init__.py b/ingestion/src/metadata/readers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/readers/base.py b/ingestion/src/metadata/readers/base.py new file mode 100644 index 00000000000..70a12e11605 --- /dev/null +++ b/ingestion/src/metadata/readers/base.py @@ -0,0 +1,29 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base local reader +""" +from abc import ABC, abstractmethod + + +class ReadException(Exception): + """ + To be raised by any errors with the read calls + """ + + +class Reader(ABC): + @abstractmethod + def read(self, path: str) -> str: + """ + Given a string, return a string + """ + raise NotImplementedError("Missing read implementation") diff --git a/ingestion/src/metadata/readers/github.py b/ingestion/src/metadata/readers/github.py new file mode 100644 index 00000000000..f3fc1372f87 --- /dev/null +++ b/ingestion/src/metadata/readers/github.py @@ -0,0 +1,101 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +GitHub client to read files with token auth +""" +import base64 +import traceback +from enum import Enum +from typing import Any, Dict + +import requests + +from metadata.generated.schema.security.credentials.githubCredentials import ( + GitHubCredentials, +) +from metadata.readers.base import Reader, ReadException +from metadata.utils.constants import UTF_8 +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +HOST = "https://api.github.com" + + +class UrlParts(Enum): + REPOS = "repos" + CONTENTS = "contents" + + +class GitHubReader(Reader): + """ + Handle calls to the GitHub API against a repo + """ + + def __init__(self, credentials: GitHubCredentials): + self.credentials = credentials + + self._auth_headers = None + + @property + def auth_headers(self) -> Dict[str, str]: + """ + Build the headers to authenticate + to the API + """ + if self._auth_headers is None: + self._auth_headers = {"Authentication": f"token {self.credentials.token}"} + + return self._auth_headers + + @staticmethod + def _build_url(*parts: str): + """ + Build URL parts + """ + return "/".join(parts) + + @staticmethod + def _decode_content(json_response: Dict[str, Any]) -> str: + """ + Return the content of the response + + If no `content` there, throw the KeyError + """ + return base64.b64decode(json_response["content"]).decode(UTF_8) + + def read(self, path: str) -> str: + """ + Read a file from a GitHub Repo and return its + contents as a string + https://docs.github.com/en/rest/repos/contents?apiVersion=2022-11-28#get-repository-content + """ + try: + res = requests.get( + self._build_url( + HOST, + UrlParts.REPOS.value, + self.credentials.repositoryOwner, + self.credentials.repositoryName, + UrlParts.CONTENTS.value, + path, + ), + timeout=30, + ) + if res.status_code == 200: + return self._decode_content(res.json()) + + except Exception as err: + logger.debug(traceback.format_exc()) + raise ReadException(f"Error fetching file [{path}] from repo: {err}") + + raise ReadException(f"Could not fetch file [{path}] from repo") diff --git a/ingestion/src/metadata/readers/local.py b/ingestion/src/metadata/readers/local.py new file mode 100644 index 00000000000..4ffe487606d --- /dev/null +++ b/ingestion/src/metadata/readers/local.py @@ -0,0 +1,42 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Local Reader +""" +import traceback +from pathlib import Path + +from metadata.readers.base import Reader, ReadException +from metadata.utils.constants import UTF_8 +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class LocalReader(Reader): + """ + Read files locally + """ + + def __init__(self, base_path: Path): + self.base_path = base_path + + def read(self, path: str) -> str: + """ + simple local reader + """ + try: + with open(self.base_path / path, encoding=UTF_8) as file: + return file.read() + + except Exception as err: + logger.debug(traceback.format_exc()) + raise ReadException(f"Error reading file [{path}] locally: {err}") diff --git a/ingestion/tests/unit/resources/lkml/cats.explore.lkml b/ingestion/tests/unit/resources/lkml/cats.explore.lkml new file mode 100644 index 00000000000..5d22542b57b --- /dev/null +++ b/ingestion/tests/unit/resources/lkml/cats.explore.lkml @@ -0,0 +1,29 @@ +include: "views/cats.view.lkml" +include: "views/dogs.view.lkml" + + +explore: cats { + label: "Cats" + join: dogs { + relationship: many_to_one + sql_on: ${cats.name} = ${dogs.name} ;; + } +} + +view: birds { + sql_table_name: birds ;; + + dimension: name { + type: string + sql: ${TABLE}.name ;; + } +} + + +explore: birds { + label: "Birds" + join: dogs { + relationship: many_to_one + sql_on: ${cats.name} = ${birds.name} ;; + } +} diff --git a/ingestion/tests/unit/resources/lkml/kittens.explore.lkml b/ingestion/tests/unit/resources/lkml/kittens.explore.lkml new file mode 100644 index 00000000000..3f952ec16ab --- /dev/null +++ b/ingestion/tests/unit/resources/lkml/kittens.explore.lkml @@ -0,0 +1,11 @@ +include: "views/kittens.view.lkml" +include: "views/dogs.view.lkml" + +explore: kittens { + label: "Kittens" + join: dogs { + relationship: many_to_one + sql_on: ${kittens.name} = ${kittens.name} ;; + } +} + diff --git a/ingestion/tests/unit/resources/lkml/views/cats.view.lkml b/ingestion/tests/unit/resources/lkml/views/cats.view.lkml new file mode 100644 index 00000000000..b8541453ed8 --- /dev/null +++ b/ingestion/tests/unit/resources/lkml/views/cats.view.lkml @@ -0,0 +1,13 @@ +view: cats { + sql_table_name: cats ;; + + dimension: name { + type: string + sql: ${TABLE}.name ;; + } + + dimension: age { + type: int + sql: ${TABLE}.age ;; + } +} diff --git a/ingestion/tests/unit/resources/lkml/views/dogs.view.lkml b/ingestion/tests/unit/resources/lkml/views/dogs.view.lkml new file mode 100644 index 00000000000..77ddd76f8bf --- /dev/null +++ b/ingestion/tests/unit/resources/lkml/views/dogs.view.lkml @@ -0,0 +1,18 @@ +view: dogs { + sql_table_name: dogs ;; + + dimension: name { + type: string + sql: ${TABLE}.name ;; + } + + dimension: age { + type: number + sql: ${TABLE}.age ;; + } + + dimension: ball { + type: yesno + sql: ${TABLE}.ball ;; + } +} diff --git a/ingestion/tests/unit/resources/lkml/views/kittens.view.lkml b/ingestion/tests/unit/resources/lkml/views/kittens.view.lkml new file mode 100644 index 00000000000..887a4b26f2f --- /dev/null +++ b/ingestion/tests/unit/resources/lkml/views/kittens.view.lkml @@ -0,0 +1,10 @@ +include: "views/cats.view.lkml" + +view: kittens { + extends: [cats] + + dimension: parent { + type: string + sql: ${TABLE}.name ;; + } +} diff --git a/ingestion/tests/unit/topology/dashboard/test_looker.py b/ingestion/tests/unit/topology/dashboard/test_looker.py index ff4062602d9..68372ef5990 100644 --- a/ingestion/tests/unit/topology/dashboard/test_looker.py +++ b/ingestion/tests/unit/topology/dashboard/test_looker.py @@ -247,16 +247,7 @@ class LookerUnitTest(TestCase): """ Check how we pick or not the owner """ - - # First, validate that we return the user if we have it - # in the ref ref = EntityReference(id=uuid.uuid4(), type="user") - self.looker._owners_ref["user_id"] = ref - - self.assertEqual(self.looker.get_owner_details(MOCK_LOOKER_DASHBOARD), ref) - - # Now check that we are storing the ref properly - self.looker._owners_ref = {} with patch.object(Looker40SDK, "user", return_value=MOCK_USER), patch.object( # This does not really return a ref, but for simplicity @@ -265,8 +256,6 @@ class LookerUnitTest(TestCase): return_value=ref, ): self.assertEqual(self.looker.get_owner_details(MOCK_LOOKER_DASHBOARD), ref) - # The call also updates the _owners_ref dict - self.assertEqual(self.looker._owners_ref.get("user_id"), ref) def raise_something_bad(): raise RuntimeError("Something bad") @@ -309,51 +298,6 @@ class LookerUnitTest(TestCase): self.assertEqual(self.looker._clean_table_name("TABLE AS ALIAS"), "table") - def test_add_sql_table(self): - """ - Check how we get the table name from the explore - """ - - # Check how we get the table name back - with patch.object( - Looker40SDK, - "lookml_model_explore", - return_value=LookmlModelExplore(sql_table_name="MY_TABLE"), - ): - dashboard_sources = set() - - self.looker._add_sql_table( - query=Query(model="model", view="view"), - dashboard_sources=dashboard_sources, - ) - self.assertEqual(dashboard_sources, {"my_table"}) - - # If there's no name, nothing happens - with patch.object( - Looker40SDK, - "lookml_model_explore", - return_value=LookmlModelExplore(sql_table_name=None), - ): - dashboard_sources = set() - - self.looker._add_sql_table( - query=Query(model="model", view="view"), - dashboard_sources=dashboard_sources, - ) - self.assertEqual(len(dashboard_sources), 0) - - def something_bad(*_): - raise SDKError("something bad") - - # We don't raise on errors, just log - with patch.object( - Looker40SDK, "lookml_model_explore", side_effect=something_bad - ): - self.looker._add_sql_table( - query=Query(model="model", view="view"), - dashboard_sources=dashboard_sources, - ) - def test_get_dashboard_sources(self): """ Check how we are building the sources @@ -361,10 +305,13 @@ class LookerUnitTest(TestCase): with patch.object( Looker40SDK, "lookml_model_explore", - return_value=LookmlModelExplore(sql_table_name="MY_TABLE"), + return_value=LookmlModelExplore( + sql_table_name="MY_TABLE", model_name="model2", view_name="view" + ), ): dashboard_sources = self.looker.get_dashboard_sources(MOCK_LOOKER_DASHBOARD) - self.assertEqual(dashboard_sources, {"my_table"}) + # Picks it up from the chart, not here + self.assertEqual(dashboard_sources, {"model_view"}) def test_build_lineage_request(self): """ diff --git a/ingestion/tests/unit/topology/dashboard/test_looker_lkml_parser.py b/ingestion/tests/unit/topology/dashboard/test_looker_lkml_parser.py new file mode 100644 index 00000000000..113208f1dec --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/test_looker_lkml_parser.py @@ -0,0 +1,138 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test the lkml parser +""" +from pathlib import Path +from unittest import TestCase + +from metadata.ingestion.source.dashboard.looker.parser import ( + Includes, + LkmlParser, + ViewName, +) +from metadata.readers.local import LocalReader + +BASE_PATH = Path(__file__).parent.parent.parent / "resources/lkml" + + +class TestLkmlParser(TestCase): + """ + Check the parser with the local reader + """ + + def test_local_parser_with_birds(self): + """ + Birds view is declared in the explore file. + Nothing else to parse, so no visited files + """ + + reader = LocalReader(BASE_PATH) + parser = LkmlParser(reader) + + view = parser.find_view( + view_name=ViewName("birds"), path=Includes("cats.explore.lkml") + ) + + self.assertIsNotNone(view) + self.assertEqual(view.name, "birds") + + self.assertEqual(parser.get_view_from_cache(ViewName("birds")).name, "birds") + self.assertEqual( + parser._visited_files, + { + "cats.explore.lkml": ["views/cats.view.lkml", "views/dogs.view.lkml"], + }, + ) + + def test_local_parser_with_cats(self): + """ + Cats view is in a separate view file which we need to visit. + We go there before jumping into the dogs, and stop. + """ + + reader = LocalReader(BASE_PATH) + parser = LkmlParser(reader) + + view = parser.find_view( + view_name=ViewName("cats"), path=Includes("cats.explore.lkml") + ) + + self.assertIsNotNone(view) + self.assertEqual(view.name, "cats") + + self.assertEqual(parser.get_view_from_cache(ViewName("cats")).name, "cats") + self.assertEqual( + parser._visited_files, + { + "cats.explore.lkml": ["views/cats.view.lkml", "views/dogs.view.lkml"], + "views/cats.view.lkml": [], + }, + ) + + def test_local_parser_with_dogs(self): + """ + Dogs view is in a separate view file which we need to visit. + We go after visiting the cats, so we need to parse both + """ + + reader = LocalReader(BASE_PATH) + parser = LkmlParser(reader) + + view = parser.find_view( + view_name=ViewName("dogs"), path=Includes("cats.explore.lkml") + ) + + self.assertIsNotNone(view) + self.assertEqual(view.name, "dogs") + + self.assertEqual(parser.get_view_from_cache(ViewName("dogs")).name, "dogs") + self.assertEqual( + parser._visited_files, + { + "cats.explore.lkml": ["views/cats.view.lkml", "views/dogs.view.lkml"], + "views/cats.view.lkml": [], + "views/dogs.view.lkml": [], + }, + ) + + # Now I can directly get the cats view from the cache as well + # as I already visited that file + self.assertEqual(parser.get_view_from_cache(ViewName("cats")).name, "cats") + + def test_local_parser_with_kittens(self): + """ + We will now parse the kittens explore looking for the cats view. + This requires two jumps: Kittens explore -> Kittens View -> Cats View + """ + + reader = LocalReader(BASE_PATH) + parser = LkmlParser(reader) + + view = parser.find_view( + view_name=ViewName("cats"), path=Includes("kittens.explore.lkml") + ) + + self.assertIsNotNone(view) + self.assertEqual(view.name, "cats") + + self.assertEqual(parser.get_view_from_cache(ViewName("cats")).name, "cats") + self.assertEqual( + parser._visited_files, + { + "kittens.explore.lkml": [ + "views/kittens.view.lkml", + "views/dogs.view.lkml", + ], + "views/kittens.view.lkml": ["views/cats.view.lkml"], + "views/cats.view.lkml": [], + }, + ) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json index 2e9f72320cf..4328aff9623 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json @@ -16,7 +16,9 @@ "enum": [ "TableauSheet", "SupersetDataModel", - "MetabaseDataModel" + "MetabaseDataModel", + "LookMlView", + "LookMlExplore" ], "javaEnums": [ { @@ -27,6 +29,12 @@ }, { "name": "MetabaseDataModel" + }, + { + "name": "LookMlView" + }, + { + "name": "LookMlExplore" } ] } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json index 59fefa45df7..411148ee8bf 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json @@ -38,6 +38,11 @@ "type": "string", "format": "uri" }, + "githubCredentials": { + "title": "GitHub Credentials", + "description": "Credentials to extract the .lkml files from a repository. This is required to get all the lineage and definitions.", + "$ref": "../../../../security/credentials/githubCredentials.json" + }, "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" diff --git a/openmetadata-spec/src/main/resources/json/schema/security/credentials/githubCredentials.json b/openmetadata-spec/src/main/resources/json/schema/security/credentials/githubCredentials.json new file mode 100644 index 00000000000..7c08cf1cf7b --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/security/credentials/githubCredentials.json @@ -0,0 +1,28 @@ +{ + "$id": "https://open-metadata.org/security/credentials/githubCredentials.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "GitHubCredentials", + "description": "Credentials for a GitHub repository", + "type": "object", + "javaType": "org.openmetadata.schema.security.credentials.GitHubCredentials", + "properties": { + "repositoryOwner": { + "title": "Repository Owner", + "description": "The owner (user or organization) of a GitHub repository. For example, in https://github.com/open-metadata/OpenMetadata, the owner is `open-metadata`.", + "type": "string" + }, + "repositoryName": { + "title": "Repository Name", + "description": "The name of a GitHub repository. For example, in https://github.com/open-metadata/OpenMetadata, the name is `OpenMetadata`.", + "type": "string" + }, + "token": { + "title": "API Token", + "description": "Token to use the API. This is required for private repositories and to ensure we don't hit API limits.", + "type": "string", + "format": "password" + } + }, + "additionalProperties": false, + "required": ["repositoryOwner", "repositoryName"] +}