chore(ingest): remove calls to deprecated methods (#13009)

This commit is contained in:
Harshal Sheth 2025-03-28 13:42:54 -07:00 committed by GitHub
parent d3ef859802
commit 5bc8a895f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 12 additions and 168 deletions

View File

@ -12,11 +12,9 @@ from typing import (
Optional,
Tuple,
TypeVar,
Union,
)
import pydantic
from deprecated import deprecated
from pydantic.fields import Field
import datahub.emitter.mce_builder as builder
@ -28,19 +26,13 @@ from datahub.configuration.time_window_config import (
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetUsageStatistics
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
DatasetFieldUsageCountsClass,
DatasetUsageStatisticsClass,
DatasetUserUsageCountsClass,
TimeWindowSizeClass,
UsageAggregationClass,
WindowDurationClass,
)
from datahub.utilities.sql_formatter import format_sql_query, trim_query
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import guess_entity_type
logger = logging.getLogger(__name__)
@ -295,60 +287,3 @@ class UsageAggregator(Generic[ResourceType]):
user_urn_builder=user_urn_builder,
queries_character_limit=self.config.queries_character_limit,
)
@deprecated
def convert_usage_aggregation_class(
obj: UsageAggregationClass,
) -> MetadataChangeProposalWrapper:
# Legacy usage aggregation only supported dataset usage stats
if guess_entity_type(obj.resource) == DatasetUrn.ENTITY_TYPE:
aspect = DatasetUsageStatistics(
timestampMillis=obj.bucket,
eventGranularity=TimeWindowSizeClass(
unit=convert_window_to_interval(obj.duration)
),
uniqueUserCount=obj.metrics.uniqueUserCount,
totalSqlQueries=obj.metrics.totalSqlQueries,
topSqlQueries=obj.metrics.topSqlQueries,
userCounts=(
[
DatasetUserUsageCountsClass(
user=u.user, count=u.count, userEmail=u.userEmail
)
for u in obj.metrics.users
if u.user is not None
]
if obj.metrics.users
else None
),
fieldCounts=(
[
DatasetFieldUsageCountsClass(fieldPath=f.fieldName, count=f.count)
for f in obj.metrics.fields
]
if obj.metrics.fields
else None
),
)
return MetadataChangeProposalWrapper(entityUrn=obj.resource, aspect=aspect)
else:
raise Exception(
f"Skipping unsupported usage aggregation - invalid entity type: {obj}"
)
@deprecated
def convert_window_to_interval(window: Union[str, WindowDurationClass]) -> str:
if window == WindowDurationClass.YEAR:
return CalendarIntervalClass.YEAR
elif window == WindowDurationClass.MONTH:
return CalendarIntervalClass.MONTH
elif window == WindowDurationClass.WEEK:
return CalendarIntervalClass.WEEK
elif window == WindowDurationClass.DAY:
return CalendarIntervalClass.DAY
elif window == WindowDurationClass.HOUR:
return CalendarIntervalClass.HOUR
else:
raise Exception(f"Unsupported window duration: {window}")

View File

