fix(ingest): bigquery - fix for hitting limit if there are too many partitioned tables (#4056)

This commit is contained in:
Tamas Nemeth 2022-02-04 23:02:36 +01:00 committed by GitHub
parent c8922b39d6
commit cc32c30b2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 9 deletions

View File

@ -748,7 +748,6 @@ class DatahubGEProfiler:
custom_sql: str = None,
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
logger.info(f"Profiling {pretty_name}")
bigquery_temp_table: Optional[str] = None
ge_config = {
"schema": schema,

View File

@ -94,12 +94,20 @@ where
is_partitioning_column = 'YES'
-- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables)
and p.partition_id not in ('__NULL__', '__UNPARTITIONED__')
and STORAGE_TIER='ACTIVE'
group by
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name,
c.data_type
order by
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name
limit {limit}
offset {offset}
""".strip()
SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$"
@ -467,19 +475,39 @@ class BigQuerySource(SQLAlchemySource):
return lineage_map
def get_latest_partitions_for_schema(self, schema: str) -> None:
query_limit: int = 500
offset: int = 0
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
with engine.connect() as con:
inspector = inspect(con)
partitions = {}
def get_partition_columns(
project_id: str, schema: str, limit: int, offset: int
) -> int:
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=self.get_db_name(inspector),
project_id=project_id,
schema=schema,
limit=limit,
offset=offset,
)
result = con.execute(sql)
partitions = {}
row_count: int = 0
for row in result:
partition = BigQueryPartitionColumn(**row)
partitions[partition.table_name] = partition
row_count = row_count + 1
return row_count
res_size = get_partition_columns(
self.get_db_name(inspector), schema, query_limit, offset
)
while res_size == query_limit:
offset = offset + query_limit
res_size = get_partition_columns(
self.get_db_name(inspector), schema, query_limit, offset
)
self.partiton_columns[schema] = partitions