feat(ingest): add aspects by subtype in report, telemetry (#13921)

This commit is contained in:
Aseem Bansal 2025-07-03 17:07:39 +05:30 committed by GitHub
parent 1561a6c8ca
commit a7c5895d98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 242 additions and 71 deletions

View File

@ -26,6 +26,7 @@ This file provides guidance to Claude Code (claude.ai/code) or any other agent w
- `metadata-ingestion/build.gradle` contains our build.gradle that has gradle tasks defined for this module
- `pyproject.toml`, `setup.py`, `setup.cfg` in the folder contain rules about the code style for the repository
- The `.md` files at top level in this folder gives you important information about the concepts of ingestion
- You can see examples of how to define various aspect types in `metadata-ingestion/src/datahub/emitter/mcp_builder.py`
- Source code goes in `metadata-ingestion/src/`
- Tests go in `metadata-ingestion/tests/` (not in `src/`)
- **Testing conventions for metadata-ingestion**:

View File

@ -2,17 +2,24 @@ import dataclasses
import json
import logging
import pprint
from dataclasses import dataclass
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional, runtime_checkable
from typing import Any, Dict, Optional, Set, cast, runtime_checkable
import humanfriendly
import pydantic
from pydantic import BaseModel
from typing_extensions import Literal, Protocol
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.report_helpers import format_datetime_relative
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import SubTypesClass, UpstreamLineageClass
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.lossy_collections import LossyList
logger = logging.getLogger(__name__)
@ -90,6 +97,14 @@ class Report(SupportsAsObj):
# TODO add helper method for warning / failure status + counts?
@dataclass
class SourceReportSubtypes:
urn: str
entity_type: str
subType: str = field(default="unknown")
aspects: Set[str] = field(default_factory=set)
class ReportAttribute(BaseModel):
severity: LogLevel = "DEBUG"
help: Optional[str] = None
@ -108,6 +123,112 @@ class ReportAttribute(BaseModel):
logger.log(level=self.logger_sev, msg=msg, stacklevel=3)
@dataclass
class ExamplesReport(Report, Closeable):
_urns_seen: Set[str] = field(default_factory=set)
entities: Dict[str, list] = field(default_factory=lambda: defaultdict(LossyList))
aspects: Dict[str, Dict[str, int]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(int))
)
aspects_by_subtypes: Dict[str, Dict[str, Dict[str, int]]] = field(
default_factory=lambda: defaultdict(
lambda: defaultdict(lambda: defaultdict(int))
)
)
aspect_urn_samples: Dict[str, Dict[str, LossyList[str]]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(LossyList))
)
_file_based_dict: Optional[FileBackedDict[SourceReportSubtypes]] = None
def __post_init__(self) -> None:
self._file_based_dict = FileBackedDict(
tablename="urn_aspects",
extra_columns={
"urn": lambda val: val.urn,
"entityType": lambda val: val.entity_type,
"subTypes": lambda val: val.subType,
"aspects": lambda val: json.dumps(sorted(list(val.aspects))),
},
)
def close(self) -> None:
self.compute_stats()
if self._file_based_dict is not None:
self._file_based_dict.close()
self._file_based_dict = None
def _store_workunit_data(self, wu: MetadataWorkUnit) -> None:
urn = wu.get_urn()
if not isinstance(wu.metadata, MetadataChangeEvent):
mcps = [wu.metadata]
else:
mcps = list(mcps_from_mce(wu.metadata))
for mcp in mcps:
entityType = mcp.entityType
aspectName = mcp.aspectName
if urn not in self._urns_seen:
self._urns_seen.add(urn)
self.entities[entityType].append(urn)
if aspectName is None:
continue
self.aspects[entityType][aspectName] += 1
self.aspect_urn_samples[entityType][aspectName].append(urn)
sub_type = "unknown"
if isinstance(mcp.aspect, UpstreamLineageClass):
upstream_lineage = cast(UpstreamLineageClass, mcp.aspect)
if upstream_lineage.fineGrainedLineages:
self.aspect_urn_samples[entityType]["fineGrainedLineages"].append(
urn
)
self.aspects[entityType]["fineGrainedLineages"] += 1
elif isinstance(mcp.aspect, SubTypesClass):
sub_type = mcp.aspect.typeNames[0]
assert self._file_based_dict is not None
if urn in self._file_based_dict:
if sub_type != "unknown":
self._file_based_dict[urn].subType = sub_type
self._file_based_dict[urn].aspects.add(aspectName)
self._file_based_dict.mark_dirty(urn)
else:
self._file_based_dict[urn] = SourceReportSubtypes(
urn=urn,
entity_type=entityType,
subType=sub_type,
aspects={aspectName},
)
def compute_stats(self) -> None:
if self._file_based_dict is None:
return
query = """
SELECT entityType, subTypes, aspects, count(*) as count
FROM urn_aspects
group by entityType, subTypes, aspects
"""
entity_subtype_aspect_counts: Dict[str, Dict[str, Dict[str, int]]] = (
defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
)
for row in self._file_based_dict.sql_query(query):
entity_type = row["entityType"]
sub_type = row["subTypes"]
count = row["count"]
aspects_raw = row["aspects"] or "[]"
aspects = json.loads(aspects_raw)
for aspect in aspects:
entity_subtype_aspect_counts[entity_type][sub_type][aspect] += count
self.aspects_by_subtypes.clear()
for entity_type, subtype_counts in entity_subtype_aspect_counts.items():
for sub_type, aspect_counts in subtype_counts.items():
self.aspects_by_subtypes[entity_type][sub_type] = dict(aspect_counts)
class EntityFilterReport(ReportAttribute):
type: str

