feat(ingest): bigquery - enhance logging while processing audit logs (#4101)

This commit is contained in:
Ravindra Lanka 2022-02-09 13:34:52 -08:00 committed by GitHub
parent 0f56bc5d48
commit 2d7452d64a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -317,6 +317,7 @@ class BigQuerySource(SQLAlchemySource):
)
def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
logger.info("Populating lineage info via GCP audit logs")
try:
_clients: List[GCPLoggingClient] = self._make_bigquery_client()
log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries(
@ -333,6 +334,7 @@ class BigQuerySource(SQLAlchemySource):
)
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None:
logger.info("Populating lineage info via exported GCP audit logs")
try:
_client: BigQueryClient = BigQueryClient(project=self.config.project_id)
exported_bigquery_audit_metadata: Iterable[
@ -441,11 +443,15 @@ class BigQuerySource(SQLAlchemySource):
def _parse_bigquery_log_entries(
self, entries: Iterable[AuditLogEntry]
) -> Iterable[QueryEvent]:
num_total_log_entries: int = 0
num_parsed_log_entires: int = 0
for entry in entries:
num_total_log_entries += 1
event: Optional[QueryEvent] = None
try:
if QueryEvent.can_parse_entry(entry):
event = QueryEvent.from_entry(entry)
num_parsed_log_entires += 1
else:
raise RuntimeError("Unable to parse log entry as QueryEvent.")
except Exception as e:
@ -456,6 +462,10 @@ class BigQuerySource(SQLAlchemySource):
logger.error("Unable to parse GCP log entry.", e)
if event is not None:
yield event
logger.info(
f"Parsing BigQuery log entries: Number of log entries scanned={num_total_log_entries}, "
f"number of log entries successfully parsed={num_parsed_log_entires}"
)
def _parse_exported_bigquery_audit_metadata(
self, audit_metadata_rows: Iterable[BigQueryAuditMetadata]
@ -482,18 +492,29 @@ class BigQuerySource(SQLAlchemySource):
def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[str]]:
lineage_map: Dict[str, Set[str]] = collections.defaultdict(set)
num_entries: int = 0
num_skipped_entries: int = 0
for e in entries:
num_entries += 1
if (
e.destinationTable is None
or e.destinationTable.is_anonymous()
or not e.referencedTables
):
num_skipped_entries += 1
continue
entry_consumed: bool = False
for ref_table in e.referencedTables:
destination_table_str = str(e.destinationTable.remove_extras())
ref_table_str = str(ref_table.remove_extras())
if ref_table_str != destination_table_str:
lineage_map[destination_table_str].add(ref_table_str)
entry_consumed = True
if not entry_consumed:
num_skipped_entries += 1
logger.info(
f"Creating lineage map: total number of entries={num_entries}, number skipped={num_skipped_entries}."
)
return lineage_map
def get_latest_partition(