diff --git a/CLAUDE.MD b/CLAUDE.MD index f110d74da8..5e30a59211 100644 --- a/CLAUDE.MD +++ b/CLAUDE.MD @@ -26,6 +26,7 @@ This file provides guidance to Claude Code (claude.ai/code) or any other agent w - `metadata-ingestion/build.gradle` contains our build.gradle that has gradle tasks defined for this module - `pyproject.toml`, `setup.py`, `setup.cfg` in the folder contain rules about the code style for the repository - The `.md` files at top level in this folder gives you important information about the concepts of ingestion +- You can see examples of how to define various aspect types in `metadata-ingestion/src/datahub/emitter/mcp_builder.py` - Source code goes in `metadata-ingestion/src/` - Tests go in `metadata-ingestion/tests/` (not in `src/`) - **Testing conventions for metadata-ingestion**: diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index 8cfca5782b..f4f353be9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -2,17 +2,24 @@ import dataclasses import json import logging import pprint -from dataclasses import dataclass +from collections import defaultdict +from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum -from typing import Any, Optional, runtime_checkable +from typing import Any, Dict, Optional, Set, cast, runtime_checkable import humanfriendly import pydantic from pydantic import BaseModel from typing_extensions import Literal, Protocol +from datahub.emitter.mcp_builder import mcps_from_mce +from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.report_helpers import format_datetime_relative +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.schema_classes import SubTypesClass, UpstreamLineageClass +from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.lossy_collections import LossyList logger = logging.getLogger(__name__) @@ -90,6 +97,14 @@ class Report(SupportsAsObj): # TODO add helper method for warning / failure status + counts? +@dataclass +class SourceReportSubtypes: + urn: str + entity_type: str + subType: str = field(default="unknown") + aspects: Set[str] = field(default_factory=set) + + class ReportAttribute(BaseModel): severity: LogLevel = "DEBUG" help: Optional[str] = None @@ -108,6 +123,112 @@ class ReportAttribute(BaseModel): logger.log(level=self.logger_sev, msg=msg, stacklevel=3) +@dataclass +class ExamplesReport(Report, Closeable): + _urns_seen: Set[str] = field(default_factory=set) + entities: Dict[str, list] = field(default_factory=lambda: defaultdict(LossyList)) + aspects: Dict[str, Dict[str, int]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(int)) + ) + aspects_by_subtypes: Dict[str, Dict[str, Dict[str, int]]] = field( + default_factory=lambda: defaultdict( + lambda: defaultdict(lambda: defaultdict(int)) + ) + ) + aspect_urn_samples: Dict[str, Dict[str, LossyList[str]]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(LossyList)) + ) + _file_based_dict: Optional[FileBackedDict[SourceReportSubtypes]] = None + + def __post_init__(self) -> None: + self._file_based_dict = FileBackedDict( + tablename="urn_aspects", + extra_columns={ + "urn": lambda val: val.urn, + "entityType": lambda val: val.entity_type, + "subTypes": lambda val: val.subType, + "aspects": lambda val: json.dumps(sorted(list(val.aspects))), + }, + ) + + def close(self) -> None: + self.compute_stats() + if self._file_based_dict is not None: + self._file_based_dict.close() + self._file_based_dict = None + + def _store_workunit_data(self, wu: MetadataWorkUnit) -> None: + urn = wu.get_urn() + + if not isinstance(wu.metadata, MetadataChangeEvent): + mcps = [wu.metadata] + else: + mcps = list(mcps_from_mce(wu.metadata)) + + for mcp in mcps: + entityType = mcp.entityType + aspectName = mcp.aspectName + + if urn not in self._urns_seen: + self._urns_seen.add(urn) + self.entities[entityType].append(urn) + + if aspectName is None: + continue + self.aspects[entityType][aspectName] += 1 + self.aspect_urn_samples[entityType][aspectName].append(urn) + sub_type = "unknown" + if isinstance(mcp.aspect, UpstreamLineageClass): + upstream_lineage = cast(UpstreamLineageClass, mcp.aspect) + if upstream_lineage.fineGrainedLineages: + self.aspect_urn_samples[entityType]["fineGrainedLineages"].append( + urn + ) + self.aspects[entityType]["fineGrainedLineages"] += 1 + elif isinstance(mcp.aspect, SubTypesClass): + sub_type = mcp.aspect.typeNames[0] + assert self._file_based_dict is not None + if urn in self._file_based_dict: + if sub_type != "unknown": + self._file_based_dict[urn].subType = sub_type + self._file_based_dict[urn].aspects.add(aspectName) + self._file_based_dict.mark_dirty(urn) + else: + self._file_based_dict[urn] = SourceReportSubtypes( + urn=urn, + entity_type=entityType, + subType=sub_type, + aspects={aspectName}, + ) + + def compute_stats(self) -> None: + if self._file_based_dict is None: + return + query = """ + SELECT entityType, subTypes, aspects, count(*) as count + FROM urn_aspects + group by entityType, subTypes, aspects + """ + + entity_subtype_aspect_counts: Dict[str, Dict[str, Dict[str, int]]] = ( + defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + ) + for row in self._file_based_dict.sql_query(query): + entity_type = row["entityType"] + sub_type = row["subTypes"] + count = row["count"] + aspects_raw = row["aspects"] or "[]" + + aspects = json.loads(aspects_raw) + for aspect in aspects: + entity_subtype_aspect_counts[entity_type][sub_type][aspect] += count + + self.aspects_by_subtypes.clear() + for entity_type, subtype_counts in entity_subtype_aspect_counts.items(): + for sub_type, aspect_counts in subtype_counts.items(): + self.aspects_by_subtypes[entity_type][sub_type] = dict(aspect_counts) + + class EntityFilterReport(ReportAttribute): type: str diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 705538c4de..f986493caf 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -2,7 +2,6 @@ import contextlib import datetime import logging from abc import ABCMeta, abstractmethod -from collections import defaultdict from dataclasses import dataclass, field from enum import Enum from functools import partial @@ -15,7 +14,6 @@ from typing import ( List, Optional, Sequence, - Set, Type, TypeVar, Union, @@ -28,7 +26,6 @@ from typing_extensions import LiteralString, Self from datahub.configuration.common import ConfigModel from datahub.configuration.source_common import PlatformInstanceConfigMixin from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.mcp_builder import mcps_from_mce from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import ( auto_patch_last_modified, ) @@ -37,7 +34,7 @@ from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( ) from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit -from datahub.ingestion.api.report import Report +from datahub.ingestion.api.report import ExamplesReport, Report from datahub.ingestion.api.source_helpers import ( AutoSystemMetadata, auto_browse_path_v2, @@ -50,9 +47,8 @@ from datahub.ingestion.api.source_helpers import ( auto_workunit_reporter, ) from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent -from datahub.metadata.schema_classes import UpstreamLineageClass from datahub.sdk.entity import Entity +from datahub.telemetry import stats from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.type_annotations import get_class_from_annotation @@ -191,20 +187,11 @@ class StructuredLogs(Report): @dataclass -class SourceReport(Report): +class SourceReport(ExamplesReport): event_not_produced_warn: bool = True events_produced: int = 0 events_produced_per_sec: int = 0 - _urns_seen: Set[str] = field(default_factory=set) - entities: Dict[str, list] = field(default_factory=lambda: defaultdict(LossyList)) - aspects: Dict[str, Dict[str, int]] = field( - default_factory=lambda: defaultdict(lambda: defaultdict(int)) - ) - aspect_urn_samples: Dict[str, Dict[str, LossyList[str]]] = field( - default_factory=lambda: defaultdict(lambda: defaultdict(LossyList)) - ) - _structured_logs: StructuredLogs = field(default_factory=StructuredLogs) @property @@ -221,34 +208,10 @@ class SourceReport(Report): def report_workunit(self, wu: WorkUnit) -> None: self.events_produced += 1 + if not isinstance(wu, MetadataWorkUnit): + return - if isinstance(wu, MetadataWorkUnit): - urn = wu.get_urn() - - # Specialized entity reporting. - if not isinstance(wu.metadata, MetadataChangeEvent): - mcps = [wu.metadata] - else: - mcps = list(mcps_from_mce(wu.metadata)) - - for mcp in mcps: - entityType = mcp.entityType - aspectName = mcp.aspectName - - if urn not in self._urns_seen: - self._urns_seen.add(urn) - self.entities[entityType].append(urn) - - if aspectName is not None: # usually true - self.aspects[entityType][aspectName] += 1 - self.aspect_urn_samples[entityType][aspectName].append(urn) - if isinstance(mcp.aspect, UpstreamLineageClass): - upstream_lineage = cast(UpstreamLineageClass, mcp.aspect) - if upstream_lineage.fineGrainedLineages: - self.aspect_urn_samples[entityType][ - "fineGrainedLineages" - ].append(urn) - self.aspects[entityType]["fineGrainedLineages"] += 1 + super()._store_workunit_data(wu) def report_warning( self, @@ -327,6 +290,7 @@ class SourceReport(Report): ) def __post_init__(self) -> None: + super().__post_init__() self.start_time = datetime.datetime.now() self.running_time: datetime.timedelta = datetime.timedelta(seconds=0) @@ -339,11 +303,41 @@ class SourceReport(Report): "infos": Report.to_pure_python_obj(self.infos), } + @staticmethod + def _discretize_dict_values( + nested_dict: Dict[str, Dict[str, int]], + ) -> Dict[str, Dict[str, int]]: + """Helper method to discretize values in a nested dictionary structure.""" + result = {} + for outer_key, inner_dict in nested_dict.items(): + discretized_dict: Dict[str, int] = {} + for inner_key, count in inner_dict.items(): + discretized_dict[inner_key] = stats.discretize(count) + result[outer_key] = discretized_dict + return result + def get_aspects_dict(self) -> Dict[str, Dict[str, int]]: """Convert the nested defaultdict aspects to a regular dict for serialization.""" + return self._discretize_dict_values(self.aspects) + + def get_aspects_by_subtypes_dict(self) -> Dict[str, Dict[str, Dict[str, int]]]: + """Get aspect counts grouped by entity type and subtype.""" + return self._discretize_dict_values_nested(self.aspects_by_subtypes) + + @staticmethod + def _discretize_dict_values_nested( + nested_dict: Dict[str, Dict[str, Dict[str, int]]], + ) -> Dict[str, Dict[str, Dict[str, int]]]: + """Helper method to discretize values in a nested dictionary structure with three levels.""" result = {} - for entity_type, aspect_counts in self.aspects.items(): - result[entity_type] = dict(aspect_counts) + for outer_key, middle_dict in nested_dict.items(): + discretized_middle_dict: Dict[str, Dict[str, int]] = {} + for middle_key, inner_dict in middle_dict.items(): + discretized_inner_dict: Dict[str, int] = {} + for inner_key, count in inner_dict.items(): + discretized_inner_dict[inner_key] = stats.discretize(count) + discretized_middle_dict[middle_key] = discretized_inner_dict + result[outer_key] = discretized_middle_dict return result def compute_stats(self) -> None: @@ -512,7 +506,7 @@ class Source(Closeable, metaclass=ABCMeta): pass def close(self) -> None: - pass + self.get_report().close() def _infer_platform(self) -> Optional[str]: config = self.get_config() diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index dd5c418303..3310c6703e 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -579,16 +579,16 @@ class Pipeline: sink_warnings = len(self.sink.get_report().warnings) global_warnings = len(get_global_warnings()) source_aspects = self.source.get_report().get_aspects_dict() - entity_dict: Dict[str, int] - for _, entity_dict in source_aspects.items(): - for aspect_name, aspect_count in entity_dict.items(): - entity_dict[aspect_name] = stats.discretize(aspect_count) + source_aspects_by_subtype = ( + self.source.get_report().get_aspects_by_subtypes_dict() + ) telemetry_instance.ping( "ingest_stats", { "source_type": self.source_type, "source_aspects": source_aspects, + "source_aspects_by_subtype": source_aspects_by_subtype, "sink_type": self.sink_type, "transformer_types": [ transformer.type for transformer in self.config.transformers or [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py index 779036aab9..ab540822b6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py @@ -94,3 +94,4 @@ class BigQueryQueriesSource(Source): def close(self) -> None: self.queries_extractor.close() self.connection.close() + super().close() diff --git a/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py b/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py index 8ba4e8a1c3..8ff61ed866 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py @@ -266,11 +266,8 @@ class DataHubMockDataSource(Source): fan_out, hops, fan_out_after_first ) - logger.info( - f"About to create {tables_to_be_created} tables for lineage testing" - ) + logger.info(f"About to create {tables_to_be_created} datasets mock data") - current_progress = 0 for i in range(hops + 1): tables_at_level = tables_at_levels[i] @@ -291,12 +288,6 @@ class DataHubMockDataSource(Source): tables_at_levels=tables_at_levels, ) - current_progress += 1 - if current_progress % 1000 == 0: - logger.info( - f"Progress: {current_progress}/{tables_to_be_created} tables processed" - ) - def _generate_lineage_for_table( self, table_name: str, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index 5dd8793ce3..52d3bd3dab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -663,6 +663,7 @@ class SnowflakeQueriesSource(Source): def close(self) -> None: self.connection.close() self.queries_extractor.close() + super().close() # Make sure we don't try to generate too much info for a single query. diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py index bbd8956b47..179ace6c44 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py @@ -107,4 +107,6 @@ def test_source_close_cleans_tmp(projects_client, client, tmp_path): assert len(os.listdir(tmp_path)) > 0 # This closes QueriesExtractor which in turn closes SqlParsingAggregator source.close() - assert len(os.listdir(tmp_path)) == 0 + assert len(os.listdir(tmp_path)) == 0, ( + f"Files left in {tmp_path}: {os.listdir(tmp_path)}" + ) diff --git a/metadata-ingestion/tests/unit/test_source.py b/metadata-ingestion/tests/unit/test_source.py index d2ed21fccb..21b3a095d9 100644 --- a/metadata-ingestion/tests/unit/test_source.py +++ b/metadata-ingestion/tests/unit/test_source.py @@ -4,23 +4,30 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.metadata.schema_classes import StatusClass +from datahub.metadata.schema_classes import ( + StatusClass, + SubTypesClass, +) from datahub.utilities.urns.dataset_urn import DatasetUrn +def _get_urn() -> str: + return str( + DatasetUrn.create_from_ids( + platform_id="elasticsearch", + table_name="fooIndex", + env="PROD", + ) + ) + + class FakeSource(Source): def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: return [ MetadataWorkUnit( id="test-workunit", mcp=MetadataChangeProposalWrapper( - entityUrn=str( - DatasetUrn.create_from_ids( - platform_id="elasticsearch", - table_name="fooIndex", - env="PROD", - ) - ), + entityUrn=_get_urn(), aspect=StatusClass(removed=False), ), ) @@ -39,3 +46,56 @@ class FakeSource(Source): def close(self) -> None: return super().close() + + +def test_aspects_by_subtypes(): + source = FakeSource(PipelineContext(run_id="test_aspects_by_subtypes")) + for wu in source.get_workunits_internal(): + source.source_report.report_workunit(wu) + + source.source_report.compute_stats() + assert source.source_report.get_aspects_by_subtypes_dict() == { + "dataset": { + "unknown": {"status": 1}, + } + } + source.source_report.report_workunit( + MetadataChangeProposalWrapper( + entityUrn=_get_urn(), + aspect=SubTypesClass(typeNames=["Table"]), + ).as_workunit() + ) + source.source_report.compute_stats() + assert source.source_report.get_aspects_by_subtypes_dict() == { + "dataset": { + "Table": {"status": 1, "subTypes": 1}, + } + } + + +def test_discretize_dict_values(): + """Test the _discretize_dict_values static method.""" + test_dict = { + "dataset": { + "schemaMetadata": 5, + "status": 12, + "ownership": 3, + }, + "chart": { + "status": 8, + "ownership": 1, + }, + } + + result = SourceReport._discretize_dict_values(test_dict) + assert result == { + "dataset": { + "schemaMetadata": 4, + "status": 8, + "ownership": 2, + }, + "chart": { + "status": 8, + "ownership": 1, + }, + }