From cc32c30b2a0c2e46fc83d1085f66c0f6b938c8a2 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 4 Feb 2022 23:02:36 +0100 Subject: [PATCH] fix(ingest): bigquery - fix for hitting limit if there are too many partitioned tables (#4056) --- .../ingestion/source/ge_data_profiler.py | 1 - .../datahub/ingestion/source/sql/bigquery.py | 44 +++++++++++++++---- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index aad762d30c..2387d4683e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -748,7 +748,6 @@ class DatahubGEProfiler: custom_sql: str = None, **kwargs: Any, ) -> Optional[DatasetProfileClass]: - logger.info(f"Profiling {pretty_name}") bigquery_temp_table: Optional[str] = None ge_config = { "schema": schema, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 30784ebfc4..e4de376cb5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -94,12 +94,20 @@ where is_partitioning_column = 'YES' -- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables) and p.partition_id not in ('__NULL__', '__UNPARTITIONED__') + and STORAGE_TIER='ACTIVE' group by c.table_catalog, c.table_schema, c.table_name, c.column_name, c.data_type +order by + c.table_catalog, + c.table_schema, + c.table_name, + c.column_name +limit {limit} +offset {offset} """.strip() SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$" @@ -467,19 +475,39 @@ class BigQuerySource(SQLAlchemySource): return lineage_map def get_latest_partitions_for_schema(self, schema: str) -> None: + query_limit: int = 500 + offset: int = 0 url = self.config.get_sql_alchemy_url() engine = create_engine(url, **self.config.options) with engine.connect() as con: inspector = inspect(con) - sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( - project_id=self.get_db_name(inspector), - schema=schema, - ) - result = con.execute(sql) partitions = {} - for row in result: - partition = BigQueryPartitionColumn(**row) - partitions[partition.table_name] = partition + + def get_partition_columns( + project_id: str, schema: str, limit: int, offset: int + ) -> int: + sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( + project_id=project_id, + schema=schema, + limit=limit, + offset=offset, + ) + result = con.execute(sql) + row_count: int = 0 + for row in result: + partition = BigQueryPartitionColumn(**row) + partitions[partition.table_name] = partition + row_count = row_count + 1 + return row_count + + res_size = get_partition_columns( + self.get_db_name(inspector), schema, query_limit, offset + ) + while res_size == query_limit: + offset = offset + query_limit + res_size = get_partition_columns( + self.get_db_name(inspector), schema, query_limit, offset + ) self.partiton_columns[schema] = partitions