datahub/smoke-test/tests/schema_fields/test_schemafields.py
2025-01-17 23:50:13 +05:30

169 lines
5.0 KiB
Python

import logging
import os
import tempfile
import time
from random import randint
import pytest
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from tests.utils import (
delete_urns_from_file,
get_sleep_info,
ingest_file_via_rest,
wait_for_writes_to_sync,
)
logger = logging.getLogger(__name__)
start_index = randint(10, 10000)
dataset_urns = [
make_dataset_urn("snowflake", f"table_foo_{i}")
for i in range(start_index, start_index + 10)
]
class FileEmitter:
def __init__(self, filename: str) -> None:
self.sink: FileSink = FileSink(
ctx=PipelineContext(run_id="create_test_data"),
config=FileSinkConfig(filename=filename),
)
def emit(self, event):
self.sink.write_record_async(
record_envelope=RecordEnvelope(record=event, metadata={}),
write_callback=NoopWriteCallback(),
)
def close(self):
self.sink.close()
@pytest.fixture(scope="module")
def chart_urn():
return "urn:li:chart:(looker,chart_foo)"
@pytest.fixture(scope="module")
def upstream_schema_field_urn():
return make_schema_field_urn(make_dataset_urn("snowflake", "table_bar"), "field1")
def create_test_data(filename: str, chart_urn: str, upstream_schema_field_urn: str):
documentation_mcp = MetadataChangeProposalWrapper(
entityUrn=upstream_schema_field_urn,
aspect=models.DocumentationClass(
documentations=[
models.DocumentationAssociationClass(
documentation="test documentation",
attribution=models.MetadataAttributionClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
source="urn:li:dataHubAction:documentation_propagation",
),
)
]
),
)
input_fields_mcp = MetadataChangeProposalWrapper(
entityUrn=chart_urn,
aspect=models.InputFieldsClass(
fields=[
models.InputFieldClass(
schemaFieldUrn=upstream_schema_field_urn,
schemaField=models.SchemaFieldClass(
fieldPath="field1",
type=models.SchemaFieldDataTypeClass(models.StringTypeClass()),
nativeDataType="STRING",
),
)
]
),
)
file_emitter = FileEmitter(filename)
for mcps in [documentation_mcp, input_fields_mcp]:
file_emitter.emit(mcps)
file_emitter.close()
sleep_sec, sleep_times = get_sleep_info()
@pytest.fixture(scope="module")
def ingest_cleanup_data(
auth_session, graph_client, request, chart_urn, upstream_schema_field_urn
):
new_file, filename = tempfile.mkstemp(suffix=".json")
try:
create_test_data(filename, chart_urn, upstream_schema_field_urn)
print("ingesting schema fields test data")
ingest_file_via_rest(auth_session, filename)
yield
print("removing schema fields test data")
delete_urns_from_file(graph_client, filename)
wait_for_writes_to_sync()
finally:
os.remove(filename)
def get_gql_query(filename: str) -> str:
with open(filename) as fp:
return fp.read()
def validate_schema_field_urn_for_chart(
graph: DataHubGraph, chart_urn: str, upstream_schema_field_urn: str
) -> None:
# Validate listing
result = graph.execute_graphql(
get_gql_query("tests/schema_fields/queries/get_chart_field.gql"),
{"urn": chart_urn},
)
assert "chart" in result
assert "inputFields" in result["chart"]
assert len(result["chart"]["inputFields"]["fields"]) == 1
assert (
result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][
"urn"
]
== upstream_schema_field_urn
)
assert (
result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][
"fieldPath"
]
== "field1"
)
assert (
result["chart"]["inputFields"]["fields"][0]["schemaFieldUrn"]
== upstream_schema_field_urn
)
assert (
result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][
"documentation"
]["documentations"][0]["documentation"]
== "test documentation"
)
# @tenacity.retry(
# stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
# )
def test_schema_field_gql_mapper_for_charts(
graph_client, ingest_cleanup_data, chart_urn, upstream_schema_field_urn
):
validate_schema_field_urn_for_chart(
graph_client, chart_urn, upstream_schema_field_urn
)