feat(ingest): lookml - adding support for only emitting reachable views from explores (#5333)

This commit is contained in:
Shirshanka Das 2022-07-05 10:14:12 -07:00 committed by GitHub
parent afc9842e32
commit e93e4691fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 917 additions and 33 deletions

View File

@ -162,8 +162,8 @@ class LookerDashboardSourceConfig(LookerAPIConfig, LookerCommonConfig):
@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
cls, v: Optional[str], *, values: Dict[str, Any], **kwargs: Dict[str, Any]
) -> str:
return v or values["base_url"]
) -> Optional[str]:
return v or values.get("base_url")
@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:

View File

@ -494,20 +494,23 @@ class LookerExplore:
return field_match.findall(sql_fragment)
@classmethod
def __from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore":
def from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore":
view_names = set()
joins = None
# always add the explore's name or the name from the from clause as the view on which this explore is built
view_names.add(dict.get("from", dict.get("name")))
if dict.get("joins", {}) != {}:
# additionally for join-based explores, pull in the linked views
assert "joins" in dict
view_names = set()
for join in dict["joins"]:
join_from = join.get("from")
view_names.add(join_from or join["name"])
sql_on = join.get("sql_on", None)
if sql_on is not None:
fields = cls._get_fields_from_sql_equality(sql_on)
joins = fields
for f in fields:
view_names.add(LookerUtil._extract_view_from_field(f))
else:
# non-join explore, get view_name from `from` field if possible, default to explore name
view_names = set(dict.get("from", dict.get("name")))
return LookerExplore(
model_name=model_name,
name=dict["name"],

View File

@ -27,6 +27,7 @@ from datahub.ingestion.api.decorators import (
)
from datahub.ingestion.source.looker_common import (
LookerCommonConfig,
LookerExplore,
LookerUtil,
LookerViewId,
ViewField,
@ -177,6 +178,10 @@ class LookMLSourceConfig(LookerCommonConfig):
512000, # 512KB should be plenty
description="When extracting the view definition from a lookml file, the maximum number of characters to extract.",
)
emit_reachable_views_only: bool = Field(
False,
description="When enabled, only views that are reachable from explores defined in the model files are emitted",
)
@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
@ -341,7 +346,9 @@ class LookerModel:
or included_file.endswith(".dashboard.lookml")
or included_file.endswith(".dashboard.lkml")
):
logger.debug(f"include '{inc}' is a dashboard, skipping it")
logger.debug(
f"include '{included_file}' is a dashboard, skipping it"
)
continue
logger.debug(
@ -1081,9 +1088,10 @@ class LookMLSource(Source):
str(self.source_config.base_folder), self.reporter
)
# some views can be mentioned by multiple 'include' statements, so this set is used to prevent
# creating duplicate MCE messages
processed_view_files: Set[str] = set()
# some views can be mentioned by multiple 'include' statements and can be included via different connections.
# So this set is used to prevent creating duplicate events
processed_view_map: Dict[str, Set[str]] = {}
view_connection_map: Dict[str, Tuple[str, str]] = {}
# The ** means "this directory and all subdirectories", and hence should
# include all the files we want.
@ -1119,20 +1127,43 @@ class LookMLSource(Source):
self.reporter.report_models_dropped(model_name)
continue
explore_reachable_views = set()
for explore_dict in model.explores:
explore: LookerExplore = LookerExplore.from_dict(
model_name, explore_dict
)
if explore.upstream_views:
for view_name in explore.upstream_views:
explore_reachable_views.add(view_name)
processed_view_files = processed_view_map.get(model.connection)
if processed_view_files is None:
processed_view_map[model.connection] = set()
processed_view_files = processed_view_map[model.connection]
project_name = self.get_project_name(model_name)
logger.debug(f"Model: {model_name}; Includes: {model.resolved_includes}")
for include in model.resolved_includes:
logger.debug(f"Considering {include} for model {model_name}")
if include in processed_view_files:
logger.debug(f"view '{include}' already processed, skipping it")
continue
logger.debug(f"Attempting to load view file: {include}")
looker_viewfile = viewfile_loader.load_viewfile(
include, connectionDefinition, self.reporter
)
if looker_viewfile is not None:
for raw_view in looker_viewfile.views:
if (
self.source_config.emit_reachable_views_only
and raw_view["name"] not in explore_reachable_views
):
logger.debug(
f"view {raw_view['name']} is not reachable from an explore, skipping.."
)
continue
self.reporter.report_views_scanned()
try:
maybe_looker_view = LookerView.from_looker_dict(
@ -1157,24 +1188,49 @@ class LookMLSource(Source):
if self.source_config.view_pattern.allowed(
maybe_looker_view.id.view_name
):
mce = self._build_dataset_mce(maybe_looker_view)
workunit = MetadataWorkUnit(
id=f"lookml-view-{maybe_looker_view.id}",
mce=mce,
view_connection_mapping = view_connection_map.get(
maybe_looker_view.id.view_name
)
self.reporter.report_workunit(workunit)
processed_view_files.add(include)
yield workunit
for mcp in self._build_dataset_mcps(maybe_looker_view):
# We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures
if not view_connection_mapping:
view_connection_map[
maybe_looker_view.id.view_name
] = (model_name, model.connection)
# first time we are discovering this view
mce = self._build_dataset_mce(maybe_looker_view)
workunit = MetadataWorkUnit(
id=f"lookml-view-{mcp.aspectName}-{maybe_looker_view.id}",
mcp=mcp,
treat_errors_as_warnings=True,
id=f"lookml-view-{maybe_looker_view.id}",
mce=mce,
)
processed_view_files.add(include)
self.reporter.report_workunit(workunit)
yield workunit
for mcp in self._build_dataset_mcps(
maybe_looker_view
):
# We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures
workunit = MetadataWorkUnit(
id=f"lookml-view-{mcp.aspectName}-{maybe_looker_view.id}",
mcp=mcp,
treat_errors_as_warnings=True,
)
self.reporter.report_workunit(workunit)
yield workunit
else:
(
prev_model_name,
prev_model_connection,
) = view_connection_mapping
if prev_model_connection != model.connection:
# this view has previously been discovered and emitted using a different connection
logger.warning(
f"view {maybe_looker_view.id.view_name} from model {model_name}, connection {model.connection} was previously processed via model {prev_model_name}, connection {prev_model_connection} and will likely lead to incorrect lineage to the underlying tables"
)
if (
not self.source_config.emit_reachable_views_only
):
logger.warning(
"Consider enabling the `emit_reachable_views_only` flag to handle this case."
)
else:
self.reporter.report_views_dropped(
str(maybe_looker_view.id)

View File

@ -30,4 +30,4 @@ explore: data_model {
value: "day"
}
}
}
}

