fix(ingest): Use lower-case dataset names in the dataset urns for all SQL-styled datasets. (#4140)

This commit is contained in:
Ravindra Lanka 2022-02-16 19:45:07 -08:00 committed by GitHub
parent ede6d91534
commit 6c75185445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1122 additions and 1002 deletions

View File

@ -5,7 +5,7 @@ import re
import time import time
from enum import Enum from enum import Enum
from hashlib import md5 from hashlib import md5
from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints from typing import Any, List, Optional, Set, Type, TypeVar, Union, cast, get_type_hints
import typing_inspect import typing_inspect
from avrogen.dict_wrapper import DictWrapper from avrogen.dict_wrapper import DictWrapper
@ -35,7 +35,20 @@ from datahub.metadata.schema_classes import (
DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION
DEFAULT_FLOW_CLUSTER = "prod" DEFAULT_FLOW_CLUSTER = "prod"
UNKNOWN_USER = "urn:li:corpuser:unknown" UNKNOWN_USER = "urn:li:corpuser:unknown"
SQL_STYLE_PLATFORMS: Set[str] = {
"athena",
"bigquery",
"druid",
"hive",
"mariadb",
"mssql",
"mysql",
"oracle",
"postgres",
"redshift",
"snowflake",
"trino",
}
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -57,6 +70,9 @@ def make_data_platform_urn(platform: str) -> str:
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
# Use lower-case name for all SQL style datasets
if platform in SQL_STYLE_PLATFORMS:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})" return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})"
@ -70,7 +86,12 @@ def make_dataplatform_instance_urn(platform: str, instance: str) -> str:
def make_dataset_urn_with_platform_instance( def make_dataset_urn_with_platform_instance(
platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV
) -> str: ) -> str:
if platform_instance: if platform_instance:
# Use lower-case name for all SQL style datasets
if platform in SQL_STYLE_PLATFORMS:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})" return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})"
else: else:
return make_dataset_urn(platform=platform, name=name, env=env) return make_dataset_urn(platform=platform, name=name, env=env)
@ -190,7 +211,6 @@ def make_domain_urn(domain: str) -> str:
def make_ml_primary_key_urn(feature_table_name: str, primary_key_name: str) -> str: def make_ml_primary_key_urn(feature_table_name: str, primary_key_name: str) -> str:
return f"urn:li:mlPrimaryKey:({feature_table_name},{primary_key_name})" return f"urn:li:mlPrimaryKey:({feature_table_name},{primary_key_name})"
@ -198,7 +218,6 @@ def make_ml_feature_urn(
feature_table_name: str, feature_table_name: str,
feature_name: str, feature_name: str,
) -> str: ) -> str:
return f"urn:li:mlFeature:({feature_table_name},{feature_name})" return f"urn:li:mlFeature:({feature_table_name},{feature_name})"

View File

