From 5bc8a895f90e7f9a86ccf624d41ef815fdd60c1f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 28 Mar 2025 13:42:54 -0700 Subject: [PATCH] chore(ingest): remove calls to deprecated methods (#13009) --- .../ingestion/source/usage/usage_common.py | 65 ---------------- .../tests/test_helpers/mce_helpers.py | 4 +- .../unit/test_generic_aspect_transformer.py | 12 +-- .../tests/unit/test_transform_dataset.py | 21 ++--- .../tests/unit/test_usage_common.py | 78 ------------------- 5 files changed, 12 insertions(+), 168 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py index 73e7e415e2..02ddc1735c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py @@ -12,11 +12,9 @@ from typing import ( Optional, Tuple, TypeVar, - Union, ) import pydantic -from deprecated import deprecated from pydantic.fields import Field import datahub.emitter.mce_builder as builder @@ -28,19 +26,13 @@ from datahub.configuration.time_window_config import ( ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetUsageStatistics from datahub.metadata.schema_classes import ( - CalendarIntervalClass, DatasetFieldUsageCountsClass, DatasetUsageStatisticsClass, DatasetUserUsageCountsClass, TimeWindowSizeClass, - UsageAggregationClass, - WindowDurationClass, ) from datahub.utilities.sql_formatter import format_sql_query, trim_query -from datahub.utilities.urns.dataset_urn import DatasetUrn -from datahub.utilities.urns.urn import guess_entity_type logger = logging.getLogger(__name__) @@ -295,60 +287,3 @@ class UsageAggregator(Generic[ResourceType]): user_urn_builder=user_urn_builder, queries_character_limit=self.config.queries_character_limit, ) - - -@deprecated -def convert_usage_aggregation_class( - obj: UsageAggregationClass, -) -> MetadataChangeProposalWrapper: - # Legacy usage aggregation only supported dataset usage stats - if guess_entity_type(obj.resource) == DatasetUrn.ENTITY_TYPE: - aspect = DatasetUsageStatistics( - timestampMillis=obj.bucket, - eventGranularity=TimeWindowSizeClass( - unit=convert_window_to_interval(obj.duration) - ), - uniqueUserCount=obj.metrics.uniqueUserCount, - totalSqlQueries=obj.metrics.totalSqlQueries, - topSqlQueries=obj.metrics.topSqlQueries, - userCounts=( - [ - DatasetUserUsageCountsClass( - user=u.user, count=u.count, userEmail=u.userEmail - ) - for u in obj.metrics.users - if u.user is not None - ] - if obj.metrics.users - else None - ), - fieldCounts=( - [ - DatasetFieldUsageCountsClass(fieldPath=f.fieldName, count=f.count) - for f in obj.metrics.fields - ] - if obj.metrics.fields - else None - ), - ) - return MetadataChangeProposalWrapper(entityUrn=obj.resource, aspect=aspect) - else: - raise Exception( - f"Skipping unsupported usage aggregation - invalid entity type: {obj}" - ) - - -@deprecated -def convert_window_to_interval(window: Union[str, WindowDurationClass]) -> str: - if window == WindowDurationClass.YEAR: - return CalendarIntervalClass.YEAR - elif window == WindowDurationClass.MONTH: - return CalendarIntervalClass.MONTH - elif window == WindowDurationClass.WEEK: - return CalendarIntervalClass.WEEK - elif window == WindowDurationClass.DAY: - return CalendarIntervalClass.DAY - elif window == WindowDurationClass.HOUR: - return CalendarIntervalClass.HOUR - else: - raise Exception(f"Unsupported window duration: {window}") diff --git a/metadata-ingestion/tests/test_helpers/mce_helpers.py b/metadata-ingestion/tests/test_helpers/mce_helpers.py index 6082c1bb25..a2d2eaa782 100644 --- a/metadata-ingestion/tests/test_helpers/mce_helpers.py +++ b/metadata-ingestion/tests/test_helpers/mce_helpers.py @@ -324,7 +324,7 @@ def assert_entity_mce_aspect( ) -> int: # TODO: Replace with read_metadata_file() test_output = load_json_file(file) - entity_type = Urn.from_string(entity_urn).get_type() + entity_type = Urn.from_string(entity_urn).entity_type assert isinstance(test_output, list) # mce urns mces: List[MetadataChangeEventClass] = [ @@ -347,7 +347,7 @@ def assert_entity_mcp_aspect( ) -> int: # TODO: Replace with read_metadata_file() test_output = load_json_file(file) - entity_type = Urn.from_string(entity_urn).get_type() + entity_type = Urn.from_string(entity_urn).entity_type assert isinstance(test_output, list) # mcps that match entity_urn mcps: List[MetadataChangeProposalWrapper] = [ diff --git a/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py b/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py index 52d7aa7f50..93dd421532 100644 --- a/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py +++ b/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py @@ -44,16 +44,12 @@ def make_mce_datajob( ) -def make_mcpw( +def make_status_mcpw( entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", - aspect_name: str = "status", aspect: Any = StatusClass(removed=False), ) -> MetadataChangeProposalWrapper: return MetadataChangeProposalWrapper( entityUrn=entity_urn, - entityType=Urn.from_string(entity_urn).get_type(), - aspectName=aspect_name, - changeType="UPSERT", aspect=aspect, ) @@ -65,7 +61,7 @@ def make_mcpc( ) -> MetadataChangeProposalClass: return MetadataChangeProposalClass( entityUrn=entity_urn, - entityType=Urn.from_string(entity_urn).get_type(), + entityType=Urn.from_string(entity_urn).entity_type, aspectName=aspect_name, changeType="UPSERT", aspect=aspect, @@ -127,8 +123,8 @@ class TestDummyGenericAspectTransformer(unittest.TestCase): assert inputs[2] == outputs[3].record def test_add_generic_aspect_when_mcpw_received(self): - mcpw_dataset = make_mcpw() - mcpw_datajob = make_mcpw( + mcpw_dataset = make_status_mcpw() + mcpw_datajob = make_status_mcpw( entity_urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)" ) inputs = [mcpw_dataset, mcpw_datajob, EndOfStream()] diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 290e57b46f..50a7340d4e 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -117,14 +117,10 @@ def make_generic_dataset( def make_generic_dataset_mcp( entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", - aspect_name: str = "status", aspect: Any = models.StatusClass(removed=False), ) -> MetadataChangeProposalWrapper: return MetadataChangeProposalWrapper( entityUrn=entity_urn, - entityType=Urn.from_string(entity_urn).get_type(), - aspectName=aspect_name, - changeType="UPSERT", aspect=aspect, ) @@ -138,7 +134,7 @@ def make_generic_container_mcp( aspect = models.StatusClass(removed=False) return MetadataChangeProposalWrapper( entityUrn=entity_urn, - entityType=Urn.from_string(entity_urn).get_type(), + entityType=Urn.from_string(entity_urn).entity_type, aspectName=aspect_name, changeType="UPSERT", aspect=aspect, @@ -497,7 +493,6 @@ def test_mark_status_dataset(tmp_path): assert status_aspect.removed is False mcp = make_generic_dataset_mcp( - aspect_name="datasetProperties", aspect=DatasetPropertiesClass(description="Test dataset"), ) events_file = create_and_run_test_pipeline( @@ -597,7 +592,6 @@ def test_mark_status_dataset(tmp_path): events=[ make_generic_dataset(aspects=[test_status_aspect]), make_generic_dataset_mcp( - aspect_name="datasetProperties", aspect=DatasetPropertiesClass(description="test dataset"), ), ], @@ -633,7 +627,7 @@ def test_mark_status_dataset(tmp_path): events_file = create_and_run_test_pipeline( events=[ make_generic_dataset(aspects=[test_dataset_props_aspect]), - make_generic_dataset_mcp(aspect_name="globalTags", aspect=test_mcp_aspect), + make_generic_dataset_mcp(aspect=test_mcp_aspect), ], transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}], path=tmp_path, @@ -1834,7 +1828,6 @@ def test_mcp_add_tags_missing(mock_time): def test_mcp_add_tags_existing(mock_time): dataset_mcp = make_generic_dataset_mcp( - aspect_name="globalTags", aspect=GlobalTagsClass( tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))] ), @@ -2074,7 +2067,6 @@ class SuppressingTransformer(BaseTransformer, SingleAspectTransformer): def test_supression_works(): dataset_mce = make_generic_dataset() dataset_mcp = make_generic_dataset_mcp( - aspect_name="datasetProperties", aspect=DatasetPropertiesClass(description="supressable description"), ) transformer = SuppressingTransformer.create( @@ -2305,9 +2297,8 @@ def run_dataset_transformer_pipeline( ) else: assert aspect - dataset = make_generic_dataset_mcp( - aspect=aspect, aspect_name=transformer.aspect_name() - ) + assert transformer.aspect_name() == aspect.ASPECT_NAME + dataset = make_generic_dataset_mcp(aspect=aspect) outputs = list( transformer.transform( @@ -2910,7 +2901,7 @@ def test_pattern_container_and_dataset_domain_transformation( pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore with_domain_aspect = make_generic_dataset_mcp( - aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains" + aspect=models.DomainsClass(domains=[datahub_domain]) ) no_domain_aspect = make_generic_dataset_mcp( entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" @@ -3008,7 +2999,7 @@ def test_pattern_container_and_dataset_domain_transformation_with_no_container( pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore with_domain_aspect = make_generic_dataset_mcp( - aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains" + aspect=models.DomainsClass(domains=[datahub_domain]) ) no_domain_aspect = make_generic_dataset_mcp( entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" diff --git a/metadata-ingestion/tests/unit/test_usage_common.py b/metadata-ingestion/tests/unit/test_usage_common.py index bd6d194835..1cef2e0a8e 100644 --- a/metadata-ingestion/tests/unit/test_usage_common.py +++ b/metadata-ingestion/tests/unit/test_usage_common.py @@ -1,8 +1,6 @@ -import time from datetime import datetime import pytest -from freezegun import freeze_time from pydantic import ValidationError import datahub.ingestion.source.usage.usage_common @@ -15,19 +13,9 @@ from datahub.ingestion.source.usage.usage_common import ( DEFAULT_QUERIES_CHARACTER_LIMIT, BaseUsageConfig, GenericAggregatedDataset, - convert_usage_aggregation_class, ) from datahub.metadata.schema_classes import ( - CalendarIntervalClass, - DatasetFieldUsageCountsClass, DatasetUsageStatisticsClass, - DatasetUserUsageCountsClass, - FieldUsageCountsClass, - TimeWindowSizeClass, - UsageAggregationClass, - UsageAggregationMetricsClass, - UserUsageCountsClass, - WindowDurationClass, ) from datahub.testing.doctest import assert_doctest @@ -311,71 +299,5 @@ def test_make_usage_workunit_include_top_n_queries(): assert du.topSqlQueries is None -@freeze_time("2023-01-01 00:00:00") -def test_convert_usage_aggregation_class(): - urn = make_dataset_urn_with_platform_instance( - "platform", "test_db.test_schema.test_table", None - ) - usage_aggregation = UsageAggregationClass( - bucket=int(time.time() * 1000), - duration=WindowDurationClass.DAY, - resource=urn, - metrics=UsageAggregationMetricsClass( - uniqueUserCount=5, - users=[ - UserUsageCountsClass(count=3, user="abc", userEmail="abc@acryl.io"), - UserUsageCountsClass(count=2), - UserUsageCountsClass(count=1, user="def"), - ], - totalSqlQueries=10, - topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"], - fields=[FieldUsageCountsClass("col", 7), FieldUsageCountsClass("col2", 0)], - ), - ) - assert convert_usage_aggregation_class( - usage_aggregation - ) == MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=DatasetUsageStatisticsClass( - timestampMillis=int(time.time() * 1000), - eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY), - uniqueUserCount=5, - totalSqlQueries=10, - topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"], - userCounts=[ - DatasetUserUsageCountsClass( - user="abc", count=3, userEmail="abc@acryl.io" - ), - DatasetUserUsageCountsClass(user="def", count=1), - ], - fieldCounts=[ - DatasetFieldUsageCountsClass(fieldPath="col", count=7), - DatasetFieldUsageCountsClass(fieldPath="col2", count=0), - ], - ), - ) - - empty_urn = make_dataset_urn_with_platform_instance( - "platform", - "test_db.test_schema.empty_table", - None, - ) - empty_usage_aggregation = UsageAggregationClass( - bucket=int(time.time() * 1000) - 1000 * 60 * 60 * 24, - duration=WindowDurationClass.MONTH, - resource=empty_urn, - metrics=UsageAggregationMetricsClass(), - ) - assert convert_usage_aggregation_class( - empty_usage_aggregation - ) == MetadataChangeProposalWrapper( - entityUrn=empty_urn, - aspect=DatasetUsageStatisticsClass( - timestampMillis=int(time.time() * 1000) - 1000 * 60 * 60 * 24, - eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.MONTH), - ), - ) - - def test_extract_user_email(): assert_doctest(datahub.ingestion.source.usage.usage_common)