diff --git a/ingestion/src/metadata/cli/db_dump.py b/ingestion/src/metadata/cli/db_dump.py index ec39207cee6..576520bc711 100644 --- a/ingestion/src/metadata/cli/db_dump.py +++ b/ingestion/src/metadata/cli/db_dump.py @@ -16,10 +16,10 @@ Database Dumping utility for the metadata CLI import json from functools import singledispatch from pathlib import Path -from typing import List, Optional, Union +from typing import Iterable, List, Optional, Union from sqlalchemy import inspect, text -from sqlalchemy.engine import Engine +from sqlalchemy.engine import Engine, Row from metadata.utils.constants import UTF_8 @@ -121,6 +121,13 @@ def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]: return None +def run_query_iter(engine: Engine, query: str) -> Iterable[Row]: + """Return a generator of rows, one row at a time, with a limit of 100 in-mem rows""" + + for row in engine.execute(text(query)).yield_per(100): + yield row + + def dump_json(tables: List[str], engine: Engine, output: Path) -> None: """ Dumps JSON data. @@ -135,14 +142,10 @@ def dump_json(tables: List[str], engine: Engine, output: Path) -> None: hash_column_name = get_hash_column_name(engine=engine, table_name=table) if hash_column_name: - res = engine.execute( - text( - STATEMENT_HASH_JSON.format( - table=table, hash_column_name=hash_column_name - ) - ) - ).all() - for row in res: + query = STATEMENT_HASH_JSON.format( + table=table, hash_column_name=hash_column_name + ) + for row in run_query_iter(engine=engine, query=query): insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long file.write(insert) else: @@ -161,8 +164,8 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None: truncate = STATEMENT_TRUNCATE.format(table=table) file.write(truncate) - res = engine.execute(text(STATEMENT_ALL.format(table=table))).all() - for row in res: + query = STATEMENT_ALL.format(table=table) + for row in run_query_iter(engine=engine, query=query): data = ",".join(clean_col(col, engine) for col in row) insert = f"INSERT INTO {table} VALUES ({data});\n" @@ -180,7 +183,7 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None: columns = inspector.get_columns(table_name=table) - statement = STATEMENT_ALL_NEW.format( + query = STATEMENT_ALL_NEW.format( cols=",".join( col["name"] for col in columns @@ -188,8 +191,7 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None: ), table=table, ) - res = engine.execute(text(statement)).all() - for row in res: + for row in run_query_iter(engine=engine, query=query): # Let's use .format here to not add more variables # pylint: disable=consider-using-f-string insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(