View File

@ -2,7 +2,6 @@ import contextlib
import datetime
import logging
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from functools import partial
@ -15,7 +14,6 @@ from typing import (
List,
Optional,
Sequence,
Set,
Type,
TypeVar,
Union,
@ -28,7 +26,6 @@ from typing_extensions import LiteralString, Self
from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import (
auto_patch_last_modified,
)
@ -37,7 +34,7 @@ from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
)
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.report import ExamplesReport, Report
from datahub.ingestion.api.source_helpers import (
AutoSystemMetadata,
auto_browse_path_v2,
@ -50,9 +47,8 @@ from datahub.ingestion.api.source_helpers import (
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import UpstreamLineageClass
from datahub.sdk.entity import Entity
from datahub.telemetry import stats
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.type_annotations import get_class_from_annotation
@ -191,20 +187,11 @@ class StructuredLogs(Report):
@dataclass
class SourceReport(Report):
class SourceReport(ExamplesReport):
event_not_produced_warn: bool = True
events_produced: int = 0
events_produced_per_sec: int = 0
_urns_seen: Set[str] = field(default_factory=set)
entities: Dict[str, list] = field(default_factory=lambda: defaultdict(LossyList))
aspects: Dict[str, Dict[str, int]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(int))
)
aspect_urn_samples: Dict[str, Dict[str, LossyList[str]]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(LossyList))
)
_structured_logs: StructuredLogs = field(default_factory=StructuredLogs)
@property
@ -221,34 +208,10 @@ class SourceReport(Report):
def report_workunit(self, wu: WorkUnit) -> None:
self.events_produced += 1
if not isinstance(wu, MetadataWorkUnit):
return
if isinstance(wu, MetadataWorkUnit):
urn = wu.get_urn()
# Specialized entity reporting.
if not isinstance(wu.metadata, MetadataChangeEvent):
mcps = [wu.metadata]
else:
mcps = list(mcps_from_mce(wu.metadata))
for mcp in mcps:
entityType = mcp.entityType
aspectName = mcp.aspectName
if urn not in self._urns_seen:
self._urns_seen.add(urn)
self.entities[entityType].append(urn)
if aspectName is not None: # usually true
self.aspects[entityType][aspectName] += 1
self.aspect_urn_samples[entityType][aspectName].append(urn)
if isinstance(mcp.aspect, UpstreamLineageClass):
upstream_lineage = cast(UpstreamLineageClass, mcp.aspect)
if upstream_lineage.fineGrainedLineages:
self.aspect_urn_samples[entityType][
"fineGrainedLineages"
].append(urn)
self.aspects[entityType]["fineGrainedLineages"] += 1
super()._store_workunit_data(wu)
def report_warning(
self,
@ -327,6 +290,7 @@ class SourceReport(Report):
)
def __post_init__(self) -> None:
super().__post_init__()
self.start_time = datetime.datetime.now()
self.running_time: datetime.timedelta = datetime.timedelta(seconds=0)
@ -339,11 +303,41 @@ class SourceReport(Report):
"infos": Report.to_pure_python_obj(self.infos),
}
@staticmethod
def _discretize_dict_values(
nested_dict: Dict[str, Dict[str, int]],
) -> Dict[str, Dict[str, int]]:
"""Helper method to discretize values in a nested dictionary structure."""
result = {}
for outer_key, inner_dict in nested_dict.items():
discretized_dict: Dict[str, int] = {}
for inner_key, count in inner_dict.items():
discretized_dict[inner_key] = stats.discretize(count)
result[outer_key] = discretized_dict
return result
def get_aspects_dict(self) -> Dict[str, Dict[str, int]]:
"""Convert the nested defaultdict aspects to a regular dict for serialization."""
return self._discretize_dict_values(self.aspects)
def get_aspects_by_subtypes_dict(self) -> Dict[str, Dict[str, Dict[str, int]]]:
"""Get aspect counts grouped by entity type and subtype."""
return self._discretize_dict_values_nested(self.aspects_by_subtypes)
@staticmethod
def _discretize_dict_values_nested(
nested_dict: Dict[str, Dict[str, Dict[str, int]]],
) -> Dict[str, Dict[str, Dict[str, int]]]:
"""Helper method to discretize values in a nested dictionary structure with three levels."""
result = {}
for entity_type, aspect_counts in self.aspects.items():
result[entity_type] = dict(aspect_counts)
for outer_key, middle_dict in nested_dict.items():
discretized_middle_dict: Dict[str, Dict[str, int]] = {}
for middle_key, inner_dict in middle_dict.items():
discretized_inner_dict: Dict[str, int] = {}
for inner_key, count in inner_dict.items():
discretized_inner_dict[inner_key] = stats.discretize(count)
discretized_middle_dict[middle_key] = discretized_inner_dict
result[outer_key] = discretized_middle_dict
return result
def compute_stats(self) -> None:
@ -512,7 +506,7 @@ class Source(Closeable, metaclass=ABCMeta):
pass
def close(self) -> None:
pass
self.get_report().close()
def _infer_platform(self) -> Optional[str]:
config = self.get_config()

