Revert "fix(ingest): Use lower-case dataset names in the dataset urns for all SQL-styled datasets. (#4140)" (#4218)

This reverts commit 6c75185445bbb23974932ff64cb142ee6bf5b51b.
This commit is contained in:
Ravindra Lanka 2022-02-22 16:21:40 -08:00 committed by GitHub
parent 84005d3848
commit 7f4cb87c57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 998 additions and 1118 deletions

View File

@ -5,7 +5,7 @@ import re
import time
from enum import Enum
from hashlib import md5
from typing import Any, List, Optional, Set, Type, TypeVar, Union, cast, get_type_hints
from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints
import typing_inspect
from avrogen.dict_wrapper import DictWrapper
@ -35,20 +35,7 @@ from datahub.metadata.schema_classes import (
DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION
DEFAULT_FLOW_CLUSTER = "prod"
UNKNOWN_USER = "urn:li:corpuser:unknown"
SQL_STYLE_PLATFORMS: Set[str] = {
"athena",
"bigquery",
"druid",
"hive",
"mariadb",
"mssql",
"mysql",
"oracle",
"postgres",
"redshift",
"snowflake",
"trino",
}
logger = logging.getLogger(__name__)
@ -70,9 +57,6 @@ def make_data_platform_urn(platform: str) -> str:
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
# Use lower-case name for all SQL style datasets
if platform in SQL_STYLE_PLATFORMS:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})"
@ -86,12 +70,7 @@ def make_dataplatform_instance_urn(platform: str, instance: str) -> str:
def make_dataset_urn_with_platform_instance(
platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV
) -> str:
if platform_instance:
# Use lower-case name for all SQL style datasets
if platform in SQL_STYLE_PLATFORMS:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})"
else:
return make_dataset_urn(platform=platform, name=name, env=env)
@ -211,6 +190,7 @@ def make_domain_urn(domain: str) -> str:
def make_ml_primary_key_urn(feature_table_name: str, primary_key_name: str) -> str:
return f"urn:li:mlPrimaryKey:({feature_table_name},{primary_key_name})"
@ -218,6 +198,7 @@ def make_ml_feature_urn(
feature_table_name: str,
feature_name: str,
) -> str:
return f"urn:li:mlFeature:({feature_table_name},{feature_name})"

View File

@ -67791,7 +67791,7 @@
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)\", \"type\": \"TRANSFORMED\"}, {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)\", \"type\": \"TRANSFORMED\"}, {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
@ -87188,7 +87188,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -87703,7 +87703,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -104195,7 +104195,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -104710,7 +104710,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -124265,7 +124265,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -124780,7 +124780,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {

View File

@ -1,7 +1,3 @@
from typing import Dict, Optional, Tuple
import pytest
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import (
DataFlowInfoClass,
@ -25,83 +21,3 @@ def test_can_add_aspect():
assert builder.can_add_aspect(dataset_mce, DatasetPropertiesClass)
assert builder.can_add_aspect(dataset_mce, OwnershipClass)
assert not builder.can_add_aspect(dataset_mce, DataFlowInfoClass)
test_make_dataset_urns_params: Dict[
str, Tuple[Tuple[str, str, Optional[str], str], str]
] = {
"athena": (
("athena", "ATABLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:athena,MY_INSTANCE.atable,PROD)",
),
"bigquery": (
("bigquery", "ATable", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:bigquery,MY_INSTANCE.atable,PROD)",
),
"bigquery_no_instance": (
("bigquery", "ATable", None, "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:bigquery,atable,PROD)",
),
"druid": (
("druid", "AtaBLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:druid,MY_INSTANCE.atable,PROD)",
),
"hive": (
("hive", "ataBLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:hive,MY_INSTANCE.atable,PROD)",
),
"mariadb": (
("mariadb", "aTAble", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:mariadb,MY_INSTANCE.atable,PROD)",
),
"mssql": (
("mssql", "aTAblE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:mssql,MY_INSTANCE.atable,PROD)",
),
"mysql": (
("mysql", "aTABlE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:mysql,MY_INSTANCE.atable,PROD)",
),
"oracle": (
("oracle", "AtAbLe", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:oracle,MY_INSTANCE.atable,PROD)",
),
"postgres": (
("postgres", "AtAbLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:postgres,MY_INSTANCE.atable,PROD)",
),
"redshift": (
("redshift", "atAbLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:redshift,MY_INSTANCE.atable,PROD)",
),
"snowflake": (
("snowflake", "atABle", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:snowflake,MY_INSTANCE.atable,PROD)",
),
"trino": (
("trino", "AtaBle", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:trino,MY_INSTANCE.atable,PROD)",
),
"kafka_no_lower_casing": (
("kafka", "MyKafkaTopic", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:kafka,MY_INSTANCE.MyKafkaTopic,PROD)",
),
"kafka_no_instance_no_lower_casing": (
("kafka", "MyKafkaTopic", None, "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:kafka,MyKafkaTopic,PROD)",
),
}
@pytest.mark.parametrize(
"urnParts, expected",
test_make_dataset_urns_params.values(),
ids=test_make_dataset_urns_params.keys(),
)
def test_make_dataset_urns(
urnParts: Tuple[str, str, Optional[str], str], expected: str
) -> None:
dataset_urn = builder.make_dataset_urn_with_platform_instance(
urnParts[0], urnParts[1], urnParts[2], urnParts[3]
)
assert dataset_urn == expected

View File

@ -524,7 +524,7 @@ def test_get_known_viz_chart_snapshot(mocked_data_source):
),
),
chartUrl="http://localhost:5000/queries/4#10",
inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,rfam,PROD)"],
inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam,PROD)"],
type="PIE",
)
],
@ -557,7 +557,7 @@ def test_get_unknown_viz_chart_snapshot(mocked_data_source):
),
),
chartUrl="http://localhost:5000/queries/4#9",
inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,rfam,PROD)"],
inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam,PROD)"],
type="TABLE",
)
],
@ -685,9 +685,9 @@ def test_get_chart_snapshot_parse_table_names_from_sql(mocked_data_source):
),
chartUrl="http://localhost:5000/queries/4#10",
inputs=[
"urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.order_items,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.staffs,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.order_items,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.staffs,PROD)",
],
type="PIE",
)

