mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 01:48:24 +00:00
refactor(ingest): Make get_workunits() return MetadataWorkUnits (#8051)
- Deprecates UsageAggregationClass, /usageStats?action=batchIngest, UsageStatsWorkUnit - Removes parsing of UsageAggregationClass in file source, all sinks, and WorkUnitRecordExtractor
This commit is contained in:
parent
25450ac82c
commit
7ba2d13087
@ -11,6 +11,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
|
||||
### Potential Downtime
|
||||
|
||||
### Deprecations
|
||||
- The signature of `Source.get_workunits()` is changed from `Iterable[WorkUnit]` to the more restrictive `Iterable[MetadataWorkUnit]`.
|
||||
- Legacy usage creation via the `UsageAggregation` aspect, `/usageStats?action=batchIngest` GMS endpoint, and `UsageStatsWorkUnit` metadata-ingestion class are all deprecated.
|
||||
|
||||
### Other notable Changes
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ from json.decoder import JSONDecodeError
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import requests
|
||||
from deprecated import deprecated
|
||||
from requests.adapters import HTTPAdapter, Retry
|
||||
from requests.exceptions import HTTPError, RequestException
|
||||
|
||||
@ -231,17 +232,14 @@ class DataHubRestEmitter(Closeable):
|
||||
|
||||
self._emit_generic(url, payload)
|
||||
|
||||
@deprecated
|
||||
def emit_usage(self, usageStats: UsageAggregation) -> None:
|
||||
url = f"{self._gms_server}/usageStats?action=batchIngest"
|
||||
|
||||
raw_usage_obj = usageStats.to_obj()
|
||||
usage_obj = pre_json_transform(raw_usage_obj)
|
||||
|
||||
snapshot = {
|
||||
"buckets": [
|
||||
usage_obj,
|
||||
]
|
||||
}
|
||||
snapshot = {"buckets": [usage_obj]}
|
||||
payload = json.dumps(snapshot)
|
||||
self._emit_generic(url, payload)
|
||||
|
||||
|
||||
@ -156,7 +156,7 @@ class Source(Closeable, metaclass=ABCMeta):
|
||||
raise NotImplementedError('sources must implement "create"')
|
||||
|
||||
@abstractmethod
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, Optional, Union, overload
|
||||
|
||||
from deprecated import deprecated
|
||||
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.api.common import WorkUnit
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
@ -101,6 +103,7 @@ class MetadataWorkUnit(WorkUnit):
|
||||
]
|
||||
|
||||
|
||||
@deprecated
|
||||
@dataclass
|
||||
class UsageStatsWorkUnit(WorkUnit):
|
||||
usageStats: UsageAggregationClass
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
from typing import Iterable, Union
|
||||
|
||||
from datahub.configuration.common import ConfigModel
|
||||
@ -11,7 +12,8 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
MetadataChangeProposal,
|
||||
SystemMetadata,
|
||||
)
|
||||
from datahub.metadata.schema_classes import UsageAggregationClass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _try_reformat_with_black(code: str) -> str:
|
||||
@ -41,7 +43,6 @@ class WorkUnitRecordExtractor(
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregationClass,
|
||||
]
|
||||
]
|
||||
]:
|
||||
@ -85,18 +86,9 @@ class WorkUnitRecordExtractor(
|
||||
},
|
||||
)
|
||||
elif isinstance(workunit, UsageStatsWorkUnit):
|
||||
if not workunit.usageStats.validate():
|
||||
invalid_usage_stats = str(workunit.usageStats)
|
||||
invalid_usage_stats = _try_reformat_with_black(invalid_usage_stats)
|
||||
|
||||
raise ValueError(
|
||||
f"source produced an invalid usage stat: {invalid_usage_stats}"
|
||||
)
|
||||
yield RecordEnvelope(
|
||||
workunit.usageStats,
|
||||
{
|
||||
"workunit_id": workunit.id,
|
||||
},
|
||||
logger.error(
|
||||
"Dropping deprecated `UsageStatsWorkUnit`. "
|
||||
"Emit a `MetadataWorkUnit` with the `datasetUsageStatistics` aspect instead."
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"unknown WorkUnit type {type(workunit)}")
|
||||
|
||||
@ -10,7 +10,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -31,7 +30,6 @@ class DataHubLiteSink(Sink[DataHubLiteSinkConfig, SinkReport]):
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregation,
|
||||
]
|
||||
],
|
||||
write_callback: WriteCallback,
|
||||
|
||||
@ -25,7 +25,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
|
||||
from datahub.utilities.server_config_util import set_gms_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -186,7 +185,6 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]):
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregation,
|
||||
]
|
||||
],
|
||||
write_callback: WriteCallback,
|
||||
|
||||
@ -11,7 +11,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -21,7 +20,6 @@ def _to_obj_for_file(
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregation,
|
||||
],
|
||||
simplified_structure: bool = True,
|
||||
) -> dict:
|
||||
@ -50,7 +48,6 @@ class FileSink(Sink[FileSinkConfig, SinkReport]):
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregation,
|
||||
]
|
||||
],
|
||||
write_callback: WriteCallback,
|
||||
@ -82,7 +79,6 @@ def write_metadata_file(
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregation,
|
||||
]
|
||||
],
|
||||
) -> None:
|
||||
|
||||
@ -11,7 +11,7 @@ from datahub.emitter.mce_builder import (
|
||||
make_dataset_urn_with_platform_instance,
|
||||
)
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.api.common import PipelineContext, WorkUnit
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
SourceCapability,
|
||||
SupportStatus,
|
||||
@ -314,7 +314,7 @@ class DeltaLakeSource(Source):
|
||||
break
|
||||
return
|
||||
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
self.container_WU_creator = ContainerWUCreator(
|
||||
self.source_config.platform,
|
||||
self.source_config.platform_instance,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from typing import Iterable
|
||||
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.ingestion.api.common import PipelineContext, WorkUnit
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
SupportStatus,
|
||||
config_class,
|
||||
@ -9,6 +9,7 @@ from datahub.ingestion.api.decorators import (
|
||||
support_status,
|
||||
)
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
|
||||
from datahub.utilities.sample_data import download_sample_data
|
||||
|
||||
@ -30,7 +31,7 @@ class DemoDataSource(Source):
|
||||
file_config = FileSourceConfig(path=str(download_sample_data()))
|
||||
self.file_source = GenericFileSource(ctx, file_config)
|
||||
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
yield from self.file_source.get_workunits()
|
||||
|
||||
def get_report(self) -> SourceReport:
|
||||
|
||||
@ -7,7 +7,7 @@ from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from enum import auto
|
||||
from io import BufferedReader
|
||||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
|
||||
from typing import Any, Dict, Iterable, Iterator, Optional, Tuple, Union
|
||||
from urllib import parse
|
||||
|
||||
import ijson
|
||||
@ -32,7 +32,7 @@ from datahub.ingestion.api.source import (
|
||||
TestableSource,
|
||||
TestConnectionReport,
|
||||
)
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
@ -205,18 +205,16 @@ class GenericFileSource(TestableSource):
|
||||
self.report.total_num_files = 1
|
||||
return [str(self.config.path)]
|
||||
|
||||
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
return auto_workunit_reporter(self.report, self.get_workunits_internal())
|
||||
|
||||
def get_workunits_internal(
|
||||
self,
|
||||
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
for f in self.get_filenames():
|
||||
for i, obj in self.iterate_generic_file(f):
|
||||
id = f"file://{f}:{i}"
|
||||
if isinstance(obj, UsageAggregationClass):
|
||||
yield UsageStatsWorkUnit(id, obj)
|
||||
elif isinstance(
|
||||
if isinstance(
|
||||
obj, (MetadataChangeProposalWrapper, MetadataChangeProposal)
|
||||
):
|
||||
self.report.entity_type_counts[obj.entityType] += 1
|
||||
@ -335,7 +333,6 @@ class GenericFileSource(TestableSource):
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposalWrapper,
|
||||
MetadataChangeProposal,
|
||||
UsageAggregationClass,
|
||||
],
|
||||
]
|
||||
]:
|
||||
@ -343,9 +340,12 @@ class GenericFileSource(TestableSource):
|
||||
try:
|
||||
deserialize_start_time = datetime.datetime.now()
|
||||
item = _from_obj_for_file(obj)
|
||||
deserialize_duration = datetime.datetime.now() - deserialize_start_time
|
||||
self.report.add_deserialize_time(deserialize_duration)
|
||||
yield i, item
|
||||
if item is not None:
|
||||
deserialize_duration = (
|
||||
datetime.datetime.now() - deserialize_start_time
|
||||
)
|
||||
self.report.add_deserialize_time(deserialize_duration)
|
||||
yield i, item
|
||||
except Exception as e:
|
||||
self.report.report_failure(f"path-{i}", str(e))
|
||||
|
||||
@ -388,7 +388,7 @@ def _from_obj_for_file(
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregationClass,
|
||||
None,
|
||||
]:
|
||||
item: Union[
|
||||
MetadataChangeEvent,
|
||||
@ -406,22 +406,25 @@ def _from_obj_for_file(
|
||||
if not item.validate():
|
||||
raise ValueError(f"failed to parse: {obj}")
|
||||
|
||||
return item
|
||||
if isinstance(item, UsageAggregationClass):
|
||||
logger.warning(f"Dropping deprecated UsageAggregationClass: {item}")
|
||||
return None
|
||||
else:
|
||||
return item
|
||||
|
||||
|
||||
def read_metadata_file(
|
||||
file: pathlib.Path,
|
||||
) -> List[
|
||||
) -> Iterable[
|
||||
Union[
|
||||
MetadataChangeEvent,
|
||||
MetadataChangeProposal,
|
||||
MetadataChangeProposalWrapper,
|
||||
UsageAggregationClass,
|
||||
]
|
||||
]:
|
||||
# This simplified version of the FileSource can be used for testing purposes.
|
||||
records = []
|
||||
with file.open("r") as f:
|
||||
for obj in json.load(f):
|
||||
records.append(_from_obj_for_file(obj))
|
||||
return records
|
||||
item = _from_obj_for_file(obj)
|
||||
if item:
|
||||
yield item
|
||||
|
||||
@ -6,7 +6,7 @@ from pydantic import Field, SecretStr, validator
|
||||
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.configuration.source_common import DatasetSourceConfigMixin
|
||||
from datahub.ingestion.api.common import PipelineContext, WorkUnit
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
SupportStatus,
|
||||
capability,
|
||||
@ -15,6 +15,7 @@ from datahub.ingestion.api.decorators import (
|
||||
support_status,
|
||||
)
|
||||
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
|
||||
from datahub.ingestion.source.data_lake_common.config import PathSpecsConfigMixin
|
||||
from datahub.ingestion.source.data_lake_common.data_lake_utils import PLATFORM_GCS
|
||||
@ -152,7 +153,7 @@ class GCSSource(Source):
|
||||
|
||||
return source
|
||||
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
yield from auto_workunit_reporter(
|
||||
self.report, auto_status_aspect(self.s3_source.get_workunits())
|
||||
)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, Iterable, List, Optional, Union
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
|
||||
from pydantic import validator
|
||||
from pydantic.fields import Field
|
||||
@ -27,7 +27,7 @@ from datahub.ingestion.api.decorators import (
|
||||
support_status,
|
||||
)
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.utilities.source_helpers import auto_workunit_reporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -100,12 +100,12 @@ class LineageFileSource(Source):
|
||||
lineage_config = LineageConfig.parse_obj(config)
|
||||
return lineage_config
|
||||
|
||||
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
return auto_workunit_reporter(self.report, self.get_workunits_internal())
|
||||
|
||||
def get_workunits_internal(
|
||||
self,
|
||||
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
config = self.load_lineage_config(self.config.file)
|
||||
logger.debug(config)
|
||||
for entity_node in config.lineage:
|
||||
|
||||
@ -18,7 +18,7 @@ from datahub.configuration.common import (
|
||||
from datahub.configuration.source_common import DatasetSourceConfigMixin
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
|
||||
from datahub.ingestion.api.common import PipelineContext, WorkUnit
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
SourceCapability,
|
||||
SupportStatus,
|
||||
@ -28,6 +28,7 @@ from datahub.ingestion.api.decorators import (
|
||||
support_status,
|
||||
)
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
|
||||
from datahub.metadata.schema_classes import (
|
||||
AuditStampClass,
|
||||
@ -280,13 +281,15 @@ class SalesforceSource(Source):
|
||||
)
|
||||
)
|
||||
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
sObjects = self.get_salesforce_objects()
|
||||
|
||||
for sObject in sObjects:
|
||||
yield from self.get_salesforce_object_workunits(sObject)
|
||||
|
||||
def get_salesforce_object_workunits(self, sObject: dict) -> Iterable[WorkUnit]:
|
||||
def get_salesforce_object_workunits(
|
||||
self, sObject: dict
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
sObjectName = sObject["QualifiedApiName"]
|
||||
|
||||
if not self.config.object_pattern.allowed(sObjectName):
|
||||
@ -367,7 +370,7 @@ class SalesforceSource(Source):
|
||||
|
||||
def get_domain_workunit(
|
||||
self, dataset_name: str, datasetUrn: str
|
||||
) -> Iterable[WorkUnit]:
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
domain_urn: Optional[str] = None
|
||||
|
||||
for domain, pattern in self.config.domain.items():
|
||||
@ -379,7 +382,7 @@ class SalesforceSource(Source):
|
||||
domain_urn=domain_urn, entity_urn=datasetUrn
|
||||
)
|
||||
|
||||
def get_platform_instance_workunit(self, datasetUrn: str) -> WorkUnit:
|
||||
def get_platform_instance_workunit(self, datasetUrn: str) -> MetadataWorkUnit:
|
||||
dataPlatformInstance = DataPlatformInstanceClass(
|
||||
builder.make_data_platform_urn(self.platform),
|
||||
instance=builder.make_dataplatform_instance_urn(
|
||||
@ -393,7 +396,7 @@ class SalesforceSource(Source):
|
||||
|
||||
def get_operation_workunit(
|
||||
self, customObject: dict, datasetUrn: str
|
||||
) -> Iterable[WorkUnit]:
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
reported_time: int = int(time.time() * 1000)
|
||||
|
||||
if customObject.get("CreatedBy") and customObject.get("CreatedDate"):
|
||||
@ -440,7 +443,7 @@ class SalesforceSource(Source):
|
||||
|
||||
def get_properties_workunit(
|
||||
self, sObject: dict, customObject: Dict[str, str], datasetUrn: str
|
||||
) -> WorkUnit:
|
||||
) -> MetadataWorkUnit:
|
||||
propertyLabels = {
|
||||
# from EntityDefinition
|
||||
"DurableId": "Durable Id",
|
||||
@ -477,7 +480,9 @@ class SalesforceSource(Source):
|
||||
entityUrn=datasetUrn, aspect=datasetProperties
|
||||
).as_workunit()
|
||||
|
||||
def get_subtypes_workunit(self, sObjectName: str, datasetUrn: str) -> WorkUnit:
|
||||
def get_subtypes_workunit(
|
||||
self, sObjectName: str, datasetUrn: str
|
||||
) -> MetadataWorkUnit:
|
||||
subtypes: List[str] = []
|
||||
if sObjectName.endswith("__c"):
|
||||
subtypes.append(DatasetSubTypes.SALESFORCE_CUSTOM_OBJECT)
|
||||
@ -490,7 +495,7 @@ class SalesforceSource(Source):
|
||||
|
||||
def get_profile_workunit(
|
||||
self, sObjectName: str, datasetUrn: str
|
||||
) -> Iterable[WorkUnit]:
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
# Here approximate record counts as returned by recordCount API are used as rowCount
|
||||
# In future, count() SOQL query may be used instead, if required, might be more expensive
|
||||
sObject_records_count_url = (
|
||||
@ -615,7 +620,7 @@ class SalesforceSource(Source):
|
||||
|
||||
def get_schema_metadata_workunit(
|
||||
self, sObjectName: str, sObject: dict, customObject: dict, datasetUrn: str
|
||||
) -> Iterable[WorkUnit]:
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
sObject_fields_query_url = (
|
||||
self.base_url
|
||||
+ "tooling/query?q=SELECT "
|
||||
|
||||
@ -12,9 +12,11 @@ from typing import (
|
||||
Optional,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
import pydantic
|
||||
from deprecated import deprecated
|
||||
from pydantic.fields import Field
|
||||
|
||||
import datahub.emitter.mce_builder as builder
|
||||
@ -26,13 +28,19 @@ from datahub.configuration.time_window_config import (
|
||||
)
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetUsageStatistics
|
||||
from datahub.metadata.schema_classes import (
|
||||
CalendarIntervalClass,
|
||||
DatasetFieldUsageCountsClass,
|
||||
DatasetUsageStatisticsClass,
|
||||
DatasetUserUsageCountsClass,
|
||||
TimeWindowSizeClass,
|
||||
UsageAggregationClass,
|
||||
WindowDurationClass,
|
||||
)
|
||||
from datahub.utilities.sql_formatter import format_sql_query, trim_query
|
||||
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
||||
from datahub.utilities.urns.urn import guess_entity_type
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -251,3 +259,56 @@ class UsageAggregator(Generic[ResourceType]):
|
||||
resource_urn_builder=resource_urn_builder,
|
||||
user_urn_builder=user_urn_builder,
|
||||
)
|
||||
|
||||
|
||||
@deprecated
|
||||
def convert_usage_aggregation_class(
|
||||
obj: UsageAggregationClass,
|
||||
) -> MetadataChangeProposalWrapper:
|
||||
# Legacy usage aggregation only supported dataset usage stats
|
||||
if guess_entity_type(obj.resource) == DatasetUrn.ENTITY_TYPE:
|
||||
aspect = DatasetUsageStatistics(
|
||||
timestampMillis=obj.bucket,
|
||||
eventGranularity=TimeWindowSizeClass(
|
||||
unit=convert_window_to_interval(obj.duration)
|
||||
),
|
||||
uniqueUserCount=obj.metrics.uniqueUserCount,
|
||||
totalSqlQueries=obj.metrics.totalSqlQueries,
|
||||
topSqlQueries=obj.metrics.topSqlQueries,
|
||||
userCounts=[
|
||||
DatasetUserUsageCountsClass(
|
||||
user=u.user, count=u.count, userEmail=u.userEmail
|
||||
)
|
||||
for u in obj.metrics.users
|
||||
if u.user is not None
|
||||
]
|
||||
if obj.metrics.users
|
||||
else None,
|
||||
fieldCounts=[
|
||||
DatasetFieldUsageCountsClass(fieldPath=f.fieldName, count=f.count)
|
||||
for f in obj.metrics.fields
|
||||
]
|
||||
if obj.metrics.fields
|
||||
else None,
|
||||
)
|
||||
return MetadataChangeProposalWrapper(entityUrn=obj.resource, aspect=aspect)
|
||||
else:
|
||||
raise Exception(
|
||||
f"Skipping unsupported usage aggregation - invalid entity type: {obj}"
|
||||
)
|
||||
|
||||
|
||||
@deprecated
|
||||
def convert_window_to_interval(window: Union[str, WindowDurationClass]) -> str:
|
||||
if window == WindowDurationClass.YEAR:
|
||||
return CalendarIntervalClass.YEAR
|
||||
elif window == WindowDurationClass.MONTH:
|
||||
return CalendarIntervalClass.MONTH
|
||||
elif window == WindowDurationClass.WEEK:
|
||||
return CalendarIntervalClass.WEEK
|
||||
elif window == WindowDurationClass.DAY:
|
||||
return CalendarIntervalClass.DAY
|
||||
elif window == WindowDurationClass.HOUR:
|
||||
return CalendarIntervalClass.HOUR
|
||||
else:
|
||||
raise Exception(f"Unsupported window duration: {window}")
|
||||
|
||||
@ -6,7 +6,7 @@ from freezegun import freeze_time
|
||||
|
||||
from datahub.configuration.common import DynamicTypedConfig
|
||||
from datahub.ingestion.api.committable import CommitPolicy, Committable
|
||||
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
|
||||
from datahub.ingestion.api.common import RecordEnvelope
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.transform import Transformer
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
@ -343,7 +343,7 @@ class FakeSource(Source):
|
||||
assert not config_dict
|
||||
return cls()
|
||||
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
return self.work_units
|
||||
|
||||
def get_report(self) -> SourceReport:
|
||||
|
||||
@ -1,17 +1,17 @@
|
||||
from typing import Iterable
|
||||
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.api import workunit
|
||||
from datahub.ingestion.api.common import PipelineContext, WorkUnit
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.metadata.schema_classes import StatusClass
|
||||
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
||||
|
||||
|
||||
class FakeSource(Source):
|
||||
def get_workunits(self) -> Iterable[WorkUnit]:
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
return [
|
||||
workunit.MetadataWorkUnit(
|
||||
MetadataWorkUnit(
|
||||
id="test-workunit",
|
||||
mcp=MetadataChangeProposalWrapper(
|
||||
entityUrn=str(
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import time
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
from pydantic import ValidationError
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
@ -12,8 +14,20 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.usage.usage_common import (
|
||||
BaseUsageConfig,
|
||||
GenericAggregatedDataset,
|
||||
convert_usage_aggregation_class,
|
||||
)
|
||||
from datahub.metadata.schema_classes import (
|
||||
CalendarIntervalClass,
|
||||
DatasetFieldUsageCountsClass,
|
||||
DatasetUsageStatisticsClass,
|
||||
DatasetUserUsageCountsClass,
|
||||
FieldUsageCountsClass,
|
||||
TimeWindowSizeClass,
|
||||
UsageAggregationClass,
|
||||
UsageAggregationMetricsClass,
|
||||
UserUsageCountsClass,
|
||||
WindowDurationClass,
|
||||
)
|
||||
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass
|
||||
|
||||
_TestTableRef = str
|
||||
|
||||
@ -290,3 +304,69 @@ def test_make_usage_workunit_include_top_n_queries():
|
||||
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
|
||||
assert du.totalSqlQueries == 1
|
||||
assert du.topSqlQueries is None
|
||||
|
||||
|
||||
@freeze_time("2023-01-01 00:00:00")
|
||||
def test_convert_usage_aggregation_class():
|
||||
urn = make_dataset_urn_with_platform_instance(
|
||||
"platform", "test_db.test_schema.test_table", None
|
||||
)
|
||||
usage_aggregation = UsageAggregationClass(
|
||||
bucket=int(time.time() * 1000),
|
||||
duration=WindowDurationClass.DAY,
|
||||
resource=urn,
|
||||
metrics=UsageAggregationMetricsClass(
|
||||
uniqueUserCount=5,
|
||||
users=[
|
||||
UserUsageCountsClass(count=3, user="abc", userEmail="abc@acryl.io"),
|
||||
UserUsageCountsClass(count=2),
|
||||
UserUsageCountsClass(count=1, user="def"),
|
||||
],
|
||||
totalSqlQueries=10,
|
||||
topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"],
|
||||
fields=[FieldUsageCountsClass("col", 7), FieldUsageCountsClass("col2", 0)],
|
||||
),
|
||||
)
|
||||
assert convert_usage_aggregation_class(
|
||||
usage_aggregation
|
||||
) == MetadataChangeProposalWrapper(
|
||||
entityUrn=urn,
|
||||
aspect=DatasetUsageStatisticsClass(
|
||||
timestampMillis=int(time.time() * 1000),
|
||||
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
|
||||
uniqueUserCount=5,
|
||||
totalSqlQueries=10,
|
||||
topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"],
|
||||
userCounts=[
|
||||
DatasetUserUsageCountsClass(
|
||||
user="abc", count=3, userEmail="abc@acryl.io"
|
||||
),
|
||||
DatasetUserUsageCountsClass(user="def", count=1),
|
||||
],
|
||||
fieldCounts=[
|
||||
DatasetFieldUsageCountsClass(fieldPath="col", count=7),
|
||||
DatasetFieldUsageCountsClass(fieldPath="col2", count=0),
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
empty_urn = make_dataset_urn_with_platform_instance(
|
||||
"platform",
|
||||
"test_db.test_schema.empty_table",
|
||||
None,
|
||||
)
|
||||
empty_usage_aggregation = UsageAggregationClass(
|
||||
bucket=int(time.time() * 1000) - 1000 * 60 * 60 * 24,
|
||||
duration=WindowDurationClass.MONTH,
|
||||
resource=empty_urn,
|
||||
metrics=UsageAggregationMetricsClass(),
|
||||
)
|
||||
assert convert_usage_aggregation_class(
|
||||
empty_usage_aggregation
|
||||
) == MetadataChangeProposalWrapper(
|
||||
entityUrn=empty_urn,
|
||||
aspect=DatasetUsageStatisticsClass(
|
||||
timestampMillis=int(time.time() * 1000) - 1000 * 60 * 60 * 24,
|
||||
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.MONTH),
|
||||
),
|
||||
)
|
||||
|
||||
@ -6,6 +6,7 @@ import com.linkedin.common.Urn
|
||||
/**
|
||||
* Usage data for a given resource, rolled up into a bucket.
|
||||
*/
|
||||
@deprecated = "Use DatasetUsageStatistics, or other UsageStatistics records, instead"
|
||||
record UsageAggregation {
|
||||
/** Bucket start time in milliseconds */
|
||||
bucket: long
|
||||
|
||||
@ -7,6 +7,9 @@
|
||||
"simple" : {
|
||||
"supports" : [ ],
|
||||
"actions" : [ {
|
||||
"annotations" : {
|
||||
"deprecated" : { }
|
||||
},
|
||||
"name" : "batchIngest",
|
||||
"parameters" : [ {
|
||||
"name" : "buckets",
|
||||
|
||||
@ -102,7 +102,8 @@
|
||||
} ]
|
||||
},
|
||||
"doc" : " Metrics associated with this bucket "
|
||||
} ]
|
||||
} ],
|
||||
"deprecated" : "Use DatasetUsageStatistics, or other UsageStatistics records, instead"
|
||||
}, "com.linkedin.usage.UsageAggregationMetrics", {
|
||||
"type" : "record",
|
||||
"name" : "UsageQueryResult",
|
||||
@ -163,6 +164,9 @@
|
||||
"simple" : {
|
||||
"supports" : [ ],
|
||||
"actions" : [ {
|
||||
"annotations" : {
|
||||
"deprecated" : { }
|
||||
},
|
||||
"name" : "batchIngest",
|
||||
"parameters" : [ {
|
||||
"name" : "buckets",
|
||||
|
||||
@ -110,6 +110,7 @@ public class UsageStats extends SimpleResourceTemplate<UsageAggregation> {
|
||||
_entityRegistry.getEntitySpec(USAGE_STATS_ENTITY_NAME).getAspectSpec(USAGE_STATS_ASPECT_NAME);
|
||||
|
||||
@Action(name = ACTION_BATCH_INGEST)
|
||||
@Deprecated
|
||||
@Nonnull
|
||||
@WithSpan
|
||||
public Task<Void> batchIngest(@ActionParam(PARAM_BUCKETS) @Nonnull UsageAggregation[] buckets) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user