From d2adde53c6d0cddacbd913cbb67b42f8db1f3b72 Mon Sep 17 00:00:00 2001 From: Milan Bariya <52292922+MilanBariya@users.noreply.github.com> Date: Fri, 30 Sep 2022 18:32:16 +0530 Subject: [PATCH] Add: Add metadata restore CLI (#7596) * Add: Add metadata restore CLI * Fix: Query execution error * Fix: Query execution error * Fix: Change based on comments * Update ingestion/src/metadata/cli/restore.py Co-authored-by: Pere Miquel Brull --- ingestion/src/metadata/cli/backup.py | 33 ++-------- ingestion/src/metadata/cli/db_dump.py | 42 ++++++++++++- ingestion/src/metadata/cli/restore.py | 77 +++++++++++++++++++++++ ingestion/src/metadata/cli/utils.py | 35 +++++++++++ ingestion/src/metadata/cmd.py | 91 +++++++++++++++++++++++++++ 5 files changed, 248 insertions(+), 30 deletions(-) create mode 100644 ingestion/src/metadata/cli/restore.py create mode 100644 ingestion/src/metadata/cli/utils.py diff --git a/ingestion/src/metadata/cli/backup.py b/ingestion/src/metadata/cli/backup.py index 01b19602fdc..94ec57be7a9 100644 --- a/ingestion/src/metadata/cli/backup.py +++ b/ingestion/src/metadata/cli/backup.py @@ -18,17 +18,9 @@ from pathlib import Path from typing import List, Optional, Tuple import click -from sqlalchemy.engine import Engine from metadata.cli.db_dump import dump -from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( - MysqlConnection, -) -from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( - PostgresConnection, -) -from metadata.utils.connections import get_connection -from metadata.utils.helpers import list_to_dict +from metadata.cli.utils import get_engine from metadata.utils.logger import cli_logger logger = cli_logger() @@ -138,26 +130,9 @@ def run_backup( out = get_output(output) - connection_options = list_to_dict(options) - connection_arguments = list_to_dict(arguments) - - connection_dict = { - "hostPort": f"{host}:{port}", - "username": user, - "password": password, - "connectionOptions": connection_options if connection_options else None, - "connectionArguments": connection_arguments if connection_arguments else None, - } - - if not schema: - connection_dict["databaseSchema"] = database - connection = MysqlConnection(**connection_dict) - else: - connection_dict["database"] = database - connection = PostgresConnection(**connection_dict) - - engine: Engine = get_connection(connection) - + engine = get_engine( + host, port, user, password, options, arguments, schema, database + ) dump(engine=engine, output=out, schema=schema) click.secho( diff --git a/ingestion/src/metadata/cli/db_dump.py b/ingestion/src/metadata/cli/db_dump.py index 7b18f45c11d..accf5cbd092 100644 --- a/ingestion/src/metadata/cli/db_dump.py +++ b/ingestion/src/metadata/cli/db_dump.py @@ -11,7 +11,10 @@ TABLES_DUMP_ALL = { "entity_extension", "field_relationship", "tag_usage", + "openmetadata_settings", } + +CUSTOM_TABLES = {"entity_extension_time_series": {"exclude_columns": ["timestamp"]}} NOT_MIGRATE = {"DATABASE_CHANGE_LOG"} STATEMENT_JSON = "SELECT json FROM {table}" @@ -64,6 +67,40 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None: file.write(insert) +def dump_entity_custom(engine: Engine, output: Path, inspector) -> None: + """ + This function is used to dump entities with custom handling + """ + with open(output, "a") as file: + for table, data in CUSTOM_TABLES.items(): + + truncate = STATEMENT_TRUNCATE.format(table=table) + file.write(truncate) + + columns = inspector.get_columns(table_name=table) + + STATEMENT_ALL_NEW = "SELECT {cols} FROM {table}".format( + cols=",".join( + col["name"] + for col in columns + if col["name"] not in data["exclude_columns"] + ), + table=table, + ) + res = engine.execute(text(STATEMENT_ALL_NEW)).all() + for row in res: + insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format( + table=table, + data=",".join(clean_col(col) for col in row), + cols=",".join( + col["name"] + for col in columns + if col["name"] not in data["exclude_columns"] + ), + ) + file.write(insert) + + def dump(engine: Engine, output: Path, schema: str = None) -> None: """ Get all tables from the database and dump @@ -77,8 +114,11 @@ def dump(engine: Engine, output: Path, schema: str = None) -> None: dump_json_tables = [ table for table in tables - if table not in TABLES_DUMP_ALL and table not in NOT_MIGRATE + if table not in TABLES_DUMP_ALL + and table not in NOT_MIGRATE + and table not in CUSTOM_TABLES.keys() ] dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output) dump_json(tables=dump_json_tables, engine=engine, output=output) + dump_entity_custom(engine=engine, output=output, inspector=inspector) diff --git a/ingestion/src/metadata/cli/restore.py b/ingestion/src/metadata/cli/restore.py new file mode 100644 index 00000000000..8a53392420b --- /dev/null +++ b/ingestion/src/metadata/cli/restore.py @@ -0,0 +1,77 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Restore utility for the metadata CLI +""" +from pathlib import Path +from typing import List, Optional + +import click +from sqlalchemy.engine import Engine + +from metadata.cli.utils import get_engine +from metadata.utils.logger import cli_logger + +logger = cli_logger() + + +def execute_sql_file(engine: Engine, input: Path, schema: str = None) -> None: + + with open(input, encoding="utf-8") as f: + for query in f.readlines(): + # `%` is a reserved syntax in SQLAlchemy to bind parameters. Escaping it with `%%` + clean_query = query.replace("%", "%%") + + with engine.connect() as conn: + conn.execute(clean_query) + + +def run_restore( + host: str, + user: str, + password: str, + database: str, + port: str, + input: str, + options: List[str], + arguments: List[str], + schema: Optional[str] = None, +) -> None: + """ + Run and restore the + buckup. Optionally, download it from S3. + + :param host: service host + :param user: service user + :param password: service pwd + :param database: database to back up + :param port: database service port + :param intput: local path of file to restore the backup + :param options: list of other connection options + :param arguments: list of connection arguments + :param schema: Run the process against Postgres with the given schema + """ + click.secho( + f"Restoring OpenMetadata backup for {host}:{port}/{database}...", + fg="bright_green", + ) + + engine = get_engine( + host, port, user, password, options, arguments, schema, database + ) + + execute_sql_file(engine=engine, input=input, schema=schema) + + click.secho( + f"Backup restored from {input}", + fg="bright_green", + ) diff --git a/ingestion/src/metadata/cli/utils.py b/ingestion/src/metadata/cli/utils.py new file mode 100644 index 00000000000..014a6ad81f4 --- /dev/null +++ b/ingestion/src/metadata/cli/utils.py @@ -0,0 +1,35 @@ +from sqlalchemy.engine import Engine + +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) +from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( + PostgresConnection, +) +from metadata.utils.connections import get_connection +from metadata.utils.helpers import list_to_dict + + +def get_engine(host, port, user, password, options, arguments, schema, database): + + connection_options = list_to_dict(options) + connection_arguments = list_to_dict(arguments) + + connection_dict = { + "hostPort": f"{host}:{port}", + "username": user, + "password": password, + "connectionOptions": connection_options if connection_options else None, + "connectionArguments": connection_arguments if connection_arguments else None, + } + + if not schema: + connection_dict["databaseSchema"] = database + connection = MysqlConnection(**connection_dict) + else: + connection_dict["database"] = database + connection = PostgresConnection(**connection_dict) + + engine: Engine = get_connection(connection) + + return engine diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index b2bd9f181f3..ac8a835356d 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -26,6 +26,7 @@ from metadata.cli.ingest import run_ingest from metadata.cli.openmetadata_imports_migration import ( run_openmetadata_imports_migration, ) +from metadata.cli.restore import run_restore from metadata.config.common import load_config_file from metadata.orm_profiler.api.workflow import ProfilerWorkflow from metadata.test_suite.api.workflow import TestSuiteWorkflow @@ -343,4 +344,94 @@ def openmetadata_imports_migration( run_openmetadata_imports_migration(dir_path) +@metadata.command() +@click.option( + "-h", + "--host", + help="Host that runs the database", + required=True, +) +@click.option( + "-u", + "--user", + help="User to run the restore backup", + required=True, +) +@click.option( + "-p", + "--password", + help="Credentials for the user", + required=True, +) +@click.option( + "-d", + "--database", + help="Database to restore", + required=True, +) +@click.option( + "--port", + help="Database service port", + default="3306", + required=False, +) +@click.option( + "--input", + help="Local backup file path for restore", + type=click.Path(exists=False, dir_okay=True), + default=None, + required=True, +) +@click.option( + "-o", + "--options", + multiple=True, + default=None, +) +@click.option( + "-a", + "--arguments", + multiple=True, + default=None, +) +@click.option( + "-s", + "--schema", + default=None, + required=False, +) +def restore( + host: str, + user: str, + password: str, + database: str, + port: str, + input: str, + options: List[str], + arguments: List[str], + schema: str, +) -> None: + """ + Run a restore for the metadata DB. + + We can pass as many connection options as required with `-o , -o [...]` + Same with connection arguments `-a , -a [...]` + + If `-s` or `--schema` is provided, we will trigger a Postgres Restore instead + of a MySQL restore. This is the value of the schema containing the OpenMetadata + tables. + """ + run_restore( + host, + user, + password, + database, + port, + input, + options, + arguments, + schema, + ) + + metadata.add_command(check)