View File

@ -1,31 +1,17 @@
import json
import time
import urllib
import time
import pytest
import requests
from datahub.cli.docker import check_local_docker_containers
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.metadata.schema_classes import (AssertionInfoClass,
AssertionResultClass,
AssertionResultTypeClass,
AssertionRunEventClass,
AssertionRunStatusClass,
AssertionStdOperatorClass,
AssertionTypeClass,
DatasetAssertionInfoClass,
DatasetAssertionScopeClass,
DatasetColumnAssertionClass,
DatasetColumnStdAggFuncClass,
PartitionSpecClass,
PartitionTypeClass)
from tests.utils import ingest_file_via_rest
from datahub.metadata.schema_classes import AssertionResultTypeClass, DatasetAssertionInfoClass, PartitionTypeClass, AssertionInfoClass, AssertionTypeClass, DatasetAssertionScopeClass, DatasetColumnAssertionClass, AssertionStdOperatorClass, DatasetColumnStdAggFuncClass, AssertionRunEventClass, PartitionSpecClass, AssertionResultClass, AssertionRunStatusClass
from datahub.emitter.mce_builder import make_schema_field_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.ingestion.api.sink import NoopWriteCallback, WriteCallback
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.common import PipelineContext
GMS_ENDPOINT = "http://localhost:8080"
restli_default_headers = {
@ -35,39 +21,37 @@ restli_default_headers = {
def create_test_data(test_file):
assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b"
dataset_urn = make_dataset_urn(platform="postgres", name="fooTable")
dataset_urn=make_dataset_urn(platform="postgres", name="fooTable")
assertion_info = AssertionInfoClass(
type=AssertionTypeClass.DATASET,
customProperties={"suite_name": "demo_suite"},
customProperties={
"suite_name": "demo_suite"
},
datasetAssertion=DatasetAssertionInfoClass(
fields=[make_schema_field_urn(dataset_urn, "col1")],
fields=[make_schema_field_urn(dataset_urn,"col1")],
datasets=[dataset_urn],
scope=DatasetAssertionScopeClass.DATASET_COLUMN,
columnAssertion=DatasetColumnAssertionClass(
stdOperator=AssertionStdOperatorClass.LESS_THAN,
nativeOperator="column_value_is_less_than",
stdAggFunc=DatasetColumnStdAggFuncClass.IDENTITY,
),
stdOperator=AssertionStdOperatorClass.LESS_THAN,
nativeOperator="column_value_is_less_than",
stdAggFunc=DatasetColumnStdAggFuncClass.IDENTITY,
),
parameters={"max_value": "99"},
)
),
parameters={
"max_value": "99"
},
)
# The assertion definition
mcp1 = MetadataChangeProposalWrapper(
entityType="assertion",
changeType="UPSERT",
entityUrn=assertion_urn,
aspectName="assertionInfo",
aspect=assertion_info,
aspect=assertion_info
)
timestamps = [
1643794280350,
1643794280352,
1643794280354,
1643880726872,
1643880726874,
1643880726875,
]
# The assertion run event attached to the dataset
timestamps = [1643794280350, 1643794280352, 1643794280354, 1643880726872, 1643880726874, 1643880726875]
msg_ids = []
# The assertion run event attached to the dataset
mcp2 = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=dataset_urn,
@ -78,7 +62,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass(
partition="[{'country': 'IN'}]",
type=PartitionTypeClass.PARTITION,
),
),
messageId=str(timestamps[0]),
assertionUrn=assertion_urn,
asserteeUrn=dataset_urn,
@ -86,10 +70,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS,
actualAggValue=90,
externalUrl="http://example.com/uuid1",
),
),
runId="uuid1",
status=AssertionRunStatusClass.COMPLETE,
),
status=AssertionRunStatusClass.COMPLETE
)
)
mcp3 = MetadataChangeProposalWrapper(
@ -102,7 +86,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass(
partition="[{'country': 'US'}]",
type=PartitionTypeClass.PARTITION,
),
),
messageId=str(timestamps[1]),
assertionUrn=assertion_urn,
asserteeUrn=dataset_urn,
@ -110,10 +94,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.FAILURE,
actualAggValue=101,
externalUrl="http://example.com/uuid1",
),
),
runId="uuid1",
status=AssertionRunStatusClass.COMPLETE,
),
status=AssertionRunStatusClass.COMPLETE
)
)
# Result of evaluating this assertion on the whole dataset
mcp4 = MetadataChangeProposalWrapper(
@ -126,7 +110,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass(
partition="FULL_TABLE_SNAPSHOT",
type=PartitionTypeClass.FULL_TABLE,
),
),
messageId=str(timestamps[2]),
assertionUrn=assertion_urn,
asserteeUrn=dataset_urn,
@ -134,10 +118,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS,
actualAggValue=93,
externalUrl="http://example.com/uuid1",
),
),
runId="uuid1",
status=AssertionRunStatusClass.COMPLETE,
),
status=AssertionRunStatusClass.COMPLETE
)
)
mcp5 = MetadataChangeProposalWrapper(
@ -150,7 +134,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass(
partition="[{'country': 'IN'}]",
type=PartitionTypeClass.PARTITION,
),
),
messageId=str(timestamps[3]),
assertionUrn=assertion_urn,
asserteeUrn=dataset_urn,
@ -158,10 +142,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS,
actualAggValue=90,
externalUrl="http://example.com/uuid1",
),
),
runId="uuid1",
status=AssertionRunStatusClass.COMPLETE,
),
status=AssertionRunStatusClass.COMPLETE
)
)
mcp6 = MetadataChangeProposalWrapper(
entityType="dataset",
@ -173,7 +157,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass(
partition="[{'country': 'US'}]",
type=PartitionTypeClass.PARTITION,
),
),
messageId=str(timestamps[4]),
assertionUrn=assertion_urn,
asserteeUrn=dataset_urn,
@ -181,10 +165,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.FAILURE,
actualAggValue=101,
externalUrl="http://example.com/uuid1",
),
),
runId="uuid1",
status=AssertionRunStatusClass.COMPLETE,
),
status=AssertionRunStatusClass.COMPLETE
)
)
# Result of evaluating this assertion on the whole dataset
@ -198,7 +182,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass(
partition="FULL_TABLE_SNAPSHOT",
type=PartitionTypeClass.FULL_TABLE,
),
),
messageId=str(timestamps[5]),
assertionUrn=assertion_urn,
asserteeUrn=dataset_urn,
@ -206,22 +190,20 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS,
actualAggValue=93,
externalUrl="http://example.com/uuid1",
),
),
runId="uuid1",
status=AssertionRunStatusClass.COMPLETE,
),
status=AssertionRunStatusClass.COMPLETE
)
)
fileSink: FileSink = FileSink.create(
FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file")
)
fileSink: FileSink = FileSink.create(FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file"))
for mcp in [mcp1, mcp2, mcp3, mcp4, mcp5, mcp6, mcp7]:
fileSink.write_record_async(
RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback()
)
fileSink.write_record_async(RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback())
fileSink.close()
@pytest.fixture(scope="session")
def generate_test_data(tmp_path_factory):
"""Generates metadata events data and stores into a test file"""
@ -229,7 +211,7 @@ def generate_test_data(tmp_path_factory):
file_name = dir_name / "test_dq_events.json"
create_test_data(test_file=str(file_name))
yield str(file_name)
@pytest.fixture(scope="session")
def wait_for_healthchecks(generate_test_data):
@ -251,7 +233,7 @@ def test_run_ingestion(wait_for_healthchecks, generate_test_data):
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_latest_assertions_results_by_partition():
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,footable,PROD)"
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)"
# sleep for elasticsearch indices to be updated
time.sleep(5)
@ -276,7 +258,9 @@ def test_gms_get_latest_assertions_results_by_partition():
}
]
},
"metrics": [{"fieldPath": "result", "aggregationType": "LATEST"}],
"metrics": [
{"fieldPath": "result", "aggregationType": "LATEST"}
],
"buckets": [
{"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"},
{"key": "partitionSpec.partition", "type": "STRING_GROUPING_BUCKET"},
@ -319,7 +303,7 @@ def test_gms_get_latest_assertions_results_by_partition():
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_assertions_on_dataset():
"""lists all assertion urns including those which may not have executed"""
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,footable,PROD)"
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)"
response = requests.get(
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts"
)
@ -332,7 +316,7 @@ def test_gms_get_assertions_on_dataset():
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_assertions_on_dataset_field():
"""lists all assertion urns including those which may not have executed"""
dataset_urn = make_dataset_urn("postgres", "fooTable")
dataset_urn=make_dataset_urn("postgres","fooTable")
field_urn = make_schema_field_urn(dataset_urn, "col1")
response = requests.get(
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts"
@ -358,6 +342,5 @@ def test_gms_get_assertion_info():
assert data["aspect"]
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET"
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"][
"scope"
]
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"]["scope"]