refactor(ingest/bigquery): Standardize audit log parsing and make TopKDict a DefaultDict (#7738)

- Moves get_sanitized_table_ref calls to ReadEvent / QueryEvent creation
- Standardizes how the audit log is read and parsed, unifying code when reading from gcp logging vs audit metadata (exported logs)
- Adds error handling around the parsing of each event, to catch errors from the new get_sanitized_table_ref calls
- Makes TopKDict inherit from DefaultDict and cleans up calls around that.
This commit is contained in:
Andrew Sikowitz 2023-04-04 11:58:48 -07:00 committed by GitHub
parent ce1ac7fa12
commit 06bc1c32e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 278 additions and 361 deletions

View File

@ -336,7 +336,7 @@ class QueryEvent:
if raw_dest_table: if raw_dest_table:
query_event.destinationTable = BigQueryTableRef.from_spec_obj( query_event.destinationTable = BigQueryTableRef.from_spec_obj(
raw_dest_table raw_dest_table
) ).get_sanitized_table_ref()
# statementType # statementType
# referencedTables # referencedTables
job_stats: Dict = job["jobStatistics"] job_stats: Dict = job["jobStatistics"]
@ -346,13 +346,15 @@ class QueryEvent:
raw_ref_tables = job_stats.get("referencedTables") raw_ref_tables = job_stats.get("referencedTables")
if raw_ref_tables: if raw_ref_tables:
query_event.referencedTables = [ query_event.referencedTables = [
BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_tables BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref()
for spec in raw_ref_tables
] ]
# referencedViews # referencedViews
raw_ref_views = job_stats.get("referencedViews") raw_ref_views = job_stats.get("referencedViews")
if raw_ref_views: if raw_ref_views:
query_event.referencedViews = [ query_event.referencedViews = [
BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_views BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref()
for spec in raw_ref_views
] ]
# payload # payload
query_event.payload = entry.payload if debug_include_full_payloads else None query_event.payload = entry.payload if debug_include_full_payloads else None
@ -415,18 +417,20 @@ class QueryEvent:
if raw_dest_table: if raw_dest_table:
query_event.destinationTable = BigQueryTableRef.from_string_name( query_event.destinationTable = BigQueryTableRef.from_string_name(
raw_dest_table raw_dest_table
) ).get_sanitized_table_ref()
# referencedTables # referencedTables
raw_ref_tables = query_stats.get("referencedTables") raw_ref_tables = query_stats.get("referencedTables")
if raw_ref_tables: if raw_ref_tables:
query_event.referencedTables = [ query_event.referencedTables = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
for spec in raw_ref_tables
] ]
# referencedViews # referencedViews
raw_ref_views = query_stats.get("referencedViews") raw_ref_views = query_stats.get("referencedViews")
if raw_ref_views: if raw_ref_views:
query_event.referencedViews = [ query_event.referencedViews = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
for spec in raw_ref_views
] ]
# payload # payload
query_event.payload = payload if debug_include_full_payloads else None query_event.payload = payload if debug_include_full_payloads else None
@ -479,19 +483,21 @@ class QueryEvent:
if raw_dest_table: if raw_dest_table:
query_event.destinationTable = BigQueryTableRef.from_string_name( query_event.destinationTable = BigQueryTableRef.from_string_name(
raw_dest_table raw_dest_table
) ).get_sanitized_table_ref()
# statementType # statementType
# referencedTables # referencedTables
raw_ref_tables = query_stats.get("referencedTables") raw_ref_tables = query_stats.get("referencedTables")
if raw_ref_tables: if raw_ref_tables:
query_event.referencedTables = [ query_event.referencedTables = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
for spec in raw_ref_tables
] ]
# referencedViews # referencedViews
raw_ref_views = query_stats.get("referencedViews") raw_ref_views = query_stats.get("referencedViews")
if raw_ref_views: if raw_ref_views:
query_event.referencedViews = [ query_event.referencedViews = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
for spec in raw_ref_views
] ]
# payload # payload
query_event.payload = payload if debug_include_full_payloads else None query_event.payload = payload if debug_include_full_payloads else None
@ -568,10 +574,14 @@ class ReadEvent:
if readReason == "JOB": if readReason == "JOB":
jobName = readInfo.get("jobName") jobName = readInfo.get("jobName")
resource = BigQueryTableRef.from_string_name(
resourceName
).get_sanitized_table_ref()
readEvent = ReadEvent( readEvent = ReadEvent(
actor_email=user, actor_email=user,
timestamp=entry.timestamp, timestamp=entry.timestamp,
resource=BigQueryTableRef.from_string_name(resourceName), resource=resource,
fieldsRead=fields, fieldsRead=fields,
readReason=readReason, readReason=readReason,
jobName=jobName, jobName=jobName,
@ -602,10 +612,14 @@ class ReadEvent:
if readReason == "JOB": if readReason == "JOB":
jobName = readInfo.get("jobName") jobName = readInfo.get("jobName")
resource = BigQueryTableRef.from_string_name(
resourceName
).get_sanitized_table_ref()
readEvent = ReadEvent( readEvent = ReadEvent(
actor_email=user, actor_email=user,
timestamp=row["timestamp"], timestamp=row["timestamp"],
resource=BigQueryTableRef.from_string_name(resourceName), resource=resource,
fieldsRead=fields, fieldsRead=fields,
readReason=readReason, readReason=readReason,
jobName=jobName, jobName=jobName,

View File

@ -10,7 +10,7 @@ import pydantic
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict from datahub.utilities.stats_collections import TopKDict, int_top_k_dict
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -19,24 +19,22 @@ logger: logging.Logger = logging.getLogger(__name__)
class BigQueryV2Report(ProfilingSqlReport): class BigQueryV2Report(ProfilingSqlReport):
num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict) num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field( num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field(
default_factory=TopKDict default_factory=int_top_k_dict
) )
num_skipped_lineage_entries_not_allowed: TopKDict[str, int] = field( num_skipped_lineage_entries_not_allowed: TopKDict[str, int] = field(
default_factory=TopKDict default_factory=int_top_k_dict
) )
num_lineage_entries_sql_parser_failure: TopKDict[str, int] = field( num_lineage_entries_sql_parser_failure: TopKDict[str, int] = field(
default_factory=TopKDict default_factory=int_top_k_dict
)
num_lineage_entries_sql_parser_success: TopKDict[str, int] = field(
default_factory=TopKDict
) )
num_skipped_lineage_entries_other: TopKDict[str, int] = field( num_skipped_lineage_entries_other: TopKDict[str, int] = field(
default_factory=TopKDict default_factory=int_top_k_dict
)
num_total_log_entries: TopKDict[str, int] = field(default_factory=int_top_k_dict)
num_parsed_log_entries: TopKDict[str, int] = field(default_factory=int_top_k_dict)
num_lineage_log_parse_failures: TopKDict[str, int] = field(
default_factory=int_top_k_dict
) )
num_total_log_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_parsed_log_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_total_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_parsed_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict)
bigquery_audit_metadata_datasets_missing: Optional[bool] = None bigquery_audit_metadata_datasets_missing: Optional[bool] = None
lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList) lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
lineage_metadata_entries: TopKDict[str, int] = field(default_factory=TopKDict) lineage_metadata_entries: TopKDict[str, int] = field(default_factory=TopKDict)

View File

@ -3,7 +3,7 @@ import logging
import textwrap import textwrap
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
import humanfriendly import humanfriendly
from google.cloud.bigquery import Client as BigQueryClient from google.cloud.bigquery import Client as BigQueryClient
@ -18,6 +18,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigqueryTableIdentifier, BigqueryTableIdentifier,
BigQueryTableRef, BigQueryTableRef,
QueryEvent, QueryEvent,
ReadEvent,
) )
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
@ -87,7 +88,6 @@ timestamp < "{end_time}"
def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report): def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report):
self.config = config self.config = config
self.report = report self.report = report
self.loaded_project_ids: List[str] = []
def error(self, log: logging.Logger, key: str, reason: str) -> None: def error(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_failure(key, reason) self.report.report_failure(key, reason)
@ -154,54 +154,7 @@ timestamp < "{end_time}"
return textwrap.dedent(query) return textwrap.dedent(query)
def compute_bigquery_lineage_via_gcp_logging( def lineage_via_catalog_lineage_api(
self, project_id: str
) -> Dict[str, Set[LineageEdge]]:
logger.info(f"Populating lineage info via GCP audit logs for {project_id}")
try:
clients: GCPLoggingClient = _make_gcp_logging_client(project_id)
log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries(
clients
)
logger.info("Log Entries loaded")
parsed_entries: Iterable[QueryEvent] = self._parse_bigquery_log_entries(
log_entries
)
return self._create_lineage_map(parsed_entries)
except Exception as e:
self.error(
logger,
"lineage-gcp-logs",
f"Failed to get lineage gcp logging for {project_id}. The error message was {e}",
)
raise e
def compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
self,
) -> Dict[str, Set[LineageEdge]]:
logger.info("Populating lineage info via exported GCP audit logs")
try:
# For exported logs we want to submit queries with the credentials project_id.
_client: BigQueryClient = get_bigquery_client(self.config)
exported_bigquery_audit_metadata: Iterable[
BigQueryAuditMetadata
] = self._get_exported_bigquery_audit_metadata(_client)
parsed_entries: Iterable[
QueryEvent
] = self._parse_exported_bigquery_audit_metadata(
exported_bigquery_audit_metadata
)
return self._create_lineage_map(parsed_entries)
except Exception as e:
self.error(
logger,
"lineage-exported-gcp-audit-logs",
f"Error: {e}",
)
raise e
def compute_bigquery_lineage_via_catalog_lineage_api(
self, project_id: str self, project_id: str
) -> Dict[str, Set[LineageEdge]]: ) -> Dict[str, Set[LineageEdge]]:
""" """
@ -310,9 +263,33 @@ timestamp < "{end_time}"
) )
raise e raise e
def _get_parsed_audit_log_events(self, project_id: str) -> Iterable[QueryEvent]:
parse_fn: Callable[[Any], Optional[Union[ReadEvent, QueryEvent]]]
if self.config.use_exported_bigquery_audit_metadata:
logger.info("Populating lineage info via exported GCP audit logs")
bq_client = get_bigquery_client(self.config)
entries = self._get_exported_bigquery_audit_metadata(bq_client)
parse_fn = self._parse_exported_bigquery_audit_metadata
else:
logger.info("Populating lineage info via exported GCP audit logs")
logging_client = _make_gcp_logging_client(project_id)
entries = self._get_bigquery_log_entries(logging_client)
parse_fn = self._parse_bigquery_log_entries
for entry in entries:
self.report.num_total_log_entries[project_id] += 1
try:
event = parse_fn(entry)
if event:
self.report.num_parsed_log_entries[project_id] += 1
yield event
except Exception as e:
logger.warning(f"Unable to parse log entry `{entry}`: {e}")
self.report.num_lineage_log_parse_failures[project_id] += 1
def _get_bigquery_log_entries( def _get_bigquery_log_entries(
self, client: GCPLoggingClient, limit: Optional[int] = None self, client: GCPLoggingClient, limit: Optional[int] = None
) -> Union[Iterable[AuditLogEntry], Iterable[BigQueryAuditMetadata]]: ) -> Union[Iterable[AuditLogEntry]]:
self.report.num_total_log_entries[client.project] = 0 self.report.num_total_log_entries[client.project] = 0
# Add a buffer to start and end time to account for delays in logging events. # Add a buffer to start and end time to account for delays in logging events.
start_time = (self.config.start_time - self.config.max_query_duration).strftime( start_time = (self.config.start_time - self.config.max_query_duration).strftime(
@ -420,67 +397,52 @@ timestamp < "{end_time}"
# events to also create field level lineage. # events to also create field level lineage.
def _parse_bigquery_log_entries( def _parse_bigquery_log_entries(
self, self,
entries: Union[Iterable[AuditLogEntry], Iterable[BigQueryAuditMetadata]], entry: AuditLogEntry,
) -> Iterable[QueryEvent]: ) -> Optional[QueryEvent]:
for entry in entries: event: Optional[QueryEvent] = None
event: Optional[QueryEvent] = None
missing_entry = QueryEvent.get_missing_key_entry(entry=entry) missing_entry = QueryEvent.get_missing_key_entry(entry=entry)
if missing_entry is None: if missing_entry is None:
event = QueryEvent.from_entry( event = QueryEvent.from_entry(
entry, entry,
debug_include_full_payloads=self.config.debug_include_full_payloads, debug_include_full_payloads=self.config.debug_include_full_payloads,
)
missing_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry=entry)
if event is None and missing_entry_v2 is None:
event = QueryEvent.from_entry_v2(
entry, self.config.debug_include_full_payloads
)
if event is None:
self.error(
logger,
f"{entry.log_name}-{entry.insert_id}",
f"Unable to parse log missing {missing_entry}, missing v2 {missing_entry_v2} for {entry}",
)
else:
self.report.num_parsed_log_entries[event.project_id] = (
self.report.num_parsed_log_entries.get(event.project_id, 0) + 1
)
yield event
def _parse_exported_bigquery_audit_metadata(
self, audit_metadata_rows: Iterable[BigQueryAuditMetadata]
) -> Iterable[QueryEvent]:
for audit_metadata in audit_metadata_rows:
event: Optional[QueryEvent] = None
missing_exported_audit = (
QueryEvent.get_missing_key_exported_bigquery_audit_metadata(
audit_metadata
)
) )
if missing_exported_audit is None: missing_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry=entry)
event = QueryEvent.from_exported_bigquery_audit_metadata( if event is None and missing_entry_v2 is None:
audit_metadata, self.config.debug_include_full_payloads event = QueryEvent.from_entry_v2(
) entry, self.config.debug_include_full_payloads
)
if event is None: if event is None:
self.error( logger.warning(
logger, f"Unable to parse log missing {missing_entry}, missing v2 {missing_entry_v2} for {entry}",
f"{audit_metadata['logName']}-{audit_metadata['insertId']}", )
f"Unable to parse audit metadata missing {missing_exported_audit} for {audit_metadata}", return None
) else:
else: return event
self.report.num_parsed_audit_entries[event.project_id] = (
self.report.num_parsed_audit_entries.get(event.project_id, 0) + 1 def _parse_exported_bigquery_audit_metadata(
) self, audit_metadata: BigQueryAuditMetadata
self.report.num_total_audit_entries[event.project_id] = ( ) -> Optional[QueryEvent]:
self.report.num_total_audit_entries.get(event.project_id, 0) + 1 event: Optional[QueryEvent] = None
)
yield event missing_exported_audit = (
QueryEvent.get_missing_key_exported_bigquery_audit_metadata(audit_metadata)
)
if missing_exported_audit is None:
event = QueryEvent.from_exported_bigquery_audit_metadata(
audit_metadata, self.config.debug_include_full_payloads
)
if event is None:
logger.warning(
f"Unable to parse audit metadata missing {missing_exported_audit} for {audit_metadata}",
)
return None
else:
return event
def _create_lineage_map( def _create_lineage_map(
self, entries: Iterable[QueryEvent] self, entries: Iterable[QueryEvent]
@ -495,59 +457,34 @@ timestamp < "{end_time}"
if e.destinationTable is None or not ( if e.destinationTable is None or not (
e.referencedTables or e.referencedViews e.referencedTables or e.referencedViews
): ):
self.report.num_skipped_lineage_entries_missing_data[e.project_id] = ( self.report.num_skipped_lineage_entries_missing_data[e.project_id] += 1
self.report.num_skipped_lineage_entries_missing_data.get(
e.project_id, 0
)
+ 1
)
continue continue
# Skip if schema/table pattern don't allow the destination table
try:
destination_table = e.destinationTable.get_sanitized_table_ref()
except Exception:
self.report.num_skipped_lineage_entries_missing_data[e.project_id] = (
self.report.num_skipped_lineage_entries_missing_data.get(
e.project_id, 0
)
+ 1
)
continue
destination_table_str = str(
BigQueryTableRef(table_identifier=destination_table.table_identifier)
)
if not self.config.dataset_pattern.allowed( if not self.config.dataset_pattern.allowed(
destination_table.table_identifier.dataset e.destinationTable.table_identifier.dataset
) or not self.config.table_pattern.allowed( ) or not self.config.table_pattern.allowed(
destination_table.table_identifier.get_table_name() e.destinationTable.table_identifier.get_table_name()
): ):
self.report.num_skipped_lineage_entries_not_allowed[e.project_id] = ( self.report.num_skipped_lineage_entries_not_allowed[e.project_id] += 1
self.report.num_skipped_lineage_entries_not_allowed.get(
e.project_id, 0
)
+ 1
)
continue continue
destination_table_str = str(e.destinationTable)
has_table = False has_table = False
for ref_table in e.referencedTables: for ref_table in e.referencedTables:
ref_table_str = str(ref_table.get_sanitized_table_ref()) if str(ref_table) != destination_table_str:
if ref_table_str != destination_table_str:
lineage_map[destination_table_str].add( lineage_map[destination_table_str].add(
LineageEdge( LineageEdge(
table=ref_table_str, table=str(ref_table),
auditStamp=e.end_time if e.end_time else datetime.now(), auditStamp=e.end_time if e.end_time else datetime.now(),
) )
) )
has_table = True has_table = True
has_view = False has_view = False
for ref_view in e.referencedViews: for ref_view in e.referencedViews:
ref_view_str = str(ref_view.get_sanitized_table_ref()) if str(ref_view) != destination_table_str:
if ref_view_str != destination_table_str:
lineage_map[destination_table_str].add( lineage_map[destination_table_str].add(
LineageEdge( LineageEdge(
table=ref_view_str, table=str(ref_view),
auditStamp=e.end_time if e.end_time else datetime.now(), auditStamp=e.end_time if e.end_time else datetime.now(),
) )
) )
@ -569,12 +506,9 @@ timestamp < "{end_time}"
logger.debug( logger.debug(
f"Sql Parser failed on query: {e.query}. It won't cause any issue except table/view lineage can't be detected reliably. The error was {ex}." f"Sql Parser failed on query: {e.query}. It won't cause any issue except table/view lineage can't be detected reliably. The error was {ex}."
) )
self.report.num_lineage_entries_sql_parser_failure[e.project_id] = ( self.report.num_lineage_entries_sql_parser_failure[
self.report.num_lineage_entries_sql_parser_failure.get( e.project_id
e.project_id, 0 ] += 1
)
+ 1
)
continue continue
curr_lineage = lineage_map[destination_table_str] curr_lineage = lineage_map[destination_table_str]
new_lineage = set() new_lineage = set()
@ -584,10 +518,7 @@ timestamp < "{end_time}"
new_lineage.add(lineage) new_lineage.add(lineage)
lineage_map[destination_table_str] = new_lineage lineage_map[destination_table_str] = new_lineage
if not (has_table or has_view): if not (has_table or has_view):
self.report.num_skipped_lineage_entries_other[e.project_id] = ( self.report.num_skipped_lineage_entries_other[e.project_id] += 1
self.report.num_skipped_lineage_entries_other.get(e.project_id, 0)
+ 1
)
logger.info("Exiting create lineage map function") logger.info("Exiting create lineage map function")
return lineage_map return lineage_map
@ -641,42 +572,23 @@ timestamp < "{end_time}"
return list(parsed_tables) return list(parsed_tables)
def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[LineageEdge]]: def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[LineageEdge]]:
lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config=self.config, report=self.report
)
lineage_metadata: Dict[str, Set[LineageEdge]] lineage_metadata: Dict[str, Set[LineageEdge]]
try: try:
if self.config.extract_lineage_from_catalog and self.config.include_tables: if self.config.extract_lineage_from_catalog and self.config.include_tables:
lineage_metadata = ( lineage_metadata = self.lineage_via_catalog_lineage_api(project_id)
lineage_extractor.compute_bigquery_lineage_via_catalog_lineage_api(
project_id
)
)
else: else:
if self.config.use_exported_bigquery_audit_metadata: events = self._get_parsed_audit_log_events(project_id)
# Exported bigquery_audit_metadata should contain every projects' audit metada lineage_metadata = self._create_lineage_map(events)
if self.loaded_project_ids:
return {}
lineage_metadata = (
lineage_extractor.compute_bigquery_lineage_via_exported_bigquery_audit_metadata()
)
else:
lineage_metadata = (
lineage_extractor.compute_bigquery_lineage_via_gcp_logging(
project_id
)
)
except Exception as e: except Exception as e:
if project_id: if project_id:
self.report.lineage_failed_extraction.append(project_id) self.report.lineage_failed_extraction.append(project_id)
logger.error( self.error(
f"Unable to extract lineage for project {project_id} due to error {e}" logger,
"lineage",
f"{project_id}: {e}",
) )
lineage_metadata = {} lineage_metadata = {}
if lineage_metadata is None:
lineage_metadata = {}
self.report.lineage_mem_size[project_id] = humanfriendly.format_size( self.report.lineage_mem_size[project_id] = humanfriendly.format_size(
memory_footprint.total_size(lineage_metadata) memory_footprint.total_size(lineage_metadata)
) )

