Fix #13660 - Metadata Backup Memory Exhaustion (#13935)

This commit is contained in:
Pere Miquel Brull 2023-11-10 12:21:48 +01:00 committed by GitHub
parent 431fbce098
commit ae969aa4f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -16,10 +16,10 @@ Database Dumping utility for the metadata CLI
import json import json
from functools import singledispatch from functools import singledispatch
from pathlib import Path from pathlib import Path
from typing import List, Optional, Union from typing import Iterable, List, Optional, Union
from sqlalchemy import inspect, text from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine, Row
from metadata.utils.constants import UTF_8 from metadata.utils.constants import UTF_8
@ -121,6 +121,13 @@ def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]:
return None return None
def run_query_iter(engine: Engine, query: str) -> Iterable[Row]:
"""Return a generator of rows, one row at a time, with a limit of 100 in-mem rows"""
for row in engine.execute(text(query)).yield_per(100):
yield row
def dump_json(tables: List[str], engine: Engine, output: Path) -> None: def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
""" """
Dumps JSON data. Dumps JSON data.
@ -135,14 +142,10 @@ def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
hash_column_name = get_hash_column_name(engine=engine, table_name=table) hash_column_name = get_hash_column_name(engine=engine, table_name=table)
if hash_column_name: if hash_column_name:
res = engine.execute( query = STATEMENT_HASH_JSON.format(
text( table=table, hash_column_name=hash_column_name
STATEMENT_HASH_JSON.format( )
table=table, hash_column_name=hash_column_name for row in run_query_iter(engine=engine, query=query):
)
)
).all()
for row in res:
insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long
file.write(insert) file.write(insert)
else: else:
@ -161,8 +164,8 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
truncate = STATEMENT_TRUNCATE.format(table=table) truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate) file.write(truncate)
res = engine.execute(text(STATEMENT_ALL.format(table=table))).all() query = STATEMENT_ALL.format(table=table)
for row in res: for row in run_query_iter(engine=engine, query=query):
data = ",".join(clean_col(col, engine) for col in row) data = ",".join(clean_col(col, engine) for col in row)
insert = f"INSERT INTO {table} VALUES ({data});\n" insert = f"INSERT INTO {table} VALUES ({data});\n"
@ -180,7 +183,7 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:
columns = inspector.get_columns(table_name=table) columns = inspector.get_columns(table_name=table)
statement = STATEMENT_ALL_NEW.format( query = STATEMENT_ALL_NEW.format(
cols=",".join( cols=",".join(
col["name"] col["name"]
for col in columns for col in columns
@ -188,8 +191,7 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:
), ),
table=table, table=table,
) )
res = engine.execute(text(statement)).all() for row in run_query_iter(engine=engine, query=query):
for row in res:
# Let's use .format here to not add more variables # Let's use .format here to not add more variables
# pylint: disable=consider-using-f-string # pylint: disable=consider-using-f-string
insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format( insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(