fix(ingest/lookml): emit all views with same name and different file path (#9279)

This commit is contained in:
Mayuri Nehate 2023-11-25 04:29:24 +05:30 committed by GitHub
parent 298b9becb0
commit a34fdfd8b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 755 additions and 6 deletions

View File

@ -1982,9 +1982,16 @@ class LookMLSource(StatefulIngestionSourceBase):
self.reporter, self.reporter,
) )
# some views can be mentioned by multiple 'include' statements and can be included via different connections. # Some views can be mentioned by multiple 'include' statements and can be included via different connections.
# So this set is used to prevent creating duplicate events
# This map is used to keep track of which views files have already been processed
# for a connection in order to prevent creating duplicate events.
# Key: connection name, Value: view file paths
processed_view_map: Dict[str, Set[str]] = {} processed_view_map: Dict[str, Set[str]] = {}
# This map is used to keep track of the connection that a view is processed with.
# Key: view unique identifier - determined by variables present in config `view_naming_pattern`
# Value: Tuple(model file name, connection name)
view_connection_map: Dict[str, Tuple[str, str]] = {} view_connection_map: Dict[str, Tuple[str, str]] = {}
# The ** means "this directory and all subdirectories", and hence should # The ** means "this directory and all subdirectories", and hence should
@ -2148,13 +2155,17 @@ class LookMLSource(StatefulIngestionSourceBase):
if self.source_config.view_pattern.allowed( if self.source_config.view_pattern.allowed(
maybe_looker_view.id.view_name maybe_looker_view.id.view_name
): ):
view_urn = maybe_looker_view.id.get_urn(
self.source_config
)
view_connection_mapping = view_connection_map.get( view_connection_mapping = view_connection_map.get(
maybe_looker_view.id.view_name view_urn
) )
if not view_connection_mapping: if not view_connection_mapping:
view_connection_map[ view_connection_map[view_urn] = (
maybe_looker_view.id.view_name model_name,
] = (model_name, model.connection) model.connection,
)
# first time we are discovering this view # first time we are discovering this view
logger.debug( logger.debug(
f"Generating MCP for view {raw_view['name']}" f"Generating MCP for view {raw_view['name']}"

View File

@ -0,0 +1,7 @@
connection: "my_connection"
include: "path1/foo.view.lkml"
explore: aliased_explore {
from: my_view
}

View File

@ -0,0 +1,6 @@
connection: "my_connection"
include: "path2/foo.view.lkml"
explore: duplicate_explore {
from: my_view
}

View File

@ -0,0 +1,47 @@
view: my_view {
derived_table: {
sql:
SELECT
is_latest,
country,
city,
timestamp,
measurement
FROM
my_table ;;
}
dimension: country {
type: string
description: "The country"
sql: ${TABLE}.country ;;
}
dimension: city {
type: string
description: "City"
sql: ${TABLE}.city ;;
}
dimension: is_latest {
type: yesno
description: "Is latest data"
sql: ${TABLE}.is_latest ;;
}
dimension_group: timestamp {
group_label: "Timestamp"
type: time
description: "Timestamp of measurement"
sql: ${TABLE}.timestamp ;;
timeframes: [hour, date, week, day_of_week]
}
measure: average_measurement {
group_label: "Measurement"
type: average
description: "My measurement"
sql: ${TABLE}.measurement ;;
}
}

View File

@ -0,0 +1,41 @@
view: my_view {
derived_table: {
sql:
SELECT
is_latest,
country,
city,
timestamp,
measurement
FROM
my_table ;;
}
dimension: city {
type: string
description: "City"
sql: ${TABLE}.city ;;
}
dimension: is_latest {
type: yesno
description: "Is latest data"
sql: ${TABLE}.is_latest ;;
}
dimension_group: timestamp {
group_label: "Timestamp"
type: time
description: "Timestamp of measurement"
sql: ${TABLE}.timestamp ;;
timeframes: [hour, date, week, day_of_week]
}
measure: average_measurement {
group_label: "Measurement"
type: average
description: "My measurement"
sql: ${TABLE}.measurement ;;
}
}

View File

@ -0,0 +1,587 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/looker/lkml_samples/path1/foo.view.lkml/views"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,warehouse.default_db.default_schema.my_table,DEV)",
"type": "VIEW"
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "my_view",
"platform": "urn:li:dataPlatform:looker",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "country",
"nullable": false,
"description": "The country",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "city",
"nullable": false,
"description": "City",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "is_latest",
"nullable": false,
"description": "Is latest data",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "yesno",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "timestamp",
"nullable": false,
"description": "Timestamp of measurement",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "time",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
},
{
"tag": "urn:li:tag:Temporal"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "average_measurement",
"nullable": false,
"description": "My measurement",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "average",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Measure"
}
]
},
"isPartOfKey": false
}
],
"primaryKeys": []
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"looker.file.path": "path1/foo.view.lkml"
},
"name": "my_view",
"tags": []
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"View"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "viewProperties",
"aspect": {
"json": {
"materialized": false,
"viewLogic": "SELECT\n is_latest,\n country,\n city,\n timestamp,\n measurement\n FROM\n my_table",
"viewLanguage": "sql"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path1.foo.view.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "looker"
},
{
"id": "lkml_samples"
},
{
"id": "path1"
},
{
"id": "foo.view.lkml"
},
{
"id": "views"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/looker/lkml_samples/path2/foo.view.lkml/views"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,warehouse.default_db.default_schema.my_table,DEV)",
"type": "VIEW"
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "my_view",
"platform": "urn:li:dataPlatform:looker",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "city",
"nullable": false,
"description": "City",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "is_latest",
"nullable": false,
"description": "Is latest data",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "yesno",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "timestamp",
"nullable": false,
"description": "Timestamp of measurement",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "time",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
},
{
"tag": "urn:li:tag:Temporal"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "average_measurement",
"nullable": false,
"description": "My measurement",
"label": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "average",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Measure"
}
]
},
"isPartOfKey": false
}
],
"primaryKeys": []
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"looker.file.path": "path2/foo.view.lkml"
},
"name": "my_view",
"tags": []
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"View"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "viewProperties",
"aspect": {
"json": {
"materialized": false,
"viewLogic": "SELECT\n is_latest,\n country,\n city,\n timestamp,\n measurement\n FROM\n my_table",
"viewLanguage": "sql"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.path2.foo.view.my_view,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "looker"
},
{
"id": "lkml_samples"
},
{
"id": "path2"
},
{
"id": "foo.view.lkml"
},
{
"id": "views"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Dimension",
"aspects": [
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Dimension",
"description": "A tag that is applied to all dimension fields."
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Temporal",
"aspects": [
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Temporal",
"description": "A tag that is applied to all time-based (temporal) fields such as timestamps or durations."
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Measure",
"aspects": [
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Measure",
"description": "A tag that is applied to all measures (metrics). Measures are typically the columns that you aggregate on"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Dimension",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Measure",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Temporal",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -802,3 +802,53 @@ def test_lookml_base_folder():
pydantic.ValidationError, match=r"base_folder.+not provided.+deploy_key" pydantic.ValidationError, match=r"base_folder.+not provided.+deploy_key"
): ):
LookMLSourceConfig.parse_obj({"api": fake_api}) LookMLSourceConfig.parse_obj({"api": fake_api})
@freeze_time(FROZEN_TIME)
def test_same_name_views_different_file_path(pytestconfig, tmp_path, mock_time):
"""Test for reachable views"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_same_name_views_different_file_path.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(
test_resources_dir
/ "lkml_same_name_views_different_file_path_samples"
),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"platform_instance": "warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
},
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"process_refinements": False,
"view_naming_pattern": "{project}.{file_path}.view.{name}",
"view_browse_pattern": "/{env}/{platform}/{project}/{file_path}/views",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)