View File

@ -244,12 +244,7 @@ WHERE
dataset_name, table.last_altered, table.size_in_bytes, table.rows_count dataset_name, table.last_altered, table.size_in_bytes, table.rows_count
): ):
profile_table_level_only = True profile_table_level_only = True
self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] = ( self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] += 1
self.report.num_tables_not_eligible_profiling.get(
f"{project}.{dataset}", 0
)
+ 1
)
if not table.column_count: if not table.column_count:
skip_profiling = True skip_profiling = True

View File

@ -5,7 +5,7 @@ import time
import traceback import traceback
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union from typing import Any, Callable, Dict, Iterable, List, MutableMapping, Optional, Union
import cachetools import cachetools
from google.cloud.bigquery import Client as BigQueryClient from google.cloud.bigquery import Client as BigQueryClient
@ -132,7 +132,7 @@ def bigquery_audit_metadata_query_template(
""" """
audit_log_filter_timestamps = """AND (timestamp >= "{start_time}" audit_log_filter_timestamps = """AND (timestamp >= "{start_time}"
AND timestamp < "{end_time}" AND timestamp < "{end_time}"
); )
""" """
audit_log_filter_query_complete = f""" audit_log_filter_query_complete = f"""
AND ( AND (
@ -193,15 +193,7 @@ class BigQueryUsageExtractor:
parsed_events: Iterable[Union[ReadEvent, QueryEvent]] parsed_events: Iterable[Union[ReadEvent, QueryEvent]]
with PerfTimer() as timer: with PerfTimer() as timer:
try: try:
bigquery_log_entries = self._get_parsed_bigquery_log_events(project_id) parsed_events = self._get_parsed_bigquery_log_events(project_id)
if self.config.use_exported_bigquery_audit_metadata:
parsed_events = self._parse_exported_bigquery_audit_metadata(
bigquery_log_entries
)
else:
parsed_events = self._parse_bigquery_log_entries(
bigquery_log_entries
)
hydrated_read_events = self._join_events_by_job_id(parsed_events) hydrated_read_events = self._join_events_by_job_id(parsed_events)
# storing it all in one big object. # storing it all in one big object.
@ -213,7 +205,7 @@ class BigQueryUsageExtractor:
self.report.num_operational_stats_workunits_emitted = 0 self.report.num_operational_stats_workunits_emitted = 0
for event in hydrated_read_events: for event in hydrated_read_events:
if self.config.usage.include_operational_stats: if self.config.usage.include_operational_stats:
operational_wu = self._create_operation_aspect_work_unit(event) operational_wu = self._create_operation_workunit(event)
if operational_wu: if operational_wu:
yield operational_wu yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1 self.report.num_operational_stats_workunits_emitted += 1
@ -308,7 +300,10 @@ class BigQueryUsageExtractor:
) )
query = bigquery_audit_metadata_query_template( query = bigquery_audit_metadata_query_template(
dataset, self.config.use_date_sharded_audit_log_tables, allow_filter dataset,
self.config.use_date_sharded_audit_log_tables,
allow_filter,
limit,
).format( ).format(
start_time=start_time, start_time=start_time,
end_time=end_time, end_time=end_time,
@ -326,14 +321,14 @@ class BigQueryUsageExtractor:
def _get_bigquery_log_entries_via_gcp_logging( def _get_bigquery_log_entries_via_gcp_logging(
self, client: GCPLoggingClient, limit: Optional[int] = None self, client: GCPLoggingClient, limit: Optional[int] = None
) -> Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]]: ) -> Iterable[AuditLogEntry]:
self.report.total_query_log_entries = 0 self.report.total_query_log_entries = 0
filter = self._generate_filter(BQ_AUDIT_V2) filter = self._generate_filter(BQ_AUDIT_V2)
logger.debug(filter) logger.debug(filter)
try: try:
list_entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]] list_entries: Iterable[AuditLogEntry]
rate_limiter: Optional[RateLimiter] = None rate_limiter: Optional[RateLimiter] = None
if self.config.rate_limit: if self.config.rate_limit:
# client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging # client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging
@ -437,9 +432,9 @@ class BigQueryUsageExtractor:
and event.query_event and event.query_event
and event.query_event.destinationTable and event.query_event.destinationTable
): ):
return event.query_event.destinationTable.get_sanitized_table_ref() return event.query_event.destinationTable
elif event.read_event: elif event.read_event:
return event.read_event.resource.get_sanitized_table_ref() return event.read_event.resource
else: else:
# TODO: CREATE_SCHEMA operation ends up here, maybe we should capture that as well # TODO: CREATE_SCHEMA operation ends up here, maybe we should capture that as well
# but it is tricky as we only get the query so it can't be tied to anything # but it is tricky as we only get the query so it can't be tied to anything
@ -492,7 +487,7 @@ class BigQueryUsageExtractor:
else: else:
return None return None
def _create_operation_aspect_work_unit( def _create_operation_workunit(
self, event: AuditEvent self, event: AuditEvent
) -> Optional[MetadataWorkUnit]: ) -> Optional[MetadataWorkUnit]:
if not event.read_event and not event.query_event: if not event.read_event and not event.query_event:
@ -518,15 +513,7 @@ class BigQueryUsageExtractor:
affected_datasets = [] affected_datasets = []
if event.query_event and event.query_event.referencedTables: if event.query_event and event.query_event.referencedTables:
for table in event.query_event.referencedTables: for table in event.query_event.referencedTables:
try: affected_datasets.append(table.to_urn(self.config.env))
affected_datasets.append(
table.get_sanitized_table_ref().to_urn(self.config.env)
)
except Exception as e:
self.report.report_warning(
str(table),
f"Failed to clean up table, {e}",
)
operation_aspect = OperationClass( operation_aspect = OperationClass(
timestampMillis=reported_time, timestampMillis=reported_time,
@ -587,92 +574,76 @@ class BigQueryUsageExtractor:
return custom_properties return custom_properties
def _parse_bigquery_log_entries( def _parse_bigquery_log_entry(
self, entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]] self, entry: Union[AuditLogEntry, BigQueryAuditMetadata]
) -> Iterable[Union[ReadEvent, QueryEvent]]: ) -> Optional[Union[ReadEvent, QueryEvent]]:
self.report.num_read_events = 0 self.report.num_read_events = 0
self.report.num_query_events = 0 self.report.num_query_events = 0
self.report.num_filtered_read_events = 0 self.report.num_filtered_read_events = 0
self.report.num_filtered_query_events = 0 self.report.num_filtered_query_events = 0
for entry in entries: event: Optional[Union[ReadEvent, QueryEvent]] = None
event: Optional[Union[ReadEvent, QueryEvent]] = None
missing_read_entry = ReadEvent.get_missing_key_entry(entry) missing_read_entry = ReadEvent.get_missing_key_entry(entry)
if missing_read_entry is None: if missing_read_entry is None:
event = ReadEvent.from_entry( event = ReadEvent.from_entry(entry, self.config.debug_include_full_payloads)
entry, self.config.debug_include_full_payloads if not self._is_table_allowed(event.resource):
self.report.num_filtered_read_events += 1
return None
if event.readReason:
self.report.read_reasons_stat[event.readReason] = (
self.report.read_reasons_stat.get(event.readReason, 0) + 1
) )
if not self._is_table_allowed(event.resource): self.report.num_read_events += 1
self.report.num_filtered_read_events += 1
continue
if event.readReason: missing_query_entry = QueryEvent.get_missing_key_entry(entry)
self.report.read_reasons_stat[event.readReason] = ( if event is None and missing_query_entry is None:
self.report.read_reasons_stat.get(event.readReason, 0) + 1 event = QueryEvent.from_entry(entry)
) self.report.num_query_events += 1
self.report.num_read_events += 1
missing_query_entry = QueryEvent.get_missing_key_entry(entry) missing_query_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry)
if event is None and missing_query_entry is None:
event = QueryEvent.from_entry(entry)
self.report.num_query_events += 1
missing_query_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry) if event is None and missing_query_entry_v2 is None:
event = QueryEvent.from_entry_v2(
entry, self.config.debug_include_full_payloads
)
self.report.num_query_events += 1
if event is None and missing_query_entry_v2 is None: if event is None:
event = QueryEvent.from_entry_v2( logger.warning(
entry, self.config.debug_include_full_payloads f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}"
) )
self.report.num_query_events += 1 return None
else:
if event is None: return event
logger.warning(
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}"
)
else:
yield event
logger.info(
f"Parsed {self.report.num_read_events} ReadEvents and {self.report.num_query_events} QueryEvents"
)
def _parse_exported_bigquery_audit_metadata( def _parse_exported_bigquery_audit_metadata(
self, audit_metadata_rows: Iterable[BigQueryAuditMetadata] self, audit_metadata: BigQueryAuditMetadata
) -> Iterable[Union[ReadEvent, QueryEvent]]: ) -> Optional[Union[ReadEvent, QueryEvent]]:
for audit_metadata in audit_metadata_rows: event: Optional[Union[QueryEvent, ReadEvent]] = None
event: Optional[Union[QueryEvent, ReadEvent]] = None missing_query_event_exported_audit = (
missing_query_event_exported_audit = ( QueryEvent.get_missing_key_exported_bigquery_audit_metadata(audit_metadata)
QueryEvent.get_missing_key_exported_bigquery_audit_metadata( )
audit_metadata if missing_query_event_exported_audit is None:
) event = QueryEvent.from_exported_bigquery_audit_metadata(
audit_metadata, self.config.debug_include_full_payloads
) )
if missing_query_event_exported_audit is None:
event = QueryEvent.from_exported_bigquery_audit_metadata(
audit_metadata, self.config.debug_include_full_payloads
)
missing_read_event_exported_audit = ( missing_read_event_exported_audit = (
ReadEvent.get_missing_key_exported_bigquery_audit_metadata( ReadEvent.get_missing_key_exported_bigquery_audit_metadata(audit_metadata)
audit_metadata )
) if missing_read_event_exported_audit is None:
event = ReadEvent.from_exported_bigquery_audit_metadata(
audit_metadata, self.config.debug_include_full_payloads
) )
if missing_read_event_exported_audit is None:
event = ReadEvent.from_exported_bigquery_audit_metadata(
audit_metadata, self.config.debug_include_full_payloads
)
if event is not None: if event is None:
yield event logger.warning(
else: f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}",
self.error( )
logger, return None
"usage-extraction", else:
f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}", return event
)
def error(self, log: logging.Logger, key: str, reason: str) -> Any:
self.report.report_failure(key, reason)
log.error(f"{key} => {reason}")
def _join_events_by_job_id( def _join_events_by_job_id(
self, events: Iterable[Union[ReadEvent, QueryEvent]] self, events: Iterable[Union[ReadEvent, QueryEvent]]
@ -755,23 +726,13 @@ class BigQueryUsageExtractor:
floored_ts = get_time_bucket( floored_ts = get_time_bucket(
event.read_event.timestamp, self.config.bucket_duration event.read_event.timestamp, self.config.bucket_duration
) )
resource: Optional[BigQueryTableRef] = None resource = event.read_event.resource
try: if (
resource = event.read_event.resource.get_sanitized_table_ref() resource.table_identifier.dataset not in tables
if ( or resource.table_identifier.get_table_name()
resource.table_identifier.dataset not in tables not in tables[resource.table_identifier.dataset]
or resource.table_identifier.get_table_name() ):
not in tables[resource.table_identifier.dataset] logger.debug(f"Skipping non existing {resource} from usage")
):
logger.debug(f"Skipping non existing {resource} from usage")
return
except Exception as e:
self.report.report_warning(
str(event.read_event.resource), f"Failed to clean up resource, {e}"
)
logger.warning(
f"Failed to process event {str(event.read_event.resource)} - {e}"
)
return return
if resource.is_temporary_table([self.config.temp_table_dataset_prefix]): if resource.is_temporary_table([self.config.temp_table_dataset_prefix]):
@ -814,23 +775,42 @@ class BigQueryUsageExtractor:
def _get_parsed_bigquery_log_events( def _get_parsed_bigquery_log_events(
self, project_id: str, limit: Optional[int] = None self, project_id: str, limit: Optional[int] = None
) -> Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]: ) -> Iterable[Union[ReadEvent, QueryEvent]]:
parse_fn: Callable[[Any], Optional[Union[ReadEvent, QueryEvent]]]
if self.config.use_exported_bigquery_audit_metadata: if self.config.use_exported_bigquery_audit_metadata:
_client: BigQueryClient = BigQueryClient(project=project_id) bq_client = BigQueryClient(project=project_id)
return self._get_exported_bigquery_audit_metadata( entries = self._get_exported_bigquery_audit_metadata(
bigquery_client=_client, bigquery_client=bq_client,
allow_filter=self.config.get_table_pattern( allow_filter=self.config.get_table_pattern(
self.config.table_pattern.allow self.config.table_pattern.allow
), ),
limit=limit, limit=limit,
) )
parse_fn = self._parse_exported_bigquery_audit_metadata
else: else:
logging_client: GCPLoggingClient = _make_gcp_logging_client( logging_client = _make_gcp_logging_client(
project_id, self.config.extra_client_options project_id, self.config.extra_client_options
) )
return self._get_bigquery_log_entries_via_gcp_logging( entries = self._get_bigquery_log_entries_via_gcp_logging(
logging_client, limit=limit logging_client, limit=limit
) )
parse_fn = self._parse_bigquery_log_entry
log_entry_parse_failures = 0
for entry in entries:
try:
event = parse_fn(entry)
if event:
yield event
except Exception as e:
logger.warning(f"Unable to parse log entry `{entry}`: {e}")
log_entry_parse_failures += 1
if log_entry_parse_failures:
self.report.report_warning(
"usage-extraction",
f"Failed to parse {log_entry_parse_failures} audit log entries for project {project_id}.",
)
def test_capability(self, project_id: str) -> None: def test_capability(self, project_id: str) -> None:
for entry in self._get_parsed_bigquery_log_events(project_id, limit=1): for entry in self._get_parsed_bigquery_log_events(project_id, limit=1):

View File

@ -17,16 +17,24 @@ from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile
from datahub.metadata.schema_classes import DatasetProfileClass from datahub.metadata.schema_classes import DatasetProfileClass
from datahub.utilities.stats_collections import TopKDict from datahub.utilities.stats_collections import TopKDict, int_top_k_dict
@dataclass @dataclass
class DetailedProfilerReportMixin: class DetailedProfilerReportMixin:
profiling_skipped_not_updated: TopKDict[str, int] = field(default_factory=TopKDict) profiling_skipped_not_updated: TopKDict[str, int] = field(
profiling_skipped_size_limit: TopKDict[str, int] = field(default_factory=TopKDict) default_factory=int_top_k_dict
)
profiling_skipped_size_limit: TopKDict[str, int] = field(
default_factory=int_top_k_dict
)
profiling_skipped_row_limit: TopKDict[str, int] = field(default_factory=TopKDict) profiling_skipped_row_limit: TopKDict[str, int] = field(
num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict) default_factory=int_top_k_dict
)
num_tables_not_eligible_profiling: Dict[str, int] = field(
default_factory=int_top_k_dict
)
class ProfilingSqlReport(DetailedProfilerReportMixin, SQLSourceReport): class ProfilingSqlReport(DetailedProfilerReportMixin, SQLSourceReport):
@ -163,9 +171,7 @@ class GenericProfiler:
if (threshold_time is not None) and ( if (threshold_time is not None) and (
last_altered is not None and last_altered < threshold_time last_altered is not None and last_altered < threshold_time
): ):
self.report.profiling_skipped_not_updated[schema_name] = ( self.report.profiling_skipped_not_updated[schema_name] += 1
self.report.profiling_skipped_not_updated.get(schema_name, 0) + 1
)
return False return False
if self.config.profiling.profile_table_size_limit is not None and ( if self.config.profiling.profile_table_size_limit is not None and (
@ -173,18 +179,14 @@ class GenericProfiler:
or size_in_bytes / (2**30) or size_in_bytes / (2**30)
> self.config.profiling.profile_table_size_limit > self.config.profiling.profile_table_size_limit
): ):
self.report.profiling_skipped_size_limit[schema_name] = ( self.report.profiling_skipped_size_limit[schema_name] += 1
self.report.profiling_skipped_size_limit.get(schema_name, 0) + 1
)
return False return False
if self.config.profiling.profile_table_row_limit is not None and ( if self.config.profiling.profile_table_row_limit is not None and (
rows_count is None rows_count is None
or rows_count > self.config.profiling.profile_table_row_limit or rows_count > self.config.profiling.profile_table_row_limit
): ):
self.report.profiling_skipped_row_limit[schema_name] = ( self.report.profiling_skipped_row_limit[schema_name] += 1
self.report.profiling_skipped_row_limit.get(schema_name, 0) + 1
)
return False return False
return True return True

