mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-05 21:29:48 +00:00
feat(ingest/databricks): report unique query count from usage (#11576)
This commit is contained in:
parent
8c53c54081
commit
20d0999601
@ -12,8 +12,10 @@ from datahub.utilities.perf_timer import PerfTimer
|
|||||||
class UnityCatalogUsagePerfReport(Report):
|
class UnityCatalogUsagePerfReport(Report):
|
||||||
get_queries_timer: PerfTimer = field(default_factory=PerfTimer)
|
get_queries_timer: PerfTimer = field(default_factory=PerfTimer)
|
||||||
sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer)
|
sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer)
|
||||||
|
spark_sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer)
|
||||||
aggregator_add_event_timer: PerfTimer = field(default_factory=PerfTimer)
|
aggregator_add_event_timer: PerfTimer = field(default_factory=PerfTimer)
|
||||||
gen_operation_timer: PerfTimer = field(default_factory=PerfTimer)
|
gen_operation_timer: PerfTimer = field(default_factory=PerfTimer)
|
||||||
|
query_fingerprinting_timer: PerfTimer = field(default_factory=PerfTimer)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -32,6 +34,7 @@ class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport):
|
|||||||
num_external_upstreams_unsupported: int = 0
|
num_external_upstreams_unsupported: int = 0
|
||||||
|
|
||||||
num_queries: int = 0
|
num_queries: int = 0
|
||||||
|
num_unique_queries: int = 0
|
||||||
num_queries_dropped_parse_failure: int = 0
|
num_queries_dropped_parse_failure: int = 0
|
||||||
num_queries_missing_table: int = 0 # Can be due to pattern filter
|
num_queries_missing_table: int = 0 # Can be due to pattern filter
|
||||||
num_queries_duplicate_table: int = 0
|
num_queries_duplicate_table: int = 0
|
||||||
|
|||||||
@ -22,6 +22,7 @@ from datahub.ingestion.source.unity.proxy_types import (
|
|||||||
from datahub.ingestion.source.unity.report import UnityCatalogReport
|
from datahub.ingestion.source.unity.report import UnityCatalogReport
|
||||||
from datahub.ingestion.source.usage.usage_common import UsageAggregator
|
from datahub.ingestion.source.usage.usage_common import UsageAggregator
|
||||||
from datahub.metadata.schema_classes import OperationClass
|
from datahub.metadata.schema_classes import OperationClass
|
||||||
|
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -76,6 +77,7 @@ class UnityCatalogUsageExtractor:
|
|||||||
self, table_refs: Set[TableReference]
|
self, table_refs: Set[TableReference]
|
||||||
) -> Iterable[MetadataWorkUnit]:
|
) -> Iterable[MetadataWorkUnit]:
|
||||||
table_map = defaultdict(list)
|
table_map = defaultdict(list)
|
||||||
|
query_hashes = set()
|
||||||
for ref in table_refs:
|
for ref in table_refs:
|
||||||
table_map[ref.table].append(ref)
|
table_map[ref.table].append(ref)
|
||||||
table_map[f"{ref.schema}.{ref.table}"].append(ref)
|
table_map[f"{ref.schema}.{ref.table}"].append(ref)
|
||||||
@ -85,6 +87,13 @@ class UnityCatalogUsageExtractor:
|
|||||||
for query in self._get_queries():
|
for query in self._get_queries():
|
||||||
self.report.num_queries += 1
|
self.report.num_queries += 1
|
||||||
with current_timer.pause():
|
with current_timer.pause():
|
||||||
|
with self.report.usage_perf_report.query_fingerprinting_timer:
|
||||||
|
query_hashes.add(
|
||||||
|
get_query_fingerprint(
|
||||||
|
query.query_text, "databricks", fast=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.report.num_unique_queries = len(query_hashes)
|
||||||
table_info = self._parse_query(query, table_map)
|
table_info = self._parse_query(query, table_map)
|
||||||
if table_info is not None:
|
if table_info is not None:
|
||||||
if self.config.include_operational_stats:
|
if self.config.include_operational_stats:
|
||||||
@ -166,7 +175,8 @@ class UnityCatalogUsageExtractor:
|
|||||||
with self.report.usage_perf_report.sql_parsing_timer:
|
with self.report.usage_perf_report.sql_parsing_timer:
|
||||||
table_info = self._parse_query_via_lineage_runner(query.query_text)
|
table_info = self._parse_query_via_lineage_runner(query.query_text)
|
||||||
if table_info is None and query.statement_type == QueryStatementType.SELECT:
|
if table_info is None and query.statement_type == QueryStatementType.SELECT:
|
||||||
table_info = self._parse_query_via_spark_sql_plan(query.query_text)
|
with self.report.usage_perf_report.spark_sql_parsing_timer:
|
||||||
|
table_info = self._parse_query_via_spark_sql_plan(query.query_text)
|
||||||
|
|
||||||
if table_info is None:
|
if table_info is None:
|
||||||
self.report.num_queries_dropped_parse_failure += 1
|
self.report.num_queries_dropped_parse_failure += 1
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user