mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-11 08:54:00 +00:00
build(ingest): remove ratelimiter dependency (#9008)
This commit is contained in:
parent
78b342f441
commit
c81a339bfc
@ -38,7 +38,6 @@ framework_common = {
|
||||
"progressbar2",
|
||||
"termcolor>=1.0.0",
|
||||
"psutil>=5.8.0",
|
||||
"ratelimiter",
|
||||
"Deprecated",
|
||||
"humanfriendly",
|
||||
"packaging",
|
||||
|
@ -4,7 +4,6 @@ from typing import Callable, Iterable, List, Optional
|
||||
|
||||
from google.cloud import bigquery
|
||||
from google.cloud.logging_v2.client import Client as GCPLoggingClient
|
||||
from ratelimiter import RateLimiter
|
||||
|
||||
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
|
||||
AuditLogEntry,
|
||||
@ -17,6 +16,7 @@ from datahub.ingestion.source.bigquery_v2.common import (
|
||||
BQ_DATE_SHARD_FORMAT,
|
||||
BQ_DATETIME_FORMAT,
|
||||
)
|
||||
from datahub.utilities.ratelimiter import RateLimiter
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
56
metadata-ingestion/src/datahub/utilities/ratelimiter.py
Normal file
56
metadata-ingestion/src/datahub/utilities/ratelimiter.py
Normal file
@ -0,0 +1,56 @@
|
||||
import collections
|
||||
import threading
|
||||
import time
|
||||
from contextlib import AbstractContextManager
|
||||
from typing import Any, Deque
|
||||
|
||||
|
||||
# Modified version of https://github.com/RazerM/ratelimiter/blob/master/ratelimiter/_sync.py
|
||||
class RateLimiter(AbstractContextManager):
|
||||
|
||||
"""Provides rate limiting for an operation with a configurable number of
|
||||
requests for a time period.
|
||||
"""
|
||||
|
||||
def __init__(self, max_calls: int, period: float = 1.0) -> None:
|
||||
"""Initialize a RateLimiter object which enforces as much as max_calls
|
||||
operations on period (eventually floating) number of seconds.
|
||||
"""
|
||||
if period <= 0:
|
||||
raise ValueError("Rate limiting period should be > 0")
|
||||
if max_calls <= 0:
|
||||
raise ValueError("Rate limiting number of calls should be > 0")
|
||||
|
||||
# We're using a deque to store the last execution timestamps, not for
|
||||
# its maxlen attribute, but to allow constant time front removal.
|
||||
self.calls: Deque = collections.deque()
|
||||
|
||||
self.period = period
|
||||
self.max_calls = max_calls
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def __enter__(self) -> "RateLimiter":
|
||||
with self._lock:
|
||||
# We want to ensure that no more than max_calls were run in the allowed
|
||||
# period. For this, we store the last timestamps of each call and run
|
||||
# the rate verification upon each __enter__ call.
|
||||
if len(self.calls) >= self.max_calls:
|
||||
until = time.time() + self.period - self._timespan
|
||||
sleeptime = until - time.time()
|
||||
if sleeptime > 0:
|
||||
time.sleep(sleeptime)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None:
|
||||
with self._lock:
|
||||
# Store the last operation timestamp.
|
||||
self.calls.append(time.time())
|
||||
|
||||
# Pop the timestamp list front (ie: the older calls) until the sum goes
|
||||
# back below the period. This is our 'sliding period' window.
|
||||
while self._timespan >= self.period:
|
||||
self.calls.popleft()
|
||||
|
||||
@property
|
||||
def _timespan(self) -> float:
|
||||
return self.calls[-1] - self.calls[0]
|
20
metadata-ingestion/tests/unit/utilities/test_ratelimiter.py
Normal file
20
metadata-ingestion/tests/unit/utilities/test_ratelimiter.py
Normal file
@ -0,0 +1,20 @@
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from typing import Dict
|
||||
|
||||
from datahub.utilities.ratelimiter import RateLimiter
|
||||
|
||||
|
||||
def test_rate_is_limited():
|
||||
MAX_CALLS_PER_SEC = 5
|
||||
TOTAL_CALLS = 18
|
||||
actual_calls: Dict[float, int] = defaultdict(lambda: 0)
|
||||
|
||||
ratelimiter = RateLimiter(max_calls=MAX_CALLS_PER_SEC, period=1)
|
||||
for _ in range(TOTAL_CALLS):
|
||||
with ratelimiter:
|
||||
actual_calls[datetime.now().replace(microsecond=0).timestamp()] += 1
|
||||
|
||||
assert len(actual_calls) == round(TOTAL_CALLS / MAX_CALLS_PER_SEC)
|
||||
assert all(calls <= MAX_CALLS_PER_SEC for calls in actual_calls.values())
|
||||
assert sum(actual_calls.values()) == TOTAL_CALLS
|
Loading…
x
Reference in New Issue
Block a user