mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-16 21:58:39 +00:00
145 lines
4.6 KiB
Python
145 lines
4.6 KiB
Python
![]() |
import json
|
||
|
import urllib
|
||
|
import time
|
||
|
import pytest
|
||
|
import requests
|
||
|
from datahub.cli.docker import check_local_docker_containers
|
||
|
from tests.utils import ingest_file_via_rest
|
||
|
|
||
|
bootstrap_sample_data = "test_resources/bootstrap_data_quality.json"
|
||
|
GMS_ENDPOINT = "http://localhost:8080"
|
||
|
|
||
|
restli_default_headers = {
|
||
|
"X-RestLi-Protocol-Version": "2.0.0",
|
||
|
}
|
||
|
|
||
|
|
||
|
@pytest.fixture(scope="session")
|
||
|
def wait_for_healthchecks():
|
||
|
# Simply assert that everything is healthy, but don't wait.
|
||
|
assert not check_local_docker_containers()
|
||
|
yield
|
||
|
|
||
|
|
||
|
@pytest.mark.dependency()
|
||
|
def test_healthchecks(wait_for_healthchecks):
|
||
|
# Call to wait_for_healthchecks fixture will do the actual functionality.
|
||
|
pass
|
||
|
|
||
|
|
||
|
@pytest.mark.dependency(depends=["test_healthchecks"])
|
||
|
def test_run_ingestion(wait_for_healthchecks):
|
||
|
ingest_file_via_rest(bootstrap_sample_data)
|
||
|
|
||
|
|
||
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||
|
def test_gms_get_latest_assertions_results_by_partition():
|
||
|
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)"
|
||
|
|
||
|
# sleep for elasticsearch indices to be updated
|
||
|
time.sleep(5)
|
||
|
|
||
|
# Query
|
||
|
# Given the dataset
|
||
|
# show me latest assertion results grouped-by date, partition, assertionId
|
||
|
query = json.dumps(
|
||
|
{
|
||
|
"entityName": "dataset",
|
||
|
"aspectName": "assertionResult",
|
||
|
"filter": {
|
||
|
"or": [
|
||
|
{
|
||
|
"and": [
|
||
|
{
|
||
|
"field": "urn",
|
||
|
"value": urn,
|
||
|
"condition": "EQUAL",
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
]
|
||
|
},
|
||
|
"metrics": [
|
||
|
{"fieldPath": "batchAssertionResult", "aggregationType": "LATEST"}
|
||
|
],
|
||
|
"buckets": [
|
||
|
{"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"},
|
||
|
{"key": "partitionSpec.partition", "type": "STRING_GROUPING_BUCKET"},
|
||
|
{
|
||
|
"key": "timestampMillis",
|
||
|
"type": "DATE_GROUPING_BUCKET",
|
||
|
"timeWindowSize": {"multiple": 1, "unit": "DAY"},
|
||
|
},
|
||
|
{"key": "assertionUrn", "type": "STRING_GROUPING_BUCKET"},
|
||
|
],
|
||
|
}
|
||
|
)
|
||
|
response = requests.post(
|
||
|
f"{GMS_ENDPOINT}/analytics?action=getTimeseriesStats",
|
||
|
data=query,
|
||
|
headers=restli_default_headers,
|
||
|
)
|
||
|
|
||
|
response.raise_for_status()
|
||
|
data = response.json()
|
||
|
|
||
|
assert data["value"]
|
||
|
assert data["value"]["table"]
|
||
|
assert sorted(data["value"]["table"]["columnNames"]) == [
|
||
|
"asserteeUrn",
|
||
|
"assertionUrn",
|
||
|
"latest_batchAssertionResult",
|
||
|
"partitionSpec.partition",
|
||
|
"timestampMillis",
|
||
|
]
|
||
|
assert len(data["value"]["table"]["rows"]) == 6
|
||
|
assert (
|
||
|
data["value"]["table"]["rows"][0][
|
||
|
data["value"]["table"]["columnNames"].index("asserteeUrn")
|
||
|
]
|
||
|
== urn
|
||
|
)
|
||
|
|
||
|
|
||
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||
|
def test_gms_get_assertions_on_dataset():
|
||
|
"""lists all assertion urns including those which may not have executed"""
|
||
|
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)"
|
||
|
response = requests.get(
|
||
|
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts"
|
||
|
)
|
||
|
|
||
|
response.raise_for_status()
|
||
|
data = response.json()
|
||
|
assert len(data["relationships"]) == 1
|
||
|
|
||
|
|
||
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||
|
def test_gms_get_assertions_on_dataset_field():
|
||
|
"""lists all assertion urns including those which may not have executed"""
|
||
|
urn = "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD), col1)"
|
||
|
response = requests.get(
|
||
|
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts"
|
||
|
)
|
||
|
|
||
|
response.raise_for_status()
|
||
|
data = response.json()
|
||
|
assert len(data["relationships"]) == 1
|
||
|
|
||
|
|
||
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||
|
def test_gms_get_assertion_info():
|
||
|
assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b"
|
||
|
response = requests.get(
|
||
|
f"{GMS_ENDPOINT}/aspects/{urllib.parse.quote(assertion_urn)}\
|
||
|
?aspect=assertionInfo&version=0",
|
||
|
headers=restli_default_headers,
|
||
|
)
|
||
|
|
||
|
response.raise_for_status()
|
||
|
data = response.json()
|
||
|
|
||
|
assert data["aspect"]
|
||
|
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]
|
||
|
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["assertionType"]
|