diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index 0a1e40bd18..de22f92a55 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -336,7 +336,7 @@ class QueryEvent: if raw_dest_table: query_event.destinationTable = BigQueryTableRef.from_spec_obj( raw_dest_table - ) + ).get_sanitized_table_ref() # statementType # referencedTables job_stats: Dict = job["jobStatistics"] @@ -346,13 +346,15 @@ class QueryEvent: raw_ref_tables = job_stats.get("referencedTables") if raw_ref_tables: query_event.referencedTables = [ - BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_tables + BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref() + for spec in raw_ref_tables ] # referencedViews raw_ref_views = job_stats.get("referencedViews") if raw_ref_views: query_event.referencedViews = [ - BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_views + BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref() + for spec in raw_ref_views ] # payload query_event.payload = entry.payload if debug_include_full_payloads else None @@ -415,18 +417,20 @@ class QueryEvent: if raw_dest_table: query_event.destinationTable = BigQueryTableRef.from_string_name( raw_dest_table - ) + ).get_sanitized_table_ref() # referencedTables raw_ref_tables = query_stats.get("referencedTables") if raw_ref_tables: query_event.referencedTables = [ - BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables + BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() + for spec in raw_ref_tables ] # referencedViews raw_ref_views = query_stats.get("referencedViews") if raw_ref_views: query_event.referencedViews = [ - BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views + BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() + for spec in raw_ref_views ] # payload query_event.payload = payload if debug_include_full_payloads else None @@ -479,19 +483,21 @@ class QueryEvent: if raw_dest_table: query_event.destinationTable = BigQueryTableRef.from_string_name( raw_dest_table - ) + ).get_sanitized_table_ref() # statementType # referencedTables raw_ref_tables = query_stats.get("referencedTables") if raw_ref_tables: query_event.referencedTables = [ - BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables + BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() + for spec in raw_ref_tables ] # referencedViews raw_ref_views = query_stats.get("referencedViews") if raw_ref_views: query_event.referencedViews = [ - BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views + BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() + for spec in raw_ref_views ] # payload query_event.payload = payload if debug_include_full_payloads else None @@ -568,10 +574,14 @@ class ReadEvent: if readReason == "JOB": jobName = readInfo.get("jobName") + resource = BigQueryTableRef.from_string_name( + resourceName + ).get_sanitized_table_ref() + readEvent = ReadEvent( actor_email=user, timestamp=entry.timestamp, - resource=BigQueryTableRef.from_string_name(resourceName), + resource=resource, fieldsRead=fields, readReason=readReason, jobName=jobName, @@ -602,10 +612,14 @@ class ReadEvent: if readReason == "JOB": jobName = readInfo.get("jobName") + resource = BigQueryTableRef.from_string_name( + resourceName + ).get_sanitized_table_ref() + readEvent = ReadEvent( actor_email=user, timestamp=row["timestamp"], - resource=BigQueryTableRef.from_string_name(resourceName), + resource=resource, fieldsRead=fields, readReason=readReason, jobName=jobName, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index cdd4a2f9de..69e68c59fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -10,7 +10,7 @@ import pydantic from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.stats_collections import TopKDict +from datahub.utilities.stats_collections import TopKDict, int_top_k_dict logger: logging.Logger = logging.getLogger(__name__) @@ -19,24 +19,22 @@ logger: logging.Logger = logging.getLogger(__name__) class BigQueryV2Report(ProfilingSqlReport): num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict) num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field( - default_factory=TopKDict + default_factory=int_top_k_dict ) num_skipped_lineage_entries_not_allowed: TopKDict[str, int] = field( - default_factory=TopKDict + default_factory=int_top_k_dict ) num_lineage_entries_sql_parser_failure: TopKDict[str, int] = field( - default_factory=TopKDict - ) - num_lineage_entries_sql_parser_success: TopKDict[str, int] = field( - default_factory=TopKDict + default_factory=int_top_k_dict ) num_skipped_lineage_entries_other: TopKDict[str, int] = field( - default_factory=TopKDict + default_factory=int_top_k_dict + ) + num_total_log_entries: TopKDict[str, int] = field(default_factory=int_top_k_dict) + num_parsed_log_entries: TopKDict[str, int] = field(default_factory=int_top_k_dict) + num_lineage_log_parse_failures: TopKDict[str, int] = field( + default_factory=int_top_k_dict ) - num_total_log_entries: TopKDict[str, int] = field(default_factory=TopKDict) - num_parsed_log_entries: TopKDict[str, int] = field(default_factory=TopKDict) - num_total_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict) - num_parsed_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict) bigquery_audit_metadata_datasets_missing: Optional[bool] = None lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList) lineage_metadata_entries: TopKDict[str, int] = field(default_factory=TopKDict) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index cd9ebed231..4a17d25026 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -3,7 +3,7 @@ import logging import textwrap from dataclasses import dataclass from datetime import datetime -from typing import Dict, Iterable, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union import humanfriendly from google.cloud.bigquery import Client as BigQueryClient @@ -18,6 +18,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigqueryTableIdentifier, BigQueryTableRef, QueryEvent, + ReadEvent, ) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report @@ -87,7 +88,6 @@ timestamp < "{end_time}" def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report): self.config = config self.report = report - self.loaded_project_ids: List[str] = [] def error(self, log: logging.Logger, key: str, reason: str) -> None: self.report.report_failure(key, reason) @@ -154,54 +154,7 @@ timestamp < "{end_time}" return textwrap.dedent(query) - def compute_bigquery_lineage_via_gcp_logging( - self, project_id: str - ) -> Dict[str, Set[LineageEdge]]: - logger.info(f"Populating lineage info via GCP audit logs for {project_id}") - try: - clients: GCPLoggingClient = _make_gcp_logging_client(project_id) - - log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries( - clients - ) - logger.info("Log Entries loaded") - parsed_entries: Iterable[QueryEvent] = self._parse_bigquery_log_entries( - log_entries - ) - return self._create_lineage_map(parsed_entries) - except Exception as e: - self.error( - logger, - "lineage-gcp-logs", - f"Failed to get lineage gcp logging for {project_id}. The error message was {e}", - ) - raise e - - def compute_bigquery_lineage_via_exported_bigquery_audit_metadata( - self, - ) -> Dict[str, Set[LineageEdge]]: - logger.info("Populating lineage info via exported GCP audit logs") - try: - # For exported logs we want to submit queries with the credentials project_id. - _client: BigQueryClient = get_bigquery_client(self.config) - exported_bigquery_audit_metadata: Iterable[ - BigQueryAuditMetadata - ] = self._get_exported_bigquery_audit_metadata(_client) - parsed_entries: Iterable[ - QueryEvent - ] = self._parse_exported_bigquery_audit_metadata( - exported_bigquery_audit_metadata - ) - return self._create_lineage_map(parsed_entries) - except Exception as e: - self.error( - logger, - "lineage-exported-gcp-audit-logs", - f"Error: {e}", - ) - raise e - - def compute_bigquery_lineage_via_catalog_lineage_api( + def lineage_via_catalog_lineage_api( self, project_id: str ) -> Dict[str, Set[LineageEdge]]: """ @@ -310,9 +263,33 @@ timestamp < "{end_time}" ) raise e + def _get_parsed_audit_log_events(self, project_id: str) -> Iterable[QueryEvent]: + parse_fn: Callable[[Any], Optional[Union[ReadEvent, QueryEvent]]] + if self.config.use_exported_bigquery_audit_metadata: + logger.info("Populating lineage info via exported GCP audit logs") + bq_client = get_bigquery_client(self.config) + entries = self._get_exported_bigquery_audit_metadata(bq_client) + parse_fn = self._parse_exported_bigquery_audit_metadata + else: + logger.info("Populating lineage info via exported GCP audit logs") + logging_client = _make_gcp_logging_client(project_id) + entries = self._get_bigquery_log_entries(logging_client) + parse_fn = self._parse_bigquery_log_entries + + for entry in entries: + self.report.num_total_log_entries[project_id] += 1 + try: + event = parse_fn(entry) + if event: + self.report.num_parsed_log_entries[project_id] += 1 + yield event + except Exception as e: + logger.warning(f"Unable to parse log entry `{entry}`: {e}") + self.report.num_lineage_log_parse_failures[project_id] += 1 + def _get_bigquery_log_entries( self, client: GCPLoggingClient, limit: Optional[int] = None - ) -> Union[Iterable[AuditLogEntry], Iterable[BigQueryAuditMetadata]]: + ) -> Union[Iterable[AuditLogEntry]]: self.report.num_total_log_entries[client.project] = 0 # Add a buffer to start and end time to account for delays in logging events. start_time = (self.config.start_time - self.config.max_query_duration).strftime( @@ -420,67 +397,52 @@ timestamp < "{end_time}" # events to also create field level lineage. def _parse_bigquery_log_entries( self, - entries: Union[Iterable[AuditLogEntry], Iterable[BigQueryAuditMetadata]], - ) -> Iterable[QueryEvent]: - for entry in entries: - event: Optional[QueryEvent] = None + entry: AuditLogEntry, + ) -> Optional[QueryEvent]: + event: Optional[QueryEvent] = None - missing_entry = QueryEvent.get_missing_key_entry(entry=entry) - if missing_entry is None: - event = QueryEvent.from_entry( - entry, - debug_include_full_payloads=self.config.debug_include_full_payloads, - ) - - missing_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry=entry) - if event is None and missing_entry_v2 is None: - event = QueryEvent.from_entry_v2( - entry, self.config.debug_include_full_payloads - ) - - if event is None: - self.error( - logger, - f"{entry.log_name}-{entry.insert_id}", - f"Unable to parse log missing {missing_entry}, missing v2 {missing_entry_v2} for {entry}", - ) - else: - self.report.num_parsed_log_entries[event.project_id] = ( - self.report.num_parsed_log_entries.get(event.project_id, 0) + 1 - ) - yield event - - def _parse_exported_bigquery_audit_metadata( - self, audit_metadata_rows: Iterable[BigQueryAuditMetadata] - ) -> Iterable[QueryEvent]: - for audit_metadata in audit_metadata_rows: - event: Optional[QueryEvent] = None - - missing_exported_audit = ( - QueryEvent.get_missing_key_exported_bigquery_audit_metadata( - audit_metadata - ) + missing_entry = QueryEvent.get_missing_key_entry(entry=entry) + if missing_entry is None: + event = QueryEvent.from_entry( + entry, + debug_include_full_payloads=self.config.debug_include_full_payloads, ) - if missing_exported_audit is None: - event = QueryEvent.from_exported_bigquery_audit_metadata( - audit_metadata, self.config.debug_include_full_payloads - ) + missing_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry=entry) + if event is None and missing_entry_v2 is None: + event = QueryEvent.from_entry_v2( + entry, self.config.debug_include_full_payloads + ) - if event is None: - self.error( - logger, - f"{audit_metadata['logName']}-{audit_metadata['insertId']}", - f"Unable to parse audit metadata missing {missing_exported_audit} for {audit_metadata}", - ) - else: - self.report.num_parsed_audit_entries[event.project_id] = ( - self.report.num_parsed_audit_entries.get(event.project_id, 0) + 1 - ) - self.report.num_total_audit_entries[event.project_id] = ( - self.report.num_total_audit_entries.get(event.project_id, 0) + 1 - ) - yield event + if event is None: + logger.warning( + f"Unable to parse log missing {missing_entry}, missing v2 {missing_entry_v2} for {entry}", + ) + return None + else: + return event + + def _parse_exported_bigquery_audit_metadata( + self, audit_metadata: BigQueryAuditMetadata + ) -> Optional[QueryEvent]: + event: Optional[QueryEvent] = None + + missing_exported_audit = ( + QueryEvent.get_missing_key_exported_bigquery_audit_metadata(audit_metadata) + ) + + if missing_exported_audit is None: + event = QueryEvent.from_exported_bigquery_audit_metadata( + audit_metadata, self.config.debug_include_full_payloads + ) + + if event is None: + logger.warning( + f"Unable to parse audit metadata missing {missing_exported_audit} for {audit_metadata}", + ) + return None + else: + return event def _create_lineage_map( self, entries: Iterable[QueryEvent] @@ -495,59 +457,34 @@ timestamp < "{end_time}" if e.destinationTable is None or not ( e.referencedTables or e.referencedViews ): - self.report.num_skipped_lineage_entries_missing_data[e.project_id] = ( - self.report.num_skipped_lineage_entries_missing_data.get( - e.project_id, 0 - ) - + 1 - ) + self.report.num_skipped_lineage_entries_missing_data[e.project_id] += 1 continue - # Skip if schema/table pattern don't allow the destination table - try: - destination_table = e.destinationTable.get_sanitized_table_ref() - except Exception: - self.report.num_skipped_lineage_entries_missing_data[e.project_id] = ( - self.report.num_skipped_lineage_entries_missing_data.get( - e.project_id, 0 - ) - + 1 - ) - continue - - destination_table_str = str( - BigQueryTableRef(table_identifier=destination_table.table_identifier) - ) if not self.config.dataset_pattern.allowed( - destination_table.table_identifier.dataset + e.destinationTable.table_identifier.dataset ) or not self.config.table_pattern.allowed( - destination_table.table_identifier.get_table_name() + e.destinationTable.table_identifier.get_table_name() ): - self.report.num_skipped_lineage_entries_not_allowed[e.project_id] = ( - self.report.num_skipped_lineage_entries_not_allowed.get( - e.project_id, 0 - ) - + 1 - ) + self.report.num_skipped_lineage_entries_not_allowed[e.project_id] += 1 continue + + destination_table_str = str(e.destinationTable) has_table = False for ref_table in e.referencedTables: - ref_table_str = str(ref_table.get_sanitized_table_ref()) - if ref_table_str != destination_table_str: + if str(ref_table) != destination_table_str: lineage_map[destination_table_str].add( LineageEdge( - table=ref_table_str, + table=str(ref_table), auditStamp=e.end_time if e.end_time else datetime.now(), ) ) has_table = True has_view = False for ref_view in e.referencedViews: - ref_view_str = str(ref_view.get_sanitized_table_ref()) - if ref_view_str != destination_table_str: + if str(ref_view) != destination_table_str: lineage_map[destination_table_str].add( LineageEdge( - table=ref_view_str, + table=str(ref_view), auditStamp=e.end_time if e.end_time else datetime.now(), ) ) @@ -569,12 +506,9 @@ timestamp < "{end_time}" logger.debug( f"Sql Parser failed on query: {e.query}. It won't cause any issue except table/view lineage can't be detected reliably. The error was {ex}." ) - self.report.num_lineage_entries_sql_parser_failure[e.project_id] = ( - self.report.num_lineage_entries_sql_parser_failure.get( - e.project_id, 0 - ) - + 1 - ) + self.report.num_lineage_entries_sql_parser_failure[ + e.project_id + ] += 1 continue curr_lineage = lineage_map[destination_table_str] new_lineage = set() @@ -584,10 +518,7 @@ timestamp < "{end_time}" new_lineage.add(lineage) lineage_map[destination_table_str] = new_lineage if not (has_table or has_view): - self.report.num_skipped_lineage_entries_other[e.project_id] = ( - self.report.num_skipped_lineage_entries_other.get(e.project_id, 0) - + 1 - ) + self.report.num_skipped_lineage_entries_other[e.project_id] += 1 logger.info("Exiting create lineage map function") return lineage_map @@ -641,42 +572,23 @@ timestamp < "{end_time}" return list(parsed_tables) def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[LineageEdge]]: - lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( - config=self.config, report=self.report - ) lineage_metadata: Dict[str, Set[LineageEdge]] try: if self.config.extract_lineage_from_catalog and self.config.include_tables: - lineage_metadata = ( - lineage_extractor.compute_bigquery_lineage_via_catalog_lineage_api( - project_id - ) - ) + lineage_metadata = self.lineage_via_catalog_lineage_api(project_id) else: - if self.config.use_exported_bigquery_audit_metadata: - # Exported bigquery_audit_metadata should contain every projects' audit metada - if self.loaded_project_ids: - return {} - lineage_metadata = ( - lineage_extractor.compute_bigquery_lineage_via_exported_bigquery_audit_metadata() - ) - else: - lineage_metadata = ( - lineage_extractor.compute_bigquery_lineage_via_gcp_logging( - project_id - ) - ) + events = self._get_parsed_audit_log_events(project_id) + lineage_metadata = self._create_lineage_map(events) except Exception as e: if project_id: self.report.lineage_failed_extraction.append(project_id) - logger.error( - f"Unable to extract lineage for project {project_id} due to error {e}" + self.error( + logger, + "lineage", + f"{project_id}: {e}", ) lineage_metadata = {} - if lineage_metadata is None: - lineage_metadata = {} - self.report.lineage_mem_size[project_id] = humanfriendly.format_size( memory_footprint.total_size(lineage_metadata) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 71afceb4d3..c9dcb4fe35 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -244,12 +244,7 @@ WHERE dataset_name, table.last_altered, table.size_in_bytes, table.rows_count ): profile_table_level_only = True - self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] = ( - self.report.num_tables_not_eligible_profiling.get( - f"{project}.{dataset}", 0 - ) - + 1 - ) + self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] += 1 if not table.column_count: skip_profiling = True diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 2995ea0bea..c4ac516d01 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -5,7 +5,7 @@ import time import traceback from dataclasses import dataclass from datetime import datetime -from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union +from typing import Any, Callable, Dict, Iterable, List, MutableMapping, Optional, Union import cachetools from google.cloud.bigquery import Client as BigQueryClient @@ -132,7 +132,7 @@ def bigquery_audit_metadata_query_template( """ audit_log_filter_timestamps = """AND (timestamp >= "{start_time}" AND timestamp < "{end_time}" - ); + ) """ audit_log_filter_query_complete = f""" AND ( @@ -193,15 +193,7 @@ class BigQueryUsageExtractor: parsed_events: Iterable[Union[ReadEvent, QueryEvent]] with PerfTimer() as timer: try: - bigquery_log_entries = self._get_parsed_bigquery_log_events(project_id) - if self.config.use_exported_bigquery_audit_metadata: - parsed_events = self._parse_exported_bigquery_audit_metadata( - bigquery_log_entries - ) - else: - parsed_events = self._parse_bigquery_log_entries( - bigquery_log_entries - ) + parsed_events = self._get_parsed_bigquery_log_events(project_id) hydrated_read_events = self._join_events_by_job_id(parsed_events) # storing it all in one big object. @@ -213,7 +205,7 @@ class BigQueryUsageExtractor: self.report.num_operational_stats_workunits_emitted = 0 for event in hydrated_read_events: if self.config.usage.include_operational_stats: - operational_wu = self._create_operation_aspect_work_unit(event) + operational_wu = self._create_operation_workunit(event) if operational_wu: yield operational_wu self.report.num_operational_stats_workunits_emitted += 1 @@ -308,7 +300,10 @@ class BigQueryUsageExtractor: ) query = bigquery_audit_metadata_query_template( - dataset, self.config.use_date_sharded_audit_log_tables, allow_filter + dataset, + self.config.use_date_sharded_audit_log_tables, + allow_filter, + limit, ).format( start_time=start_time, end_time=end_time, @@ -326,14 +321,14 @@ class BigQueryUsageExtractor: def _get_bigquery_log_entries_via_gcp_logging( self, client: GCPLoggingClient, limit: Optional[int] = None - ) -> Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]]: + ) -> Iterable[AuditLogEntry]: self.report.total_query_log_entries = 0 filter = self._generate_filter(BQ_AUDIT_V2) logger.debug(filter) try: - list_entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]] + list_entries: Iterable[AuditLogEntry] rate_limiter: Optional[RateLimiter] = None if self.config.rate_limit: # client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging @@ -437,9 +432,9 @@ class BigQueryUsageExtractor: and event.query_event and event.query_event.destinationTable ): - return event.query_event.destinationTable.get_sanitized_table_ref() + return event.query_event.destinationTable elif event.read_event: - return event.read_event.resource.get_sanitized_table_ref() + return event.read_event.resource else: # TODO: CREATE_SCHEMA operation ends up here, maybe we should capture that as well # but it is tricky as we only get the query so it can't be tied to anything @@ -492,7 +487,7 @@ class BigQueryUsageExtractor: else: return None - def _create_operation_aspect_work_unit( + def _create_operation_workunit( self, event: AuditEvent ) -> Optional[MetadataWorkUnit]: if not event.read_event and not event.query_event: @@ -518,15 +513,7 @@ class BigQueryUsageExtractor: affected_datasets = [] if event.query_event and event.query_event.referencedTables: for table in event.query_event.referencedTables: - try: - affected_datasets.append( - table.get_sanitized_table_ref().to_urn(self.config.env) - ) - except Exception as e: - self.report.report_warning( - str(table), - f"Failed to clean up table, {e}", - ) + affected_datasets.append(table.to_urn(self.config.env)) operation_aspect = OperationClass( timestampMillis=reported_time, @@ -587,92 +574,76 @@ class BigQueryUsageExtractor: return custom_properties - def _parse_bigquery_log_entries( - self, entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]] - ) -> Iterable[Union[ReadEvent, QueryEvent]]: + def _parse_bigquery_log_entry( + self, entry: Union[AuditLogEntry, BigQueryAuditMetadata] + ) -> Optional[Union[ReadEvent, QueryEvent]]: self.report.num_read_events = 0 self.report.num_query_events = 0 self.report.num_filtered_read_events = 0 self.report.num_filtered_query_events = 0 - for entry in entries: - event: Optional[Union[ReadEvent, QueryEvent]] = None + event: Optional[Union[ReadEvent, QueryEvent]] = None - missing_read_entry = ReadEvent.get_missing_key_entry(entry) - if missing_read_entry is None: - event = ReadEvent.from_entry( - entry, self.config.debug_include_full_payloads + missing_read_entry = ReadEvent.get_missing_key_entry(entry) + if missing_read_entry is None: + event = ReadEvent.from_entry(entry, self.config.debug_include_full_payloads) + if not self._is_table_allowed(event.resource): + self.report.num_filtered_read_events += 1 + return None + + if event.readReason: + self.report.read_reasons_stat[event.readReason] = ( + self.report.read_reasons_stat.get(event.readReason, 0) + 1 ) - if not self._is_table_allowed(event.resource): - self.report.num_filtered_read_events += 1 - continue + self.report.num_read_events += 1 - if event.readReason: - self.report.read_reasons_stat[event.readReason] = ( - self.report.read_reasons_stat.get(event.readReason, 0) + 1 - ) - self.report.num_read_events += 1 + missing_query_entry = QueryEvent.get_missing_key_entry(entry) + if event is None and missing_query_entry is None: + event = QueryEvent.from_entry(entry) + self.report.num_query_events += 1 - missing_query_entry = QueryEvent.get_missing_key_entry(entry) - if event is None and missing_query_entry is None: - event = QueryEvent.from_entry(entry) - self.report.num_query_events += 1 + missing_query_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry) - missing_query_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry) + if event is None and missing_query_entry_v2 is None: + event = QueryEvent.from_entry_v2( + entry, self.config.debug_include_full_payloads + ) + self.report.num_query_events += 1 - if event is None and missing_query_entry_v2 is None: - event = QueryEvent.from_entry_v2( - entry, self.config.debug_include_full_payloads - ) - self.report.num_query_events += 1 - - if event is None: - logger.warning( - f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}" - ) - else: - yield event - - logger.info( - f"Parsed {self.report.num_read_events} ReadEvents and {self.report.num_query_events} QueryEvents" - ) + if event is None: + logger.warning( + f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}" + ) + return None + else: + return event def _parse_exported_bigquery_audit_metadata( - self, audit_metadata_rows: Iterable[BigQueryAuditMetadata] - ) -> Iterable[Union[ReadEvent, QueryEvent]]: - for audit_metadata in audit_metadata_rows: - event: Optional[Union[QueryEvent, ReadEvent]] = None - missing_query_event_exported_audit = ( - QueryEvent.get_missing_key_exported_bigquery_audit_metadata( - audit_metadata - ) + self, audit_metadata: BigQueryAuditMetadata + ) -> Optional[Union[ReadEvent, QueryEvent]]: + event: Optional[Union[QueryEvent, ReadEvent]] = None + missing_query_event_exported_audit = ( + QueryEvent.get_missing_key_exported_bigquery_audit_metadata(audit_metadata) + ) + if missing_query_event_exported_audit is None: + event = QueryEvent.from_exported_bigquery_audit_metadata( + audit_metadata, self.config.debug_include_full_payloads ) - if missing_query_event_exported_audit is None: - event = QueryEvent.from_exported_bigquery_audit_metadata( - audit_metadata, self.config.debug_include_full_payloads - ) - missing_read_event_exported_audit = ( - ReadEvent.get_missing_key_exported_bigquery_audit_metadata( - audit_metadata - ) + missing_read_event_exported_audit = ( + ReadEvent.get_missing_key_exported_bigquery_audit_metadata(audit_metadata) + ) + if missing_read_event_exported_audit is None: + event = ReadEvent.from_exported_bigquery_audit_metadata( + audit_metadata, self.config.debug_include_full_payloads ) - if missing_read_event_exported_audit is None: - event = ReadEvent.from_exported_bigquery_audit_metadata( - audit_metadata, self.config.debug_include_full_payloads - ) - if event is not None: - yield event - else: - self.error( - logger, - "usage-extraction", - f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}", - ) - - def error(self, log: logging.Logger, key: str, reason: str) -> Any: - self.report.report_failure(key, reason) - log.error(f"{key} => {reason}") + if event is None: + logger.warning( + f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}", + ) + return None + else: + return event def _join_events_by_job_id( self, events: Iterable[Union[ReadEvent, QueryEvent]] @@ -755,23 +726,13 @@ class BigQueryUsageExtractor: floored_ts = get_time_bucket( event.read_event.timestamp, self.config.bucket_duration ) - resource: Optional[BigQueryTableRef] = None - try: - resource = event.read_event.resource.get_sanitized_table_ref() - if ( - resource.table_identifier.dataset not in tables - or resource.table_identifier.get_table_name() - not in tables[resource.table_identifier.dataset] - ): - logger.debug(f"Skipping non existing {resource} from usage") - return - except Exception as e: - self.report.report_warning( - str(event.read_event.resource), f"Failed to clean up resource, {e}" - ) - logger.warning( - f"Failed to process event {str(event.read_event.resource)} - {e}" - ) + resource = event.read_event.resource + if ( + resource.table_identifier.dataset not in tables + or resource.table_identifier.get_table_name() + not in tables[resource.table_identifier.dataset] + ): + logger.debug(f"Skipping non existing {resource} from usage") return if resource.is_temporary_table([self.config.temp_table_dataset_prefix]): @@ -814,23 +775,42 @@ class BigQueryUsageExtractor: def _get_parsed_bigquery_log_events( self, project_id: str, limit: Optional[int] = None - ) -> Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]: + ) -> Iterable[Union[ReadEvent, QueryEvent]]: + parse_fn: Callable[[Any], Optional[Union[ReadEvent, QueryEvent]]] if self.config.use_exported_bigquery_audit_metadata: - _client: BigQueryClient = BigQueryClient(project=project_id) - return self._get_exported_bigquery_audit_metadata( - bigquery_client=_client, + bq_client = BigQueryClient(project=project_id) + entries = self._get_exported_bigquery_audit_metadata( + bigquery_client=bq_client, allow_filter=self.config.get_table_pattern( self.config.table_pattern.allow ), limit=limit, ) + parse_fn = self._parse_exported_bigquery_audit_metadata else: - logging_client: GCPLoggingClient = _make_gcp_logging_client( + logging_client = _make_gcp_logging_client( project_id, self.config.extra_client_options ) - return self._get_bigquery_log_entries_via_gcp_logging( + entries = self._get_bigquery_log_entries_via_gcp_logging( logging_client, limit=limit ) + parse_fn = self._parse_bigquery_log_entry + + log_entry_parse_failures = 0 + for entry in entries: + try: + event = parse_fn(entry) + if event: + yield event + except Exception as e: + logger.warning(f"Unable to parse log entry `{entry}`: {e}") + log_entry_parse_failures += 1 + + if log_entry_parse_failures: + self.report.report_warning( + "usage-extraction", + f"Failed to parse {log_entry_parse_failures} audit log entries for project {project_id}.", + ) def test_capability(self, project_id: str) -> None: for entry in self._get_parsed_bigquery_log_events(project_id, limit=1): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index b54f7cbe33..63403c2655 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -17,16 +17,24 @@ from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile from datahub.metadata.schema_classes import DatasetProfileClass -from datahub.utilities.stats_collections import TopKDict +from datahub.utilities.stats_collections import TopKDict, int_top_k_dict @dataclass class DetailedProfilerReportMixin: - profiling_skipped_not_updated: TopKDict[str, int] = field(default_factory=TopKDict) - profiling_skipped_size_limit: TopKDict[str, int] = field(default_factory=TopKDict) + profiling_skipped_not_updated: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) + profiling_skipped_size_limit: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) - profiling_skipped_row_limit: TopKDict[str, int] = field(default_factory=TopKDict) - num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict) + profiling_skipped_row_limit: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) + num_tables_not_eligible_profiling: Dict[str, int] = field( + default_factory=int_top_k_dict + ) class ProfilingSqlReport(DetailedProfilerReportMixin, SQLSourceReport): @@ -163,9 +171,7 @@ class GenericProfiler: if (threshold_time is not None) and ( last_altered is not None and last_altered < threshold_time ): - self.report.profiling_skipped_not_updated[schema_name] = ( - self.report.profiling_skipped_not_updated.get(schema_name, 0) + 1 - ) + self.report.profiling_skipped_not_updated[schema_name] += 1 return False if self.config.profiling.profile_table_size_limit is not None and ( @@ -173,18 +179,14 @@ class GenericProfiler: or size_in_bytes / (2**30) > self.config.profiling.profile_table_size_limit ): - self.report.profiling_skipped_size_limit[schema_name] = ( - self.report.profiling_skipped_size_limit.get(schema_name, 0) + 1 - ) + self.report.profiling_skipped_size_limit[schema_name] += 1 return False if self.config.profiling.profile_table_row_limit is not None and ( rows_count is None or rows_count > self.config.profiling.profile_table_row_limit ): - self.report.profiling_skipped_row_limit[schema_name] = ( - self.report.profiling_skipped_row_limit.get(schema_name, 0) + 1 - ) + self.report.profiling_skipped_row_limit[schema_name] += 1 return False return True diff --git a/metadata-ingestion/src/datahub/utilities/stats_collections.py b/metadata-ingestion/src/datahub/utilities/stats_collections.py index 1d1d6d6b65..a41139c501 100644 --- a/metadata-ingestion/src/datahub/utilities/stats_collections.py +++ b/metadata-ingestion/src/datahub/utilities/stats_collections.py @@ -1,16 +1,31 @@ -from typing import Any, Dict, TypeVar, Union +from typing import Any, Callable, DefaultDict, Dict, Optional, TypeVar + +from typing_extensions import Protocol + +_CT = TypeVar("_CT") + + +class Comparable(Protocol): + def __lt__(self: _CT, other: _CT) -> bool: + pass + -T = TypeVar("T") _KT = TypeVar("_KT") -_VT = TypeVar("_VT") +_VT = TypeVar("_VT", bound=Comparable) -class TopKDict(Dict[_KT, _VT]): +class TopKDict(DefaultDict[_KT, _VT]): """A structure that only prints the top K items from the dictionary. Not lossy.""" - def __init__(self, top_k: int = 10) -> None: - super().__init__() - self.top_k = 10 + def __init__( + self, + default_factory: Optional[Callable[[], _VT]] = None, + *args: Any, + top_k: int = 10, + **kwargs: Any, + ) -> None: + super().__init__(default_factory, *args, **kwargs) + self.top_k = top_k def __repr__(self) -> str: return repr(self.as_obj()) @@ -18,18 +33,19 @@ class TopKDict(Dict[_KT, _VT]): def __str__(self) -> str: return self.__repr__() - @staticmethod - def _trim_dictionary(big_dict: Dict[str, Any]) -> Dict[str, Any]: - if big_dict is not None and len(big_dict) > 10: - dict_as_tuples = [(k, v) for k, v in big_dict.items()] - sorted_tuples = sorted(dict_as_tuples, key=lambda x: x[1], reverse=True) - dict_as_tuples = sorted_tuples[:10] - trimmed_dict = {k: v for k, v in dict_as_tuples} - trimmed_dict[f"... top(10) of total {len(big_dict)} entries"] = "" + def as_obj(self) -> Dict[_KT, _VT]: + if len(self) <= self.top_k: + return dict(self) + else: + try: + trimmed_dict = dict( + sorted(self.items(), key=lambda x: x[1], reverse=True)[: self.top_k] + ) + except TypeError: + trimmed_dict = dict(list(self.items())[: self.top_k]) + trimmed_dict[f"... top {self.top_k} of total {len(self)} entries"] = "" # type: ignore return trimmed_dict - return big_dict - def as_obj(self) -> Dict[Union[_KT, str], Union[_VT, str]]: - base_dict: Dict[Union[_KT, str], Union[_VT, str]] = super().copy() # type: ignore - return self._trim_dictionary(base_dict) # type: ignore +def int_top_k_dict() -> TopKDict[str, int]: + return TopKDict(int)