diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index eab8e76e3e..3c8b54f3d2 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -23,6 +23,8 @@ logger: logging.Logger = logging.getLogger(__name__) P = ParamSpec("P") +MAX_QUERIES_TO_COMBINE_AT_ONCE = 40 + # We need to make sure that only one query combiner attempts to patch # the SQLAlchemy execute method at a time so that they don't interfere. @@ -286,6 +288,11 @@ class SQLAlchemyQueryCombiner: full_queue = self._get_queue(main_greenlet) pending_queue = {k: v for k, v in full_queue.items() if not v.done} + + pending_queue = dict( + itertools.islice(pending_queue.items(), MAX_QUERIES_TO_COMBINE_AT_ONCE) + ) + if pending_queue: queue_item = next(iter(pending_queue.values()))