datahub/metadata-ingestion/tests/unit/test_rest_sink.py

313 lines
13 KiB
Python

import contextlib
import json
from datetime import datetime, timezone
import pytest
import requests
from freezegun import freeze_time
import datahub.metadata.schema_classes as models
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
MOCK_GMS_ENDPOINT = "http://fakegmshost:8080"
FROZEN_TIME = 1618987484580
basicAuditStamp = models.AuditStampClass(
time=1618987484580,
actor="urn:li:corpuser:datahub",
impersonator=None,
)
@pytest.mark.parametrize(
"record,path,snapshot",
[
(
# Simple test.
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,downstream,PROD)",
aspects=[
models.UpstreamLineageClass(
upstreams=[
models.UpstreamClass(
auditStamp=basicAuditStamp,
dataset="urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream1,PROD)",
type="TRANSFORMED",
),
models.UpstreamClass(
auditStamp=basicAuditStamp,
dataset="urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream2,PROD)",
type="TRANSFORMED",
),
]
)
],
),
),
"/entities?action=ingest",
{
"entity": {
"value": {
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,downstream,PROD)",
"aspects": [
{
"com.linkedin.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 1618987484580,
"actor": "urn:li:corpuser:datahub",
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream1,PROD)",
"type": "TRANSFORMED",
},
{
"auditStamp": {
"time": 1618987484580,
"actor": "urn:li:corpuser:datahub",
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream2,PROD)",
"type": "TRANSFORMED",
},
]
}
}
],
}
}
},
"systemMetadata": {
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
# Verify the serialization behavior with chart type enums.
models.MetadataChangeEventClass(
proposedSnapshot=models.ChartSnapshotClass(
urn="urn:li:chart:(superset,227)",
aspects=[
models.ChartInfoClass(
title="Weekly Messages",
description="",
lastModified=models.ChangeAuditStampsClass(
created=basicAuditStamp,
lastModified=basicAuditStamp,
),
type=models.ChartTypeClass.SCATTER,
),
],
)
),
"/entities?action=ingest",
{
"entity": {
"value": {
"com.linkedin.metadata.snapshot.ChartSnapshot": {
"urn": "urn:li:chart:(superset,227)",
"aspects": [
{
"com.linkedin.chart.ChartInfo": {
"customProperties": {},
"title": "Weekly Messages",
"description": "",
"lastModified": {
"created": {
"time": 1618987484580,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1618987484580,
"actor": "urn:li:corpuser:datahub",
},
},
"type": "SCATTER",
}
}
],
}
}
},
"systemMetadata": {
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
# Verify that DataJobInfo is serialized properly (particularly it's union type).
models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
models.DataJobInfoClass(
name="User Deletions",
description="Constructs the fct_users_deleted from logging_events",
type=models.AzkabanJobTypeClass.SQL,
)
],
)
),
"/entities?action=ingest",
{
"entity": {
"value": {
"com.linkedin.metadata.snapshot.DataJobSnapshot": {
"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
"aspects": [
{
"com.linkedin.datajob.DataJobInfo": {
"customProperties": {},
"name": "User Deletions",
"description": "Constructs the fct_users_deleted from logging_events",
"type": {"string": "SQL"},
}
}
],
}
}
},
"systemMetadata": {
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
# Usage stats ingestion test.
models.UsageAggregationClass(
bucket=1623826800000,
duration="DAY",
resource="urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
metrics=models.UsageAggregationMetricsClass(
uniqueUserCount=2,
users=[
models.UserUsageCountsClass(
user="urn:li:corpuser:jdoe",
count=5,
),
models.UserUsageCountsClass(
user="urn:li:corpuser:unknown",
count=3,
userEmail="foo@example.com",
),
],
totalSqlQueries=1,
topSqlQueries=["SELECT * FROM foo"],
),
),
"/usageStats?action=batchIngest",
{
"buckets": [
{
"bucket": 1623826800000,
"duration": "DAY",
"resource": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
"metrics": {
"uniqueUserCount": 2,
"users": [
{"count": 5, "user": "urn:li:corpuser:jdoe"},
{
"count": 3,
"user": "urn:li:corpuser:unknown",
"userEmail": "foo@example.com",
},
],
"totalSqlQueries": 1,
"topSqlQueries": ["SELECT * FROM foo"],
},
}
]
},
),
(
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)",
aspect=models.OwnershipClass(
owners=[
models.OwnerClass(
owner="urn:li:corpuser:fbar",
type=models.OwnershipTypeClass.DATAOWNER,
)
],
lastModified=models.AuditStampClass(
time=0,
actor="urn:li:corpuser:fbar",
),
),
),
"/aspects?action=ingestProposal",
{
"proposal": {
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"value": '{"owners": [{"owner": "urn:li:corpuser:fbar", "type": "DATAOWNER"}], "ownerTypes": {}, "lastModified": {"time": 0, "actor": "urn:li:corpuser:fbar"}}',
"contentType": "application/json",
},
"systemMetadata": {
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
}
},
),
],
)
@freeze_time(datetime.fromtimestamp(FROZEN_TIME / 1000, tz=timezone.utc))
def test_datahub_rest_emitter(requests_mock, record, path, snapshot):
def match_request_text(request: requests.Request) -> bool:
requested_snapshot = request.json()
assert requested_snapshot == snapshot, (
f"Expected snapshot to be {json.dumps(snapshot)}, got {json.dumps(requested_snapshot)}"
)
return True
requests_mock.post(
f"{MOCK_GMS_ENDPOINT}{path}",
request_headers={"X-RestLi-Protocol-Version": "2.0.0"},
additional_matcher=match_request_text,
)
with contextlib.ExitStack() as stack:
if isinstance(record, models.UsageAggregationClass):
stack.enter_context(
pytest.warns(
DeprecationWarning,
match="Use emit with a datasetUsageStatistics aspect instead",
)
)
# This test specifically exercises the restli emitter endpoints.
# We have additional tests specifically for the OpenAPI emitter
# and its request format.
emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT, openapi_ingestion=False)
stack.enter_context(emitter)
emitter.emit(record)