View File

@ -579,16 +579,16 @@ class Pipeline:
sink_warnings = len(self.sink.get_report().warnings)
global_warnings = len(get_global_warnings())
source_aspects = self.source.get_report().get_aspects_dict()
entity_dict: Dict[str, int]
for _, entity_dict in source_aspects.items():
for aspect_name, aspect_count in entity_dict.items():
entity_dict[aspect_name] = stats.discretize(aspect_count)
source_aspects_by_subtype = (
self.source.get_report().get_aspects_by_subtypes_dict()
)
telemetry_instance.ping(
"ingest_stats",
{
"source_type": self.source_type,
"source_aspects": source_aspects,
"source_aspects_by_subtype": source_aspects_by_subtype,
"sink_type": self.sink_type,
"transformer_types": [
transformer.type for transformer in self.config.transformers or []

View File

@ -94,3 +94,4 @@ class BigQueryQueriesSource(Source):
def close(self) -> None:
self.queries_extractor.close()
self.connection.close()
super().close()

View File

@ -266,11 +266,8 @@ class DataHubMockDataSource(Source):
fan_out, hops, fan_out_after_first
)
logger.info(
f"About to create {tables_to_be_created} tables for lineage testing"
)
logger.info(f"About to create {tables_to_be_created} datasets mock data")
current_progress = 0
for i in range(hops + 1):
tables_at_level = tables_at_levels[i]
@ -291,12 +288,6 @@ class DataHubMockDataSource(Source):
tables_at_levels=tables_at_levels,
)
current_progress += 1
if current_progress % 1000 == 0:
logger.info(
f"Progress: {current_progress}/{tables_to_be_created} tables processed"
)
def _generate_lineage_for_table(
self,
table_name: str,

View File

@ -663,6 +663,7 @@ class SnowflakeQueriesSource(Source):
def close(self) -> None:
self.connection.close()
self.queries_extractor.close()
super().close()
# Make sure we don't try to generate too much info for a single query.

View File

@ -107,4 +107,6 @@ def test_source_close_cleans_tmp(projects_client, client, tmp_path):
assert len(os.listdir(tmp_path)) > 0
# This closes QueriesExtractor which in turn closes SqlParsingAggregator
source.close()
assert len(os.listdir(tmp_path)) == 0
assert len(os.listdir(tmp_path)) == 0, (
f"Files left in {tmp_path}: {os.listdir(tmp_path)}"
)

View File

@ -4,23 +4,30 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import StatusClass
from datahub.metadata.schema_classes import (
StatusClass,
SubTypesClass,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
def _get_urn() -> str:
return str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
table_name="fooIndex",
env="PROD",
)
)
class FakeSource(Source):
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
return [
MetadataWorkUnit(
id="test-workunit",
mcp=MetadataChangeProposalWrapper(
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
table_name="fooIndex",
env="PROD",
)
),
entityUrn=_get_urn(),
aspect=StatusClass(removed=False),
),
)
@ -39,3 +46,56 @@ class FakeSource(Source):
def close(self) -> None:
return super().close()
def test_aspects_by_subtypes():
source = FakeSource(PipelineContext(run_id="test_aspects_by_subtypes"))
for wu in source.get_workunits_internal():
source.source_report.report_workunit(wu)
source.source_report.compute_stats()
assert source.source_report.get_aspects_by_subtypes_dict() == {
"dataset": {
"unknown": {"status": 1},
}
}
source.source_report.report_workunit(
MetadataChangeProposalWrapper(
entityUrn=_get_urn(),
aspect=SubTypesClass(typeNames=["Table"]),
).as_workunit()
)
source.source_report.compute_stats()
assert source.source_report.get_aspects_by_subtypes_dict() == {
"dataset": {
"Table": {"status": 1, "subTypes": 1},
}
}
def test_discretize_dict_values():
"""Test the _discretize_dict_values static method."""
test_dict = {
"dataset": {
"schemaMetadata": 5,
"status": 12,
"ownership": 3,
},
"chart": {
"status": 8,
"ownership": 1,
},
}
result = SourceReport._discretize_dict_values(test_dict)
assert result == {
"dataset": {
"schemaMetadata": 4,
"status": 8,
"ownership": 2,
},
"chart": {
"status": 8,
"ownership": 1,
},
}