View File

@ -0,0 +1,10 @@
connection: "my_other_connection"
include: "**/*.view.lkml"
explore: aliased_explore2 {
from: my_view2
}
explore: duplicate_explore {
from: my_view
}

View File

@ -0,0 +1,47 @@
view: my_view2 {
derived_table: {
sql:
SELECT
is_latest,
country,
city,
timestamp,
measurement
FROM
my_table ;;
}
dimension: country {
type: string
description: "The country"
sql: ${TABLE}.country ;;
}
dimension: city {
type: string
description: "City"
sql: ${TABLE}.city ;;
}
dimension: is_latest {
type: yesno
description: "Is latest data"
sql: ${TABLE}.is_latest ;;
}
dimension_group: timestamp {
group_label: "Timestamp"
type: time
description: "Timestamp of measurement"
sql: ${TABLE}.timestamp ;;
timeframes: [hour, date, week, day_of_week]
}
measure: average_measurement {
group_label: "Measurement"
type: average
description: "My measurement"
sql: ${TABLE}.measurement ;;
}
}

View File

@ -0,0 +1,659 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/looker/lkml_samples/views/my_view"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,warehouse.default_db.default_schema.my_table,DEV)",
"type": "VIEW"
}
],
"fineGrainedLineages": null
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "my_view",
"platform": "urn:li:dataPlatform:looker",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "country",
"jsonPath": null,
"nullable": false,
"description": "The country",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "city",
"jsonPath": null,
"nullable": false,
"description": "City",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "is_latest",
"jsonPath": null,
"nullable": false,
"description": "Is latest data",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "yesno",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "timestamp",
"jsonPath": null,
"nullable": false,
"description": "Timestamp of measurement",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "time",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
},
{
"tag": "urn:li:tag:Temporal",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "average_measurement",
"jsonPath": null,
"nullable": false,
"description": "My measurement",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "average",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Measure",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
"primaryKeys": [],
"foreignKeysSpecs": null,
"foreignKeys": null
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"looker.file.path": "foo.view.lkml"
},
"externalUrl": null,
"name": "my_view",
"qualifiedName": null,
"description": null,
"uri": null,
"tags": []
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"view\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "viewProperties",
"aspect": {
"value": "{\"materialized\": false, \"viewLogic\": \"SELECT\\n is_latest,\\n country,\\n city,\\n timestamp,\\n measurement\\n FROM\\n my_table\", \"viewLanguage\": \"sql\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/looker/lkml_samples/views/my_view2"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,rs_warehouse.default_db.default_schema.my_table,DEV)",
"type": "VIEW"
}
],
"fineGrainedLineages": null
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "my_view2",
"platform": "urn:li:dataPlatform:looker",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "country",
"jsonPath": null,
"nullable": false,
"description": "The country",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "city",
"jsonPath": null,
"nullable": false,
"description": "City",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "is_latest",
"jsonPath": null,
"nullable": false,
"description": "Is latest data",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "yesno",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "timestamp",
"jsonPath": null,
"nullable": false,
"description": "Timestamp of measurement",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "time",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
},
{
"tag": "urn:li:tag:Temporal",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "average_measurement",
"jsonPath": null,
"nullable": false,
"description": "My measurement",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "average",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Measure",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
"primaryKeys": [],
"foreignKeysSpecs": null,
"foreignKeys": null
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"looker.file.path": "foo2.view.lkml"
},
"externalUrl": null,
"name": "my_view2",
"qualifiedName": null,
"description": null,
"uri": null,
"tags": []
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"view\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "viewProperties",
"aspect": {
"value": "{\"materialized\": false, \"viewLogic\": \"SELECT\\n is_latest,\\n country,\\n city,\\n timestamp,\\n measurement\\n FROM\\n my_table\", \"viewLanguage\": \"sql\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Dimension",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Dimension",
"description": "A tag that is applied to all dimension fields.",
"colorHex": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Temporal",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Temporal",
"description": "A tag that is applied to all time-based (temporal) fields such as timestamps or durations.",
"colorHex": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Measure",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Measure",
"description": "A tag that is applied to all measures (metrics). Measures are typically the columns that you aggregate on",
"colorHex": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
}
]

