From 44663fa03556f260045e0520a41a0524cb1fca29 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Thu, 6 Apr 2023 13:24:22 -0700 Subject: [PATCH] fix(ingest/bigquery): Raise report_failure threshold; add robustness around table parsing (#7772) - Converted getting views and tables to iterators - Catches exception around table expiration time being impossible to represent in python because it's too far in the future --- .../ingestion/source/bigquery_v2/bigquery.py | 47 +++--- .../source/bigquery_v2/bigquery_schema.py | 139 +++++++++++------- .../ingestion/source/bigquery_v2/lineage.py | 2 +- .../ingestion/source/bigquery_v2/usage.py | 4 +- .../tests/unit/test_bigquery_source.py | 14 +- 5 files changed, 116 insertions(+), 90 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 4130bf0f29..015b0fff89 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -308,7 +308,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): tables={}, with_data_read_permission=config.profiling.enabled, ) - if len(tables) == 0: + if len(list(tables)) == 0: return CapabilityReport( capable=False, failure_reason=f"Tables query did not return any table. It is either empty or no tables in project {project_id}.{result[0].name}", @@ -759,8 +759,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): ) if self.config.include_tables: - db_tables[dataset_name] = self.get_tables_for_dataset( - conn, project_id, dataset_name + db_tables[dataset_name] = list( + self.get_tables_for_dataset(conn, project_id, dataset_name) ) for table in db_tables[dataset_name]: @@ -773,8 +773,10 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): ) if self.config.include_views: - db_views[dataset_name] = BigQueryDataDictionary.get_views_for_dataset( - conn, project_id, dataset_name, self.config.profiling.enabled + db_views[dataset_name] = list( + BigQueryDataDictionary.get_views_for_dataset( + conn, project_id, dataset_name, self.config.profiling.enabled + ) ) for view in db_views[dataset_name]: @@ -1178,12 +1180,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): conn: bigquery.Client, project_id: str, dataset_name: str, - ) -> List[BigqueryTable]: - bigquery_tables: Optional[List[BigqueryTable]] = [] - + ) -> Iterable[BigqueryTable]: # In bigquery there is no way to query all tables in a Project id with PerfTimer() as timer: - bigquery_tables = [] # Partitions view throw exception if we try to query partition info for too many tables # so we have to limit the number of tables we query partition info. # The conn.list_tables returns table infos that information_schema doesn't contain and this @@ -1203,34 +1202,28 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): for table_item in table_items.keys(): items_to_get[table_item] = table_items[table_item] if len(items_to_get) % max_batch_size == 0: - bigquery_tables.extend( - BigQueryDataDictionary.get_tables_for_dataset( - conn, - project_id, - dataset_name, - items_to_get, - with_data_read_permission=self.config.profiling.enabled, - ) - ) - items_to_get.clear() - - if items_to_get: - bigquery_tables.extend( - BigQueryDataDictionary.get_tables_for_dataset( + yield from BigQueryDataDictionary.get_tables_for_dataset( conn, project_id, dataset_name, items_to_get, with_data_read_permission=self.config.profiling.enabled, ) + items_to_get.clear() + + if items_to_get: + yield from BigQueryDataDictionary.get_tables_for_dataset( + conn, + project_id, + dataset_name, + items_to_get, + with_data_read_permission=self.config.profiling.enabled, ) self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round( timer.elapsed_seconds(), 2 ) - return bigquery_tables - def get_core_table_details( self, conn: bigquery.Client, dataset_name: str, project_id: str ) -> Dict[str, TableListItem]: @@ -1297,7 +1290,3 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): self.report.use_exported_bigquery_audit_metadata = ( self.config.use_exported_bigquery_audit_metadata ) - - def warn(self, log: logging.Logger, key: str, reason: str) -> None: - self.report.report_warning(key, reason) - log.warning(f"{key} => {reason}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index eb3c4c1db4..15c54302e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -2,7 +2,7 @@ import logging from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, Iterator, List, Optional from google.cloud import bigquery from google.cloud.bigquery.table import ( @@ -13,6 +13,7 @@ from google.cloud.bigquery.table import ( ) from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier +from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView logger: logging.Logger = logging.getLogger(__name__) @@ -362,8 +363,6 @@ class BigQueryDataDictionary: def get_datasets_for_project_id( conn: bigquery.Client, project_id: str, maxResults: Optional[int] = None ) -> List[BigqueryDataset]: - # FIXME: Due to a bug in BigQuery's type annotations, we need to cast here. - maxResults = cast(int, maxResults) datasets = conn.list_datasets(project_id, max_results=maxResults) return [BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets] @@ -399,7 +398,8 @@ class BigQueryDataDictionary: dataset_name: str, tables: Dict[str, TableListItem], with_data_read_permission: bool = False, - ) -> List[BigqueryTable]: + report: Optional[BigQueryV2Report] = None, + ) -> Iterator[BigqueryTable]: filter: str = ", ".join(f"'{table}'" for table in tables.keys()) if with_data_read_permission: @@ -424,42 +424,61 @@ class BigQueryDataDictionary: table_filter=f" and t.table_name in ({filter})" if filter else "", ), ) - # Some property we want to capture only available from the TableListItem we get from an earlier query of - # the list of tables. - return [ - BigqueryTable( - name=table.table_name, - created=table.created, - last_altered=datetime.fromtimestamp( - table.get("last_altered") / 1000, tz=timezone.utc + + for table in cur: + try: + yield BigQueryDataDictionary._make_bigquery_table( + table, tables.get(table.table_name) ) - if table.get("last_altered") is not None - else table.created, - size_in_bytes=table.get("bytes"), - rows_count=table.get("row_count"), - comment=table.comment, - ddl=table.ddl, - expires=tables[table.table_name].expires if tables else None, - labels=tables[table.table_name].labels if tables else None, - partition_info=PartitionInfo.from_table_info(tables[table.table_name]) - if tables - else None, - clustering_fields=tables[table.table_name].clustering_fields - if tables - else None, - max_partition_id=table.get("max_partition_id"), - max_shard_id=BigqueryTableIdentifier.get_table_and_shard( - table.table_name - )[1] - if len(BigqueryTableIdentifier.get_table_and_shard(table.table_name)) - == 2 - else None, - num_partitions=table.get("num_partitions"), - active_billable_bytes=table.get("active_billable_bytes"), - long_term_billable_bytes=table.get("long_term_billable_bytes"), + except Exception as e: + table_name = f"{project_id}.{dataset_name}.{table.table_name}" + logger.warning( + f"Error while processing table {table_name}", + exc_info=True, + ) + if report: + report.report_warning( + "metadata-extraction", + f"Failed to get table {table_name}: {e}", + ) + + @staticmethod + def _make_bigquery_table( + table: bigquery.Row, table_basic: Optional[TableListItem] + ) -> BigqueryTable: + # Some properties we want to capture are only available from the TableListItem + # we get from an earlier query of the list of tables. + try: + expiration = table_basic.expires if table_basic else None + except OverflowError: + logger.info(f"Invalid expiration time for table {table.table_name}.") + expiration = None + + _, shard = BigqueryTableIdentifier.get_table_and_shard(table.table_name) + return BigqueryTable( + name=table.table_name, + created=table.created, + last_altered=datetime.fromtimestamp( + table.get("last_altered") / 1000, tz=timezone.utc ) - for table in cur - ] + if table.get("last_altered") is not None + else table.created, + size_in_bytes=table.get("bytes"), + rows_count=table.get("row_count"), + comment=table.comment, + ddl=table.ddl, + expires=expiration, + labels=table_basic.labels if table_basic else None, + partition_info=PartitionInfo.from_table_info(table_basic) + if table_basic + else None, + clustering_fields=table_basic.clustering_fields if table_basic else None, + max_partition_id=table.get("max_partition_id"), + max_shard_id=shard, + num_partitions=table.get("num_partitions"), + active_billable_bytes=table.get("active_billable_bytes"), + long_term_billable_bytes=table.get("long_term_billable_bytes"), + ) @staticmethod def get_views_for_dataset( @@ -467,7 +486,8 @@ class BigQueryDataDictionary: project_id: str, dataset_name: str, has_data_read: bool, - ) -> List[BigqueryView]: + report: Optional[BigQueryV2Report] = None, + ) -> Iterator[BigqueryView]: if has_data_read: cur = BigQueryDataDictionary.get_query_result( conn, @@ -483,21 +503,35 @@ class BigQueryDataDictionary: ), ) - return [ - BigqueryView( - name=table.table_name, - created=table.created, - last_altered=datetime.fromtimestamp( - table.get("last_altered") / 1000, tz=timezone.utc + for table in cur: + try: + yield BigQueryDataDictionary._make_bigquery_view(table) + except Exception as e: + view_name = f"{project_id}.{dataset_name}.{table.table_name}" + logger.warning( + f"Error while processing view {view_name}", + exc_info=True, ) - if table.get("last_altered") is not None - else table.created, - comment=table.comment, - view_definition=table.view_definition, - materialized=table.table_type == BigqueryTableType.MATERIALIZED_VIEW, + if report: + report.report_warning( + "metadata-extraction", + f"Failed to get view {view_name}: {e}", + ) + + @staticmethod + def _make_bigquery_view(view: bigquery.Row) -> BigqueryView: + return BigqueryView( + name=view.table_name, + created=view.created, + last_altered=datetime.fromtimestamp( + view.get("last_altered") / 1000, tz=timezone.utc ) - for table in cur - ] + if view.get("last_altered") is not None + else view.created, + comment=view.comment, + view_definition=view.view_definition, + materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW, + ) @staticmethod def get_columns_for_dataset( @@ -577,7 +611,6 @@ class BigQueryDataDictionary: logger.warning( f"{table_identifier.project_id}.{table_identifier.dataset}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns" ) - last_seen_table = column.table_name else: columns.append( BigqueryColumn( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index e70521396d..5eae458d42 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -90,7 +90,7 @@ timestamp < "{end_time}" self.report = report def error(self, log: logging.Logger, key: str, reason: str) -> None: - self.report.report_failure(key, reason) + self.report.report_warning(key, reason) log.error(f"{key} => {reason}") @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 747872da24..94e9b48c77 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -245,7 +245,7 @@ class BigQueryUsageExtractor: logger.warning( f"Encountered exception retrieving AuditLogEntries for project {client.project} - {e}" ) - self.report.report_failure( + self.report.report_warning( "lineage-extraction", f"{client.project} - unable to retrieve log entries {e}", ) @@ -346,7 +346,7 @@ class BigQueryUsageExtractor: logger.warning( f"Encountered exception retrieving AuditLogEntires for project {client.project} - {e}" ) - self.report.report_failure( + self.report.report_warning( "usage-extraction", f"{client.project} - unable to retrive log entrires {e}", ) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 5278dec717..e6b89d345d 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -330,8 +330,10 @@ def test_table_processing_logic(client_mock, data_dictionary_mock): source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - _ = source.get_tables_for_dataset( - conn=client_mock, project_id="test-project", dataset_name="test-dataset" + _ = list( + source.get_tables_for_dataset( + conn=client_mock, project_id="test-project", dataset_name="test-dataset" + ) ) assert data_dictionary_mock.call_count == 1 @@ -400,8 +402,10 @@ def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_m source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - _ = source.get_tables_for_dataset( - conn=client_mock, project_id="test-project", dataset_name="test-dataset" + _ = list( + source.get_tables_for_dataset( + conn=client_mock, project_id="test-project", dataset_name="test-dataset" + ) ) assert data_dictionary_mock.call_count == 1 @@ -487,7 +491,7 @@ def test_get_views_for_dataset( dataset_name="test-dataset", has_data_read=False, ) - assert views == [bigquery_view_1, bigquery_view_2] + assert list(views) == [bigquery_view_1, bigquery_view_2] @patch.object(BigqueryV2Source, "gen_dataset_workunits", lambda *args, **kwargs: [])