From df2cb94ed88f6a0437f8fd06eb147f0f32fd81cd Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 16 Dec 2021 20:34:46 -0500 Subject: [PATCH] feat(ingest): profiling add upper bound on combined query size (#3762) --- .../src/datahub/utilities/sqlalchemy_query_combiner.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index eab8e76e3e..3c8b54f3d2 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -23,6 +23,8 @@ logger: logging.Logger = logging.getLogger(__name__) P = ParamSpec("P") +MAX_QUERIES_TO_COMBINE_AT_ONCE = 40 + # We need to make sure that only one query combiner attempts to patch # the SQLAlchemy execute method at a time so that they don't interfere. @@ -286,6 +288,11 @@ class SQLAlchemyQueryCombiner: full_queue = self._get_queue(main_greenlet) pending_queue = {k: v for k, v in full_queue.items() if not v.done} + + pending_queue = dict( + itertools.islice(pending_queue.items(), MAX_QUERIES_TO_COMBINE_AT_ONCE) + ) + if pending_queue: queue_item = next(iter(pending_queue.values()))