diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 1365c1ba1d..e4e150b7bf 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -31,29 +31,31 @@ If you run into an error, try checking the [_common setup issues_](./developing. We use a plugin architecture so that you can install only the dependencies you actually need. -| Plugin Name | Install Command | Provides | -| ------------- | ---------------------------------------------------------- | -------------------------- | -| file | _included by default_ | File source and sink | -| console | _included by default_ | Console sink | -| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source | -| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | -| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source | -| hive | `pip install 'acryl-datahub[hive]'` | Hive source | -| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source | -| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source | -| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source | -| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source | +| Plugin Name | Install Command | Provides | +| ------------- | ---------------------------------------------------------- | ----------------------------------- | +| file | _included by default_ | File source and sink | +| console | _included by default_ | Console sink | +| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source | +| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | +| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source | +| hive | `pip install 'acryl-datahub[hive]'` | Hive source | +| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source | +| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source | +| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source | +| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source | | redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source | -| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | -| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source | -| superset | `pip install 'acryl-datahub[superset]'` | Supserset source | -| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source | -| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source | -| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source | -| druid | `pip install 'acryl-datahub[druid]'` | Druid Source | -| dbt | _no additional dependencies_ | DBT source | -| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API | -| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka | +| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | +| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source | +| superset | `pip install 'acryl-datahub[superset]'` | Supserset source | +| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source | +| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source | +| looker | `pip install 'acryl-datahub[looker]'` | Looker source | +| lookml | `pip install 'acryl-datahub[lookml]'` | LookML source, requires Python 3.7+ | +| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source | +| druid | `pip install 'acryl-datahub[druid]'` | Druid Source | +| dbt | _no additional dependencies_ | DBT source | +| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API | +| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka | These plugins can be mixed and matched as desired. For example: @@ -514,6 +516,57 @@ source: filter: "(objectClass=*)" # optional field ``` +### LookML `lookml` + +Note! This plugin uses a package that requires Python 3.7+! + +Extracts: + +- LookML views from model files +- Name, upstream table names, dimensions, measures, and dimension groups + +```yml +source: + type: "lookml" + config: + base_folder: /path/to/model/files # Where the *.model.lkml and *.view.lkml files are stored. + connection_to_platform_map: # mapping between connection names in the model files to platform names. + my_snowflake_conn: snowflake + platform_name: looker_views # Optional, default is "looker_views" + actor: "urn:li:corpuser:etl" # Optional, "urn:li:corpuser:etl" + model_pattern: {} + view_pattern: {} + env: "PROD" # Optional, default is "PROD" + parse_table_names_from_sql: False # See note below. +``` + +Note! The integration can use [`sql-metadata`](https://pypi.org/project/sql-metadata/) to try to parse the tables the +views depends on. As these SQL's can be complicated, and the package doesn't official support all the SQL dialects that +Looker support, the result might not be correct. This parsing is disables by default, but can be enabled by setting +`parse_table_names_from_sql: True`. + +### Looker dashboards `looker` + +Extracts: + +- Looker dashboards and dashboard elements (charts) +- Names, descriptions, URLs, chart types, input view for the charts + +```yml +source: + type: "looker" + config: + client_id: str # Your Looker API client ID. As your Looker admin + client_secret: str # Your Looker API client secret. As your Looker admin + base_url: str # The url to your Looker instance: https://company.looker.com:19999 or https://looker.company.com, or similar. + platform_name: str = "looker" # Optional, default is "looker" + view_platform_name: str = "looker_views" # Optional, default is "looker_views". Should be the same `platform_name` in the `lookml` source, if that source is also run. + actor: str = "urn:li:corpuser:etl" # Optional, "urn:li:corpuser:etl" + dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + env: str = "PROD" # Optional, default is "PROD" +``` + ### File `file` Pulls metadata from a previously generated file. Note that the file sink diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 7ab2d72d7d..27dbc82a3e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -1,8 +1,12 @@ import os +import sys from typing import Dict, Set import setuptools +is_py37_or_newer = sys.version_info >= (3, 7) + + package_metadata: dict = {} with open("./src/datahub/__init__.py") as fp: exec(fp.read(), package_metadata) @@ -78,11 +82,15 @@ plugins: Dict[str, Set[str]] = { "snowflake": sql_common | {"snowflake-sqlalchemy"}, "oracle": sql_common | {"cx_Oracle"}, "ldap": {"python-ldap>=2.4"}, + "looker": {"looker-sdk==21.6.0"}, "druid": sql_common | {"pydruid>=0.6.2"}, "mongodb": {"pymongo>=3.11"}, "superset": {"requests"}, "glue": {"boto3"}, } +if is_py37_or_newer: + plugins["lookml"] = {"lkml>=1.1.0", "sql-metadata==1.12.0"} + base_dev_requirements = { *base_requirements, @@ -110,6 +118,7 @@ base_dev_requirements = { "mssql", "mongodb", "ldap", + "looker", "glue", "datahub-kafka", "datahub-rest", @@ -119,6 +128,11 @@ base_dev_requirements = { ), } +if is_py37_or_newer: + base_dev_requirements = base_dev_requirements.union( + {dependency for plugin in ["lookml"] for dependency in plugins[plugin]} + ) + dev_requirements = { *base_dev_requirements, "apache-airflow==1.10.15", @@ -131,6 +145,49 @@ dev_requirements_airflow_2 = { } +entry_points = { + "console_scripts": ["datahub = datahub.entrypoints:datahub"], + "datahub.ingestion.source.plugins": [ + "file = datahub.ingestion.source.mce_file:MetadataFileSource", + "sqlalchemy = datahub.ingestion.source.sql_generic:SQLAlchemyGenericSource", + "athena = datahub.ingestion.source.athena:AthenaSource", + "bigquery = datahub.ingestion.source.bigquery:BigQuerySource", + "dbt = datahub.ingestion.source.dbt:DBTSource", + "druid = datahub.ingestion.source.druid:DruidSource", + "glue = datahub.ingestion.source.glue:GlueSource", + "hive = datahub.ingestion.source.hive:HiveSource", + "kafka = datahub.ingestion.source.kafka:KafkaSource", + "ldap = datahub.ingestion.source.ldap:LDAPSource", + "looker = datahub.ingestion.source.looker:LookerDashboardSource", + "mongodb = datahub.ingestion.source.mongodb:MongoDBSource", + "mssql = datahub.ingestion.source.mssql:SQLServerSource", + "mysql = datahub.ingestion.source.mysql:MySQLSource", + "oracle = datahub.ingestion.source.oracle:OracleSource", + "postgres = datahub.ingestion.source.postgres:PostgresSource", + "redshift = datahub.ingestion.source.redshift:RedshiftSource", + "snowflake = datahub.ingestion.source.snowflake:SnowflakeSource", + "superset = datahub.ingestion.source.superset:SupersetSource", + ], + "datahub.ingestion.sink.plugins": [ + "file = datahub.ingestion.sink.file:FileSink", + "console = datahub.ingestion.sink.console:ConsoleSink", + "datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink", + "datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink", + ], + "apache_airflow_provider": [ + "provider_info=datahub.integrations.airflow.get_provider_info:get_provider_info" + ], + "airflow.plugins": [ + "datahub = datahub.integrations.airflow.get_provider_info:DatahubAirflowPlugin" + ], +} + +if is_py37_or_newer: + entry_points["datahub.ingestion.source.plugins"].append( + "lookml = datahub.ingestion.source.lookml:LookMLSource" + ) + + setuptools.setup( # Package metadata. name=package_metadata["__package_name__"], @@ -175,38 +232,7 @@ setuptools.setup( "datahub": ["py.typed"], "datahub.metadata": ["schema.avsc"], }, - entry_points={ - "console_scripts": ["datahub = datahub.entrypoints:datahub"], - "datahub.ingestion.source.plugins": [ - "file = datahub.ingestion.source.mce_file:MetadataFileSource", - "sqlalchemy = datahub.ingestion.source.sql_generic:SQLAlchemyGenericSource", - "athena = datahub.ingestion.source.athena:AthenaSource", - "bigquery = datahub.ingestion.source.bigquery:BigQuerySource", - "dbt = datahub.ingestion.source.dbt:DBTSource", - "druid = datahub.ingestion.source.druid:DruidSource", - "glue = datahub.ingestion.source.glue:GlueSource", - "hive = datahub.ingestion.source.hive:HiveSource", - "kafka = datahub.ingestion.source.kafka:KafkaSource", - "ldap = datahub.ingestion.source.ldap:LDAPSource", - "mongodb = datahub.ingestion.source.mongodb:MongoDBSource", - "mssql = datahub.ingestion.source.mssql:SQLServerSource", - "mysql = datahub.ingestion.source.mysql:MySQLSource", - "oracle = datahub.ingestion.source.oracle:OracleSource", - "postgres = datahub.ingestion.source.postgres:PostgresSource", - "redshift = datahub.ingestion.source.redshift:RedshiftSource", - "snowflake = datahub.ingestion.source.snowflake:SnowflakeSource", - "superset = datahub.ingestion.source.superset:SupersetSource", - ], - "datahub.ingestion.sink.plugins": [ - "file = datahub.ingestion.sink.file:FileSink", - "console = datahub.ingestion.sink.console:ConsoleSink", - "datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink", - "datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink", - ], - "apache_airflow_provider": [ - "provider_info = datahub_provider:get_provider_info" - ], - }, + entry_points=entry_points, # Dependencies. install_requires=list(base_requirements | framework_common), extras_require={ diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py new file mode 100644 index 0000000000..67d9fb296e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py @@ -0,0 +1,467 @@ +import datetime +import json +import logging +import os +import time +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from typing import Any, Iterable, List, MutableMapping, Optional, Sequence + +import looker_sdk +from looker_sdk.error import SDKError +from looker_sdk.sdk.api31.models import ( + Dashboard, + DashboardElement, + LookWithQuery, + Query, +) + +from datahub.configuration import ConfigModel +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.source.metadata_common import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.common import ( + AuditStamp, + ChangeAuditStamps, + Status, +) +from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( + ChartSnapshot, + DashboardSnapshot, +) +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.schema_classes import ( + AuditStampClass, + ChartInfoClass, + ChartTypeClass, + DashboardInfoClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, +) + +logger = logging.getLogger(__name__) + + +class LookerDashboardSourceConfig(ConfigModel): + client_id: str + client_secret: str + base_url: str + platform_name: str = "looker" + # The datahub platform where looker views are stored, must be the same as `platform_name` in lookml source + view_platform_name: str = "looker_views" + actor: str = "urn:li:corpuser:etl" + dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + env: str = "PROD" + + +@dataclass +class LookerDashboardSourceReport(SourceReport): + dashboards_scanned: int = 0 + charts_scanned: int = 0 + filtered_dashboards: List[str] = dataclass_field(default_factory=list) + filtered_charts: List[str] = dataclass_field(default_factory=list) + + def report_dashboards_scanned(self) -> None: + self.dashboards_scanned += 1 + + def report_charts_scanned(self) -> None: + self.charts_scanned += 1 + + def report_dashboards_dropped(self, model: str) -> None: + self.filtered_dashboards.append(model) + + def report_charts_dropped(self, view: str) -> None: + self.filtered_charts.append(view) + + +@dataclass +class LookerDashboardElement: + id: str + title: str + query_slug: str + looker_views: List[str] + look_id: Optional[str] + type: Optional[str] = None + description: Optional[str] = None + + def url(self, base_url: str) -> str: + # A dashboard element can use a look or just a raw query against an explore + if self.look_id is not None: + return base_url + "/looks/" + self.look_id + else: + return base_url + "/x/" + self.query_slug + + def get_urn_element_id(self): + # A dashboard element can use a look or just a raw query against an explore + return f"dashboard_elements.{self.id}" + + def get_view_urns(self, platform_name: str) -> List[str]: + return [ + f"urn:li:dataset:(urn:li:dataPlatform:{platform_name},{v},PROD)" + for v in self.looker_views + ] + + +@dataclass +class LookerDashboard: + id: str + title: str + dashboard_elements: List[LookerDashboardElement] + created_at: Optional[datetime.datetime] + description: Optional[str] = None + is_deleted: bool = False + is_hidden: bool = False + + def url(self, base_url): + return base_url + "/dashboards/" + self.id + + def get_urn_dashboard_id(self): + return f"dashboards.{self.id}" + + +class LookerDashboardSource(Source): + source_config: LookerDashboardSourceConfig + report = LookerDashboardSourceReport() + + def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext): + super().__init__(ctx) + self.source_config = config + self.reporter = LookerDashboardSourceReport() + + @staticmethod + def _extract_view_from_field(field: str) -> str: + assert ( + field.count(".") == 1 + ), f"Error: A field must be prefixed by a view name, field is: {field}" + view_name = field.split(".")[0] + return view_name + + def _get_views_from_query(self, query: Optional[Query]) -> List[str]: + if query is None: + return [] + + all_views = set() + + # query.dynamic_fields can contain: + # - looker table calculations: https://docs.looker.com/exploring-data/using-table-calculations + # - looker custom measures: https://docs.looker.com/de/exploring-data/adding-fields/custom-measure + # - looker custom dimensions: https://docs.looker.com/exploring-data/adding-fields/custom-measure#creating_a_custom_dimension_using_a_looker_expression + dynamic_fields = json.loads( + query.dynamic_fields if query.dynamic_fields is not None else "[]" + ) + custom_field_to_underlying_field = {} + for field in dynamic_fields: + # Table calculations can only reference fields used in the fields section, so this will always be a subset of of the query.fields + if "table_calculation" in field: + continue + # Looker custom measures can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on + if "measure" in field: + measure = field["measure"] + based_on = field["based_on"] + custom_field_to_underlying_field[measure] = based_on + + # Looker custom dimensions can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on + # However, unlike custom measures custom dimensions can be defined using an arbitrary expression + # We are not going to support parsing arbitrary Looker expressions here, so going to ignore these fields for now + if "dimension" in field: + dimension = field["dimension"] + expression = field["expression"] # noqa: F841 + custom_field_to_underlying_field[dimension] = None + + # A query uses fields defined in views, find the views those fields use + fields: Sequence[str] = query.fields if query.fields is not None else [] + for field in fields: + # If the field is a custom field, look up the field it is based on + field_name = ( + custom_field_to_underlying_field[field] + if field in custom_field_to_underlying_field + else field + ) + if field_name is None: + continue + + try: + view_name = self._extract_view_from_field(field_name) + except AssertionError: + self.reporter.report_warning( + key=f"chart-field-{field_name}", + reason="The field was not prefixed by a view name. This can happen when the field references another dynamic field.", + ) + continue + all_views.add(view_name) + + # A query uses fields for filtering and those fields are defined in views, find the views those fields use + filters: MutableMapping[str, Any] = ( + query.filters if query.filters is not None else {} + ) + for field in filters.keys(): + # If the field is a custom field, look up the field it is based on + field_name = ( + custom_field_to_underlying_field[field] + if field in custom_field_to_underlying_field + else field + ) + if field_name is None: + continue + try: + view_name = self._extract_view_from_field(field_name) + except AssertionError: + self.reporter.report_warning( + f"chart-field-{field_name}", + "The field was not prefixed by a view name. This can happen when the field references another dynamic field.", + ) + continue + all_views.add(view_name) + + return list(all_views) + + def _get_views_from_look(self, look: LookWithQuery) -> List[str]: + return self._get_views_from_query(look.query) + + def _get_looker_dashboard_element( + self, + element: DashboardElement, + ) -> Optional[LookerDashboardElement]: + # Dashboard elements can use raw queries against explores + if element.id is None: + raise ValueError("Element ID can't be None") + + if element.query is not None: + views = self._get_views_from_query(element.query) + return LookerDashboardElement( + id=element.id, + title=element.title if element.title is not None else "", + type=element.type, + description=element.subtitle_text, + look_id=None, + query_slug=element.query.slug if element.query.slug is not None else "", + looker_views=views, + ) + + # Dashboard elements can *alternatively* link to an existing look + if element.look is not None: + views = self._get_views_from_look(element.look) + if element.look.query and element.look.query.slug: + slug = element.look.query.slug + else: + slug = "" + return LookerDashboardElement( + id=element.id, + title=element.title if element.title is not None else "", + type=element.type, + description=element.subtitle_text, + look_id=element.look_id, + query_slug=slug, + looker_views=views, + ) + + return None + + def _get_chart_type( + self, dashboard_element: LookerDashboardElement + ) -> Optional[str]: + type_mapping = { + "looker_column": ChartTypeClass.BAR, + "looker_scatter": ChartTypeClass.SCATTER, + "looker_line": ChartTypeClass.LINE, + "looker_area": ChartTypeClass.AREA, + "looker_pie": ChartTypeClass.PIE, + "looker_donut_multiples": ChartTypeClass.PIE, + "looker_funnel": ChartTypeClass.BAR, + "looker_timeline": ChartTypeClass.BAR, + "looker_waterfall": ChartTypeClass.BAR, + "text": ChartTypeClass.TEXT, + "single_value": ChartTypeClass.TEXT, + "looker_single_record": ChartTypeClass.TABLE, + "table": ChartTypeClass.TABLE, + "looker_grid": ChartTypeClass.TABLE, + "looker_map": None, + "looker_geo_coordinates": None, + "looker_geo_choropleth": None, + "looker_boxplot": ChartTypeClass.BOX_PLOT, + "vis": None, + } + type_str = dashboard_element.type + if not type_str: + self.reporter.report_warning( + key=f"looker-chart-{dashboard_element.id}", + reason=f"Chart type {type_str} is missing. Setting to None", + ) + return None + try: + chart_type = type_mapping[type_str] + except KeyError: + self.reporter.report_warning( + key=f"looker-chart-{dashboard_element.id}", + reason=f"Chart type {type_str} not supported. Setting to None", + ) + chart_type = None + + return chart_type + + def _make_chart_mce( + self, dashboard_element: LookerDashboardElement + ) -> MetadataChangeEvent: + actor = self.source_config.actor + sys_time = int(time.time()) * 1000 + chart_urn = f"urn:li:chart:({self.source_config.platform_name},{dashboard_element.get_urn_element_id()})" + chart_snapshot = ChartSnapshot( + urn=chart_urn, + aspects=[], + ) + + last_modified = ChangeAuditStamps( + created=AuditStamp(time=sys_time, actor=actor), + lastModified=AuditStamp(time=sys_time, actor=actor), + ) + + chart_type = self._get_chart_type(dashboard_element) + + chart_info = ChartInfoClass( + type=chart_type, + description=dashboard_element.description + if dashboard_element.description is not None + else "", + title=dashboard_element.title + if dashboard_element.title is not None + else "", + lastModified=last_modified, + chartUrl=dashboard_element.url(self.source_config.base_url), + inputs=dashboard_element.get_view_urns(self.source_config.platform_name), + ) + chart_snapshot.aspects.append(chart_info) + + return MetadataChangeEvent(proposedSnapshot=chart_snapshot) + + def _make_dashboard_and_chart_mces( + self, looker_dashboard: LookerDashboard + ) -> List[MetadataChangeEvent]: + actor = self.source_config.actor + sys_time = int(time.time()) * 1000 + + chart_mces = [ + self._make_chart_mce(element) + for element in looker_dashboard.dashboard_elements + ] + + dashboard_urn = f"urn:li:dashboard:({self.source_config.platform_name},{looker_dashboard.get_urn_dashboard_id()})" + dashboard_snapshot = DashboardSnapshot( + urn=dashboard_urn, + aspects=[], + ) + + last_modified = ChangeAuditStamps( + created=AuditStamp(time=sys_time, actor=actor), + lastModified=AuditStamp(time=sys_time, actor=actor), + ) + + dashboard_info = DashboardInfoClass( + description=looker_dashboard.description + if looker_dashboard.description is not None + else "", + title=looker_dashboard.title, + charts=[mce.proposedSnapshot.urn for mce in chart_mces], + lastModified=last_modified, + dashboardUrl=looker_dashboard.url(self.source_config.base_url), + ) + + dashboard_snapshot.aspects.append(dashboard_info) + owners = [OwnerClass(owner=actor, type=OwnershipTypeClass.DATAOWNER)] + dashboard_snapshot.aspects.append( + OwnershipClass( + owners=owners, + lastModified=AuditStampClass( + time=sys_time, actor=self.source_config.actor + ), + ) + ) + dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted)) + + dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot) + + return chart_mces + [dashboard_mce] + + def _get_looker_dashboard(self, dashboard: Dashboard) -> LookerDashboard: + dashboard_elements: List[LookerDashboardElement] = [] + elements = ( + dashboard.dashboard_elements + if dashboard.dashboard_elements is not None + else [] + ) + for element in elements: + self.reporter.report_charts_scanned() + if element.id is not None and self.source_config.chart_pattern.allowed( + element.id + ): + self.reporter.report_charts_dropped(element.id) + continue + looker_dashboard_element = self._get_looker_dashboard_element(element) + if looker_dashboard_element is not None: + dashboard_elements.append(looker_dashboard_element) + + if dashboard.id is None or dashboard.title is None: + raise ValueError("Both dashboard ID and title are None") + + looker_dashboard = LookerDashboard( + id=dashboard.id, + title=dashboard.title, + description=dashboard.description, + dashboard_elements=dashboard_elements, + created_at=dashboard.created_at, + is_deleted=dashboard.deleted if dashboard.deleted is not None else False, + is_hidden=dashboard.deleted if dashboard.deleted is not None else False, + ) + return looker_dashboard + + def _get_looker_client(self): + # The Looker SDK looks wants these as environment variables + os.environ["LOOKERSDK_CLIENT_ID"] = self.source_config.client_id + os.environ["LOOKERSDK_CLIENT_SECRET"] = self.source_config.client_secret + os.environ["LOOKERSDK_BASE_URL"] = self.source_config.base_url + + return looker_sdk.init31() + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + client = self._get_looker_client() + dashboard_ids = [ + dashboard_base.id + for dashboard_base in client.all_dashboards(fields="id") + if dashboard_base.id is not None + ] + + for dashboard_id in dashboard_ids: + self.reporter.report_dashboards_scanned() + if not self.source_config.dashboard_pattern.allowed(dashboard_id): + self.reporter.report_dashboards_dropped(dashboard_id) + continue + try: + fields = ["id", "title", "dashboard_elements", "dashboard_filters"] + dashboard_object = client.dashboard( + dashboard_id=dashboard_id, fields=",".join(fields) + ) + except SDKError: + # A looker dashboard could be deleted in between the list and the get + logger.warning( + f"Error occuried while loading dashboard {dashboard_id}. Skipping." + ) + continue + + looker_dashboard = self._get_looker_dashboard(dashboard_object) + mces = self._make_dashboard_and_chart_mces(looker_dashboard) + for mce in mces: + workunit = MetadataWorkUnit( + id=f"looker-{mce.proposedSnapshot.urn}", mce=mce + ) + self.reporter.report_workunit(workunit) + yield workunit + + @classmethod + def create(cls, config_dict, ctx): + config = LookerDashboardSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_report(self) -> SourceReport: + return self.reporter diff --git a/metadata-ingestion/src/datahub/ingestion/source/lookml.py b/metadata-ingestion/src/datahub/ingestion/source/lookml.py new file mode 100644 index 0000000000..11eaf9208e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/lookml.py @@ -0,0 +1,575 @@ +import glob +import logging +import re +import sys +import time +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from dataclasses import replace +from enum import Enum +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Tuple + +if sys.version_info[1] >= 7: + import lkml +else: + logging.warning("This plugin requres Python 3.7 or newer.") +from sql_metadata import get_query_tables + +from datahub.configuration import ConfigModel +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.source.metadata_common import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status +from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( + DatasetLineageTypeClass, + UpstreamClass, + UpstreamLineage, +) +from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + ArrayTypeClass, + BooleanTypeClass, + DateTypeClass, + NullTypeClass, + NumberTypeClass, + OtherSchema, + SchemaField, + SchemaFieldDataType, + SchemaMetadata, + StringTypeClass, + TimeTypeClass, + UnionTypeClass, +) +from datahub.metadata.schema_classes import EnumTypeClass, SchemaMetadataClass + +assert sys.version_info[1] >= 7 # needed for mypy + +logger = logging.getLogger(__name__) + + +class LookMLSourceConfig(ConfigModel): # pragma: no cover + base_folder: str + connection_to_platform_map: Dict[str, str] + platform_name: str = "looker_views" + actor: str = "urn:li:corpuser:etl" + model_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + env: str = "PROD" + parse_table_names_from_sql: bool = False + + +@dataclass +class LookMLSourceReport(SourceReport): # pragma: no cover + models_scanned: int = 0 + views_scanned: int = 0 + filtered_models: List[str] = dataclass_field(default_factory=list) + filtered_views: List[str] = dataclass_field(default_factory=list) + + def report_models_scanned(self) -> None: + self.models_scanned += 1 + + def report_views_scanned(self) -> None: + self.views_scanned += 1 + + def report_models_dropped(self, model: str) -> None: + self.filtered_models.append(model) + + def report_views_dropped(self, view: str) -> None: + self.filtered_views.append(view) + + +@dataclass +class LookerModel: # pragma: no cover + connection: str + includes: List[str] + resolved_includes: List[str] + + @staticmethod + def from_looker_dict(looker_model_dict: dict, base_folder: str) -> "LookerModel": + connection = looker_model_dict["connection"] + includes = looker_model_dict["includes"] + resolved_includes = LookerModel.resolve_includes(includes, base_folder) + + return LookerModel( + connection=connection, + includes=includes, + resolved_includes=resolved_includes, + ) + + @staticmethod + def resolve_includes(includes: List, base_folder: str) -> List[str]: + resolved = [] + for inc in includes: + # Massage the looker include into a valid glob wildcard expression + glob_expr = f"{base_folder}/{inc}" + outputs = glob.glob(glob_expr) + resolved.extend(outputs) + return resolved + + +@dataclass +class LookerViewFile: # pragma: no cover + absolute_file_path: str + connection: Optional[str] + includes: List[str] + resolved_includes: List[str] + views: List[Dict] + + @staticmethod + def from_looker_dict( + absolute_file_path: str, looker_view_file_dict: dict, base_folder: str + ) -> "LookerViewFile": + includes = looker_view_file_dict.get("includes", []) + resolved_includes = LookerModel.resolve_includes(includes, base_folder) + views = looker_view_file_dict.get("views", []) + + return LookerViewFile( + absolute_file_path=absolute_file_path, + connection=None, + includes=includes, + resolved_includes=resolved_includes, + views=views, + ) + + +class LookerViewFileLoader: # pragma: no cover + """ + Loads the looker viewfile at a :path and caches the LookerViewFile in memory + This is to avoid reloading the same file off of disk many times during the recursive include resolution process + """ + + def __init__(self, base_folder: str) -> None: + self.viewfile_cache: Dict[str, LookerViewFile] = {} + self._base_folder = base_folder + + def is_view_seen(self, path: str) -> bool: + return path in self.viewfile_cache + + def _load_viewfile(self, path: str) -> Optional[LookerViewFile]: + if self.is_view_seen(path): + return self.viewfile_cache[path] + + try: + with open(path, "r") as file: + parsed = lkml.load(file) + looker_viewfile = LookerViewFile.from_looker_dict( + path, parsed, self._base_folder + ) + self.viewfile_cache[path] = looker_viewfile + return looker_viewfile + except Exception: + logger.warning(f"Error processing view file {path}. Skipping it") + return None + + def load_viewfile(self, path: str, connection: str) -> Optional[LookerViewFile]: + viewfile = self._load_viewfile(path) + if viewfile is None: + return None + + return replace(viewfile, connection=connection) + + +class ViewFieldType(Enum): + DIMENSION = "Dimension" + DIMENSION_GROUP = "Dimension Group" + MEASURE = "Measure" + + +@dataclass +class ViewField: + name: str + type: str + description: str + field_type: ViewFieldType + is_primary_key: bool = False + + +@dataclass +class LookerView: # pragma: no cover + absolute_file_path: str + connection: str + view_name: str + sql_table_names: List[str] + fields: List[ViewField] + + @classmethod + def _get_sql_table_names(cls, sql: str) -> List[str]: + sql_tables: List[str] = get_query_tables(sql) + + # Remove temporary tables from WITH statements + sql_table_names = [ + t + for t in sql_tables + if not re.search( + fr"WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(", + sql, + re.IGNORECASE | re.DOTALL, + ) + ] + + # Remove quotes from tables + sql_table_names = [t.replace('"', "") for t in sql_table_names] + + return sql_table_names + + @classmethod + def _get_fields( + cls, field_list: List[Dict], type_cls: ViewFieldType + ) -> List[ViewField]: + fields = [] + for field_dict in field_list: + is_primary_key = field_dict.get("primary_key", "no") == "yes" + name = field_dict["name"] + native_type = field_dict.get("type", "string") + description = field_dict.get("description", "") + field = ViewField( + name=name, + type=native_type, + description=description, + is_primary_key=is_primary_key, + field_type=type_cls, + ) + fields.append(field) + return fields + + @classmethod + def from_looker_dict( + cls, + looker_view: dict, + connection: str, + looker_viewfile: LookerViewFile, + looker_viewfile_loader: LookerViewFileLoader, + parse_table_names_from_sql: bool = False, + ) -> Optional["LookerView"]: + view_name = looker_view["name"] + sql_table_name = looker_view.get("sql_table_name", None) + # Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes + sql_table_name = ( + sql_table_name.replace('"', "") if sql_table_name is not None else None + ) + derived_table = looker_view.get("derived_table", None) + + dimensions = cls._get_fields( + looker_view.get("dimensions", []), ViewFieldType.DIMENSION + ) + dimension_groups = cls._get_fields( + looker_view.get("dimension_groups", []), ViewFieldType.DIMENSION_GROUP + ) + measures = cls._get_fields( + looker_view.get("measures", []), ViewFieldType.MEASURE + ) + fields: List[ViewField] = dimensions + dimension_groups + measures + + # Parse SQL from derived tables to extract dependencies + if derived_table is not None: + if parse_table_names_from_sql and "sql" in derived_table: + # Get the list of tables in the query + sql_table_names = cls._get_sql_table_names(derived_table["sql"]) + else: + sql_table_names = [] + + return LookerView( + absolute_file_path=looker_viewfile.absolute_file_path, + connection=connection, + view_name=view_name, + sql_table_names=sql_table_names, + fields=fields, + ) + + # There is a single dependency in the view, on the sql_table_name + if sql_table_name is not None: + return LookerView( + absolute_file_path=looker_viewfile.absolute_file_path, + connection=connection, + view_name=view_name, + sql_table_names=[sql_table_name], + fields=fields, + ) + + # The sql_table_name might be defined in another view and this view is extending that view, try to find it + else: + extends = looker_view.get("extends", looker_view.get("extends__all", [])) + if len(extends) == 0: + # The view is malformed, the view is not a derived table, does not contain a sql_table_name or an extends + logger.warning( + f"Skipping malformed with view_name: {view_name} ({looker_viewfile.absolute_file_path}). View should have a sql_table_name if it is not a derived table" + ) + return None + + extends_to_looker_view = [] + + # The base view could live in the same file + for raw_view in looker_viewfile.views: + raw_view_name = raw_view["name"] + # Make sure to skip loading view we are currently trying to resolve + if raw_view_name != view_name: + maybe_looker_view = LookerView.from_looker_dict( + raw_view, + connection, + looker_viewfile, + looker_viewfile_loader, + parse_table_names_from_sql, + ) + if ( + maybe_looker_view is not None + and maybe_looker_view.view_name in extends + ): + extends_to_looker_view.append(maybe_looker_view) + + # Or it could live in one of the included files, we do not know which file the base view lives in, try them all! + for include in looker_viewfile.resolved_includes: + maybe_looker_viewfile = looker_viewfile_loader.load_viewfile( + include, connection + ) + if maybe_looker_viewfile is not None: + for view in looker_viewfile.views: + maybe_looker_view = LookerView.from_looker_dict( + view, + connection, + looker_viewfile, + looker_viewfile_loader, + parse_table_names_from_sql, + ) + if maybe_looker_view is None: + continue + + if ( + maybe_looker_view is not None + and maybe_looker_view.view_name in extends + ): + extends_to_looker_view.append(maybe_looker_view) + + if len(extends_to_looker_view) != 1: + logger.warning( + f"Skipping malformed view with view_name: {view_name}. View should have a single view in a view inheritance chain with a sql_table_name" + ) + return None + + output_looker_view = LookerView( + absolute_file_path=looker_viewfile.absolute_file_path, + connection=connection, + view_name=view_name, + sql_table_names=extends_to_looker_view[0].sql_table_names, + fields=fields, + ) + return output_looker_view + + +class LookMLSource(Source): # pragma: no cover + source_config: LookMLSourceConfig + report = LookMLSourceReport() + + def __init__(self, config: LookMLSourceConfig, ctx: PipelineContext): + super().__init__(ctx) + self.source_config = config + self.reporter = LookMLSourceReport() + + @classmethod + def create(cls, config_dict, ctx): + config = LookMLSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def _load_model(self, path: str) -> LookerModel: + with open(path, "r") as file: + parsed = lkml.load(file) + looker_model = LookerModel.from_looker_dict( + parsed, self.source_config.base_folder + ) + return looker_model + + def _construct_datalineage_urn(self, sql_table_name: str, connection: str) -> str: + platform = self._get_platform_based_on_connection(connection) + return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{sql_table_name},{self.source_config.env})" + + def _get_platform_based_on_connection(self, connection: str) -> str: + if connection in self.source_config.connection_to_platform_map: + return self.source_config.connection_to_platform_map[connection] + else: + raise Exception( + f"Could not find a platform for looker view with connection: {connection}" + ) + + def _get_upsteam_lineage( + self, looker_view: LookerView, actor: str, sys_time: int + ) -> UpstreamLineage: + upstreams = [] + for sql_table_name in looker_view.sql_table_names: + upstream = UpstreamClass( + dataset=self._construct_datalineage_urn( + sql_table_name, looker_view.connection + ), + auditStamp=AuditStamp(actor=actor, time=sys_time), + type=DatasetLineageTypeClass.TRANSFORMED, + ) + upstreams.append(upstream) + + upstream_lineage = UpstreamLineage(upstreams=upstreams) + + return upstream_lineage + + def _get_field_type(self, native_type: str) -> SchemaFieldDataType: + field_type_mapping = { + "date": DateTypeClass, + "date_time": TimeTypeClass, + "distance": NumberTypeClass, + "duration": NumberTypeClass, + "location": UnionTypeClass, + "number": NumberTypeClass, + "string": StringTypeClass, + "tier": EnumTypeClass, + "time": TimeTypeClass, + "unquoted": StringTypeClass, + "yesno": BooleanTypeClass, + "zipcode": EnumTypeClass, + "int": NumberTypeClass, + "average": NumberTypeClass, + "average_distinct": NumberTypeClass, + "count": NumberTypeClass, + "count_distinct": NumberTypeClass, + "list": ArrayTypeClass, + "max": NumberTypeClass, + "median": NumberTypeClass, + "median_distinct": NumberTypeClass, + "min": NumberTypeClass, + "percent_of_previous": NumberTypeClass, + "percent_of_total": NumberTypeClass, + "percentile": NumberTypeClass, + "percentile_distinct": NumberTypeClass, + "running_total": NumberTypeClass, + "sum": NumberTypeClass, + "sum_distinct": NumberTypeClass, + } + + if native_type in field_type_mapping: + type_class = field_type_mapping[native_type] + else: + self.reporter.report_warning( + native_type, + f"The type '{native_type}' is not recognised for field type, setting as NullTypeClass.", + ) + type_class = NullTypeClass + data_type = SchemaFieldDataType(type=type_class()) + return data_type + + def _get_fields_and_primary_keys( + self, looker_view: LookerView + ) -> Tuple[List[SchemaField], List[str]]: + fields: List[SchemaField] = [] + primary_keys: List = [] + for field in looker_view.fields: + schema_field = SchemaField( + fieldPath=field.name, + type=self._get_field_type(field.type), + nativeDataType=field.type, + description=f"{field.field_type.value}. {field.description}", + ) + fields.append(schema_field) + if field.is_primary_key: + primary_keys.append(schema_field.fieldPath) + return fields, primary_keys + + def _get_schema( + self, looker_view: LookerView, actor: str, sys_time: int + ) -> SchemaMetadataClass: + fields, primary_keys = self._get_fields_and_primary_keys(looker_view) + stamp = AuditStamp(time=sys_time, actor=actor) + schema_metadata = SchemaMetadata( + schemaName=looker_view.view_name, + platform=self.source_config.platform_name, + version=0, + fields=fields, + primaryKeys=primary_keys, + created=stamp, + lastModified=stamp, + hash="", + platformSchema=OtherSchema(rawSchema="looker-view"), + ) + return schema_metadata + + def _build_dataset_mce(self, looker_view: LookerView) -> MetadataChangeEvent: + """ + Creates MetadataChangeEvent for the dataset, creating upstream lineage links + """ + logger.debug(f"looker_view = {looker_view.view_name}") + + dataset_name = looker_view.view_name + actor = self.source_config.actor + sys_time = int(time.time()) * 1000 + + dataset_snapshot = DatasetSnapshot( + urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.source_config.platform_name}, {dataset_name}, {self.source_config.env})", + aspects=[], # we append to this list later on + ) + dataset_snapshot.aspects.append(Status(removed=False)) + dataset_snapshot.aspects.append( + self._get_upsteam_lineage(looker_view, actor, sys_time) + ) + dataset_snapshot.aspects.append(self._get_schema(looker_view, actor, sys_time)) + + mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) + + return mce + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + viewfile_loader = LookerViewFileLoader(self.source_config.base_folder) + + model_files = sorted( + f + for f in glob.glob( + f"{self.source_config.base_folder}/**/*.model.lkml", recursive=True + ) + ) + for file_path in model_files: + model_name = Path(file_path).stem + self.reporter.report_models_scanned() + if not self.source_config.model_pattern.allowed(model_name): + self.reporter.report_models_dropped(model_name) + continue + try: + model = self._load_model(file_path) + except Exception: + self.reporter.report_warning( + "LookML", f"unable to parse Looker model: {file_path}" + ) + continue + + for include in model.resolved_includes: + is_view_seen = viewfile_loader.is_view_seen(include) + if is_view_seen: + continue + looker_viewfile = viewfile_loader.load_viewfile( + include, model.connection + ) + if looker_viewfile is not None: + for raw_view in looker_viewfile.views: + maybe_looker_view = LookerView.from_looker_dict( + raw_view, + model.connection, + looker_viewfile, + viewfile_loader, + self.source_config.parse_table_names_from_sql, + ) + if maybe_looker_view: + self.reporter.report_views_scanned() + if self.source_config.view_pattern.allowed( + maybe_looker_view.view_name + ): + mce = self._build_dataset_mce(maybe_looker_view) + workunit = MetadataWorkUnit( + id=f"lookml-{maybe_looker_view.view_name}", mce=mce + ) + self.reporter.report_workunit(workunit) + yield workunit + else: + self.reporter.report_views_dropped( + maybe_looker_view.view_name + ) + + def get_report(self): + return self.report + + def close(self): + pass diff --git a/metadata-ingestion/tests/integration/looker/expected_output.json b/metadata-ingestion/tests/integration/looker/expected_output.json new file mode 100644 index 0000000000..16203bee47 --- /dev/null +++ b/metadata-ingestion/tests/integration/looker/expected_output.json @@ -0,0 +1,59 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(looker,dashboards.1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": {}, + "externalUrl": null, + "title": "foo", + "description": "lorem ipsum", + "charts": [], + "lastModified": { + "created": { + "time": 1615443388000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "lastModified": { + "time": 1615443388000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "deleted": null + }, + "dashboardUrl": "https://looker.company.com/dashboards/1", + "access": null, + "lastRefreshed": null + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:etl", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 1615443388000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + } + ] + } + }, + "proposedDelta": null + } + ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py new file mode 100644 index 0000000000..93ad9a1b51 --- /dev/null +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -0,0 +1,65 @@ +import time +from datetime import datetime +from unittest import mock + +from looker_sdk.sdk.api31.models import Dashboard, DashboardElement, Query + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers + + +def test_looker_ingest(pytestconfig, tmp_path, mock_time): + mocked_client = mock.MagicMock() + with mock.patch( + "datahub.ingestion.source.looker.LookerDashboardSource._get_looker_client", + mocked_client, + ): + mocked_client.return_value.all_dashboards.return_value = [Dashboard(id="1")] + mocked_client.return_value.dashboard.return_value = Dashboard( + id="1", + title="foo", + created_at=datetime.utcfromtimestamp(time.time()), + description="lorem ipsum", + dashboard_elements=[ + DashboardElement( + id="2", + type="", + subtitle_text="Some text", + query=Query( + model="data", + view="my_view", + dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}', + ), + ) + ], + ) + + test_resources_dir = pytestconfig.rootpath / "tests/integration/looker" + + pipeline = Pipeline.create( + { + "run_id": "looker-test", + "source": { + "type": "looker", + "config": { + "base_url": "https://looker.company.com", + "client_id": "foo", + "client_secret": "bar", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/looker_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + output = mce_helpers.load_json_file(str(tmp_path / "looker_mces.json")) + expected = mce_helpers.load_json_file( + str(test_resources_dir / "expected_output.json") + ) + mce_helpers.assert_mces_equal(output, expected) diff --git a/metadata-ingestion/tests/integration/lookml/data.model.lkml b/metadata-ingestion/tests/integration/lookml/data.model.lkml new file mode 100644 index 0000000000..0e47898ae2 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/data.model.lkml @@ -0,0 +1,19 @@ +connection: "my_connection" + +include: "foo.view.lkml" + +explore: data_model { + label: "Data model!" + description: "Lorem ipsum" + + always_filter: { + filters: { + field: is_latest_forecast + value: "TRUE" + } + filters: { + field: granularity + value: "day" + } + } +} diff --git a/metadata-ingestion/tests/integration/lookml/expected_output.json b/metadata-ingestion/tests/integration/lookml/expected_output.json new file mode 100644 index 0000000000..4cc3ffff27 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/expected_output.json @@ -0,0 +1,138 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker_views, my_view, PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 1615443388000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:conn,my_table,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_view", + "platform": "looker_views", + "version": 0, + "created": { + "time": 1615443388000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "lastModified": { + "time": 1615443388000, + "actor": "urn:li:corpuser:etl", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "looker-view" + } + }, + "fields": [ + { + "fieldPath": "country", + "jsonPath": null, + "nullable": false, + "description": "Dimension. The country", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "city", + "jsonPath": null, + "nullable": false, + "description": "Dimension. City", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "is_latest", + "jsonPath": null, + "nullable": false, + "description": "Dimension. Is latest data", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.BooleanType": {} + } + }, + "nativeDataType": "yesno", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "timestamp", + "jsonPath": null, + "nullable": false, + "description": "Dimension Group. Timestamp of measurement", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + }, + { + "fieldPath": "average_measurement", + "jsonPath": null, + "nullable": false, + "description": "Measure. My measurement", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "average", + "recursive": false, + "globalTags": null, + "glossaryTerms": null + } + ], + "primaryKeys": [], + "foreignKeysSpecs": null + } + } + ] + } + }, + "proposedDelta": null + } + ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/foo.view.lkml b/metadata-ingestion/tests/integration/lookml/foo.view.lkml new file mode 100644 index 0000000000..b430d54d71 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/foo.view.lkml @@ -0,0 +1,52 @@ +### +### Please update the docs below after editing this: +### https://woltwide.atlassian.net/wiki/spaces/AN/pages/429195880/Weather+Forecasts +### + +view: my_view { + derived_table: { + sql: + SELECT + is_latest, + country, + city, + timestamp, + measurement + FROM + my_table ;; + } + + dimension: country { + type: string + description: "The country" + sql: ${TABLE}.country ;; + } + + dimension: city { + type: string + description: "City" + sql: ${TABLE}.city ;; + } + + dimension: is_latest { + type: yesno + description: "Is latest data" + sql: ${TABLE}.is_latest ;; + } + + dimension_group: timestamp { + group_label: "Timestamp" + type: time + description: "Timestamp of measurement" + sql: ${TABLE}.timestamp ;; + timeframes: [hour, date, week, day_of_week] + } + + measure: average_measurement { + group_label: "Measurement" + type: average + description: "My measurement" + sql: ${TABLE}.measurement ;; + } + +} diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py new file mode 100644 index 0000000000..2fdcced4e0 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -0,0 +1,42 @@ +import logging +import sys + +import pytest + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers + +logging.getLogger("lkml").setLevel(logging.INFO) + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="lkml requires Python 3.7+") +def test_lookml_ingest(pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + + pipeline = Pipeline.create( + { + "run_id": "lookml-test", + "source": { + "type": "lookml", + "config": { + "base_folder": str(test_resources_dir), + "connection_to_platform_map": {"my_connection": "conn"}, + "parse_table_names_from_sql": True, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/lookml_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + output = mce_helpers.load_json_file(str(tmp_path / "lookml_mces.json")) + expected = mce_helpers.load_json_file( + str(test_resources_dir / "expected_output.json") + ) + mce_helpers.assert_mces_equal(output, expected)