feat(ingest): fix bugs in SqlParsingAggregator (#9926)

This commit is contained in:
Harshal Sheth 2024-02-28 06:06:33 -08:00 committed by GitHub
parent 92b1cfa194
commit 1736edf8f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 43 additions and 15 deletions

View File

@ -80,7 +80,7 @@ class SchemaResolver(Closeable, SchemaResolverInterface):
def schema_count(self) -> int:
return int(
self._schema_cache.sql_query(
f"SELECT COUNT(*) FROM {self._schema_cache.tablename} WHERE is_missing"
f"SELECT COUNT(*) FROM {self._schema_cache.tablename} WHERE NOT is_missing"
)[0][0]
)

View File

@ -115,6 +115,7 @@ class SqlAggregatorReport(Report):
_aggregator: "SqlParsingAggregator"
query_log_path: Optional[str] = None
# Observed queries.
num_observed_queries: int = 0
num_observed_queries_failed: int = 0
num_observed_queries_column_timeout: int = 0
@ -123,6 +124,7 @@ class SqlAggregatorReport(Report):
default_factory=LossyList
)
# Views.
num_view_definitions: int = 0
num_views_failed: int = 0
num_views_column_timeout: int = 0
@ -131,28 +133,30 @@ class SqlAggregatorReport(Report):
default_factory=LossyDict
)
# Other lineage loading metrics.
num_known_query_lineage: int = 0
num_known_mapping_lineage: int = 0
num_table_renames: int = 0
num_queries_with_temp_tables_in_session: int = 0
num_unique_query_fingerprints: Optional[int] = None
# Lineage-related.
num_urns_with_lineage: Optional[int] = None
# Temp tables.
num_temp_sessions: Optional[int] = None
num_inferred_temp_schemas: Optional[int] = None
num_queries_with_temp_tables_in_session: int = 0
queries_with_temp_upstreams: LossyDict[QueryId, LossyList] = dataclasses.field(
default_factory=LossyDict
)
# Lineage-related.
schema_resolver_count: Optional[int] = None
num_unique_query_fingerprints: Optional[int] = None
num_urns_with_lineage: Optional[int] = None
num_queries_entities_generated: int = 0
# Usage-related.
usage_skipped_missing_timestamp: int = 0
def compute_stats(self) -> None:
self.schema_resolver_count = self._aggregator._schema_resolver.schema_count()
self.num_unique_query_fingerprints = len(self._aggregator._query_map)
self.num_urns_with_lineage = len(self._aggregator._lineage_map)
@ -865,6 +869,9 @@ class SqlParsingAggregator:
confidenceScore=queries_map[query_id].confidence_score,
)
)
upstream_aspect.fineGrainedLineages = (
upstream_aspect.fineGrainedLineages or None
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn,

View File

@ -19,8 +19,7 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
]
}
}
},
@ -44,8 +43,7 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
]
}
}
},
@ -69,8 +67,7 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
]
}
}
}

View File

@ -205,8 +205,7 @@
"type": "TRANSFORMED",
"query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71"
}
],
"fineGrainedLineages": []
]
}
}
},

View File

@ -1,6 +1,31 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver, _TableName
def test_basic_schema_resolver():
schema_resolver = SchemaResolver(
platform="redshift",
env="PROD",
graph=None,
)
schema_resolver.add_raw_schema_info(
urn="urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)",
schema_info={"name": "STRING"},
)
urn, schema = schema_resolver.resolve_table(
_TableName(database="my_db", db_schema="public", table="test_table")
)
assert (
urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)"
)
assert schema
assert schema["name"]
assert schema_resolver.schema_count() == 1
def test_get_urn_for_table_lowercase():
schema_resolver = SchemaResolver(
platform="mssql",