fix(bigquery): add rate limiting for api calls made (#4967)

This commit is contained in:
Aseem Bansal 2022-05-23 20:14:36 +05:30 committed by GitHub
parent 0f7ff79368
commit b5e1ed739c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 12 deletions

View File

@ -50,6 +50,7 @@ framework_common = {
"termcolor>=1.0.0",
"types-termcolor>=1.0.0",
"psutil>=5.8.0",
"ratelimiter",
# Markupsafe breaking change broke Jinja and some other libs
# Pinning it to a version which works even though we are not using explicitly
# https://github.com/aws/aws-sam-cli/issues/3661

View File

@ -15,6 +15,7 @@ import sqlalchemy_bigquery
from dateutil.relativedelta import relativedelta
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from ratelimiter import RateLimiter
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector
@ -456,9 +457,15 @@ class BigQuerySource(SQLAlchemySource):
f"Start loading log entries from BigQuery start_time={start_time} and end_time={end_time}"
)
for client in clients:
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
if self.config.rate_limit:
with RateLimiter(max_calls=self.config.requests_per_min, period=60):
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
else:
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
for entry in entries:
self.report.num_total_log_entries += 1
yield entry
@ -519,7 +526,11 @@ class BigQuerySource(SQLAlchemySource):
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
)
yield from query_job
if self.config.rate_limit:
with RateLimiter(max_calls=self.config.requests_per_min, period=60):
yield from query_job
else:
yield from query_job
# Currently we only parse JobCompleted events but in future we would want to parse other
# events to also create field level lineage.

View File

@ -15,6 +15,7 @@ import cachetools
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from more_itertools import partition
from ratelimiter import RateLimiter
import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
@ -817,7 +818,11 @@ class BigQueryUsageSource(Source):
logger.info(
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
)
yield from query_job
if self.config.rate_limit:
with RateLimiter(max_calls=self.config.requests_per_min, period=60):
yield from query_job
else:
yield from query_job
def _get_entry_timestamp(
self, entry: Union[AuditLogEntry, BigQueryAuditMetadata]
@ -884,11 +889,16 @@ class BigQueryUsageSource(Source):
] = list()
for client in clients:
try:
list_entries: Iterable[
Union[AuditLogEntry, BigQueryAuditMetadata]
] = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
list_entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]]
if self.config.rate_limit:
with RateLimiter(max_calls=self.config.requests_per_min, period=60):
list_entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
else:
list_entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
list_entry_generators_across_clients.append(list_entries)
except Exception as e:
logger.warning(

View File

@ -0,0 +1,13 @@
import pydantic
from datahub.configuration.common import ConfigModel
class BigQueryBaseConfig(ConfigModel):
rate_limit: bool = pydantic.Field(
default=False, description="Should we rate limit reqeusts made to API."
)
requests_per_min: int = pydantic.Field(
default=60,
description="Used to control number of API calls made per min. Only used when `rate_limit` is set to `True`.",
)

View File

@ -8,12 +8,13 @@ import pydantic
from datahub.configuration.common import ConfigurationError
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig
from datahub.ingestion.source_config.bigquery import BigQueryBaseConfig
from datahub.ingestion.source_config.usage.bigquery_usage import BigQueryCredential
logger = logging.getLogger(__name__)
class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery"
project_id: Optional[str] = pydantic.Field(
default=None,

View File

@ -11,6 +11,7 @@ from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_config.bigquery import BigQueryBaseConfig
logger = logging.getLogger(__name__)
@ -57,7 +58,7 @@ class BigQueryCredential(ConfigModel):
return fp.name
class BigQueryUsageConfig(DatasetSourceConfigBase, BaseUsageConfig):
class BigQueryUsageConfig(BigQueryBaseConfig, DatasetSourceConfigBase, BaseUsageConfig):
projects: Optional[List[str]] = pydantic.Field(
default=None,
description="List of project ids to ingest usage from. If not specified, will infer from environment.",