Add: Add metadata restore CLI (#7596)

* Add: Add metadata restore CLI

* Fix: Query execution error

* Fix: Query execution error

* Fix: Change based on comments

* Update ingestion/src/metadata/cli/restore.py

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Milan Bariya 2022-09-30 18:32:16 +05:30 committed by GitHub
parent c4b0bb0538
commit d2adde53c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 248 additions and 30 deletions

View File

@ -18,17 +18,9 @@ from pathlib import Path
from typing import List, Optional, Tuple
import click
from sqlalchemy.engine import Engine
from metadata.cli.db_dump import dump
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.utils.connections import get_connection
from metadata.utils.helpers import list_to_dict
from metadata.cli.utils import get_engine
from metadata.utils.logger import cli_logger
logger = cli_logger()
@ -138,26 +130,9 @@ def run_backup(
out = get_output(output)
connection_options = list_to_dict(options)
connection_arguments = list_to_dict(arguments)
connection_dict = {
"hostPort": f"{host}:{port}",
"username": user,
"password": password,
"connectionOptions": connection_options if connection_options else None,
"connectionArguments": connection_arguments if connection_arguments else None,
}
if not schema:
connection_dict["databaseSchema"] = database
connection = MysqlConnection(**connection_dict)
else:
connection_dict["database"] = database
connection = PostgresConnection(**connection_dict)
engine: Engine = get_connection(connection)
engine = get_engine(
host, port, user, password, options, arguments, schema, database
)
dump(engine=engine, output=out, schema=schema)
click.secho(

View File

@ -11,7 +11,10 @@ TABLES_DUMP_ALL = {
"entity_extension",
"field_relationship",
"tag_usage",
"openmetadata_settings",
}
CUSTOM_TABLES = {"entity_extension_time_series": {"exclude_columns": ["timestamp"]}}
NOT_MIGRATE = {"DATABASE_CHANGE_LOG"}
STATEMENT_JSON = "SELECT json FROM {table}"
@ -64,6 +67,40 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
file.write(insert)
def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:
"""
This function is used to dump entities with custom handling
"""
with open(output, "a") as file:
for table, data in CUSTOM_TABLES.items():
truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate)
columns = inspector.get_columns(table_name=table)
STATEMENT_ALL_NEW = "SELECT {cols} FROM {table}".format(
cols=",".join(
col["name"]
for col in columns
if col["name"] not in data["exclude_columns"]
),
table=table,
)
res = engine.execute(text(STATEMENT_ALL_NEW)).all()
for row in res:
insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(
table=table,
data=",".join(clean_col(col) for col in row),
cols=",".join(
col["name"]
for col in columns
if col["name"] not in data["exclude_columns"]
),
)
file.write(insert)
def dump(engine: Engine, output: Path, schema: str = None) -> None:
"""
Get all tables from the database and dump
@ -77,8 +114,11 @@ def dump(engine: Engine, output: Path, schema: str = None) -> None:
dump_json_tables = [
table
for table in tables
if table not in TABLES_DUMP_ALL and table not in NOT_MIGRATE
if table not in TABLES_DUMP_ALL
and table not in NOT_MIGRATE
and table not in CUSTOM_TABLES.keys()
]
dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output)
dump_json(tables=dump_json_tables, engine=engine, output=output)
dump_entity_custom(engine=engine, output=output, inspector=inspector)

View File

@ -0,0 +1,77 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Restore utility for the metadata CLI
"""
from pathlib import Path
from typing import List, Optional
import click
from sqlalchemy.engine import Engine
from metadata.cli.utils import get_engine
from metadata.utils.logger import cli_logger
logger = cli_logger()
def execute_sql_file(engine: Engine, input: Path, schema: str = None) -> None:
with open(input, encoding="utf-8") as f:
for query in f.readlines():
# `%` is a reserved syntax in SQLAlchemy to bind parameters. Escaping it with `%%`
clean_query = query.replace("%", "%%")
with engine.connect() as conn:
conn.execute(clean_query)
def run_restore(
host: str,
user: str,
password: str,
database: str,
port: str,
input: str,
options: List[str],
arguments: List[str],
schema: Optional[str] = None,
) -> None:
"""
Run and restore the
buckup. Optionally, download it from S3.
:param host: service host
:param user: service user
:param password: service pwd
:param database: database to back up
:param port: database service port
:param intput: local path of file to restore the backup
:param options: list of other connection options
:param arguments: list of connection arguments
:param schema: Run the process against Postgres with the given schema
"""
click.secho(
f"Restoring OpenMetadata backup for {host}:{port}/{database}...",
fg="bright_green",
)
engine = get_engine(
host, port, user, password, options, arguments, schema, database
)
execute_sql_file(engine=engine, input=input, schema=schema)
click.secho(
f"Backup restored from {input}",
fg="bright_green",
)

View File

@ -0,0 +1,35 @@
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.utils.connections import get_connection
from metadata.utils.helpers import list_to_dict
def get_engine(host, port, user, password, options, arguments, schema, database):
connection_options = list_to_dict(options)
connection_arguments = list_to_dict(arguments)
connection_dict = {
"hostPort": f"{host}:{port}",
"username": user,
"password": password,
"connectionOptions": connection_options if connection_options else None,
"connectionArguments": connection_arguments if connection_arguments else None,
}
if not schema:
connection_dict["databaseSchema"] = database
connection = MysqlConnection(**connection_dict)
else:
connection_dict["database"] = database
connection = PostgresConnection(**connection_dict)
engine: Engine = get_connection(connection)
return engine

View File

@ -26,6 +26,7 @@ from metadata.cli.ingest import run_ingest
from metadata.cli.openmetadata_imports_migration import (
run_openmetadata_imports_migration,
)
from metadata.cli.restore import run_restore
from metadata.config.common import load_config_file
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.test_suite.api.workflow import TestSuiteWorkflow
@ -343,4 +344,94 @@ def openmetadata_imports_migration(
run_openmetadata_imports_migration(dir_path)
@metadata.command()
@click.option(
"-h",
"--host",
help="Host that runs the database",
required=True,
)
@click.option(
"-u",
"--user",
help="User to run the restore backup",
required=True,
)
@click.option(
"-p",
"--password",
help="Credentials for the user",
required=True,
)
@click.option(
"-d",
"--database",
help="Database to restore",
required=True,
)
@click.option(
"--port",
help="Database service port",
default="3306",
required=False,
)
@click.option(
"--input",
help="Local backup file path for restore",
type=click.Path(exists=False, dir_okay=True),
default=None,
required=True,
)
@click.option(
"-o",
"--options",
multiple=True,
default=None,
)
@click.option(
"-a",
"--arguments",
multiple=True,
default=None,
)
@click.option(
"-s",
"--schema",
default=None,
required=False,
)
def restore(
host: str,
user: str,
password: str,
database: str,
port: str,
input: str,
options: List[str],
arguments: List[str],
schema: str,
) -> None:
"""
Run a restore for the metadata DB.
We can pass as many connection options as required with `-o <opt1>, -o <opt2> [...]`
Same with connection arguments `-a <arg1>, -a <arg2> [...]`
If `-s` or `--schema` is provided, we will trigger a Postgres Restore instead
of a MySQL restore. This is the value of the schema containing the OpenMetadata
tables.
"""
run_restore(
host,
user,
password,
database,
port,
input,
options,
arguments,
schema,
)
metadata.add_command(check)