diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index e2efdba1fd..ad9834b830 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -1,15 +1,26 @@ import json import pprint +import sys from dataclasses import dataclass +from typing import Dict + +# The sort_dicts option was added in Python 3.8. +if sys.version_info >= (3, 8): + PPRINT_OPTIONS = {"sort_dicts": False} +else: + PPRINT_OPTIONS: Dict = {} @dataclass class Report: def as_obj(self) -> dict: - return self.__dict__ + return { + key: value.as_obj() if hasattr(value, "as_obj") else value + for (key, value) in self.__dict__.items() + } def as_string(self) -> str: - return pprint.pformat(self.as_obj(), width=150) + return pprint.pformat(self.as_obj(), width=150, **PPRINT_OPTIONS) def as_json(self) -> str: return json.dumps(self.as_obj()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 18b07d13b6..8659c3038b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -722,6 +722,8 @@ class DatahubGEProfiler: f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds" ) + self.report.report_from_query_combiner(query_combiner.report) + def _generate_profile_from_request( self, query_combiner: SQLAlchemyQueryCombiner, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 4fbf5f58e3..28af83929c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -47,6 +47,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import ( TimeTypeClass, ) from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass +from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport if TYPE_CHECKING: from datahub.ingestion.source.ge_data_profiler import ( @@ -124,6 +125,8 @@ class SQLSourceReport(SourceReport): views_scanned: int = 0 filtered: List[str] = field(default_factory=list) + query_combiner: Optional[SQLAlchemyQueryCombinerReport] = None + def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: """ Entity could be a view or a table @@ -138,6 +141,11 @@ class SQLSourceReport(SourceReport): def report_dropped(self, ent_name: str) -> None: self.filtered.append(ent_name) + def report_from_query_combiner( + self, query_combiner_report: SQLAlchemyQueryCombinerReport + ) -> None: + self.query_combiner = query_combiner_report + class SQLAlchemyConfig(ConfigModel): env: str = DEFAULT_ENV diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index d9eff6afee..eab8e76e3e 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -17,6 +17,8 @@ from sqlalchemy.engine import Connection from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from typing_extensions import ParamSpec +from datahub.ingestion.api.report import Report + logger: logging.Logger = logging.getLogger(__name__) P = ParamSpec("P") @@ -113,6 +115,17 @@ def get_query_columns(query: Any) -> List[Any]: return list(query.columns) +@dataclasses.dataclass +class SQLAlchemyQueryCombinerReport(Report): + total_queries: int = 0 + uncombined_queries_issued: int = 0 + + combined_queries_issued: int = 0 + queries_combined: int = 0 + + query_exceptions: int = 0 + + @dataclasses.dataclass class SQLAlchemyQueryCombiner: """ @@ -126,6 +139,12 @@ class SQLAlchemyQueryCombiner: is_single_row_query_method: Callable[[Any], bool] serial_execution_fallback_enabled: bool + # The Python GIL ensures that modifications to the report's counters + # are safe. + report: SQLAlchemyQueryCombinerReport = dataclasses.field( + default_factory=SQLAlchemyQueryCombinerReport + ) + # There will be one main greenlet per thread. As such, queries will be # queued according to the main greenlet's thread ID. We also keep track # of the greenlets we spawn for bookkeeping purposes. @@ -202,6 +221,7 @@ class SQLAlchemyQueryCombiner: query_id = SQLAlchemyQueryCombiner._generate_sql_safe_identifier() query_future = _QueryFuture(conn, query, multiparams, params) queue[query_id] = query_future + self.report.queries_combined += 1 # Yield control back to the main greenlet until the query is done. # We assume that the main greenlet will be the one that actually executes the query. @@ -217,6 +237,7 @@ class SQLAlchemyQueryCombiner: conn: Connection, query: Any, *args: Any, **kwargs: Any ) -> Any: try: + self.report.total_queries += 1 handled, result = self._handle_execute(conn, query, args, kwargs) except Exception as e: if not self.catch_exceptions: @@ -224,6 +245,7 @@ class SQLAlchemyQueryCombiner: logger.exception( f"Failed to execute query normally, using fallback: {str(query)}" ) + self.report.query_exceptions += 1 return _sa_execute_underlying_method(conn, query, *args, **kwargs) else: if handled: @@ -234,6 +256,7 @@ class SQLAlchemyQueryCombiner: return result.res else: logger.debug(f"Executing query normally: {str(query)}") + self.report.uncombined_queries_issued += 1 return _sa_execute_underlying_method(conn, query, *args, **kwargs) with _sa_execute_method_patching_lock: @@ -289,6 +312,7 @@ class SQLAlchemyQueryCombiner: combined_query.append_from(cte) logger.debug(f"Executing combined query: {str(combined_query)}") + self.report.combined_queries_issued += 1 sa_res = _sa_execute_underlying_method(queue_item.conn, combined_query) # Fetch the results and ensure that exactly one row is returned. @@ -322,6 +346,7 @@ class SQLAlchemyQueryCombiner: continue logger.debug(f"Executing query via fallback: {str(query_future.query)}") + self.report.uncombined_queries_issued += 1 try: res = _sa_execute_underlying_method( query_future.conn, @@ -351,6 +376,7 @@ class SQLAlchemyQueryCombiner: if not self.serial_execution_fallback_enabled: raise e logger.exception(f"Failed to execute queue using combiner: {str(e)}") + self.report.query_exceptions += 1 self._execute_queue_fallback(main_greenlet) for let in list(pool):