@ -67791,7 +67791,7 @@
"changeType": "UPSERT", "changeType": "UPSERT",
"aspectName": "upstreamLineage", "aspectName": "upstreamLineage",
"aspect": { "aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)\", \"type\": \"TRANSFORMED\"}, {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)\", \"type\": \"TRANSFORMED\"}]}", "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)\", \"type\": \"TRANSFORMED\"}, {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json" "contentType": "application/json"
}, },
"systemMetadata": { "systemMetadata": {
@ -87188,7 +87188,7 @@
"auditHeader": null, "auditHeader": null,
"proposedSnapshot": { "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)", "urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)",
"aspects": [ "aspects": [
{ {
"com.linkedin.pegasus2avro.common.BrowsePaths": { "com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -87703,7 +87703,7 @@
"auditHeader": null, "auditHeader": null,
"proposedSnapshot": { "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)", "urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)",
"aspects": [ "aspects": [
{ {
"com.linkedin.pegasus2avro.common.BrowsePaths": { "com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -104195,7 +104195,7 @@
"auditHeader": null, "auditHeader": null,
"proposedSnapshot": { "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)", "urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)",
"aspects": [ "aspects": [
{ {
"com.linkedin.pegasus2avro.common.BrowsePaths": { "com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -104710,7 +104710,7 @@
"auditHeader": null, "auditHeader": null,
"proposedSnapshot": { "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)", "urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)",
"aspects": [ "aspects": [
{ {
"com.linkedin.pegasus2avro.common.BrowsePaths": { "com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -124265,7 +124265,7 @@
"auditHeader": null, "auditHeader": null,
"proposedSnapshot": { "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.activitiesAll,PROD)", "urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.activitiesall,PROD)",
"aspects": [ "aspects": [
{ {
"com.linkedin.pegasus2avro.common.BrowsePaths": { "com.linkedin.pegasus2avro.common.BrowsePaths": {
@ -124780,7 +124780,7 @@
"auditHeader": null, "auditHeader": null,
"proposedSnapshot": { "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,Oracle Eloqua.contacts,PROD)", "urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle eloqua.contacts,PROD)",
"aspects": [ "aspects": [
{ {
"com.linkedin.pegasus2avro.common.BrowsePaths": { "com.linkedin.pegasus2avro.common.BrowsePaths": {

View File

@ -1,3 +1,7 @@
from typing import Dict, Optional, Tuple
import pytest
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DataFlowInfoClass, DataFlowInfoClass,
@ -21,3 +25,83 @@ def test_can_add_aspect():
assert builder.can_add_aspect(dataset_mce, DatasetPropertiesClass) assert builder.can_add_aspect(dataset_mce, DatasetPropertiesClass)
assert builder.can_add_aspect(dataset_mce, OwnershipClass) assert builder.can_add_aspect(dataset_mce, OwnershipClass)
assert not builder.can_add_aspect(dataset_mce, DataFlowInfoClass) assert not builder.can_add_aspect(dataset_mce, DataFlowInfoClass)
test_make_dataset_urns_params: Dict[
str, Tuple[Tuple[str, str, Optional[str], str], str]
] = {
"athena": (
("athena", "ATABLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:athena,MY_INSTANCE.atable,PROD)",
),
"bigquery": (
("bigquery", "ATable", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:bigquery,MY_INSTANCE.atable,PROD)",
),
"bigquery_no_instance": (
("bigquery", "ATable", None, "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:bigquery,atable,PROD)",
),
"druid": (
("druid", "AtaBLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:druid,MY_INSTANCE.atable,PROD)",
),
"hive": (
("hive", "ataBLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:hive,MY_INSTANCE.atable,PROD)",
),
"mariadb": (
("mariadb", "aTAble", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:mariadb,MY_INSTANCE.atable,PROD)",
),
"mssql": (
("mssql", "aTAblE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:mssql,MY_INSTANCE.atable,PROD)",
),
"mysql": (
("mysql", "aTABlE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:mysql,MY_INSTANCE.atable,PROD)",
),
"oracle": (
("oracle", "AtAbLe", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:oracle,MY_INSTANCE.atable,PROD)",
),
"postgres": (
("postgres", "AtAbLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:postgres,MY_INSTANCE.atable,PROD)",
),
"redshift": (
("redshift", "atAbLE", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:redshift,MY_INSTANCE.atable,PROD)",
),
"snowflake": (
("snowflake", "atABle", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:snowflake,MY_INSTANCE.atable,PROD)",
),
"trino": (
("trino", "AtaBle", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:trino,MY_INSTANCE.atable,PROD)",
),
"kafka_no_lower_casing": (
("kafka", "MyKafkaTopic", "MY_INSTANCE", "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:kafka,MY_INSTANCE.MyKafkaTopic,PROD)",
),
"kafka_no_instance_no_lower_casing": (
("kafka", "MyKafkaTopic", None, "PROD"),
"urn:li:dataset:(urn:li:dataPlatform:kafka,MyKafkaTopic,PROD)",
),
}
@pytest.mark.parametrize(
"urnParts, expected",
test_make_dataset_urns_params.values(),
ids=test_make_dataset_urns_params.keys(),
)
def test_make_dataset_urns(
urnParts: Tuple[str, str, Optional[str], str], expected: str
) -> None:
dataset_urn = builder.make_dataset_urn_with_platform_instance(
urnParts[0], urnParts[1], urnParts[2], urnParts[3]
)
assert dataset_urn == expected

View File

@ -524,7 +524,7 @@ def test_get_known_viz_chart_snapshot(mocked_data_source):
), ),
), ),
chartUrl="http://localhost:5000/queries/4#10", chartUrl="http://localhost:5000/queries/4#10",
inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam,PROD)"], inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,rfam,PROD)"],
type="PIE", type="PIE",
) )
], ],
@ -557,7 +557,7 @@ def test_get_unknown_viz_chart_snapshot(mocked_data_source):
), ),
), ),
chartUrl="http://localhost:5000/queries/4#9", chartUrl="http://localhost:5000/queries/4#9",
inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam,PROD)"], inputs=["urn:li:dataset:(urn:li:dataPlatform:mysql,rfam,PROD)"],
type="TABLE", type="TABLE",
) )
], ],
@ -685,9 +685,9 @@ def test_get_chart_snapshot_parse_table_names_from_sql(mocked_data_source):
), ),
chartUrl="http://localhost:5000/queries/4#10", chartUrl="http://localhost:5000/queries/4#10",
inputs=[ inputs=[
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.order_items,PROD)", "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.order_items,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.orders,PROD)", "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.staffs,PROD)", "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.staffs,PROD)",
], ],
type="PIE", type="PIE",
) )

View File

@ -1,17 +1,31 @@
import json import json
import urllib
import time import time
import urllib
import pytest import pytest
import requests import requests
from datahub.cli.docker import check_local_docker_containers from datahub.cli.docker import check_local_docker_containers
from tests.utils import ingest_file_via_rest from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.metadata.schema_classes import AssertionResultTypeClass, DatasetAssertionInfoClass, PartitionTypeClass, AssertionInfoClass, AssertionTypeClass, DatasetAssertionScopeClass, DatasetColumnAssertionClass, AssertionStdOperatorClass, DatasetColumnStdAggFuncClass, AssertionRunEventClass, PartitionSpecClass, AssertionResultClass, AssertionRunStatusClass
from datahub.emitter.mce_builder import make_schema_field_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.sink.file import FileSink, FileSinkConfig from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.ingestion.api.sink import NoopWriteCallback, WriteCallback from datahub.metadata.schema_classes import (AssertionInfoClass,
from datahub.ingestion.api.common import RecordEnvelope AssertionResultClass,
from datahub.ingestion.api.common import PipelineContext AssertionResultTypeClass,
AssertionRunEventClass,
AssertionRunStatusClass,
AssertionStdOperatorClass,
AssertionTypeClass,
DatasetAssertionInfoClass,
DatasetAssertionScopeClass,
DatasetColumnAssertionClass,
DatasetColumnStdAggFuncClass,
PartitionSpecClass,
PartitionTypeClass)
from tests.utils import ingest_file_via_rest
GMS_ENDPOINT = "http://localhost:8080" GMS_ENDPOINT = "http://localhost:8080"
restli_default_headers = { restli_default_headers = {
@ -21,37 +35,39 @@ restli_default_headers = {
def create_test_data(test_file): def create_test_data(test_file):
assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b" assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b"
dataset_urn=make_dataset_urn(platform="postgres", name="fooTable") dataset_urn = make_dataset_urn(platform="postgres", name="fooTable")
assertion_info = AssertionInfoClass( assertion_info = AssertionInfoClass(
type=AssertionTypeClass.DATASET, type=AssertionTypeClass.DATASET,
customProperties={ customProperties={"suite_name": "demo_suite"},
"suite_name": "demo_suite"
},
datasetAssertion=DatasetAssertionInfoClass( datasetAssertion=DatasetAssertionInfoClass(
fields=[make_schema_field_urn(dataset_urn,"col1")], fields=[make_schema_field_urn(dataset_urn, "col1")],
datasets=[dataset_urn], datasets=[dataset_urn],
scope=DatasetAssertionScopeClass.DATASET_COLUMN, scope=DatasetAssertionScopeClass.DATASET_COLUMN,
columnAssertion=DatasetColumnAssertionClass( columnAssertion=DatasetColumnAssertionClass(
stdOperator=AssertionStdOperatorClass.LESS_THAN, stdOperator=AssertionStdOperatorClass.LESS_THAN,
nativeOperator="column_value_is_less_than", nativeOperator="column_value_is_less_than",
stdAggFunc=DatasetColumnStdAggFuncClass.IDENTITY, stdAggFunc=DatasetColumnStdAggFuncClass.IDENTITY,
),
), ),
), parameters={"max_value": "99"},
parameters={ )
"max_value": "99"
},
)
# The assertion definition # The assertion definition
mcp1 = MetadataChangeProposalWrapper( mcp1 = MetadataChangeProposalWrapper(
entityType="assertion", entityType="assertion",
changeType="UPSERT", changeType="UPSERT",
entityUrn=assertion_urn, entityUrn=assertion_urn,
aspectName="assertionInfo", aspectName="assertionInfo",
aspect=assertion_info aspect=assertion_info,
) )
timestamps = [1643794280350, 1643794280352, 1643794280354, 1643880726872, 1643880726874, 1643880726875] timestamps = [
msg_ids = [] 1643794280350,
# The assertion run event attached to the dataset 1643794280352,
1643794280354,
1643880726872,
1643880726874,
1643880726875,
]
# The assertion run event attached to the dataset
mcp2 = MetadataChangeProposalWrapper( mcp2 = MetadataChangeProposalWrapper(
entityType="dataset", entityType="dataset",
entityUrn=dataset_urn, entityUrn=dataset_urn,
@ -62,7 +78,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass( partitionSpec=PartitionSpecClass(
partition="[{'country': 'IN'}]", partition="[{'country': 'IN'}]",
type=PartitionTypeClass.PARTITION, type=PartitionTypeClass.PARTITION,
), ),
messageId=str(timestamps[0]), messageId=str(timestamps[0]),
assertionUrn=assertion_urn, assertionUrn=assertion_urn,
asserteeUrn=dataset_urn, asserteeUrn=dataset_urn,
@ -70,10 +86,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS, type=AssertionResultTypeClass.SUCCESS,
actualAggValue=90, actualAggValue=90,
externalUrl="http://example.com/uuid1", externalUrl="http://example.com/uuid1",
), ),
runId="uuid1", runId="uuid1",
status=AssertionRunStatusClass.COMPLETE status=AssertionRunStatusClass.COMPLETE,
) ),
) )
mcp3 = MetadataChangeProposalWrapper( mcp3 = MetadataChangeProposalWrapper(
@ -86,7 +102,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass( partitionSpec=PartitionSpecClass(
partition="[{'country': 'US'}]", partition="[{'country': 'US'}]",
type=PartitionTypeClass.PARTITION, type=PartitionTypeClass.PARTITION,
), ),
messageId=str(timestamps[1]), messageId=str(timestamps[1]),
assertionUrn=assertion_urn, assertionUrn=assertion_urn,
asserteeUrn=dataset_urn, asserteeUrn=dataset_urn,
@ -94,10 +110,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.FAILURE, type=AssertionResultTypeClass.FAILURE,
actualAggValue=101, actualAggValue=101,
externalUrl="http://example.com/uuid1", externalUrl="http://example.com/uuid1",
), ),
runId="uuid1", runId="uuid1",
status=AssertionRunStatusClass.COMPLETE status=AssertionRunStatusClass.COMPLETE,
) ),
) )
# Result of evaluating this assertion on the whole dataset # Result of evaluating this assertion on the whole dataset
mcp4 = MetadataChangeProposalWrapper( mcp4 = MetadataChangeProposalWrapper(
@ -110,7 +126,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass( partitionSpec=PartitionSpecClass(
partition="FULL_TABLE_SNAPSHOT", partition="FULL_TABLE_SNAPSHOT",
type=PartitionTypeClass.FULL_TABLE, type=PartitionTypeClass.FULL_TABLE,
), ),
messageId=str(timestamps[2]), messageId=str(timestamps[2]),
assertionUrn=assertion_urn, assertionUrn=assertion_urn,
asserteeUrn=dataset_urn, asserteeUrn=dataset_urn,
@ -118,10 +134,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS, type=AssertionResultTypeClass.SUCCESS,
actualAggValue=93, actualAggValue=93,
externalUrl="http://example.com/uuid1", externalUrl="http://example.com/uuid1",
), ),
runId="uuid1", runId="uuid1",
status=AssertionRunStatusClass.COMPLETE status=AssertionRunStatusClass.COMPLETE,
) ),
) )
mcp5 = MetadataChangeProposalWrapper( mcp5 = MetadataChangeProposalWrapper(
@ -134,7 +150,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass( partitionSpec=PartitionSpecClass(
partition="[{'country': 'IN'}]", partition="[{'country': 'IN'}]",
type=PartitionTypeClass.PARTITION, type=PartitionTypeClass.PARTITION,
), ),
messageId=str(timestamps[3]), messageId=str(timestamps[3]),
assertionUrn=assertion_urn, assertionUrn=assertion_urn,
asserteeUrn=dataset_urn, asserteeUrn=dataset_urn,
@ -142,10 +158,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS, type=AssertionResultTypeClass.SUCCESS,
actualAggValue=90, actualAggValue=90,
externalUrl="http://example.com/uuid1", externalUrl="http://example.com/uuid1",
), ),
runId="uuid1", runId="uuid1",
status=AssertionRunStatusClass.COMPLETE status=AssertionRunStatusClass.COMPLETE,
) ),
) )
mcp6 = MetadataChangeProposalWrapper( mcp6 = MetadataChangeProposalWrapper(
entityType="dataset", entityType="dataset",
@ -157,7 +173,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass( partitionSpec=PartitionSpecClass(
partition="[{'country': 'US'}]", partition="[{'country': 'US'}]",
type=PartitionTypeClass.PARTITION, type=PartitionTypeClass.PARTITION,
), ),
messageId=str(timestamps[4]), messageId=str(timestamps[4]),
assertionUrn=assertion_urn, assertionUrn=assertion_urn,
asserteeUrn=dataset_urn, asserteeUrn=dataset_urn,
@ -165,10 +181,10 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.FAILURE, type=AssertionResultTypeClass.FAILURE,
actualAggValue=101, actualAggValue=101,
externalUrl="http://example.com/uuid1", externalUrl="http://example.com/uuid1",
), ),
runId="uuid1", runId="uuid1",
status=AssertionRunStatusClass.COMPLETE status=AssertionRunStatusClass.COMPLETE,
) ),
) )
# Result of evaluating this assertion on the whole dataset # Result of evaluating this assertion on the whole dataset
@ -182,7 +198,7 @@ def create_test_data(test_file):
partitionSpec=PartitionSpecClass( partitionSpec=PartitionSpecClass(
partition="FULL_TABLE_SNAPSHOT", partition="FULL_TABLE_SNAPSHOT",
type=PartitionTypeClass.FULL_TABLE, type=PartitionTypeClass.FULL_TABLE,
), ),
messageId=str(timestamps[5]), messageId=str(timestamps[5]),
assertionUrn=assertion_urn, assertionUrn=assertion_urn,
asserteeUrn=dataset_urn, asserteeUrn=dataset_urn,
@ -190,20 +206,22 @@ def create_test_data(test_file):
type=AssertionResultTypeClass.SUCCESS, type=AssertionResultTypeClass.SUCCESS,
actualAggValue=93, actualAggValue=93,
externalUrl="http://example.com/uuid1", externalUrl="http://example.com/uuid1",
), ),
runId="uuid1", runId="uuid1",
status=AssertionRunStatusClass.COMPLETE status=AssertionRunStatusClass.COMPLETE,
) ),
) )
fileSink: FileSink = FileSink.create(FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file")) fileSink: FileSink = FileSink.create(
FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file")
)
for mcp in [mcp1, mcp2, mcp3, mcp4, mcp5, mcp6, mcp7]: for mcp in [mcp1, mcp2, mcp3, mcp4, mcp5, mcp6, mcp7]:
fileSink.write_record_async(RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback()) fileSink.write_record_async(
RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback()
)
fileSink.close() fileSink.close()
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def generate_test_data(tmp_path_factory): def generate_test_data(tmp_path_factory):
"""Generates metadata events data and stores into a test file""" """Generates metadata events data and stores into a test file"""
@ -211,7 +229,7 @@ def generate_test_data(tmp_path_factory):
file_name = dir_name / "test_dq_events.json" file_name = dir_name / "test_dq_events.json"
create_test_data(test_file=str(file_name)) create_test_data(test_file=str(file_name))
yield str(file_name) yield str(file_name)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def wait_for_healthchecks(generate_test_data): def wait_for_healthchecks(generate_test_data):
@ -233,7 +251,7 @@ def test_run_ingestion(wait_for_healthchecks, generate_test_data):
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_latest_assertions_results_by_partition(): def test_gms_get_latest_assertions_results_by_partition():
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)" urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,footable,PROD)"
# sleep for elasticsearch indices to be updated # sleep for elasticsearch indices to be updated
time.sleep(5) time.sleep(5)
@ -258,9 +276,7 @@ def test_gms_get_latest_assertions_results_by_partition():
} }
] ]
}, },
"metrics": [ "metrics": [{"fieldPath": "result", "aggregationType": "LATEST"}],
{"fieldPath": "result", "aggregationType": "LATEST"}
],
"buckets": [ "buckets": [
{"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"}, {"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"},
{"key": "partitionSpec.partition", "type": "STRING_GROUPING_BUCKET"}, {"key": "partitionSpec.partition", "type": "STRING_GROUPING_BUCKET"},
@ -303,7 +319,7 @@ def test_gms_get_latest_assertions_results_by_partition():
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_assertions_on_dataset(): def test_gms_get_assertions_on_dataset():
"""lists all assertion urns including those which may not have executed""" """lists all assertion urns including those which may not have executed"""
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)" urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,footable,PROD)"
response = requests.get( response = requests.get(
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts" f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts"
) )
@ -316,7 +332,7 @@ def test_gms_get_assertions_on_dataset():
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_assertions_on_dataset_field(): def test_gms_get_assertions_on_dataset_field():
"""lists all assertion urns including those which may not have executed""" """lists all assertion urns including those which may not have executed"""
dataset_urn=make_dataset_urn("postgres","fooTable") dataset_urn = make_dataset_urn("postgres", "fooTable")
field_urn = make_schema_field_urn(dataset_urn, "col1") field_urn = make_schema_field_urn(dataset_urn, "col1")
response = requests.get( response = requests.get(
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts" f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts"
@ -342,5 +358,6 @@ def test_gms_get_assertion_info():
assert data["aspect"] assert data["aspect"]
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"] assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET" assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET"
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"]["scope"] assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"][
"scope"
]