feat(ingest): additional limits on ingestProposalBatch (#12130)

This commit is contained in:
Harshal Sheth 2024-12-14 10:09:04 -05:00 committed by GitHub
parent ab15fb92d2
commit f9ca305600
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 5 deletions

View File

@ -46,8 +46,18 @@ _DEFAULT_RETRY_MAX_TIMES = int(
os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4")
)
# The limit is 16mb. We will use a max of 15mb to have some space for overhead.
_MAX_BATCH_INGEST_PAYLOAD_SIZE = 15 * 1024 * 1024
# The limit is 16mb. We will use a max of 15mb to have some space
# for overhead like request headers.
# This applies to pretty much all calls to GMS.
INGEST_MAX_PAYLOAD_BYTES = 15 * 1024 * 1024
# This limit is somewhat arbitrary. All GMS endpoints will timeout
# and return a 500 if processing takes too long. To avoid sending
# too much to the backend and hitting a timeout, we try to limit
# the number of MCPs we send in a batch.
BATCH_INGEST_MAX_PAYLOAD_LENGTH = int(
os.getenv("DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH", 200)
)
class DataHubRestEmitter(Closeable, Emitter):
@ -290,11 +300,14 @@ class DataHubRestEmitter(Closeable, Emitter):
# As a safety mechanism, we need to make sure we don't exceed the max payload size for GMS.
# If we will exceed the limit, we need to break it up into chunks.
mcp_obj_chunks: List[List[str]] = []
current_chunk_size = _MAX_BATCH_INGEST_PAYLOAD_SIZE
current_chunk_size = INGEST_MAX_PAYLOAD_BYTES
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))
if mcp_obj_size + current_chunk_size > _MAX_BATCH_INGEST_PAYLOAD_SIZE:
if (
mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES
or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)

View File

@ -18,7 +18,10 @@ from datahub.configuration.common import (
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.emitter.rest_emitter import (
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
DataHubRestEmitter,
)
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
from datahub.ingestion.api.sink import (
NoopWriteCallback,
@ -71,6 +74,14 @@ class DatahubRestSinkConfig(DatahubClientConfig):
# Only applies in async batch mode.
max_per_batch: pydantic.PositiveInt = 100
@pydantic.validator("max_per_batch", always=True)
def validate_max_per_batch(cls, v):
if v > BATCH_INGEST_MAX_PAYLOAD_LENGTH:
raise ValueError(
f"max_per_batch must be less than or equal to {BATCH_INGEST_MAX_PAYLOAD_LENGTH}"
)
return v
@dataclasses.dataclass
class DataHubRestSinkReport(SinkReport):