feat(ingest): ensure payload size constraints for queryProperties, querySubjects and upstreamLineage aspects (#14919)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Sergio Gómez Villamor 2025-10-06 20:10:59 +02:00 committed by GitHub
parent 40b51ac2da
commit e847b58472
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 746 additions and 0 deletions

View File

@ -1,5 +1,6 @@
import json
import logging
import os
from typing import TYPE_CHECKING, Iterable, List
from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES
@ -7,15 +8,36 @@ from datahub.emitter.serialization_helper import pre_json_transform
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetProfileClass,
QueryPropertiesClass,
QuerySubjectsClass,
SchemaFieldClass,
SchemaMetadataClass,
UpstreamLineageClass,
)
if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
# TODO: ordering
# In the cases where we trim collections of data (e.g. fields in schema, upstream lineage, query subjects), given
# those collections are typically unordered, we should consider sorting them by some criteria (e.g. size, alphabetically)
# so that the trimming is deterministic and predictable and more importantly consistent across executions.
# In the case of schemaMetadata, that's more relevant as currently we may be trimming fields while adding nested ones,
# which may lead to poorly schema rendering in the UI.
logger = logging.getLogger(__name__)
DEFAULT_QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES = 5 * 1024 * 1024 # 5MB
QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES = int(
os.environ.get(
"QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES",
DEFAULT_QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES,
)
)
QUERY_STATEMENT_TRUNCATION_BUFFER = 100
class EnsureAspectSizeProcessor:
def __init__(
@ -81,6 +103,274 @@ class EnsureAspectSizeProcessor:
schema.fields = accepted_fields
def ensure_query_subjects_size(
self, entity_urn: str, query_subjects: QuerySubjectsClass
) -> None:
"""
Ensure query subjects aspect does not exceed allowed size by removing column-level lineage first,
then table lineage if necessary.
"""
if not query_subjects.subjects:
return
total_subjects_size = 0
accepted_table_level_subjects = []
accepted_column_level_subjects = []
column_level_subjects_with_sizes = []
table_level_subjects_with_sizes = []
# Separate column-level and table-level subjects
for subject in query_subjects.subjects:
subject_size = len(json.dumps(pre_json_transform(subject.to_obj())))
if subject.entity.startswith("urn:li:schemaField:"):
column_level_subjects_with_sizes.append((subject, subject_size))
else:
table_level_subjects_with_sizes.append((subject, subject_size))
# Once we find one that doesn't fit, stop everything else to prevent inconsistencies
first_skip_done = False
# First, try to include all table-level subjects
for subject, subject_size in table_level_subjects_with_sizes:
if total_subjects_size + subject_size < self.payload_constraint:
accepted_table_level_subjects.append(subject)
total_subjects_size += subject_size
else:
first_skip_done = True
break
# Then, add column-level subjects if there's remaining space
# Only process if we successfully included all table-level subjects
if not first_skip_done:
for subject, subject_size in column_level_subjects_with_sizes:
if total_subjects_size + subject_size < self.payload_constraint:
accepted_column_level_subjects.append(subject)
total_subjects_size += subject_size
else:
first_skip_done = True
break
if first_skip_done:
# Log aggregate warnings
table_level_skipped_count = len(table_level_subjects_with_sizes) - len(
accepted_table_level_subjects
)
column_level_skipped_count = len(column_level_subjects_with_sizes) - len(
accepted_column_level_subjects
)
self._maybe_warn_query_subjects(
entity_urn, table_level_skipped_count, "table-level lineage subjects"
)
self._maybe_warn_query_subjects(
entity_urn, column_level_skipped_count, "column-level lineage subjects"
)
query_subjects.subjects = (
accepted_table_level_subjects + accepted_column_level_subjects
)
def _maybe_warn_query_subjects(
self, entity_urn: str, skipped_count: int, item_type: str
) -> None:
"""Log warning for query subjects truncation if any items were skipped."""
if skipped_count > 0:
self.report.warning(
title="Query subjects truncated due to size constraint",
message="Query subjects contained too much data and would have caused ingestion to fail",
context=f"Skipped {skipped_count} {item_type} for {entity_urn} due to aspect size constraints",
)
def _maybe_warn_upstream_lineage(
self, entity_urn: str, skipped_count: int, item_type: str
) -> None:
"""Log warning for upstream lineage truncation if any items were skipped."""
if skipped_count > 0:
self.report.warning(
title="Upstream lineage truncated due to size constraint",
message="Upstream lineage contained too much data and would have caused ingestion to fail",
context=f"Skipped {skipped_count} {item_type} for {entity_urn} due to aspect size constraints",
)
def ensure_upstream_lineage_size( # noqa: C901
self, entity_urn: str, upstream_lineage: UpstreamLineageClass
) -> None:
"""
Ensure upstream lineage aspect does not exceed allowed size by removing lineage in priority order:
first NONE fine-grained lineages (lowest priority), then FIELD_SET fine-grained lineages,
then DATASET fine-grained lineages, and finally upstreams (highest priority).
"""
if not upstream_lineage.fineGrainedLineages and not upstream_lineage.upstreams:
return
total_lineage_size = 0
accepted_upstreams = []
accepted_dataset_fg_lineages = []
accepted_field_set_fg_lineages = []
accepted_none_fg_lineages = []
upstream_items_with_sizes = []
dataset_fg_items_with_sizes = []
field_set_fg_items_with_sizes = []
none_fg_items_with_sizes = []
# Add upstreams (highest priority)
if upstream_lineage.upstreams:
for upstream in upstream_lineage.upstreams:
upstream_size = len(json.dumps(pre_json_transform(upstream.to_obj())))
upstream_items_with_sizes.append((upstream, upstream_size))
# Separate fine-grained lineage items by upstreamType: DATASET > FIELD_SET > NONE
if upstream_lineage.fineGrainedLineages:
for fg_lineage in upstream_lineage.fineGrainedLineages:
fg_lineage_size = len(
json.dumps(pre_json_transform(fg_lineage.to_obj()))
)
upstream_type_str = str(fg_lineage.upstreamType)
if upstream_type_str == "DATASET":
dataset_fg_items_with_sizes.append((fg_lineage, fg_lineage_size))
elif upstream_type_str == "FIELD_SET":
field_set_fg_items_with_sizes.append((fg_lineage, fg_lineage_size))
elif upstream_type_str == "NONE":
none_fg_items_with_sizes.append((fg_lineage, fg_lineage_size))
# Once we find one that doesn't fit, stop everything else to prevent inconsistencies
first_skip_done = False
# First, include all upstreams (highest priority)
for item, item_size in upstream_items_with_sizes:
if total_lineage_size + item_size < self.payload_constraint:
accepted_upstreams.append(item)
total_lineage_size += item_size
else:
first_skip_done = True
break
# Second, include DATASET fine-grained lineages if no upstreams were skipped
if not first_skip_done:
for fg_lineage, fg_lineage_size in dataset_fg_items_with_sizes:
if total_lineage_size + fg_lineage_size < self.payload_constraint:
accepted_dataset_fg_lineages.append(fg_lineage)
total_lineage_size += fg_lineage_size
else:
first_skip_done = True
break
# Third, include FIELD_SET fine-grained lineages if no higher priority items were skipped
if not first_skip_done:
for fg_lineage, fg_lineage_size in field_set_fg_items_with_sizes:
if total_lineage_size + fg_lineage_size < self.payload_constraint:
accepted_field_set_fg_lineages.append(fg_lineage)
total_lineage_size += fg_lineage_size
else:
first_skip_done = True
break
# Finally, include NONE fine-grained lineages if no higher priority items were skipped
if not first_skip_done:
for fg_lineage, fg_lineage_size in none_fg_items_with_sizes:
if total_lineage_size + fg_lineage_size < self.payload_constraint:
accepted_none_fg_lineages.append(fg_lineage)
total_lineage_size += fg_lineage_size
else:
first_skip_done = True
break
# Log aggregate warnings instead of per-item warnings
if first_skip_done:
upstreams_skipped_count = len(upstream_items_with_sizes) - len(
accepted_upstreams
)
dataset_fg_skipped_count = len(dataset_fg_items_with_sizes) - len(
accepted_dataset_fg_lineages
)
field_set_fg_skipped_count = len(field_set_fg_items_with_sizes) - len(
accepted_field_set_fg_lineages
)
none_fg_skipped_count = len(none_fg_items_with_sizes) - len(
accepted_none_fg_lineages
)
self._maybe_warn_upstream_lineage(
entity_urn, upstreams_skipped_count, "upstream datasets"
)
self._maybe_warn_upstream_lineage(
entity_urn,
dataset_fg_skipped_count,
"dataset-level fine-grained lineages",
)
self._maybe_warn_upstream_lineage(
entity_urn,
field_set_fg_skipped_count,
"field-set-level fine-grained lineages",
)
self._maybe_warn_upstream_lineage(
entity_urn, none_fg_skipped_count, "none-level fine-grained lineages"
)
# Combine all accepted fine-grained lineages
accepted_fine_grained_lineages = (
accepted_dataset_fg_lineages
+ accepted_field_set_fg_lineages
+ accepted_none_fg_lineages
)
upstream_lineage.upstreams = accepted_upstreams
upstream_lineage.fineGrainedLineages = (
accepted_fine_grained_lineages if accepted_fine_grained_lineages else None
)
def ensure_query_properties_size(
self, entity_urn: str, query_properties: QueryPropertiesClass
) -> None:
"""
Ensure query properties aspect does not exceed allowed size by truncating the query statement value.
Uses a configurable max payload size that is the minimum between QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES
and INGEST_MAX_PAYLOAD_BYTES.
We have found surprisingly large query statements (e.g. 20MB+) that caused ingestion to fail;
that was INSERT INTO VALUES with huge list of values.
"""
if not query_properties.statement or not query_properties.statement.value:
return
max_payload_size = min(
QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES, self.payload_constraint
)
current_size = len(json.dumps(pre_json_transform(query_properties.to_obj())))
if current_size < max_payload_size:
return
reduction_needed = (
current_size - max_payload_size + QUERY_STATEMENT_TRUNCATION_BUFFER
)
statement_value_size = len(query_properties.statement.value)
original_statement_size = statement_value_size
# Only truncate if reduction is actually needed and possible
if statement_value_size > reduction_needed > 0:
new_statement_length = statement_value_size - reduction_needed
truncated_statement = query_properties.statement.value[
:new_statement_length
]
truncation_message = f"... [original value was {original_statement_size} bytes and truncated to {new_statement_length} bytes]"
query_properties.statement.value = truncated_statement + truncation_message
self.report.warning(
title="Query properties truncated due to size constraint",
message="Query properties contained too much data and would have caused ingestion to fail",
context=f"Query statement was truncated from {original_statement_size} to {new_statement_length} characters for {entity_urn} due to aspect size constraints",
)
else:
logger.warning(
f"Cannot truncate query statement for {entity_urn} as it is smaller than or equal to the required reduction size {reduction_needed}. That means that 'ensure_query_properties_size' must be extended to trim other fields different than statement."
)
def ensure_aspect_size(
self,
stream: Iterable[MetadataWorkUnit],
@ -96,4 +386,10 @@ class EnsureAspectSizeProcessor:
self.ensure_schema_metadata_size(wu.get_urn(), schema)
elif profile := wu.get_aspect_of_type(DatasetProfileClass):
self.ensure_dataset_profile_size(wu.get_urn(), profile)
elif query_subjects := wu.get_aspect_of_type(QuerySubjectsClass):
self.ensure_query_subjects_size(wu.get_urn(), query_subjects)
elif upstream_lineage := wu.get_aspect_of_type(UpstreamLineageClass):
self.ensure_upstream_lineage_size(wu.get_urn(), upstream_lineage)
elif query_properties := wu.get_aspect_of_type(QueryPropertiesClass):
self.ensure_query_properties_size(wu.get_urn(), query_properties)
yield wu

View File

@ -15,20 +15,33 @@ from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
DatasetFieldProfileClass,
DatasetLineageTypeClass,
DatasetProfileClass,
DatasetSnapshotClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
NumberTypeClass,
OtherSchemaClass,
QueryLanguageClass,
QueryPropertiesClass,
QuerySourceClass,
QueryStatementClass,
QuerySubjectClass,
QuerySubjectsClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StatusClass,
StringTypeClass,
SubTypesClass,
UpstreamClass,
UpstreamLineageClass,
)
@ -112,6 +125,192 @@ def proper_schema_metadata() -> SchemaMetadataClass:
)
def proper_query_subjects() -> QuerySubjectsClass:
subjects = [
QuerySubjectClass(
entity="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table1,PROD)"
),
QuerySubjectClass(
entity="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table2,PROD)"
),
QuerySubjectClass(
entity="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.table1,PROD),col1)"
),
QuerySubjectClass(
entity="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.table2,PROD),col2)"
),
]
return QuerySubjectsClass(subjects=subjects)
def too_big_query_subjects() -> QuerySubjectsClass:
subjects = []
# Add a few table-level subjects
for i in range(5):
subjects.append(
QuerySubjectClass(
entity=f"urn:li:dataset:(urn:li:dataPlatform:hive,db.table{i},PROD)"
)
)
# Add many column-level subjects with very large entity URNs to exceed the 15MB constraint
# Each URN will be about 40KB, so 500 subjects should create ~20MB of data
for i in range(500):
large_table_name = "a" * 20000 # Very large table name
large_column_name = "b" * 20000 # Very large column name
subjects.append(
QuerySubjectClass(
entity=f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,{large_table_name}_{i},PROD),{large_column_name}_{i})"
)
)
return QuerySubjectsClass(subjects=subjects)
def proper_upstream_lineage() -> UpstreamLineageClass:
upstreams = [
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table1,PROD)",
type=DatasetLineageTypeClass.TRANSFORMED,
),
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table2,PROD)",
type=DatasetLineageTypeClass.TRANSFORMED,
),
]
fine_grained_lineages = [
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
upstreams=["urn:li:dataset:(urn:li:dataPlatform:hive,db1.table3,PROD)"],
downstreams=[
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.target,PROD),col1)"
],
),
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
upstreams=[
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.table4,PROD),col2)"
],
downstreams=[
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.target,PROD),col2)"
],
),
]
return UpstreamLineageClass(
upstreams=upstreams, fineGrainedLineages=fine_grained_lineages
)
def too_big_upstream_lineage() -> UpstreamLineageClass:
upstreams = []
fine_grained_lineages = []
# Add upstreams (highest priority)
for i in range(5):
upstreams.append(
UpstreamClass(
dataset=f"urn:li:dataset:(urn:li:dataPlatform:hive,upstream_table_{i},PROD)",
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
# Add DATASET fine-grained lineages with large URNs
for i in range(200):
large_dataset_name = "a" * 20000
large_downstream_name = "b" * 20000
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
upstreams=[
f"urn:li:dataset:(urn:li:dataPlatform:hive,{large_dataset_name}_{i},PROD)"
],
downstreams=[
f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,target,PROD),{large_downstream_name}_{i})"
],
)
)
# Add FIELD_SET fine-grained lineages with large URNs
for i in range(200):
large_upstream_name = "c" * 20000
large_downstream_name = "d" * 20000
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
upstreams=[
f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,source,PROD),{large_upstream_name}_{i})"
],
downstreams=[
f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,target,PROD),{large_downstream_name}_{i})"
],
)
)
# Add NONE fine-grained lineages with large URNs (lowest priority)
for i in range(200):
large_downstream_name = "e" * 20000
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.NONE,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[
f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,target,PROD),{large_downstream_name}_{i})"
],
)
)
return UpstreamLineageClass(
upstreams=upstreams, fineGrainedLineages=fine_grained_lineages
)
def proper_query_properties() -> QueryPropertiesClass:
# Create a query properties with a reasonably sized statement (~1KB)
query_statement = (
"SELECT * FROM table1 WHERE column1 = 'value' AND column2 > 100;" * 20
)
return QueryPropertiesClass(
statement=QueryStatementClass(
value=query_statement,
language=QueryLanguageClass.SQL,
),
source=QuerySourceClass.SYSTEM,
created=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"),
lastModified=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"),
)
def too_big_query_properties() -> QueryPropertiesClass:
# Create a query properties with a very large statement (~6MB, exceeding the 5MB default limit)
# This is larger than the QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES default (5MB)
large_query_statement = (
"SELECT col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, "
"col11, col12, col13, col14, col15, col16, col17, col18, col19, col20 "
"FROM very_long_table_name_with_lots_of_characters_to_make_it_big "
"WHERE condition1 = 'some_very_long_value_with_lots_of_text' "
"AND condition2 IN ('value1', 'value2', 'value3', 'value4') "
"ORDER BY col1, col2, col3, col4, col5 LIMIT 1000;"
) * 15000 # ~6MB
return QueryPropertiesClass(
statement=QueryStatementClass(
value=large_query_statement,
language=QueryLanguageClass.SQL,
),
source=QuerySourceClass.SYSTEM,
created=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"),
lastModified=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"),
name="Large Test Query",
description="A test query with a very large statement",
)
def proper_dataset_profile() -> DatasetProfileClass:
sample_values = [
"23483295",
@ -344,3 +543,254 @@ def test_wu_processor_not_triggered_by_unhandled_aspects(
]
ensure_schema_metadata_size_mock.assert_not_called()
ensure_dataset_profile_size_mock.assert_not_called()
@freeze_time("2023-01-02 00:00:00")
def test_ensure_size_of_proper_query_subjects(processor):
query_subjects = proper_query_subjects()
orig_repr = json.dumps(query_subjects.to_obj())
processor.ensure_query_subjects_size(
"urn:li:query:(urn:li:dataPlatform:hive, dummy_query, DEV)", query_subjects
)
assert orig_repr == json.dumps(query_subjects.to_obj()), (
"Aspect was modified in case where workunit processor should have been no-op"
)
@freeze_time("2023-01-02 00:00:00")
def test_ensure_size_of_too_big_query_subjects(processor):
query_subjects = too_big_query_subjects()
assert len(query_subjects.subjects) == 505 # 5 table + 500 column subjects
# Verify that the initial size exceeds the default payload constraint
initial_size = len(json.dumps(query_subjects.to_obj()))
expected_size = 20 * 1024 * 1024 # 20MB
assert initial_size == pytest.approx(expected_size, rel=0.05), (
f"Initial size {initial_size} should be around 20MB (±5%), got {initial_size / (1024 * 1024):.1f}MB"
)
assert initial_size > INGEST_MAX_PAYLOAD_BYTES, (
f"Initial size {initial_size} should exceed payload constraint {INGEST_MAX_PAYLOAD_BYTES}"
)
processor.ensure_query_subjects_size(
"urn:li:query:(urn:li:dataPlatform:hive, dummy_query, DEV)", query_subjects
)
# Should be significantly reduced due to size constraints
# With ~20MB of data needing to be reduced to ~15MB, we expect ~25% reduction (125 subjects)
# So final count should be around 380, using 400 as upper bound with buffer
assert len(query_subjects.subjects) < 400, (
"Query subjects has not been properly truncated"
)
# Check that table-level subjects are prioritized (should still be present)
table_subjects = [
s
for s in query_subjects.subjects
if not s.entity.startswith("urn:li:schemaField:")
]
assert len(table_subjects) > 0, (
"Table-level subjects should be prioritized and present"
)
# The aspect should not exceed acceptable size
assert len(json.dumps(query_subjects.to_obj())) < INGEST_MAX_PAYLOAD_BYTES, (
"Aspect exceeded acceptable size"
)
@freeze_time("2023-01-02 00:00:00")
@patch(
"datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_query_subjects_size"
)
def test_wu_processor_triggered_by_query_subjects_aspect(
ensure_query_subjects_size_mock, processor
):
ret = [ # noqa: F841
*processor.ensure_aspect_size(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:query:(urn:li:dataPlatform:hive, dummy_query, DEV)",
aspect=proper_query_subjects(),
).as_workunit()
]
)
]
ensure_query_subjects_size_mock.assert_called_once()
@freeze_time("2023-01-02 00:00:00")
def test_ensure_size_of_proper_upstream_lineage(processor):
upstream_lineage = proper_upstream_lineage()
orig_repr = json.dumps(upstream_lineage.to_obj())
processor.ensure_upstream_lineage_size(
"urn:li:dataset:(urn:li:dataPlatform:hive, dummy_dataset, DEV)",
upstream_lineage,
)
assert orig_repr == json.dumps(upstream_lineage.to_obj()), (
"Aspect was modified in case where workunit processor should have been no-op"
)
@freeze_time("2023-01-02 00:00:00")
def test_ensure_size_of_too_big_upstream_lineage(processor):
upstream_lineage = too_big_upstream_lineage()
assert len(upstream_lineage.upstreams) == 5 # 5 upstreams
assert upstream_lineage.fineGrainedLineages is not None
assert (
len(upstream_lineage.fineGrainedLineages) == 600
) # 200 DATASET + 200 FIELD_SET + 200 NONE
# Verify that the initial size exceeds the default payload constraint
initial_size = len(json.dumps(upstream_lineage.to_obj()))
expected_size = 20 * 1024 * 1024 # 20MB
assert initial_size == pytest.approx(expected_size, rel=0.05), (
f"Initial size {initial_size} should be around 20MB (±5%), got {initial_size / (1024 * 1024):.1f}MB"
)
assert initial_size > INGEST_MAX_PAYLOAD_BYTES, (
f"Initial size {initial_size} should exceed payload constraint {INGEST_MAX_PAYLOAD_BYTES}"
)
processor.ensure_upstream_lineage_size(
"urn:li:dataset:(urn:li:dataPlatform:hive, dummy_dataset, DEV)",
upstream_lineage,
)
# Should be significantly reduced due to size constraints
# With ~20MB of data needing to be reduced to ~15MB, we expect ~25% reduction
# Total items: 5 upstreams + 600 fine-grained = 605, expect around ~450 after 25% reduction
total_items = len(upstream_lineage.upstreams) + (
len(upstream_lineage.fineGrainedLineages)
if upstream_lineage.fineGrainedLineages
else 0
)
assert total_items < 500, "Upstream lineage has not been properly truncated"
# Check that upstreams are prioritized (should still be present)
assert len(upstream_lineage.upstreams) > 0, (
"Upstreams should be prioritized and present"
)
# Check that DATASET fine-grained lineages are prioritized over FIELD_SET and NONE
if upstream_lineage.fineGrainedLineages:
dataset_count = sum(
1
for fg in upstream_lineage.fineGrainedLineages
if str(fg.upstreamType) == "DATASET"
)
field_set_count = sum(
1
for fg in upstream_lineage.fineGrainedLineages
if str(fg.upstreamType) == "FIELD_SET"
)
none_count = sum(
1
for fg in upstream_lineage.fineGrainedLineages
if str(fg.upstreamType) == "NONE"
)
# DATASET should be prioritized over FIELD_SET and NONE
assert dataset_count >= field_set_count, (
"DATASET fine-grained lineages should be prioritized"
)
assert dataset_count >= none_count, (
"DATASET fine-grained lineages should be prioritized over NONE"
)
# The aspect should not exceed acceptable size
assert len(json.dumps(upstream_lineage.to_obj())) < INGEST_MAX_PAYLOAD_BYTES, (
"Aspect exceeded acceptable size"
)
@freeze_time("2023-01-02 00:00:00")
@patch(
"datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_upstream_lineage_size"
)
def test_wu_processor_triggered_by_upstream_lineage_aspect(
ensure_upstream_lineage_size_mock, processor
):
ret = [ # noqa: F841
*processor.ensure_aspect_size(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive, dummy_dataset, DEV)",
aspect=proper_upstream_lineage(),
).as_workunit()
]
)
]
ensure_upstream_lineage_size_mock.assert_called_once()
@freeze_time("2023-01-02 00:00:00")
def test_ensure_size_of_proper_query_properties(processor):
query_properties = proper_query_properties()
original_statement = query_properties.statement.value
# Verify initial size is reasonable (under 5MB)
initial_size = len(json.dumps(query_properties.to_obj()))
assert initial_size < 5 * 1024 * 1024, "Test query properties should be under 5MB"
processor.ensure_query_properties_size("urn:li:query:test", query_properties)
# Statement should remain unchanged for properly sized query properties
assert query_properties.statement.value == original_statement
assert len(processor.report.warnings) == 0
@freeze_time("2023-01-02 00:00:00")
def test_ensure_size_of_too_big_query_properties(processor):
query_properties = too_big_query_properties()
original_statement_size = len(query_properties.statement.value)
# Verify initial size exceeds it's about 5.5MB limit and definitely larger than 5MB
initial_size = len(json.dumps(query_properties.to_obj()))
expected_initial_size = 5.5 * 1024 * 1024 # ~5.5MB
assert initial_size == pytest.approx(expected_initial_size, rel=0.1), (
f"Expected initial size ~{expected_initial_size}, got {initial_size}"
)
assert initial_size > 5 * 1024 * 1024, "Test data should exceed 5MB limit"
processor.ensure_query_properties_size("urn:li:query:test", query_properties)
# Statement should be truncated
assert len(query_properties.statement.value) < original_statement_size
# Should contain truncation message
assert "... [original value was" in query_properties.statement.value
assert (
f"{original_statement_size} bytes and truncated to"
in query_properties.statement.value
)
assert query_properties.statement.value.endswith(" bytes]")
# Final size should be within constraints, ie <= 5MB + buffer
final_size = len(json.dumps(query_properties.to_obj()))
expected_final_size = 5 * 1024 * 1024 + 100 # 5MB + buffer
assert final_size <= expected_final_size, (
f"Final size {final_size} should be <= {expected_final_size}"
)
# Should have logged a warning
assert len(processor.report.warnings) == 1
@freeze_time("2023-01-02 00:00:00")
@patch(
"datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_query_properties_size"
)
def test_wu_processor_triggered_by_query_properties_aspect(
ensure_query_properties_size_mock, processor
):
list(
processor.ensure_aspect_size(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:query:test",
aspect=proper_query_properties(),
).as_workunit()
]
)
)
ensure_query_properties_size_mock.assert_called_once()