From 1736edf8f5d8dd36ed005e19a2ded80b645099a7 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 28 Feb 2024 06:06:33 -0800 Subject: [PATCH] feat(ingest): fix bugs in SqlParsingAggregator (#9926) --- .../datahub/sql_parsing/schema_resolver.py | 2 +- .../sql_parsing/sql_parsing_aggregator.py | 19 +++++++++----- .../test_known_lineage_mapping.json | 9 +++---- .../aggregator_goldens/test_temp_table.json | 3 +-- .../unit/sql_parsing/test_schemaresolver.py | 25 +++++++++++++++++++ 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index ce43fb5da5..ec52e83921 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -80,7 +80,7 @@ class SchemaResolver(Closeable, SchemaResolverInterface): def schema_count(self) -> int: return int( self._schema_cache.sql_query( - f"SELECT COUNT(*) FROM {self._schema_cache.tablename} WHERE is_missing" + f"SELECT COUNT(*) FROM {self._schema_cache.tablename} WHERE NOT is_missing" )[0][0] ) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index cecaef33ef..49b58ddd22 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -115,6 +115,7 @@ class SqlAggregatorReport(Report): _aggregator: "SqlParsingAggregator" query_log_path: Optional[str] = None + # Observed queries. num_observed_queries: int = 0 num_observed_queries_failed: int = 0 num_observed_queries_column_timeout: int = 0 @@ -123,6 +124,7 @@ class SqlAggregatorReport(Report): default_factory=LossyList ) + # Views. num_view_definitions: int = 0 num_views_failed: int = 0 num_views_column_timeout: int = 0 @@ -131,28 +133,30 @@ class SqlAggregatorReport(Report): default_factory=LossyDict ) + # Other lineage loading metrics. num_known_query_lineage: int = 0 num_known_mapping_lineage: int = 0 num_table_renames: int = 0 - num_queries_with_temp_tables_in_session: int = 0 - - num_unique_query_fingerprints: Optional[int] = None - - # Lineage-related. - num_urns_with_lineage: Optional[int] = None + # Temp tables. num_temp_sessions: Optional[int] = None num_inferred_temp_schemas: Optional[int] = None + num_queries_with_temp_tables_in_session: int = 0 queries_with_temp_upstreams: LossyDict[QueryId, LossyList] = dataclasses.field( default_factory=LossyDict ) + # Lineage-related. + schema_resolver_count: Optional[int] = None + num_unique_query_fingerprints: Optional[int] = None + num_urns_with_lineage: Optional[int] = None num_queries_entities_generated: int = 0 # Usage-related. usage_skipped_missing_timestamp: int = 0 def compute_stats(self) -> None: + self.schema_resolver_count = self._aggregator._schema_resolver.schema_count() self.num_unique_query_fingerprints = len(self._aggregator._query_map) self.num_urns_with_lineage = len(self._aggregator._lineage_map) @@ -865,6 +869,9 @@ class SqlParsingAggregator: confidenceScore=queries_map[query_id].confidence_score, ) ) + upstream_aspect.fineGrainedLineages = ( + upstream_aspect.fineGrainedLineages or None + ) yield MetadataChangeProposalWrapper( entityUrn=downstream_urn, diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json index ab210c6f70..86d5ef34a8 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json @@ -19,8 +19,7 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)", "type": "COPY" } - ], - "fineGrainedLineages": [] + ] } } }, @@ -44,8 +43,7 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", "type": "COPY" } - ], - "fineGrainedLineages": [] + ] } } }, @@ -69,8 +67,7 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", "type": "COPY" } - ], - "fineGrainedLineages": [] + ] } } } diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json index b93e7e0f52..5e61fb2b6a 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json @@ -205,8 +205,7 @@ "type": "TRANSFORMED", "query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71" } - ], - "fineGrainedLineages": [] + ] } } }, diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py index 5786c135a8..5a33034f27 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py @@ -1,6 +1,31 @@ from datahub.sql_parsing.schema_resolver import SchemaResolver, _TableName +def test_basic_schema_resolver(): + schema_resolver = SchemaResolver( + platform="redshift", + env="PROD", + graph=None, + ) + + schema_resolver.add_raw_schema_info( + urn="urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)", + schema_info={"name": "STRING"}, + ) + + urn, schema = schema_resolver.resolve_table( + _TableName(database="my_db", db_schema="public", table="test_table") + ) + assert ( + urn + == "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + ) + assert schema + assert schema["name"] + + assert schema_resolver.schema_count() == 1 + + def test_get_urn_for_table_lowercase(): schema_resolver = SchemaResolver( platform="mssql",