mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 02:37:05 +00:00 
			
		
		
		
	fix(profile):bigquery - Check for every table if it is partitioned to not hit table quota (#4074)
This commit is contained in:
		
							parent
							
								
									782e66f5cf
								
							
						
					
					
						commit
						622d7bfccb
					
				| @ -97,6 +97,7 @@ where | |||||||
|     -- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables) |     -- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables) | ||||||
|     and p.partition_id not in ('__NULL__', '__UNPARTITIONED__') |     and p.partition_id not in ('__NULL__', '__UNPARTITIONED__') | ||||||
|     and STORAGE_TIER='ACTIVE' |     and STORAGE_TIER='ACTIVE' | ||||||
|  |     and p.table_name= '{table}' | ||||||
| group by | group by | ||||||
|     c.table_catalog, |     c.table_catalog, | ||||||
|     c.table_schema, |     c.table_schema, | ||||||
| @ -108,8 +109,6 @@ order by | |||||||
|     c.table_schema, |     c.table_schema, | ||||||
|     c.table_name, |     c.table_name, | ||||||
|     c.column_name |     c.column_name | ||||||
| limit {limit} |  | ||||||
| offset {offset} |  | ||||||
| """.strip() | """.strip() | ||||||
| 
 | 
 | ||||||
| SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$" | SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$" | ||||||
| @ -289,7 +288,6 @@ class BigQueryDatasetKey(ProjectIdKey): | |||||||
| 
 | 
 | ||||||
| class BigQuerySource(SQLAlchemySource): | class BigQuerySource(SQLAlchemySource): | ||||||
|     config: BigQueryConfig |     config: BigQueryConfig | ||||||
|     partiton_columns: Dict[str, Dict[str, BigQueryPartitionColumn]] = dict() |  | ||||||
|     maximum_shard_ids: Dict[str, str] = dict() |     maximum_shard_ids: Dict[str, str] = dict() | ||||||
|     lineage_metadata: Optional[Dict[str, Set[str]]] = None |     lineage_metadata: Optional[Dict[str, Set[str]]] = None | ||||||
| 
 | 
 | ||||||
| @ -486,50 +484,23 @@ class BigQuerySource(SQLAlchemySource): | |||||||
|                     lineage_map[destination_table_str].add(ref_table_str) |                     lineage_map[destination_table_str].add(ref_table_str) | ||||||
|         return lineage_map |         return lineage_map | ||||||
| 
 | 
 | ||||||
|     def get_latest_partitions_for_schema(self, schema: str) -> None: |     def get_latest_partition( | ||||||
|         query_limit: int = 500 |         self, schema: str, table: str | ||||||
|         offset: int = 0 |     ) -> Optional[BigQueryPartitionColumn]: | ||||||
|         url = self.config.get_sql_alchemy_url() |         url = self.config.get_sql_alchemy_url() | ||||||
|         engine = create_engine(url, **self.config.options) |         engine = create_engine(url, **self.config.options) | ||||||
|         with engine.connect() as con: |         with engine.connect() as con: | ||||||
|             inspector = inspect(con) |             inspector = inspect(con) | ||||||
|             partitions = {} |  | ||||||
| 
 |  | ||||||
|             def get_partition_columns( |  | ||||||
|                 project_id: str, schema: str, limit: int, offset: int |  | ||||||
|             ) -> int: |  | ||||||
|             sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( |             sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( | ||||||
|                     project_id=project_id, |                 project_id=self.get_db_name(inspector), schema=schema, table=table | ||||||
|                     schema=schema, |  | ||||||
|                     limit=limit, |  | ||||||
|                     offset=offset, |  | ||||||
|             ) |             ) | ||||||
|             result = con.execute(sql) |             result = con.execute(sql) | ||||||
|                 row_count: int = 0 |             # Bigquery only supports one partition column | ||||||
|                 for row in result: |             # https://stackoverflow.com/questions/62886213/adding-multiple-partitioned-columns-to-bigquery-table-from-sql-query | ||||||
|                     partition = BigQueryPartitionColumn(**row) |             row = result.fetchone() | ||||||
|                     partitions[partition.table_name] = partition |             if row: | ||||||
|                     row_count = row_count + 1 |                 return BigQueryPartitionColumn(**row) | ||||||
|                 return row_count |             return None | ||||||
| 
 |  | ||||||
|             res_size = get_partition_columns( |  | ||||||
|                 self.get_db_name(inspector), schema, query_limit, offset |  | ||||||
|             ) |  | ||||||
|             while res_size == query_limit: |  | ||||||
|                 offset = offset + query_limit |  | ||||||
|                 res_size = get_partition_columns( |  | ||||||
|                     self.get_db_name(inspector), schema, query_limit, offset |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|             self.partiton_columns[schema] = partitions |  | ||||||
| 
 |  | ||||||
|     def get_latest_partition( |  | ||||||
|         self, schema: str, table: str |  | ||||||
|     ) -> Optional[BigQueryPartitionColumn]: |  | ||||||
|         if schema not in self.partiton_columns: |  | ||||||
|             self.get_latest_partitions_for_schema(schema) |  | ||||||
| 
 |  | ||||||
|         return self.partiton_columns[schema].get(table) |  | ||||||
| 
 | 
 | ||||||
|     def get_shard_from_table(self, table: str) -> Tuple[str, Optional[str]]: |     def get_shard_from_table(self, table: str) -> Tuple[str, Optional[str]]: | ||||||
|         match = re.search(SHARDED_TABLE_REGEX, table, re.IGNORECASE) |         match = re.search(SHARDED_TABLE_REGEX, table, re.IGNORECASE) | ||||||
| @ -627,7 +598,7 @@ WHERE | |||||||
| 
 | 
 | ||||||
|         (project_id, schema, table) = dataset_name.split(".") |         (project_id, schema, table) = dataset_name.split(".") | ||||||
|         if not self.is_latest_shard(project_id=project_id, table=table, schema=schema): |         if not self.is_latest_shard(project_id=project_id, table=table, schema=schema): | ||||||
|             logger.warning( |             logger.debug( | ||||||
|                 f"{dataset_name} is sharded but not the latest shard, skipping..." |                 f"{dataset_name} is sharded but not the latest shard, skipping..." | ||||||
|             ) |             ) | ||||||
|             return False |             return False | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Tamas Nemeth
						Tamas Nemeth