View File

@ -39,6 +39,7 @@ def test_lookml_ingest(pytestconfig, tmp_path, mock_time):
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
},
},
"sink": {
@ -82,6 +83,7 @@ def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time):
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
},
},
"sink": {
@ -171,6 +173,7 @@ def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_ti
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
},
},
"sink": {
@ -250,6 +253,7 @@ def ingestion_test(
"base_url": "fake_account.looker.com",
},
"parse_table_names_from_sql": True,
"model_pattern": {"deny": ["data2"]},
},
},
"sink": {
@ -342,6 +346,7 @@ def test_lookml_github_info(pytestconfig, tmp_path, mock_time):
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"github_info": {"repo": "datahub/looker-demo", "branch": "master"},
},
},
@ -362,3 +367,68 @@ def test_lookml_github_info(pytestconfig, tmp_path, mock_time):
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
@freeze_time(FROZEN_TIME)
@pytest.mark.skipif(sys.version_info < (3, 7), reason="lkml requires Python 3.7+")
def test_reachable_views(pytestconfig, tmp_path, mock_time):
"""Test for reachable views"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_reachable_views.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"platform_instance": "warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
},
"my_other_connection": {
"platform": "redshift",
"platform_instance": "rs_warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
},
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"emit_reachable_views_only": True,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
entity_urns = mce_helpers.get_entity_urns(tmp_path / mce_out)
# we should only have two views discoverable
assert len(entity_urns) == 2
assert (
"urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)"
in entity_urns
)
assert (
"urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)"
in entity_urns
)

View File

@ -3,7 +3,7 @@ import logging
import os
import pprint
import shutil
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
import deepdiff
@ -91,11 +91,23 @@ def assert_mces_equal(
exclude_regex_paths=ignore_paths,
ignore_order=True,
)
if clean_diff != diff:
logger.warning(
f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}"
)
if not clean_diff:
logger.debug(f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}")
diff = clean_diff
if diff:
# do some additional processing to emit helpful messages
output_urns = _get_entity_urns(output)
golden_urns = _get_entity_urns(golden)
in_golden_but_not_in_output = golden_urns - output_urns
in_output_but_not_in_golden = output_urns - golden_urns
if in_golden_but_not_in_output:
logger.info(
f"Golden file has {len(in_golden_but_not_in_output)} more urns: {in_golden_but_not_in_output}"
)
if in_output_but_not_in_golden:
logger.info(
f"Golden file has {len(in_output_but_not_in_golden)} more urns: {in_output_but_not_in_golden}"
)
assert (
not diff
@ -192,6 +204,33 @@ def _element_matches_pattern(
return (True, re.search(pattern, str(element)) is not None)
def get_entity_urns(events_file: str) -> Set[str]:
events = load_json_file(events_file)
assert isinstance(events, list)
return _get_entity_urns(events)
def _get_entity_urns(events_list: List[Dict]) -> Set[str]:
entity_type = "dataset"
# mce urns
mce_urns = set(
[
_get_element(x, _get_mce_urn_path_spec(entity_type))
for x in events_list
if _get_filter(mce=True, entity_type=entity_type)(x)
]
)
mcp_urns = set(
[
_get_element(x, _get_mcp_urn_path_spec())
for x in events_list
if _get_filter(mcp=True, entity_type=entity_type)(x)
]
)
all_urns = mce_urns.union(mcp_urns)
return all_urns
def assert_mcp_entity_urn(
filter: str, entity_type: str, regex_pattern: str, file: str
) -> int: