diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index acc98559b2..7e2afe1306 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -97,6 +97,7 @@ where -- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables) and p.partition_id not in ('__NULL__', '__UNPARTITIONED__') and STORAGE_TIER='ACTIVE' + and p.table_name= '{table}' group by c.table_catalog, c.table_schema, @@ -108,8 +109,6 @@ order by c.table_schema, c.table_name, c.column_name -limit {limit} -offset {offset} """.strip() SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$" @@ -289,7 +288,6 @@ class BigQueryDatasetKey(ProjectIdKey): class BigQuerySource(SQLAlchemySource): config: BigQueryConfig - partiton_columns: Dict[str, Dict[str, BigQueryPartitionColumn]] = dict() maximum_shard_ids: Dict[str, str] = dict() lineage_metadata: Optional[Dict[str, Set[str]]] = None @@ -486,50 +484,23 @@ class BigQuerySource(SQLAlchemySource): lineage_map[destination_table_str].add(ref_table_str) return lineage_map - def get_latest_partitions_for_schema(self, schema: str) -> None: - query_limit: int = 500 - offset: int = 0 + def get_latest_partition( + self, schema: str, table: str + ) -> Optional[BigQueryPartitionColumn]: url = self.config.get_sql_alchemy_url() engine = create_engine(url, **self.config.options) with engine.connect() as con: inspector = inspect(con) - partitions = {} - - def get_partition_columns( - project_id: str, schema: str, limit: int, offset: int - ) -> int: - sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( - project_id=project_id, - schema=schema, - limit=limit, - offset=offset, - ) - result = con.execute(sql) - row_count: int = 0 - for row in result: - partition = BigQueryPartitionColumn(**row) - partitions[partition.table_name] = partition - row_count = row_count + 1 - return row_count - - res_size = get_partition_columns( - self.get_db_name(inspector), schema, query_limit, offset + sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( + project_id=self.get_db_name(inspector), schema=schema, table=table ) - while res_size == query_limit: - offset = offset + query_limit - res_size = get_partition_columns( - self.get_db_name(inspector), schema, query_limit, offset - ) - - self.partiton_columns[schema] = partitions - - def get_latest_partition( - self, schema: str, table: str - ) -> Optional[BigQueryPartitionColumn]: - if schema not in self.partiton_columns: - self.get_latest_partitions_for_schema(schema) - - return self.partiton_columns[schema].get(table) + result = con.execute(sql) + # Bigquery only supports one partition column + # https://stackoverflow.com/questions/62886213/adding-multiple-partitioned-columns-to-bigquery-table-from-sql-query + row = result.fetchone() + if row: + return BigQueryPartitionColumn(**row) + return None def get_shard_from_table(self, table: str) -> Tuple[str, Optional[str]]: match = re.search(SHARDED_TABLE_REGEX, table, re.IGNORECASE) @@ -627,7 +598,7 @@ WHERE (project_id, schema, table) = dataset_name.split(".") if not self.is_latest_shard(project_id=project_id, table=table, schema=schema): - logger.warning( + logger.debug( f"{dataset_name} is sharded but not the latest shard, skipping..." ) return False