fix(ingest/bigquery): Raise report_failure threshold; add robustness around table parsing (#7772)

- Converted getting views and tables to iterators
- Catches exception around table expiration time being impossible to represent in python because it's too far in the future
This commit is contained in:
Andrew Sikowitz 2023-04-06 13:24:22 -07:00 committed by GitHub
parent dac540e691
commit 44663fa035
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 90 deletions

View File

@ -308,7 +308,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
tables={}, tables={},
with_data_read_permission=config.profiling.enabled, with_data_read_permission=config.profiling.enabled,
) )
if len(tables) == 0: if len(list(tables)) == 0:
return CapabilityReport( return CapabilityReport(
capable=False, capable=False,
failure_reason=f"Tables query did not return any table. It is either empty or no tables in project {project_id}.{result[0].name}", failure_reason=f"Tables query did not return any table. It is either empty or no tables in project {project_id}.{result[0].name}",
@ -759,8 +759,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
) )
if self.config.include_tables: if self.config.include_tables:
db_tables[dataset_name] = self.get_tables_for_dataset( db_tables[dataset_name] = list(
conn, project_id, dataset_name self.get_tables_for_dataset(conn, project_id, dataset_name)
) )
for table in db_tables[dataset_name]: for table in db_tables[dataset_name]:
@ -773,9 +773,11 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
) )
if self.config.include_views: if self.config.include_views:
db_views[dataset_name] = BigQueryDataDictionary.get_views_for_dataset( db_views[dataset_name] = list(
BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name, self.config.profiling.enabled conn, project_id, dataset_name, self.config.profiling.enabled
) )
)
for view in db_views[dataset_name]: for view in db_views[dataset_name]:
view_columns = columns.get(view.name, []) if columns else [] view_columns = columns.get(view.name, []) if columns else []
@ -1178,12 +1180,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
conn: bigquery.Client, conn: bigquery.Client,
project_id: str, project_id: str,
dataset_name: str, dataset_name: str,
) -> List[BigqueryTable]: ) -> Iterable[BigqueryTable]:
bigquery_tables: Optional[List[BigqueryTable]] = []
# In bigquery there is no way to query all tables in a Project id # In bigquery there is no way to query all tables in a Project id
with PerfTimer() as timer: with PerfTimer() as timer:
bigquery_tables = []
# Partitions view throw exception if we try to query partition info for too many tables # Partitions view throw exception if we try to query partition info for too many tables
# so we have to limit the number of tables we query partition info. # so we have to limit the number of tables we query partition info.
# The conn.list_tables returns table infos that information_schema doesn't contain and this # The conn.list_tables returns table infos that information_schema doesn't contain and this
@ -1203,34 +1202,28 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
for table_item in table_items.keys(): for table_item in table_items.keys():
items_to_get[table_item] = table_items[table_item] items_to_get[table_item] = table_items[table_item]
if len(items_to_get) % max_batch_size == 0: if len(items_to_get) % max_batch_size == 0:
bigquery_tables.extend( yield from BigQueryDataDictionary.get_tables_for_dataset(
BigQueryDataDictionary.get_tables_for_dataset(
conn, conn,
project_id, project_id,
dataset_name, dataset_name,
items_to_get, items_to_get,
with_data_read_permission=self.config.profiling.enabled, with_data_read_permission=self.config.profiling.enabled,
) )
)
items_to_get.clear() items_to_get.clear()
if items_to_get: if items_to_get:
bigquery_tables.extend( yield from BigQueryDataDictionary.get_tables_for_dataset(
BigQueryDataDictionary.get_tables_for_dataset(
conn, conn,
project_id, project_id,
dataset_name, dataset_name,
items_to_get, items_to_get,
with_data_read_permission=self.config.profiling.enabled, with_data_read_permission=self.config.profiling.enabled,
) )
)
self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round( self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round(
timer.elapsed_seconds(), 2 timer.elapsed_seconds(), 2
) )
return bigquery_tables
def get_core_table_details( def get_core_table_details(
self, conn: bigquery.Client, dataset_name: str, project_id: str self, conn: bigquery.Client, dataset_name: str, project_id: str
) -> Dict[str, TableListItem]: ) -> Dict[str, TableListItem]:
@ -1297,7 +1290,3 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
self.report.use_exported_bigquery_audit_metadata = ( self.report.use_exported_bigquery_audit_metadata = (
self.config.use_exported_bigquery_audit_metadata self.config.use_exported_bigquery_audit_metadata
) )
def warn(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_warning(key, reason)
log.warning(f"{key} => {reason}")

View File

@ -2,7 +2,7 @@ import logging
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, cast from typing import Any, Dict, Iterator, List, Optional
from google.cloud import bigquery from google.cloud import bigquery
from google.cloud.bigquery.table import ( from google.cloud.bigquery.table import (
@ -13,6 +13,7 @@ from google.cloud.bigquery.table import (
) )
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -362,8 +363,6 @@ class BigQueryDataDictionary:
def get_datasets_for_project_id( def get_datasets_for_project_id(
conn: bigquery.Client, project_id: str, maxResults: Optional[int] = None conn: bigquery.Client, project_id: str, maxResults: Optional[int] = None
) -> List[BigqueryDataset]: ) -> List[BigqueryDataset]:
# FIXME: Due to a bug in BigQuery's type annotations, we need to cast here.
maxResults = cast(int, maxResults)
datasets = conn.list_datasets(project_id, max_results=maxResults) datasets = conn.list_datasets(project_id, max_results=maxResults)
return [BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets] return [BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets]
@ -399,7 +398,8 @@ class BigQueryDataDictionary:
dataset_name: str, dataset_name: str,
tables: Dict[str, TableListItem], tables: Dict[str, TableListItem],
with_data_read_permission: bool = False, with_data_read_permission: bool = False,
) -> List[BigqueryTable]: report: Optional[BigQueryV2Report] = None,
) -> Iterator[BigqueryTable]:
filter: str = ", ".join(f"'{table}'" for table in tables.keys()) filter: str = ", ".join(f"'{table}'" for table in tables.keys())
if with_data_read_permission: if with_data_read_permission:
@ -424,10 +424,38 @@ class BigQueryDataDictionary:
table_filter=f" and t.table_name in ({filter})" if filter else "", table_filter=f" and t.table_name in ({filter})" if filter else "",
), ),
) )
# Some property we want to capture only available from the TableListItem we get from an earlier query of
# the list of tables. for table in cur:
return [ try:
BigqueryTable( yield BigQueryDataDictionary._make_bigquery_table(
table, tables.get(table.table_name)
)
except Exception as e:
table_name = f"{project_id}.{dataset_name}.{table.table_name}"
logger.warning(
f"Error while processing table {table_name}",
exc_info=True,
)
if report:
report.report_warning(
"metadata-extraction",
f"Failed to get table {table_name}: {e}",
)
@staticmethod
def _make_bigquery_table(
table: bigquery.Row, table_basic: Optional[TableListItem]
) -> BigqueryTable:
# Some properties we want to capture are only available from the TableListItem
# we get from an earlier query of the list of tables.
try:
expiration = table_basic.expires if table_basic else None
except OverflowError:
logger.info(f"Invalid expiration time for table {table.table_name}.")
expiration = None
_, shard = BigqueryTableIdentifier.get_table_and_shard(table.table_name)
return BigqueryTable(
name=table.table_name, name=table.table_name,
created=table.created, created=table.created,
last_altered=datetime.fromtimestamp( last_altered=datetime.fromtimestamp(
@ -439,27 +467,18 @@ class BigQueryDataDictionary:
rows_count=table.get("row_count"), rows_count=table.get("row_count"),
comment=table.comment, comment=table.comment,
ddl=table.ddl, ddl=table.ddl,
expires=tables[table.table_name].expires if tables else None, expires=expiration,
labels=tables[table.table_name].labels if tables else None, labels=table_basic.labels if table_basic else None,
partition_info=PartitionInfo.from_table_info(tables[table.table_name]) partition_info=PartitionInfo.from_table_info(table_basic)
if tables if table_basic
else None,
clustering_fields=tables[table.table_name].clustering_fields
if tables
else None, else None,
clustering_fields=table_basic.clustering_fields if table_basic else None,
max_partition_id=table.get("max_partition_id"), max_partition_id=table.get("max_partition_id"),
max_shard_id=BigqueryTableIdentifier.get_table_and_shard( max_shard_id=shard,
table.table_name
)[1]
if len(BigqueryTableIdentifier.get_table_and_shard(table.table_name))
== 2
else None,
num_partitions=table.get("num_partitions"), num_partitions=table.get("num_partitions"),
active_billable_bytes=table.get("active_billable_bytes"), active_billable_bytes=table.get("active_billable_bytes"),
long_term_billable_bytes=table.get("long_term_billable_bytes"), long_term_billable_bytes=table.get("long_term_billable_bytes"),
) )
for table in cur
]
@staticmethod @staticmethod
def get_views_for_dataset( def get_views_for_dataset(
@ -467,7 +486,8 @@ class BigQueryDataDictionary:
project_id: str, project_id: str,
dataset_name: str, dataset_name: str,
has_data_read: bool, has_data_read: bool,
) -> List[BigqueryView]: report: Optional[BigQueryV2Report] = None,
) -> Iterator[BigqueryView]:
if has_data_read: if has_data_read:
cur = BigQueryDataDictionary.get_query_result( cur = BigQueryDataDictionary.get_query_result(
conn, conn,
@ -483,21 +503,35 @@ class BigQueryDataDictionary:
), ),
) )
return [ for table in cur:
BigqueryView( try:
name=table.table_name, yield BigQueryDataDictionary._make_bigquery_view(table)
created=table.created, except Exception as e:
view_name = f"{project_id}.{dataset_name}.{table.table_name}"
logger.warning(
f"Error while processing view {view_name}",
exc_info=True,
)
if report:
report.report_warning(
"metadata-extraction",
f"Failed to get view {view_name}: {e}",
)
@staticmethod
def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
return BigqueryView(
name=view.table_name,
created=view.created,
last_altered=datetime.fromtimestamp( last_altered=datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc view.get("last_altered") / 1000, tz=timezone.utc
) )
if table.get("last_altered") is not None if view.get("last_altered") is not None
else table.created, else view.created,
comment=table.comment, comment=view.comment,
view_definition=table.view_definition, view_definition=view.view_definition,
materialized=table.table_type == BigqueryTableType.MATERIALIZED_VIEW, materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW,
) )
for table in cur
]
@staticmethod @staticmethod
def get_columns_for_dataset( def get_columns_for_dataset(
@ -577,7 +611,6 @@ class BigQueryDataDictionary:
logger.warning( logger.warning(
f"{table_identifier.project_id}.{table_identifier.dataset}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns" f"{table_identifier.project_id}.{table_identifier.dataset}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns"
) )
last_seen_table = column.table_name
else: else:
columns.append( columns.append(
BigqueryColumn( BigqueryColumn(

View File

@ -90,7 +90,7 @@ timestamp < "{end_time}"
self.report = report self.report = report
def error(self, log: logging.Logger, key: str, reason: str) -> None: def error(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_failure(key, reason) self.report.report_warning(key, reason)
log.error(f"{key} => {reason}") log.error(f"{key} => {reason}")
@staticmethod @staticmethod

View File

@ -245,7 +245,7 @@ class BigQueryUsageExtractor:
logger.warning( logger.warning(
f"Encountered exception retrieving AuditLogEntries for project {client.project} - {e}" f"Encountered exception retrieving AuditLogEntries for project {client.project} - {e}"
) )
self.report.report_failure( self.report.report_warning(
"lineage-extraction", "lineage-extraction",
f"{client.project} - unable to retrieve log entries {e}", f"{client.project} - unable to retrieve log entries {e}",
) )
@ -346,7 +346,7 @@ class BigQueryUsageExtractor:
logger.warning( logger.warning(
f"Encountered exception retrieving AuditLogEntires for project {client.project} - {e}" f"Encountered exception retrieving AuditLogEntires for project {client.project} - {e}"
) )
self.report.report_failure( self.report.report_warning(
"usage-extraction", "usage-extraction",
f"{client.project} - unable to retrive log entrires {e}", f"{client.project} - unable to retrive log entrires {e}",
) )

View File

@ -330,9 +330,11 @@ def test_table_processing_logic(client_mock, data_dictionary_mock):
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
_ = source.get_tables_for_dataset( _ = list(
source.get_tables_for_dataset(
conn=client_mock, project_id="test-project", dataset_name="test-dataset" conn=client_mock, project_id="test-project", dataset_name="test-dataset"
) )
)
assert data_dictionary_mock.call_count == 1 assert data_dictionary_mock.call_count == 1
@ -400,9 +402,11 @@ def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_m
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
_ = source.get_tables_for_dataset( _ = list(
source.get_tables_for_dataset(
conn=client_mock, project_id="test-project", dataset_name="test-dataset" conn=client_mock, project_id="test-project", dataset_name="test-dataset"
) )
)
assert data_dictionary_mock.call_count == 1 assert data_dictionary_mock.call_count == 1
@ -487,7 +491,7 @@ def test_get_views_for_dataset(
dataset_name="test-dataset", dataset_name="test-dataset",
has_data_read=False, has_data_read=False,
) )
assert views == [bigquery_view_1, bigquery_view_2] assert list(views) == [bigquery_view_1, bigquery_view_2]
@patch.object(BigqueryV2Source, "gen_dataset_workunits", lambda *args, **kwargs: []) @patch.object(BigqueryV2Source, "gen_dataset_workunits", lambda *args, **kwargs: [])