datahub/smoke-test/tests/schema_fields/schema_evolution.py
2025-01-17 23:50:13 +05:30

226 lines
6.9 KiB
Python

import time
from enum import Enum
import pytest
from tenacity import retry, stop_after_delay, wait_fixed
import datahub.metadata.schema_classes as models
from datahub.cli.cli_utils import get_aspects_for_entity
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DataHubGraph
from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync
_MAX_DELAY_UNTIL_WRITES_VISIBLE_SECS = 30
_ATTEMPT_RETRY_INTERVAL_SECS = 1
large_dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:looker,test-large-schema,PROD)"
class FieldPathStyle(Enum):
FLAT = "FLAT"
NESTED = "NESTED"
def _create_schema_with_fields(
entity_urn: str,
num_fields: int = 1,
field_path_style: FieldPathStyle = FieldPathStyle.FLAT,
) -> models.SchemaMetadataClass:
"""
A simple helper function to create a schema with a given number of fields.
The fields are created with the following naming convention:
- FLAT: field_0, field_1, field_2, ...
- NESTED: [version=2.0].[type=Address].parent_field.[type=string].field_0,
[version=2.0].[type=Address].parent_field.[type=string].field_1, ...
TODO: Add support for more complex field types and descriptions beyond
strings.
"""
schema = models.SchemaMetadataClass(
schemaName="my_schema",
platformSchema=models.OtherSchemaClass(rawSchema=""),
platform="urn:li:dataPlatform:bigquery",
version=0,
hash="",
fields=[
models.SchemaFieldClass(
fieldPath=(
f"field_{i}"
if field_path_style == FieldPathStyle.FLAT
else f"[version=2.0].[type=Address].parent_field.[type=string].field_{i}"
),
nativeDataType="STRING",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
description="",
nullable=True,
)
for i in range(num_fields)
],
)
assert schema.validate()
return schema
@pytest.fixture()
def test_setup(auth_session, graph_client):
"""Fixture data"""
session = graph_client._session
gms_host = graph_client.config.server
ingest_file_via_rest(
auth_session, "tests/schema_fields/schema_field_side_effect_data.json"
)
assert "schemaMetadata" in get_aspects_for_entity(
session,
gms_host,
entity_urn=large_dataset_urn,
aspects=["schemaMetadata"],
typed=False,
)
yield
# Deleting takes way too long for CI
#
# rollback_url = f"{gms_host}/runs?action=rollback"
# session.post(
# rollback_url,
# data=json.dumps(
# {"runId": ingested_dataset_run_id, "dryRun": False, "hardDelete": True}
# ),
# )
#
# wait_for_writes_to_sync()
#
# assert "schemaMetadata" not in get_aspects_for_entity(
# entity_urn=large_dataset_urn, aspects=["schemaMetadata"], typed=False
# )
@retry(
stop=stop_after_delay(_MAX_DELAY_UNTIL_WRITES_VISIBLE_SECS),
wait=wait_fixed(_ATTEMPT_RETRY_INTERVAL_SECS),
reraise=True,
)
def assert_schema_field_exists(graph: DataHubGraph, urn: str, field_path: str):
schema_field_urn = make_schema_field_urn(parent_urn=urn, field_path=field_path)
assert graph.exists(schema_field_urn)
status = graph.get_aspect(schema_field_urn, models.StatusClass)
assert status is None or status.removed is False
@retry(
stop=stop_after_delay(_MAX_DELAY_UNTIL_WRITES_VISIBLE_SECS),
wait=wait_fixed(_ATTEMPT_RETRY_INTERVAL_SECS),
reraise=True,
)
def assert_schema_field_soft_deleted(graph: DataHubGraph, urn: str, field_path: str):
schema_field_urn = make_schema_field_urn(parent_urn=urn, field_path=field_path)
status = graph.get_aspect(schema_field_urn, models.StatusClass)
assert status and status.removed is True
@pytest.mark.parametrize(
"field_path_style",
[
FieldPathStyle.NESTED,
FieldPathStyle.FLAT,
],
)
def test_schema_evolution_field_dropped(
graph_client: DataHubGraph, field_path_style: FieldPathStyle
):
"""
Test that schema evolution works as expected
1. Create a schema with 2 fields
2. Sleep for 10 seconds
3. Update the schema to have 1 field
4. Sleep for 10 seconds
5. Assert that the field_1 is removed
"""
now = int(time.time())
urn = make_dataset_urn("bigquery", f"my_dataset.my_table.{now}")
print(urn)
schema_with_2_fields = _create_schema_with_fields(
urn, 2, field_path_style=field_path_style
)
field_names = [field.fieldPath for field in schema_with_2_fields.fields]
graph_client.emit(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=schema_with_2_fields,
)
)
for field_name in field_names:
print("Checking field: ", field_name)
assert_schema_field_exists(graph_client, urn, field_name)
# Evolve the schema
schema_with_1_field = _create_schema_with_fields(
urn, 1, field_path_style=field_path_style
)
new_field_name = schema_with_1_field.fields[0].fieldPath
field_names.remove(new_field_name)
removed_field_name = field_names[0]
graph_client.emit(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=schema_with_1_field,
)
)
assert_schema_field_exists(graph_client, urn, new_field_name)
assert_schema_field_soft_deleted(graph_client, urn, removed_field_name)
def test_soft_deleted_entity(graph_client: DataHubGraph):
"""
Test that we if there is a soft deleted dataset, its schema fields are
initialized with soft deleted status
1. Create a schema with 2 fields
"""
now = int(time.time())
urn = make_dataset_urn("bigquery", f"my_dataset.my_table.{now}")
print(urn)
schema_with_2_fields = _create_schema_with_fields(urn, 2)
field_names = [field.fieldPath for field in schema_with_2_fields.fields]
graph_client.emit(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=schema_with_2_fields,
)
)
for field_name in field_names:
print("Checking field: ", field_name)
assert_schema_field_exists(graph_client, urn, field_name)
# Soft delete the dataset
graph_client.emit(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=models.StatusClass(removed=True),
)
)
# Check that the fields are soft deleted
for field_name in field_names:
assert_schema_field_soft_deleted(graph_client, urn, field_name)
# Note: Does not execute deletes, too slow for CI
@pytest.mark.dependency()
def test_large_schema(graph_client: DataHubGraph, test_setup):
wait_for_writes_to_sync()
assert_schema_field_exists(graph_client, large_dataset_urn, "last_of.6800_cols")