From e847b58472d889ee71b5101658c4b9748db2263a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Mon, 6 Oct 2025 20:10:59 +0200 Subject: [PATCH] feat(ingest): ensure payload size constraints for queryProperties, querySubjects and upstreamLineage aspects (#14919) Co-authored-by: Claude --- .../auto_ensure_aspect_size.py | 296 ++++++++++++ .../source_helpers/test_ensure_aspect_size.py | 450 ++++++++++++++++++ 2 files changed, 746 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py index f9d1493259..7f266e3cbf 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py +++ b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py @@ -1,5 +1,6 @@ import json import logging +import os from typing import TYPE_CHECKING, Iterable, List from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES @@ -7,15 +8,36 @@ from datahub.emitter.serialization_helper import pre_json_transform from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( DatasetProfileClass, + QueryPropertiesClass, + QuerySubjectsClass, SchemaFieldClass, SchemaMetadataClass, + UpstreamLineageClass, ) if TYPE_CHECKING: from datahub.ingestion.api.source import SourceReport + +# TODO: ordering +# In the cases where we trim collections of data (e.g. fields in schema, upstream lineage, query subjects), given +# those collections are typically unordered, we should consider sorting them by some criteria (e.g. size, alphabetically) +# so that the trimming is deterministic and predictable and more importantly consistent across executions. +# In the case of schemaMetadata, that's more relevant as currently we may be trimming fields while adding nested ones, +# which may lead to poorly schema rendering in the UI. + logger = logging.getLogger(__name__) +DEFAULT_QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES = 5 * 1024 * 1024 # 5MB +QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES = int( + os.environ.get( + "QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES", + DEFAULT_QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES, + ) +) + +QUERY_STATEMENT_TRUNCATION_BUFFER = 100 + class EnsureAspectSizeProcessor: def __init__( @@ -81,6 +103,274 @@ class EnsureAspectSizeProcessor: schema.fields = accepted_fields + def ensure_query_subjects_size( + self, entity_urn: str, query_subjects: QuerySubjectsClass + ) -> None: + """ + Ensure query subjects aspect does not exceed allowed size by removing column-level lineage first, + then table lineage if necessary. + """ + if not query_subjects.subjects: + return + + total_subjects_size = 0 + accepted_table_level_subjects = [] + accepted_column_level_subjects = [] + column_level_subjects_with_sizes = [] + table_level_subjects_with_sizes = [] + + # Separate column-level and table-level subjects + for subject in query_subjects.subjects: + subject_size = len(json.dumps(pre_json_transform(subject.to_obj()))) + + if subject.entity.startswith("urn:li:schemaField:"): + column_level_subjects_with_sizes.append((subject, subject_size)) + else: + table_level_subjects_with_sizes.append((subject, subject_size)) + + # Once we find one that doesn't fit, stop everything else to prevent inconsistencies + first_skip_done = False + + # First, try to include all table-level subjects + for subject, subject_size in table_level_subjects_with_sizes: + if total_subjects_size + subject_size < self.payload_constraint: + accepted_table_level_subjects.append(subject) + total_subjects_size += subject_size + else: + first_skip_done = True + break + + # Then, add column-level subjects if there's remaining space + # Only process if we successfully included all table-level subjects + if not first_skip_done: + for subject, subject_size in column_level_subjects_with_sizes: + if total_subjects_size + subject_size < self.payload_constraint: + accepted_column_level_subjects.append(subject) + total_subjects_size += subject_size + else: + first_skip_done = True + break + + if first_skip_done: + # Log aggregate warnings + table_level_skipped_count = len(table_level_subjects_with_sizes) - len( + accepted_table_level_subjects + ) + column_level_skipped_count = len(column_level_subjects_with_sizes) - len( + accepted_column_level_subjects + ) + + self._maybe_warn_query_subjects( + entity_urn, table_level_skipped_count, "table-level lineage subjects" + ) + self._maybe_warn_query_subjects( + entity_urn, column_level_skipped_count, "column-level lineage subjects" + ) + + query_subjects.subjects = ( + accepted_table_level_subjects + accepted_column_level_subjects + ) + + def _maybe_warn_query_subjects( + self, entity_urn: str, skipped_count: int, item_type: str + ) -> None: + """Log warning for query subjects truncation if any items were skipped.""" + if skipped_count > 0: + self.report.warning( + title="Query subjects truncated due to size constraint", + message="Query subjects contained too much data and would have caused ingestion to fail", + context=f"Skipped {skipped_count} {item_type} for {entity_urn} due to aspect size constraints", + ) + + def _maybe_warn_upstream_lineage( + self, entity_urn: str, skipped_count: int, item_type: str + ) -> None: + """Log warning for upstream lineage truncation if any items were skipped.""" + if skipped_count > 0: + self.report.warning( + title="Upstream lineage truncated due to size constraint", + message="Upstream lineage contained too much data and would have caused ingestion to fail", + context=f"Skipped {skipped_count} {item_type} for {entity_urn} due to aspect size constraints", + ) + + def ensure_upstream_lineage_size( # noqa: C901 + self, entity_urn: str, upstream_lineage: UpstreamLineageClass + ) -> None: + """ + Ensure upstream lineage aspect does not exceed allowed size by removing lineage in priority order: + first NONE fine-grained lineages (lowest priority), then FIELD_SET fine-grained lineages, + then DATASET fine-grained lineages, and finally upstreams (highest priority). + """ + if not upstream_lineage.fineGrainedLineages and not upstream_lineage.upstreams: + return + + total_lineage_size = 0 + accepted_upstreams = [] + accepted_dataset_fg_lineages = [] + accepted_field_set_fg_lineages = [] + accepted_none_fg_lineages = [] + upstream_items_with_sizes = [] + dataset_fg_items_with_sizes = [] + field_set_fg_items_with_sizes = [] + none_fg_items_with_sizes = [] + + # Add upstreams (highest priority) + if upstream_lineage.upstreams: + for upstream in upstream_lineage.upstreams: + upstream_size = len(json.dumps(pre_json_transform(upstream.to_obj()))) + upstream_items_with_sizes.append((upstream, upstream_size)) + + # Separate fine-grained lineage items by upstreamType: DATASET > FIELD_SET > NONE + if upstream_lineage.fineGrainedLineages: + for fg_lineage in upstream_lineage.fineGrainedLineages: + fg_lineage_size = len( + json.dumps(pre_json_transform(fg_lineage.to_obj())) + ) + + upstream_type_str = str(fg_lineage.upstreamType) + if upstream_type_str == "DATASET": + dataset_fg_items_with_sizes.append((fg_lineage, fg_lineage_size)) + elif upstream_type_str == "FIELD_SET": + field_set_fg_items_with_sizes.append((fg_lineage, fg_lineage_size)) + elif upstream_type_str == "NONE": + none_fg_items_with_sizes.append((fg_lineage, fg_lineage_size)) + + # Once we find one that doesn't fit, stop everything else to prevent inconsistencies + first_skip_done = False + + # First, include all upstreams (highest priority) + for item, item_size in upstream_items_with_sizes: + if total_lineage_size + item_size < self.payload_constraint: + accepted_upstreams.append(item) + total_lineage_size += item_size + else: + first_skip_done = True + break + + # Second, include DATASET fine-grained lineages if no upstreams were skipped + if not first_skip_done: + for fg_lineage, fg_lineage_size in dataset_fg_items_with_sizes: + if total_lineage_size + fg_lineage_size < self.payload_constraint: + accepted_dataset_fg_lineages.append(fg_lineage) + total_lineage_size += fg_lineage_size + else: + first_skip_done = True + break + + # Third, include FIELD_SET fine-grained lineages if no higher priority items were skipped + if not first_skip_done: + for fg_lineage, fg_lineage_size in field_set_fg_items_with_sizes: + if total_lineage_size + fg_lineage_size < self.payload_constraint: + accepted_field_set_fg_lineages.append(fg_lineage) + total_lineage_size += fg_lineage_size + else: + first_skip_done = True + break + + # Finally, include NONE fine-grained lineages if no higher priority items were skipped + if not first_skip_done: + for fg_lineage, fg_lineage_size in none_fg_items_with_sizes: + if total_lineage_size + fg_lineage_size < self.payload_constraint: + accepted_none_fg_lineages.append(fg_lineage) + total_lineage_size += fg_lineage_size + else: + first_skip_done = True + break + + # Log aggregate warnings instead of per-item warnings + if first_skip_done: + upstreams_skipped_count = len(upstream_items_with_sizes) - len( + accepted_upstreams + ) + dataset_fg_skipped_count = len(dataset_fg_items_with_sizes) - len( + accepted_dataset_fg_lineages + ) + field_set_fg_skipped_count = len(field_set_fg_items_with_sizes) - len( + accepted_field_set_fg_lineages + ) + none_fg_skipped_count = len(none_fg_items_with_sizes) - len( + accepted_none_fg_lineages + ) + + self._maybe_warn_upstream_lineage( + entity_urn, upstreams_skipped_count, "upstream datasets" + ) + self._maybe_warn_upstream_lineage( + entity_urn, + dataset_fg_skipped_count, + "dataset-level fine-grained lineages", + ) + self._maybe_warn_upstream_lineage( + entity_urn, + field_set_fg_skipped_count, + "field-set-level fine-grained lineages", + ) + self._maybe_warn_upstream_lineage( + entity_urn, none_fg_skipped_count, "none-level fine-grained lineages" + ) + + # Combine all accepted fine-grained lineages + accepted_fine_grained_lineages = ( + accepted_dataset_fg_lineages + + accepted_field_set_fg_lineages + + accepted_none_fg_lineages + ) + + upstream_lineage.upstreams = accepted_upstreams + upstream_lineage.fineGrainedLineages = ( + accepted_fine_grained_lineages if accepted_fine_grained_lineages else None + ) + + def ensure_query_properties_size( + self, entity_urn: str, query_properties: QueryPropertiesClass + ) -> None: + """ + Ensure query properties aspect does not exceed allowed size by truncating the query statement value. + Uses a configurable max payload size that is the minimum between QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES + and INGEST_MAX_PAYLOAD_BYTES. + + We have found surprisingly large query statements (e.g. 20MB+) that caused ingestion to fail; + that was INSERT INTO VALUES with huge list of values. + """ + if not query_properties.statement or not query_properties.statement.value: + return + + max_payload_size = min( + QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES, self.payload_constraint + ) + + current_size = len(json.dumps(pre_json_transform(query_properties.to_obj()))) + + if current_size < max_payload_size: + return + + reduction_needed = ( + current_size - max_payload_size + QUERY_STATEMENT_TRUNCATION_BUFFER + ) + + statement_value_size = len(query_properties.statement.value) + original_statement_size = statement_value_size + + # Only truncate if reduction is actually needed and possible + if statement_value_size > reduction_needed > 0: + new_statement_length = statement_value_size - reduction_needed + truncated_statement = query_properties.statement.value[ + :new_statement_length + ] + + truncation_message = f"... [original value was {original_statement_size} bytes and truncated to {new_statement_length} bytes]" + query_properties.statement.value = truncated_statement + truncation_message + + self.report.warning( + title="Query properties truncated due to size constraint", + message="Query properties contained too much data and would have caused ingestion to fail", + context=f"Query statement was truncated from {original_statement_size} to {new_statement_length} characters for {entity_urn} due to aspect size constraints", + ) + else: + logger.warning( + f"Cannot truncate query statement for {entity_urn} as it is smaller than or equal to the required reduction size {reduction_needed}. That means that 'ensure_query_properties_size' must be extended to trim other fields different than statement." + ) + def ensure_aspect_size( self, stream: Iterable[MetadataWorkUnit], @@ -96,4 +386,10 @@ class EnsureAspectSizeProcessor: self.ensure_schema_metadata_size(wu.get_urn(), schema) elif profile := wu.get_aspect_of_type(DatasetProfileClass): self.ensure_dataset_profile_size(wu.get_urn(), profile) + elif query_subjects := wu.get_aspect_of_type(QuerySubjectsClass): + self.ensure_query_subjects_size(wu.get_urn(), query_subjects) + elif upstream_lineage := wu.get_aspect_of_type(UpstreamLineageClass): + self.ensure_upstream_lineage_size(wu.get_urn(), upstream_lineage) + elif query_properties := wu.get_aspect_of_type(QueryPropertiesClass): + self.ensure_query_properties_size(wu.get_urn(), query_properties) yield wu diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py b/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py index 8a45efb468..652f46f49a 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_ensure_aspect_size.py @@ -15,20 +15,33 @@ from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( + AuditStampClass, ChangeTypeClass, DatasetFieldProfileClass, + DatasetLineageTypeClass, DatasetProfileClass, DatasetSnapshotClass, + FineGrainedLineageClass, + FineGrainedLineageDownstreamTypeClass, + FineGrainedLineageUpstreamTypeClass, GenericAspectClass, MetadataChangeProposalClass, NumberTypeClass, OtherSchemaClass, + QueryLanguageClass, + QueryPropertiesClass, + QuerySourceClass, + QueryStatementClass, + QuerySubjectClass, + QuerySubjectsClass, SchemaFieldClass, SchemaFieldDataTypeClass, SchemaMetadataClass, StatusClass, StringTypeClass, SubTypesClass, + UpstreamClass, + UpstreamLineageClass, ) @@ -112,6 +125,192 @@ def proper_schema_metadata() -> SchemaMetadataClass: ) +def proper_query_subjects() -> QuerySubjectsClass: + subjects = [ + QuerySubjectClass( + entity="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table1,PROD)" + ), + QuerySubjectClass( + entity="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table2,PROD)" + ), + QuerySubjectClass( + entity="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.table1,PROD),col1)" + ), + QuerySubjectClass( + entity="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.table2,PROD),col2)" + ), + ] + return QuerySubjectsClass(subjects=subjects) + + +def too_big_query_subjects() -> QuerySubjectsClass: + subjects = [] + + # Add a few table-level subjects + for i in range(5): + subjects.append( + QuerySubjectClass( + entity=f"urn:li:dataset:(urn:li:dataPlatform:hive,db.table{i},PROD)" + ) + ) + + # Add many column-level subjects with very large entity URNs to exceed the 15MB constraint + # Each URN will be about 40KB, so 500 subjects should create ~20MB of data + for i in range(500): + large_table_name = "a" * 20000 # Very large table name + large_column_name = "b" * 20000 # Very large column name + subjects.append( + QuerySubjectClass( + entity=f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,{large_table_name}_{i},PROD),{large_column_name}_{i})" + ) + ) + + return QuerySubjectsClass(subjects=subjects) + + +def proper_upstream_lineage() -> UpstreamLineageClass: + upstreams = [ + UpstreamClass( + dataset="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table1,PROD)", + type=DatasetLineageTypeClass.TRANSFORMED, + ), + UpstreamClass( + dataset="urn:li:dataset:(urn:li:dataPlatform:hive,db1.table2,PROD)", + type=DatasetLineageTypeClass.TRANSFORMED, + ), + ] + fine_grained_lineages = [ + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=["urn:li:dataset:(urn:li:dataPlatform:hive,db1.table3,PROD)"], + downstreams=[ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.target,PROD),col1)" + ], + ), + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=[ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.table4,PROD),col2)" + ], + downstreams=[ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.target,PROD),col2)" + ], + ), + ] + return UpstreamLineageClass( + upstreams=upstreams, fineGrainedLineages=fine_grained_lineages + ) + + +def too_big_upstream_lineage() -> UpstreamLineageClass: + upstreams = [] + fine_grained_lineages = [] + + # Add upstreams (highest priority) + for i in range(5): + upstreams.append( + UpstreamClass( + dataset=f"urn:li:dataset:(urn:li:dataPlatform:hive,upstream_table_{i},PROD)", + type=DatasetLineageTypeClass.TRANSFORMED, + ) + ) + + # Add DATASET fine-grained lineages with large URNs + for i in range(200): + large_dataset_name = "a" * 20000 + large_downstream_name = "b" * 20000 + fine_grained_lineages.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=[ + f"urn:li:dataset:(urn:li:dataPlatform:hive,{large_dataset_name}_{i},PROD)" + ], + downstreams=[ + f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,target,PROD),{large_downstream_name}_{i})" + ], + ) + ) + + # Add FIELD_SET fine-grained lineages with large URNs + for i in range(200): + large_upstream_name = "c" * 20000 + large_downstream_name = "d" * 20000 + fine_grained_lineages.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=[ + f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,source,PROD),{large_upstream_name}_{i})" + ], + downstreams=[ + f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,target,PROD),{large_downstream_name}_{i})" + ], + ) + ) + + # Add NONE fine-grained lineages with large URNs (lowest priority) + for i in range(200): + large_downstream_name = "e" * 20000 + fine_grained_lineages.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.NONE, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + downstreams=[ + f"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,target,PROD),{large_downstream_name}_{i})" + ], + ) + ) + + return UpstreamLineageClass( + upstreams=upstreams, fineGrainedLineages=fine_grained_lineages + ) + + +def proper_query_properties() -> QueryPropertiesClass: + # Create a query properties with a reasonably sized statement (~1KB) + query_statement = ( + "SELECT * FROM table1 WHERE column1 = 'value' AND column2 > 100;" * 20 + ) + + return QueryPropertiesClass( + statement=QueryStatementClass( + value=query_statement, + language=QueryLanguageClass.SQL, + ), + source=QuerySourceClass.SYSTEM, + created=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"), + lastModified=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"), + ) + + +def too_big_query_properties() -> QueryPropertiesClass: + # Create a query properties with a very large statement (~6MB, exceeding the 5MB default limit) + # This is larger than the QUERY_PROPERTIES_STATEMENT_MAX_PAYLOAD_BYTES default (5MB) + large_query_statement = ( + "SELECT col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, " + "col11, col12, col13, col14, col15, col16, col17, col18, col19, col20 " + "FROM very_long_table_name_with_lots_of_characters_to_make_it_big " + "WHERE condition1 = 'some_very_long_value_with_lots_of_text' " + "AND condition2 IN ('value1', 'value2', 'value3', 'value4') " + "ORDER BY col1, col2, col3, col4, col5 LIMIT 1000;" + ) * 15000 # ~6MB + + return QueryPropertiesClass( + statement=QueryStatementClass( + value=large_query_statement, + language=QueryLanguageClass.SQL, + ), + source=QuerySourceClass.SYSTEM, + created=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"), + lastModified=AuditStampClass(time=1000000000000, actor="urn:li:corpuser:test"), + name="Large Test Query", + description="A test query with a very large statement", + ) + + def proper_dataset_profile() -> DatasetProfileClass: sample_values = [ "23483295", @@ -344,3 +543,254 @@ def test_wu_processor_not_triggered_by_unhandled_aspects( ] ensure_schema_metadata_size_mock.assert_not_called() ensure_dataset_profile_size_mock.assert_not_called() + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_proper_query_subjects(processor): + query_subjects = proper_query_subjects() + orig_repr = json.dumps(query_subjects.to_obj()) + processor.ensure_query_subjects_size( + "urn:li:query:(urn:li:dataPlatform:hive, dummy_query, DEV)", query_subjects + ) + assert orig_repr == json.dumps(query_subjects.to_obj()), ( + "Aspect was modified in case where workunit processor should have been no-op" + ) + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_too_big_query_subjects(processor): + query_subjects = too_big_query_subjects() + assert len(query_subjects.subjects) == 505 # 5 table + 500 column subjects + + # Verify that the initial size exceeds the default payload constraint + initial_size = len(json.dumps(query_subjects.to_obj())) + expected_size = 20 * 1024 * 1024 # 20MB + assert initial_size == pytest.approx(expected_size, rel=0.05), ( + f"Initial size {initial_size} should be around 20MB (±5%), got {initial_size / (1024 * 1024):.1f}MB" + ) + assert initial_size > INGEST_MAX_PAYLOAD_BYTES, ( + f"Initial size {initial_size} should exceed payload constraint {INGEST_MAX_PAYLOAD_BYTES}" + ) + + processor.ensure_query_subjects_size( + "urn:li:query:(urn:li:dataPlatform:hive, dummy_query, DEV)", query_subjects + ) + + # Should be significantly reduced due to size constraints + # With ~20MB of data needing to be reduced to ~15MB, we expect ~25% reduction (125 subjects) + # So final count should be around 380, using 400 as upper bound with buffer + assert len(query_subjects.subjects) < 400, ( + "Query subjects has not been properly truncated" + ) + + # Check that table-level subjects are prioritized (should still be present) + table_subjects = [ + s + for s in query_subjects.subjects + if not s.entity.startswith("urn:li:schemaField:") + ] + assert len(table_subjects) > 0, ( + "Table-level subjects should be prioritized and present" + ) + + # The aspect should not exceed acceptable size + assert len(json.dumps(query_subjects.to_obj())) < INGEST_MAX_PAYLOAD_BYTES, ( + "Aspect exceeded acceptable size" + ) + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_query_subjects_size" +) +def test_wu_processor_triggered_by_query_subjects_aspect( + ensure_query_subjects_size_mock, processor +): + ret = [ # noqa: F841 + *processor.ensure_aspect_size( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:query:(urn:li:dataPlatform:hive, dummy_query, DEV)", + aspect=proper_query_subjects(), + ).as_workunit() + ] + ) + ] + ensure_query_subjects_size_mock.assert_called_once() + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_proper_upstream_lineage(processor): + upstream_lineage = proper_upstream_lineage() + orig_repr = json.dumps(upstream_lineage.to_obj()) + processor.ensure_upstream_lineage_size( + "urn:li:dataset:(urn:li:dataPlatform:hive, dummy_dataset, DEV)", + upstream_lineage, + ) + assert orig_repr == json.dumps(upstream_lineage.to_obj()), ( + "Aspect was modified in case where workunit processor should have been no-op" + ) + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_too_big_upstream_lineage(processor): + upstream_lineage = too_big_upstream_lineage() + assert len(upstream_lineage.upstreams) == 5 # 5 upstreams + assert upstream_lineage.fineGrainedLineages is not None + assert ( + len(upstream_lineage.fineGrainedLineages) == 600 + ) # 200 DATASET + 200 FIELD_SET + 200 NONE + + # Verify that the initial size exceeds the default payload constraint + initial_size = len(json.dumps(upstream_lineage.to_obj())) + expected_size = 20 * 1024 * 1024 # 20MB + assert initial_size == pytest.approx(expected_size, rel=0.05), ( + f"Initial size {initial_size} should be around 20MB (±5%), got {initial_size / (1024 * 1024):.1f}MB" + ) + assert initial_size > INGEST_MAX_PAYLOAD_BYTES, ( + f"Initial size {initial_size} should exceed payload constraint {INGEST_MAX_PAYLOAD_BYTES}" + ) + + processor.ensure_upstream_lineage_size( + "urn:li:dataset:(urn:li:dataPlatform:hive, dummy_dataset, DEV)", + upstream_lineage, + ) + + # Should be significantly reduced due to size constraints + # With ~20MB of data needing to be reduced to ~15MB, we expect ~25% reduction + # Total items: 5 upstreams + 600 fine-grained = 605, expect around ~450 after 25% reduction + total_items = len(upstream_lineage.upstreams) + ( + len(upstream_lineage.fineGrainedLineages) + if upstream_lineage.fineGrainedLineages + else 0 + ) + assert total_items < 500, "Upstream lineage has not been properly truncated" + + # Check that upstreams are prioritized (should still be present) + assert len(upstream_lineage.upstreams) > 0, ( + "Upstreams should be prioritized and present" + ) + + # Check that DATASET fine-grained lineages are prioritized over FIELD_SET and NONE + if upstream_lineage.fineGrainedLineages: + dataset_count = sum( + 1 + for fg in upstream_lineage.fineGrainedLineages + if str(fg.upstreamType) == "DATASET" + ) + field_set_count = sum( + 1 + for fg in upstream_lineage.fineGrainedLineages + if str(fg.upstreamType) == "FIELD_SET" + ) + none_count = sum( + 1 + for fg in upstream_lineage.fineGrainedLineages + if str(fg.upstreamType) == "NONE" + ) + + # DATASET should be prioritized over FIELD_SET and NONE + assert dataset_count >= field_set_count, ( + "DATASET fine-grained lineages should be prioritized" + ) + assert dataset_count >= none_count, ( + "DATASET fine-grained lineages should be prioritized over NONE" + ) + + # The aspect should not exceed acceptable size + assert len(json.dumps(upstream_lineage.to_obj())) < INGEST_MAX_PAYLOAD_BYTES, ( + "Aspect exceeded acceptable size" + ) + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_upstream_lineage_size" +) +def test_wu_processor_triggered_by_upstream_lineage_aspect( + ensure_upstream_lineage_size_mock, processor +): + ret = [ # noqa: F841 + *processor.ensure_aspect_size( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive, dummy_dataset, DEV)", + aspect=proper_upstream_lineage(), + ).as_workunit() + ] + ) + ] + ensure_upstream_lineage_size_mock.assert_called_once() + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_proper_query_properties(processor): + query_properties = proper_query_properties() + original_statement = query_properties.statement.value + + # Verify initial size is reasonable (under 5MB) + initial_size = len(json.dumps(query_properties.to_obj())) + assert initial_size < 5 * 1024 * 1024, "Test query properties should be under 5MB" + + processor.ensure_query_properties_size("urn:li:query:test", query_properties) + + # Statement should remain unchanged for properly sized query properties + assert query_properties.statement.value == original_statement + assert len(processor.report.warnings) == 0 + + +@freeze_time("2023-01-02 00:00:00") +def test_ensure_size_of_too_big_query_properties(processor): + query_properties = too_big_query_properties() + original_statement_size = len(query_properties.statement.value) + + # Verify initial size exceeds it's about 5.5MB limit and definitely larger than 5MB + initial_size = len(json.dumps(query_properties.to_obj())) + expected_initial_size = 5.5 * 1024 * 1024 # ~5.5MB + assert initial_size == pytest.approx(expected_initial_size, rel=0.1), ( + f"Expected initial size ~{expected_initial_size}, got {initial_size}" + ) + assert initial_size > 5 * 1024 * 1024, "Test data should exceed 5MB limit" + + processor.ensure_query_properties_size("urn:li:query:test", query_properties) + + # Statement should be truncated + assert len(query_properties.statement.value) < original_statement_size + + # Should contain truncation message + assert "... [original value was" in query_properties.statement.value + assert ( + f"{original_statement_size} bytes and truncated to" + in query_properties.statement.value + ) + assert query_properties.statement.value.endswith(" bytes]") + + # Final size should be within constraints, ie <= 5MB + buffer + final_size = len(json.dumps(query_properties.to_obj())) + expected_final_size = 5 * 1024 * 1024 + 100 # 5MB + buffer + assert final_size <= expected_final_size, ( + f"Final size {final_size} should be <= {expected_final_size}" + ) + + # Should have logged a warning + assert len(processor.report.warnings) == 1 + + +@freeze_time("2023-01-02 00:00:00") +@patch( + "datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size.EnsureAspectSizeProcessor.ensure_query_properties_size" +) +def test_wu_processor_triggered_by_query_properties_aspect( + ensure_query_properties_size_mock, processor +): + list( + processor.ensure_aspect_size( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:query:test", + aspect=proper_query_properties(), + ).as_workunit() + ] + ) + ) + ensure_query_properties_size_mock.assert_called_once()