From e93e4691fba930f6700a068f83970d2fab3dce98 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Tue, 5 Jul 2022 10:14:12 -0700 Subject: [PATCH] feat(ingest): lookml - adding support for only emitting reachable views from explores (#5333) --- .../src/datahub/ingestion/source/looker.py | 4 +- .../datahub/ingestion/source/looker_common.py | 17 +- .../src/datahub/ingestion/source/lookml.py | 92 ++- .../lookml/lkml_samples/data.model.lkml | 2 +- .../lookml/lkml_samples/data2.model.lkml | 10 + .../lookml/lkml_samples/foo2.view.lkml | 47 ++ .../lookml/lookml_reachable_views.json | 659 ++++++++++++++++++ .../tests/integration/lookml/test_lookml.py | 70 ++ .../tests/test_helpers/mce_helpers.py | 49 +- 9 files changed, 917 insertions(+), 33 deletions(-) create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_samples/data2.model.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_samples/foo2.view.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py index d472131add..68f8471832 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py @@ -162,8 +162,8 @@ class LookerDashboardSourceConfig(LookerAPIConfig, LookerCommonConfig): @validator("external_base_url", pre=True, always=True) def external_url_defaults_to_api_config_base_url( cls, v: Optional[str], *, values: Dict[str, Any], **kwargs: Dict[str, Any] - ) -> str: - return v or values["base_url"] + ) -> Optional[str]: + return v or values.get("base_url") @validator("platform_instance") def platform_instance_not_supported(cls, v: str) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py index 0c8c496a90..287e4dacc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py @@ -494,20 +494,23 @@ class LookerExplore: return field_match.findall(sql_fragment) @classmethod - def __from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore": + def from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore": + view_names = set() + joins = None + # always add the explore's name or the name from the from clause as the view on which this explore is built + view_names.add(dict.get("from", dict.get("name"))) + if dict.get("joins", {}) != {}: + # additionally for join-based explores, pull in the linked views assert "joins" in dict - view_names = set() for join in dict["joins"]: + join_from = join.get("from") + view_names.add(join_from or join["name"]) sql_on = join.get("sql_on", None) if sql_on is not None: fields = cls._get_fields_from_sql_equality(sql_on) joins = fields - for f in fields: - view_names.add(LookerUtil._extract_view_from_field(f)) - else: - # non-join explore, get view_name from `from` field if possible, default to explore name - view_names = set(dict.get("from", dict.get("name"))) + return LookerExplore( model_name=model_name, name=dict["name"], diff --git a/metadata-ingestion/src/datahub/ingestion/source/lookml.py b/metadata-ingestion/src/datahub/ingestion/source/lookml.py index a08b0bf30b..19409ad210 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/lookml.py +++ b/metadata-ingestion/src/datahub/ingestion/source/lookml.py @@ -27,6 +27,7 @@ from datahub.ingestion.api.decorators import ( ) from datahub.ingestion.source.looker_common import ( LookerCommonConfig, + LookerExplore, LookerUtil, LookerViewId, ViewField, @@ -177,6 +178,10 @@ class LookMLSourceConfig(LookerCommonConfig): 512000, # 512KB should be plenty description="When extracting the view definition from a lookml file, the maximum number of characters to extract.", ) + emit_reachable_views_only: bool = Field( + False, + description="When enabled, only views that are reachable from explores defined in the model files are emitted", + ) @validator("platform_instance") def platform_instance_not_supported(cls, v: str) -> str: @@ -341,7 +346,9 @@ class LookerModel: or included_file.endswith(".dashboard.lookml") or included_file.endswith(".dashboard.lkml") ): - logger.debug(f"include '{inc}' is a dashboard, skipping it") + logger.debug( + f"include '{included_file}' is a dashboard, skipping it" + ) continue logger.debug( @@ -1081,9 +1088,10 @@ class LookMLSource(Source): str(self.source_config.base_folder), self.reporter ) - # some views can be mentioned by multiple 'include' statements, so this set is used to prevent - # creating duplicate MCE messages - processed_view_files: Set[str] = set() + # some views can be mentioned by multiple 'include' statements and can be included via different connections. + # So this set is used to prevent creating duplicate events + processed_view_map: Dict[str, Set[str]] = {} + view_connection_map: Dict[str, Tuple[str, str]] = {} # The ** means "this directory and all subdirectories", and hence should # include all the files we want. @@ -1119,20 +1127,43 @@ class LookMLSource(Source): self.reporter.report_models_dropped(model_name) continue + explore_reachable_views = set() + for explore_dict in model.explores: + explore: LookerExplore = LookerExplore.from_dict( + model_name, explore_dict + ) + if explore.upstream_views: + for view_name in explore.upstream_views: + explore_reachable_views.add(view_name) + + processed_view_files = processed_view_map.get(model.connection) + if processed_view_files is None: + processed_view_map[model.connection] = set() + processed_view_files = processed_view_map[model.connection] + project_name = self.get_project_name(model_name) + logger.debug(f"Model: {model_name}; Includes: {model.resolved_includes}") for include in model.resolved_includes: logger.debug(f"Considering {include} for model {model_name}") if include in processed_view_files: logger.debug(f"view '{include}' already processed, skipping it") continue - logger.debug(f"Attempting to load view file: {include}") looker_viewfile = viewfile_loader.load_viewfile( include, connectionDefinition, self.reporter ) if looker_viewfile is not None: for raw_view in looker_viewfile.views: + if ( + self.source_config.emit_reachable_views_only + and raw_view["name"] not in explore_reachable_views + ): + logger.debug( + f"view {raw_view['name']} is not reachable from an explore, skipping.." + ) + continue + self.reporter.report_views_scanned() try: maybe_looker_view = LookerView.from_looker_dict( @@ -1157,24 +1188,49 @@ class LookMLSource(Source): if self.source_config.view_pattern.allowed( maybe_looker_view.id.view_name ): - mce = self._build_dataset_mce(maybe_looker_view) - workunit = MetadataWorkUnit( - id=f"lookml-view-{maybe_looker_view.id}", - mce=mce, + view_connection_mapping = view_connection_map.get( + maybe_looker_view.id.view_name ) - self.reporter.report_workunit(workunit) - processed_view_files.add(include) - yield workunit - - for mcp in self._build_dataset_mcps(maybe_looker_view): - # We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures + if not view_connection_mapping: + view_connection_map[ + maybe_looker_view.id.view_name + ] = (model_name, model.connection) + # first time we are discovering this view + mce = self._build_dataset_mce(maybe_looker_view) workunit = MetadataWorkUnit( - id=f"lookml-view-{mcp.aspectName}-{maybe_looker_view.id}", - mcp=mcp, - treat_errors_as_warnings=True, + id=f"lookml-view-{maybe_looker_view.id}", + mce=mce, ) + processed_view_files.add(include) self.reporter.report_workunit(workunit) yield workunit + for mcp in self._build_dataset_mcps( + maybe_looker_view + ): + # We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures + workunit = MetadataWorkUnit( + id=f"lookml-view-{mcp.aspectName}-{maybe_looker_view.id}", + mcp=mcp, + treat_errors_as_warnings=True, + ) + self.reporter.report_workunit(workunit) + yield workunit + else: + ( + prev_model_name, + prev_model_connection, + ) = view_connection_mapping + if prev_model_connection != model.connection: + # this view has previously been discovered and emitted using a different connection + logger.warning( + f"view {maybe_looker_view.id.view_name} from model {model_name}, connection {model.connection} was previously processed via model {prev_model_name}, connection {prev_model_connection} and will likely lead to incorrect lineage to the underlying tables" + ) + if ( + not self.source_config.emit_reachable_views_only + ): + logger.warning( + "Consider enabling the `emit_reachable_views_only` flag to handle this case." + ) else: self.reporter.report_views_dropped( str(maybe_looker_view.id) diff --git a/metadata-ingestion/tests/integration/lookml/lkml_samples/data.model.lkml b/metadata-ingestion/tests/integration/lookml/lkml_samples/data.model.lkml index f28fe84d15..f733633264 100644 --- a/metadata-ingestion/tests/integration/lookml/lkml_samples/data.model.lkml +++ b/metadata-ingestion/tests/integration/lookml/lkml_samples/data.model.lkml @@ -30,4 +30,4 @@ explore: data_model { value: "day" } } -} +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/lkml_samples/data2.model.lkml b/metadata-ingestion/tests/integration/lookml/lkml_samples/data2.model.lkml new file mode 100644 index 0000000000..b1cd88dffd --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_samples/data2.model.lkml @@ -0,0 +1,10 @@ +connection: "my_other_connection" +include: "**/*.view.lkml" + +explore: aliased_explore2 { + from: my_view2 +} + +explore: duplicate_explore { + from: my_view +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/lkml_samples/foo2.view.lkml b/metadata-ingestion/tests/integration/lookml/lkml_samples/foo2.view.lkml new file mode 100644 index 0000000000..6edbada817 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_samples/foo2.view.lkml @@ -0,0 +1,47 @@ +view: my_view2 { + derived_table: { + sql: + SELECT + is_latest, + country, + city, + timestamp, + measurement + FROM + my_table ;; + } + + dimension: country { + type: string + description: "The country" + sql: ${TABLE}.country ;; + } + + dimension: city { + type: string + description: "City" + sql: ${TABLE}.city ;; + } + + dimension: is_latest { + type: yesno + description: "Is latest data" + sql: ${TABLE}.is_latest ;; + } + + dimension_group: timestamp { + group_label: "Timestamp" + type: time + description: "Timestamp of measurement" + sql: ${TABLE}.timestamp ;; + timeframes: [hour, date, week, day_of_week] + } + + measure: average_measurement { + group_label: "Measurement" + type: average + description: "My measurement" + sql: ${TABLE}.measurement ;; + } + +} diff --git a/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json b/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json new file mode 100644 index 0000000000..d3e376c499 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json @@ -0,0 +1,659 @@ +[ +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/looker/lkml_samples/views/my_view" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,warehouse.default_db.default_schema.my_table,DEV)", + "type": "VIEW" + } + ], + "fineGrainedLineages": null + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_view", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "country", + "jsonPath": null, + "nullable": false, + "description": "The country", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "city", + "jsonPath": null, + "nullable": false, + "description": "City", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "is_latest", + "jsonPath": null, + "nullable": false, + "description": "Is latest data", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.BooleanType": {} + } + }, + "nativeDataType": "yesno", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "timestamp", + "jsonPath": null, + "nullable": false, + "description": "Timestamp of measurement", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + }, + { + "tag": "urn:li:tag:Temporal", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "average_measurement", + "jsonPath": null, + "nullable": false, + "description": "My measurement", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "average", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + } + ], + "primaryKeys": [], + "foreignKeysSpecs": null, + "foreignKeys": null + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "foo.view.lkml" + }, + "externalUrl": null, + "name": "my_view", + "qualifiedName": null, + "description": null, + "uri": null, + "tags": [] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "value": "{\"typeNames\": [\"view\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "value": "{\"materialized\": false, \"viewLogic\": \"SELECT\\n is_latest,\\n country,\\n city,\\n timestamp,\\n measurement\\n FROM\\n my_table\", \"viewLanguage\": \"sql\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/looker/lkml_samples/views/my_view2" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,rs_warehouse.default_db.default_schema.my_table,DEV)", + "type": "VIEW" + } + ], + "fineGrainedLineages": null + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_view2", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "country", + "jsonPath": null, + "nullable": false, + "description": "The country", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "city", + "jsonPath": null, + "nullable": false, + "description": "City", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "is_latest", + "jsonPath": null, + "nullable": false, + "description": "Is latest data", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.BooleanType": {} + } + }, + "nativeDataType": "yesno", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "timestamp", + "jsonPath": null, + "nullable": false, + "description": "Timestamp of measurement", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + }, + { + "tag": "urn:li:tag:Temporal", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + }, + { + "fieldPath": "average_measurement", + "jsonPath": null, + "nullable": false, + "description": "My measurement", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "average", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + } + ], + "primaryKeys": [], + "foreignKeysSpecs": null, + "foreignKeys": null + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "foo2.view.lkml" + }, + "externalUrl": null, + "name": "my_view2", + "qualifiedName": null, + "description": null, + "uri": null, + "tags": [] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "value": "{\"typeNames\": [\"view\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "value": "{\"materialized\": false, \"viewLogic\": \"SELECT\\n is_latest,\\n country,\\n city,\\n timestamp,\\n measurement\\n FROM\\n my_table\", \"viewLanguage\": \"sql\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Dimension", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Dimension", + "description": "A tag that is applied to all dimension fields.", + "colorHex": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Temporal", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Temporal", + "description": "A tag that is applied to all time-based (temporal) fields such as timestamps or durations.", + "colorHex": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Measure", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Measure", + "description": "A tag that is applied to all measures (metrics). Measures are typically the columns that you aggregate on", + "colorHex": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index da00d27481..b9a53614e4 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -39,6 +39,7 @@ def test_lookml_ingest(pytestconfig, tmp_path, mock_time): "parse_table_names_from_sql": True, "tag_measures_and_dimensions": False, "project_name": "lkml_samples", + "model_pattern": {"deny": ["data2"]}, }, }, "sink": { @@ -82,6 +83,7 @@ def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time): }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", + "model_pattern": {"deny": ["data2"]}, }, }, "sink": { @@ -171,6 +173,7 @@ def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_ti }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", + "model_pattern": {"deny": ["data2"]}, }, }, "sink": { @@ -250,6 +253,7 @@ def ingestion_test( "base_url": "fake_account.looker.com", }, "parse_table_names_from_sql": True, + "model_pattern": {"deny": ["data2"]}, }, }, "sink": { @@ -342,6 +346,7 @@ def test_lookml_github_info(pytestconfig, tmp_path, mock_time): }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", + "model_pattern": {"deny": ["data2"]}, "github_info": {"repo": "datahub/looker-demo", "branch": "master"}, }, }, @@ -362,3 +367,68 @@ def test_lookml_github_info(pytestconfig, tmp_path, mock_time): output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.skipif(sys.version_info < (3, 7), reason="lkml requires Python 3.7+") +def test_reachable_views(pytestconfig, tmp_path, mock_time): + """Test for reachable views""" + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + mce_out = "lookml_reachable_views.json" + pipeline = Pipeline.create( + { + "run_id": "lookml-test", + "source": { + "type": "lookml", + "config": { + "base_folder": str(test_resources_dir / "lkml_samples"), + "connection_to_platform_map": { + "my_connection": { + "platform": "snowflake", + "platform_instance": "warehouse", + "platform_env": "dev", + "default_db": "default_db", + "default_schema": "default_schema", + }, + "my_other_connection": { + "platform": "redshift", + "platform_instance": "rs_warehouse", + "platform_env": "dev", + "default_db": "default_db", + "default_schema": "default_schema", + }, + }, + "parse_table_names_from_sql": True, + "project_name": "lkml_samples", + "emit_reachable_views_only": True, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/{mce_out}", + }, + }, + } + ) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status(raise_warnings=True) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_out, + golden_path=test_resources_dir / mce_out, + ) + + entity_urns = mce_helpers.get_entity_urns(tmp_path / mce_out) + # we should only have two views discoverable + assert len(entity_urns) == 2 + assert ( + "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)" + in entity_urns + ) + assert ( + "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)" + in entity_urns + ) diff --git a/metadata-ingestion/tests/test_helpers/mce_helpers.py b/metadata-ingestion/tests/test_helpers/mce_helpers.py index 9da9bb7e68..cad0503103 100644 --- a/metadata-ingestion/tests/test_helpers/mce_helpers.py +++ b/metadata-ingestion/tests/test_helpers/mce_helpers.py @@ -3,7 +3,7 @@ import logging import os import pprint import shutil -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union import deepdiff @@ -91,11 +91,23 @@ def assert_mces_equal( exclude_regex_paths=ignore_paths, ignore_order=True, ) - if clean_diff != diff: - logger.warning( - f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}" - ) + if not clean_diff: + logger.debug(f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}") diff = clean_diff + if diff: + # do some additional processing to emit helpful messages + output_urns = _get_entity_urns(output) + golden_urns = _get_entity_urns(golden) + in_golden_but_not_in_output = golden_urns - output_urns + in_output_but_not_in_golden = output_urns - golden_urns + if in_golden_but_not_in_output: + logger.info( + f"Golden file has {len(in_golden_but_not_in_output)} more urns: {in_golden_but_not_in_output}" + ) + if in_output_but_not_in_golden: + logger.info( + f"Golden file has {len(in_output_but_not_in_golden)} more urns: {in_output_but_not_in_golden}" + ) assert ( not diff @@ -192,6 +204,33 @@ def _element_matches_pattern( return (True, re.search(pattern, str(element)) is not None) +def get_entity_urns(events_file: str) -> Set[str]: + events = load_json_file(events_file) + assert isinstance(events, list) + return _get_entity_urns(events) + + +def _get_entity_urns(events_list: List[Dict]) -> Set[str]: + entity_type = "dataset" + # mce urns + mce_urns = set( + [ + _get_element(x, _get_mce_urn_path_spec(entity_type)) + for x in events_list + if _get_filter(mce=True, entity_type=entity_type)(x) + ] + ) + mcp_urns = set( + [ + _get_element(x, _get_mcp_urn_path_spec()) + for x in events_list + if _get_filter(mcp=True, entity_type=entity_type)(x) + ] + ) + all_urns = mce_urns.union(mcp_urns) + return all_urns + + def assert_mcp_entity_urn( filter: str, entity_type: str, regex_pattern: str, file: str ) -> int: