diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index d9bd7e52df..6c92ad8653 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -24,6 +24,7 @@ from typing import ( ) import sqlalchemy as sa +import sqlalchemy.sql.compiler from great_expectations.core.util import convert_to_json_serializable from great_expectations.data_context import BaseDataContext from great_expectations.data_context.types.base import ( @@ -58,6 +59,7 @@ from datahub.metadata.schema_classes import ( from datahub.telemetry import stats, telemetry from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.sqlalchemy_query_combiner import ( + IS_SQLALCHEMY_1_4, SQLAlchemyQueryCombiner, get_query_columns, ) @@ -683,6 +685,21 @@ class DatahubGEProfiler: # make the threading code work correctly. As such, we need to make sure we've # got an engine here. self.base_engine = conn.engine + + if IS_SQLALCHEMY_1_4: + # SQLAlchemy 1.4 added a statement "linter", which issues warnings about cartesian products in SELECT statements. + # Changelog: https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#change-4737. + # Code: https://github.com/sqlalchemy/sqlalchemy/blob/2f91dd79310657814ad28b6ef64f91fff7a007c9/lib/sqlalchemy/sql/compiler.py#L549 + # + # The query combiner does indeed produce queries with cartesian products, but they are + # safe because each "FROM" clause only returns one row, so the cartesian product + # is also always a single row. As such, we disable the linter here. + + # Modified from https://github.com/sqlalchemy/sqlalchemy/blob/2f91dd79310657814ad28b6ef64f91fff7a007c9/lib/sqlalchemy/engine/create.py#L612 + self.base_engine.dialect.compiler_linting &= ( # type: ignore[attr-defined] + ~sqlalchemy.sql.compiler.COLLECT_CARTESIAN_PRODUCTS # type: ignore[attr-defined] + ) + self.platform = platform @contextlib.contextmanager diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index 29a51007b1..31d10a09c2 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -13,6 +13,7 @@ import greenlet import sqlalchemy import sqlalchemy.engine import sqlalchemy.sql +from packaging import version from sqlalchemy.engine import Connection from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound @@ -20,6 +21,12 @@ from datahub.ingestion.api.report import Report logger: logging.Logger = logging.getLogger(__name__) +# The type annotations for SA 1.3.x don't have the __version__ attribute, +# so we need to ignore the error here. +SQLALCHEMY_VERSION = sqlalchemy.__version__ # type: ignore[attr-defined] +IS_SQLALCHEMY_1_4 = version.parse(SQLALCHEMY_VERSION) >= version.parse("1.4.0") + + MAX_QUERIES_TO_COMBINE_AT_ONCE = 40 @@ -333,7 +340,11 @@ class SQLAlchemyQueryCombiner: # Extract the results into a result for each query. index = 0 for _, query_future in pending_queue.items(): - cols = query_future.query.columns + query = query_future.query + if IS_SQLALCHEMY_1_4: + # On 1.4, it prints a warning if we don't call subquery. + query = query.subquery() # type: ignore + cols = query.columns data = {} for col in cols: