diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c415e4fc42..f45517b1f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1,3 +1,4 @@ +import collections import concurrent.futures import contextlib import dataclasses @@ -747,7 +748,7 @@ class DatahubGEProfiler: "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery", _get_column_quantiles_bigquery_patch, ): - async_profiles = [ + async_profiles = collections.deque( async_executor.submit( self._generate_profile_from_request, query_combiner, @@ -756,12 +757,13 @@ class DatahubGEProfiler: profiler_args=profiler_args, ) for request in requests - ] + ) # Avoid using as_completed so that the results are yielded in the # same order as the requests. # for async_profile in concurrent.futures.as_completed(async_profiles): - for async_profile in async_profiles: + while len(async_profiles) > 0: + async_profile = async_profiles.popleft() yield async_profile.result() total_time_taken = timer.elapsed_seconds()