View File

@ -1,16 +1,31 @@
from typing import Any, Dict, TypeVar, Union from typing import Any, Callable, DefaultDict, Dict, Optional, TypeVar
from typing_extensions import Protocol
_CT = TypeVar("_CT")
class Comparable(Protocol):
def __lt__(self: _CT, other: _CT) -> bool:
pass
T = TypeVar("T")
_KT = TypeVar("_KT") _KT = TypeVar("_KT")
_VT = TypeVar("_VT") _VT = TypeVar("_VT", bound=Comparable)
class TopKDict(Dict[_KT, _VT]): class TopKDict(DefaultDict[_KT, _VT]):
"""A structure that only prints the top K items from the dictionary. Not lossy.""" """A structure that only prints the top K items from the dictionary. Not lossy."""
def __init__(self, top_k: int = 10) -> None: def __init__(
super().__init__() self,
self.top_k = 10 default_factory: Optional[Callable[[], _VT]] = None,
*args: Any,
top_k: int = 10,
**kwargs: Any,
) -> None:
super().__init__(default_factory, *args, **kwargs)
self.top_k = top_k
def __repr__(self) -> str: def __repr__(self) -> str:
return repr(self.as_obj()) return repr(self.as_obj())
@ -18,18 +33,19 @@ class TopKDict(Dict[_KT, _VT]):
def __str__(self) -> str: def __str__(self) -> str:
return self.__repr__() return self.__repr__()
@staticmethod def as_obj(self) -> Dict[_KT, _VT]:
def _trim_dictionary(big_dict: Dict[str, Any]) -> Dict[str, Any]: if len(self) <= self.top_k:
if big_dict is not None and len(big_dict) > 10: return dict(self)
dict_as_tuples = [(k, v) for k, v in big_dict.items()] else:
sorted_tuples = sorted(dict_as_tuples, key=lambda x: x[1], reverse=True) try:
dict_as_tuples = sorted_tuples[:10] trimmed_dict = dict(
trimmed_dict = {k: v for k, v in dict_as_tuples} sorted(self.items(), key=lambda x: x[1], reverse=True)[: self.top_k]
trimmed_dict[f"... top(10) of total {len(big_dict)} entries"] = "" )
except TypeError:
trimmed_dict = dict(list(self.items())[: self.top_k])
trimmed_dict[f"... top {self.top_k} of total {len(self)} entries"] = "" # type: ignore
return trimmed_dict return trimmed_dict
return big_dict
def as_obj(self) -> Dict[Union[_KT, str], Union[_VT, str]]: def int_top_k_dict() -> TopKDict[str, int]:
base_dict: Dict[Union[_KT, str], Union[_VT, str]] = super().copy() # type: ignore return TopKDict(int)
return self._trim_dictionary(base_dict) # type: ignore