mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-27 00:31:42 +00:00
parent
f4ae0a5b8f
commit
6f66431587
@ -41,6 +41,8 @@ STATEMENT_ALL = "SELECT * FROM {table}"
|
||||
STATEMENT_TRUNCATE = "TRUNCATE TABLE {table};\n"
|
||||
STATEMENT_ALL_NEW = "SELECT {cols} FROM {table}"
|
||||
|
||||
MYSQL_ENGINE_NAME = "mysql"
|
||||
|
||||
|
||||
def single_quote_wrap(raw: str) -> str:
|
||||
"""
|
||||
@ -49,14 +51,37 @@ def single_quote_wrap(raw: str) -> str:
|
||||
return f"'{raw}'"
|
||||
|
||||
|
||||
def clean_str(raw: str, engine: Engine) -> str:
|
||||
"""
|
||||
String cleaning for SQL parsing.
|
||||
|
||||
Logic is different between MySQL and Postgres
|
||||
|
||||
- descriptions/comments with single quotes, e.g., `Mysql's data`.
|
||||
get converted to `Mysql''s data`
|
||||
- To insert a literal backlash in MySQL you need to escape with another one. This applies for `\n` and `\"` in
|
||||
inner JSONs for a field. This is not required for postgres
|
||||
"""
|
||||
quoted_str = raw.replace("'", "''")
|
||||
|
||||
if engine.name == MYSQL_ENGINE_NAME:
|
||||
quoted_str = quoted_str.replace("\\", "\\\\")
|
||||
|
||||
return quoted_str
|
||||
|
||||
|
||||
@singledispatch
|
||||
def clean_col(column_raw: Optional[Union[dict, str]]) -> str:
|
||||
return single_quote_wrap(str(column_raw)) if column_raw is not None else "null"
|
||||
def clean_col(column_raw: Optional[Union[dict, str]], engine: Engine) -> str:
|
||||
return (
|
||||
single_quote_wrap(clean_str(str(column_raw), engine))
|
||||
if column_raw is not None
|
||||
else "null"
|
||||
)
|
||||
|
||||
|
||||
@clean_col.register(dict)
|
||||
@clean_col.register(list)
|
||||
def _(column_raw: Optional[Union[dict, list]]) -> str:
|
||||
def _(column_raw: Optional[Union[dict, list]], engine: Engine) -> str:
|
||||
"""
|
||||
Prepare the JSON column to be inserted to MySQL
|
||||
|
||||
@ -66,9 +91,13 @@ def _(column_raw: Optional[Union[dict, list]]) -> str:
|
||||
"""
|
||||
return (
|
||||
single_quote_wrap(
|
||||
json.dumps(
|
||||
column_raw, default=str
|
||||
) # If we don't know how to serialize, convert to str
|
||||
clean_str(
|
||||
json.dumps(
|
||||
column_raw,
|
||||
default=str, # If we don't know how to serialize, convert to str
|
||||
),
|
||||
engine,
|
||||
)
|
||||
)
|
||||
if column_raw is not None
|
||||
else "null"
|
||||
@ -77,7 +106,10 @@ def _(column_raw: Optional[Union[dict, list]]) -> str:
|
||||
|
||||
def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
|
||||
"""
|
||||
Dumps JSON data
|
||||
Dumps JSON data.
|
||||
|
||||
Postgres: engine.name == "postgresql"
|
||||
MySQL: engine.name == "mysql"
|
||||
"""
|
||||
with open(output, "a", encoding=UTF_8) as file:
|
||||
for table in tables:
|
||||
@ -87,7 +119,7 @@ def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
|
||||
|
||||
res = engine.execute(text(STATEMENT_JSON.format(table=table))).all()
|
||||
for row in res:
|
||||
insert = f"INSERT INTO {table} (json) VALUES ({clean_col(row.json)});\n"
|
||||
insert = f"INSERT INTO {table} (json) VALUES ({clean_col(row.json, engine)});\n"
|
||||
file.write(insert)
|
||||
|
||||
|
||||
@ -103,7 +135,7 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
|
||||
|
||||
res = engine.execute(text(STATEMENT_ALL.format(table=table))).all()
|
||||
for row in res:
|
||||
data = ",".join(clean_col(col) for col in row)
|
||||
data = ",".join(clean_col(col, engine) for col in row)
|
||||
|
||||
insert = f"INSERT INTO {table} VALUES ({data});\n"
|
||||
file.write(insert)
|
||||
@ -131,12 +163,11 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:
|
||||
)
|
||||
res = engine.execute(text(statement)).all()
|
||||
for row in res:
|
||||
|
||||
# Let's use .format here to not add more variables
|
||||
# pylint: disable=consider-using-f-string
|
||||
insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(
|
||||
table=table,
|
||||
data=",".join(clean_col(col) for col in row),
|
||||
data=",".join(clean_col(col, engine) for col in row),
|
||||
cols=",".join(
|
||||
col["name"]
|
||||
for col in columns
|
||||
|
||||
@ -52,12 +52,11 @@ def execute_sql_file(engine: Engine, sql_file: str) -> None:
|
||||
logger.warning(
|
||||
f"Error processing the following query while restoring - {err}"
|
||||
)
|
||||
logger.warning(clean_query)
|
||||
|
||||
print_ansi_encoded_string(
|
||||
color=ANSI.GREEN,
|
||||
bold=False,
|
||||
message=f"Restore finished. {failed_queries} queries failed.",
|
||||
message=f"Restore finished. {failed_queries} queries failed from {len(all_queries)}.",
|
||||
)
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user