@ -324,7 +324,7 @@ def assert_entity_mce_aspect(
) -> int:
# TODO: Replace with read_metadata_file()
test_output = load_json_file(file)
entity_type = Urn.from_string(entity_urn).get_type()
entity_type = Urn.from_string(entity_urn).entity_type
assert isinstance(test_output, list)
# mce urns
mces: List[MetadataChangeEventClass] = [
@ -347,7 +347,7 @@ def assert_entity_mcp_aspect(
) -> int:
# TODO: Replace with read_metadata_file()
test_output = load_json_file(file)
entity_type = Urn.from_string(entity_urn).get_type()
entity_type = Urn.from_string(entity_urn).entity_type
assert isinstance(test_output, list)
# mcps that match entity_urn
mcps: List[MetadataChangeProposalWrapper] = [

View File

@ -44,16 +44,12 @@ def make_mce_datajob(
)
def make_mcpw(
def make_status_mcpw(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspect_name: str = "status",
aspect: Any = StatusClass(removed=False),
) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.from_string(entity_urn).get_type(),
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)
@ -65,7 +61,7 @@ def make_mcpc(
) -> MetadataChangeProposalClass:
return MetadataChangeProposalClass(
entityUrn=entity_urn,
entityType=Urn.from_string(entity_urn).get_type(),
entityType=Urn.from_string(entity_urn).entity_type,
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
@ -127,8 +123,8 @@ class TestDummyGenericAspectTransformer(unittest.TestCase):
assert inputs[2] == outputs[3].record
def test_add_generic_aspect_when_mcpw_received(self):
mcpw_dataset = make_mcpw()
mcpw_datajob = make_mcpw(
mcpw_dataset = make_status_mcpw()
mcpw_datajob = make_status_mcpw(
entity_urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)"
)
inputs = [mcpw_dataset, mcpw_datajob, EndOfStream()]

View File

@ -117,14 +117,10 @@ def make_generic_dataset(
def make_generic_dataset_mcp(
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspect_name: str = "status",
aspect: Any = models.StatusClass(removed=False),
) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.from_string(entity_urn).get_type(),
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)
@ -138,7 +134,7 @@ def make_generic_container_mcp(
aspect = models.StatusClass(removed=False)
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.from_string(entity_urn).get_type(),
entityType=Urn.from_string(entity_urn).entity_type,
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
@ -497,7 +493,6 @@ def test_mark_status_dataset(tmp_path):
assert status_aspect.removed is False
mcp = make_generic_dataset_mcp(
aspect_name="datasetProperties",
aspect=DatasetPropertiesClass(description="Test dataset"),
)
events_file = create_and_run_test_pipeline(
@ -597,7 +592,6 @@ def test_mark_status_dataset(tmp_path):
events=[
make_generic_dataset(aspects=[test_status_aspect]),
make_generic_dataset_mcp(
aspect_name="datasetProperties",
aspect=DatasetPropertiesClass(description="test dataset"),
),
],
@ -633,7 +627,7 @@ def test_mark_status_dataset(tmp_path):
events_file = create_and_run_test_pipeline(
events=[
make_generic_dataset(aspects=[test_dataset_props_aspect]),
make_generic_dataset_mcp(aspect_name="globalTags", aspect=test_mcp_aspect),
make_generic_dataset_mcp(aspect=test_mcp_aspect),
],
transformers=[{"type": "mark_dataset_status", "config": {"removed": True}}],
path=tmp_path,
@ -1834,7 +1828,6 @@ def test_mcp_add_tags_missing(mock_time):
def test_mcp_add_tags_existing(mock_time):
dataset_mcp = make_generic_dataset_mcp(
aspect_name="globalTags",
aspect=GlobalTagsClass(
tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))]
),
@ -2074,7 +2067,6 @@ class SuppressingTransformer(BaseTransformer, SingleAspectTransformer):
def test_supression_works():
dataset_mce = make_generic_dataset()
dataset_mcp = make_generic_dataset_mcp(
aspect_name="datasetProperties",
aspect=DatasetPropertiesClass(description="supressable description"),
)
transformer = SuppressingTransformer.create(
@ -2305,9 +2297,8 @@ def run_dataset_transformer_pipeline(
)
else:
assert aspect
dataset = make_generic_dataset_mcp(
aspect=aspect, aspect_name=transformer.aspect_name()
)
assert transformer.aspect_name() == aspect.ASPECT_NAME
dataset = make_generic_dataset_mcp(aspect=aspect)
outputs = list(
transformer.transform(
@ -2910,7 +2901,7 @@ def test_pattern_container_and_dataset_domain_transformation(
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
with_domain_aspect = make_generic_dataset_mcp(
aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains"
aspect=models.DomainsClass(domains=[datahub_domain])
)
no_domain_aspect = make_generic_dataset_mcp(
entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)"
@ -3008,7 +2999,7 @@ def test_pattern_container_and_dataset_domain_transformation_with_no_container(
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore
with_domain_aspect = make_generic_dataset_mcp(
aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains"
aspect=models.DomainsClass(domains=[datahub_domain])
)
no_domain_aspect = make_generic_dataset_mcp(
entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)"

View File

@ -1,8 +1,6 @@
import time
from datetime import datetime
import pytest
from freezegun import freeze_time
from pydantic import ValidationError
import datahub.ingestion.source.usage.usage_common
@ -15,19 +13,9 @@ from datahub.ingestion.source.usage.usage_common import (
DEFAULT_QUERIES_CHARACTER_LIMIT,
BaseUsageConfig,
GenericAggregatedDataset,
convert_usage_aggregation_class,
)
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
DatasetFieldUsageCountsClass,
DatasetUsageStatisticsClass,
DatasetUserUsageCountsClass,
FieldUsageCountsClass,
TimeWindowSizeClass,
UsageAggregationClass,
UsageAggregationMetricsClass,
UserUsageCountsClass,
WindowDurationClass,
)
from datahub.testing.doctest import assert_doctest
@ -311,71 +299,5 @@ def test_make_usage_workunit_include_top_n_queries():
assert du.topSqlQueries is None
@freeze_time("2023-01-01 00:00:00")
def test_convert_usage_aggregation_class():
urn = make_dataset_urn_with_platform_instance(
"platform", "test_db.test_schema.test_table", None
)
usage_aggregation = UsageAggregationClass(
bucket=int(time.time() * 1000),
duration=WindowDurationClass.DAY,
resource=urn,
metrics=UsageAggregationMetricsClass(
uniqueUserCount=5,
users=[
UserUsageCountsClass(count=3, user="abc", userEmail="abc@acryl.io"),
UserUsageCountsClass(count=2),
UserUsageCountsClass(count=1, user="def"),
],
totalSqlQueries=10,
topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"],
fields=[FieldUsageCountsClass("col", 7), FieldUsageCountsClass("col2", 0)],
),
)
assert convert_usage_aggregation_class(
usage_aggregation
) == MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DatasetUsageStatisticsClass(
timestampMillis=int(time.time() * 1000),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=5,
totalSqlQueries=10,
topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"],
userCounts=[
DatasetUserUsageCountsClass(
user="abc", count=3, userEmail="abc@acryl.io"
),
DatasetUserUsageCountsClass(user="def", count=1),
],
fieldCounts=[
DatasetFieldUsageCountsClass(fieldPath="col", count=7),
DatasetFieldUsageCountsClass(fieldPath="col2", count=0),
],
),
)
empty_urn = make_dataset_urn_with_platform_instance(
"platform",
"test_db.test_schema.empty_table",
None,
)
empty_usage_aggregation = UsageAggregationClass(
bucket=int(time.time() * 1000) - 1000 * 60 * 60 * 24,
duration=WindowDurationClass.MONTH,
resource=empty_urn,
metrics=UsageAggregationMetricsClass(),
)
assert convert_usage_aggregation_class(
empty_usage_aggregation
) == MetadataChangeProposalWrapper(
entityUrn=empty_urn,
aspect=DatasetUsageStatisticsClass(
timestampMillis=int(time.time() * 1000) - 1000 * 60 * 60 * 24,
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.MONTH),
),
)
def test_extract_user_email():
assert_doctest(datahub.ingestion.source.usage.usage_common)