mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-14 12:06:54 +00:00
parent
d6ffb0aa70
commit
6701ae6416
@ -12,13 +12,22 @@
|
||||
"""
|
||||
Backup utility for the metadata CLI
|
||||
"""
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import click
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from metadata.cli.db_dump import dump
|
||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||
MysqlConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
|
||||
PostgresConnection,
|
||||
)
|
||||
from metadata.utils.connections import get_connection
|
||||
from metadata.utils.helpers import list_to_dict
|
||||
from metadata.utils.logger import cli_logger
|
||||
|
||||
logger = cli_logger()
|
||||
@ -100,6 +109,8 @@ def run_backup(
|
||||
output: Optional[str],
|
||||
upload: Optional[Tuple[str, str, str]],
|
||||
options: List[str],
|
||||
arguments: List[str],
|
||||
schema: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Run `mysqldump` to MySQL database and store the
|
||||
@ -108,11 +119,13 @@ def run_backup(
|
||||
:param host: service host
|
||||
:param user: service user
|
||||
:param password: service pwd
|
||||
:param database: database to backup
|
||||
:param database: database to back up
|
||||
:param port: database service port
|
||||
:param output: local path to store the backup
|
||||
:param upload: URI to upload result file
|
||||
:param options: list of other options to pass to mysqldump
|
||||
:param options: list of other connection options
|
||||
:param arguments: list of connection arguments
|
||||
:param schema: Run the process against Postgres with the given schema
|
||||
"""
|
||||
click.secho(
|
||||
f"Creating OpenMetadata backup for {host}:{port}/{database}...",
|
||||
@ -121,14 +134,27 @@ def run_backup(
|
||||
|
||||
out = get_output(output)
|
||||
|
||||
mysqldump_root = f"mysqldump -h {host} -u {user} -p{password}"
|
||||
port_opt = f"-P {port}" if port else ""
|
||||
connection_options = list_to_dict(options)
|
||||
connection_arguments = list_to_dict(arguments)
|
||||
|
||||
command = " ".join([mysqldump_root, port_opt, *options, database, f"> {out}"])
|
||||
connection_dict = {
|
||||
"hostPort": f"{host}:{port}",
|
||||
"username": user,
|
||||
"password": password,
|
||||
"connectionOptions": connection_options if connection_options else None,
|
||||
"connectionArguments": connection_arguments if connection_arguments else None,
|
||||
}
|
||||
|
||||
res = subprocess.run(command, shell=True)
|
||||
if res.returncode != 0:
|
||||
raise RuntimeError("Error encountered when running mysqldump!")
|
||||
if not schema:
|
||||
connection_dict["databaseSchema"] = database
|
||||
connection = MysqlConnection(**connection_dict)
|
||||
else:
|
||||
connection_dict["database"] = database
|
||||
connection = PostgresConnection(**connection_dict)
|
||||
|
||||
engine: Engine = get_connection(connection)
|
||||
|
||||
dump(engine=engine, output=out, schema=schema)
|
||||
|
||||
click.secho(
|
||||
f"Backup stored locally under {out}",
|
||||
|
84
ingestion/src/metadata/cli/db_dump.py
Normal file
84
ingestion/src/metadata/cli/db_dump.py
Normal file
@ -0,0 +1,84 @@
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import inspect, text
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
TABLES_DUMP_ALL = {
|
||||
"task_sequence",
|
||||
"entity_usage",
|
||||
"entity_relationship",
|
||||
"entity_extension",
|
||||
"field_relationship",
|
||||
"tag_usage",
|
||||
}
|
||||
NOT_MIGRATE = {"DATABASE_CHANGE_LOG"}
|
||||
|
||||
STATEMENT_JSON = "SELECT json FROM {table}"
|
||||
STATEMENT_ALL = "SELECT * FROM {table}"
|
||||
STATEMENT_TRUNCATE = "TRUNCATE TABLE {table};\n"
|
||||
|
||||
|
||||
def clean_col(column_raw: str) -> str:
|
||||
"""
|
||||
Prepare the column to be inserted to MySQL
|
||||
"""
|
||||
return (
|
||||
repr(str(column_raw)).replace('"', '\\"') if column_raw is not None else "null"
|
||||
)
|
||||
|
||||
|
||||
def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
|
||||
"""
|
||||
Dumps JSON data
|
||||
"""
|
||||
with open(output, "a") as file:
|
||||
for table in tables:
|
||||
|
||||
truncate = STATEMENT_TRUNCATE.format(table=table)
|
||||
file.write(truncate)
|
||||
|
||||
res = engine.execute(text(STATEMENT_JSON.format(table=table))).all()
|
||||
for row in res:
|
||||
insert = "INSERT INTO {table} (json) VALUES ({data});\n".format(
|
||||
table=table, data=clean_col(row.json)
|
||||
)
|
||||
file.write(insert)
|
||||
|
||||
|
||||
def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
|
||||
"""
|
||||
Dump tables that need to store all data
|
||||
"""
|
||||
with open(output, "a") as file:
|
||||
for table in tables:
|
||||
|
||||
truncate = STATEMENT_TRUNCATE.format(table=table)
|
||||
file.write(truncate)
|
||||
|
||||
res = engine.execute(text(STATEMENT_ALL.format(table=table))).all()
|
||||
for row in res:
|
||||
insert = "INSERT INTO {table} VALUES ({data});\n".format(
|
||||
table=table, data=",".join(clean_col(col) for col in row)
|
||||
)
|
||||
file.write(insert)
|
||||
|
||||
|
||||
def dump(engine: Engine, output: Path, schema: str = None) -> None:
|
||||
"""
|
||||
Get all tables from the database and dump
|
||||
only the JSON column for the required tables
|
||||
"""
|
||||
inspector = inspect(engine)
|
||||
tables = (
|
||||
inspector.get_table_names(schema) if schema else inspector.get_table_names()
|
||||
)
|
||||
|
||||
dump_json_tables = [
|
||||
table
|
||||
for table in tables
|
||||
if table not in TABLES_DUMP_ALL and table not in NOT_MIGRATE
|
||||
]
|
||||
|
||||
dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output)
|
||||
dump_json(tables=dump_json_tables, engine=engine, output=output)
|
@ -236,7 +236,22 @@ def docker(
|
||||
required=False,
|
||||
)
|
||||
@click.option(
|
||||
"-o", "--options", multiple=True, default=["--protocol=tcp", "--no-tablespaces"]
|
||||
"-o",
|
||||
"--options",
|
||||
multiple=True,
|
||||
default=None,
|
||||
)
|
||||
@click.option(
|
||||
"-a",
|
||||
"--arguments",
|
||||
multiple=True,
|
||||
default=None,
|
||||
)
|
||||
@click.option(
|
||||
"-s",
|
||||
"--schema",
|
||||
default=None,
|
||||
required=False,
|
||||
)
|
||||
def backup(
|
||||
host: str,
|
||||
@ -247,18 +262,26 @@ def backup(
|
||||
output: Optional[str],
|
||||
upload: Optional[Tuple[str, str, str]],
|
||||
options: List[str],
|
||||
arguments: List[str],
|
||||
schema: str,
|
||||
) -> None:
|
||||
"""
|
||||
Run a backup for the metadata DB.
|
||||
Requires mysqldump installed on the host.
|
||||
Run a backup for the metadata DB. Uses a custom dump strategy for OpenMetadata tables.
|
||||
|
||||
We can pass as many options as required with `-o <opt1>, -o <opt2> [...]`
|
||||
We can pass as many connection options as required with `-o <opt1>, -o <opt2> [...]`
|
||||
Same with connection arguments `-a <arg1>, -a <arg2> [...]`
|
||||
|
||||
To run the upload, provide the information as
|
||||
`--upload endpoint bucket key` and properly configure the environment
|
||||
variables AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY
|
||||
variables AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY.
|
||||
|
||||
If `-s` or `--schema` is provided, we will trigger a Postgres backup instead
|
||||
of a MySQL backup. This is the value of the schema containing the OpenMetadata
|
||||
tables.
|
||||
"""
|
||||
run_backup(host, user, password, database, port, output, upload, options)
|
||||
run_backup(
|
||||
host, user, password, database, port, output, upload, options, arguments, schema
|
||||
)
|
||||
|
||||
|
||||
metadata.add_command(check)
|
||||
|
@ -281,3 +281,17 @@ def find_column_in_table(column_name: str, table: Table) -> Optional[Column]:
|
||||
return next(
|
||||
(col for col in table.columns if col.name.__root__ == column_name), None
|
||||
)
|
||||
|
||||
|
||||
def list_to_dict(original: Optional[List[str]], sep: str = "=") -> Dict[str, str]:
|
||||
"""
|
||||
Given a list with strings that have a separator,
|
||||
convert that to a dictionary of key-value pairs
|
||||
"""
|
||||
if not original:
|
||||
return {}
|
||||
|
||||
split_original = [
|
||||
(elem.split(sep)[0], elem.split(sep)[1]) for elem in original if sep in elem
|
||||
]
|
||||
return dict(split_original)
|
||||
|
29
ingestion/tests/unit/test_helpers.py
Normal file
29
ingestion/tests/unit/test_helpers.py
Normal file
@ -0,0 +1,29 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Test helpers module
|
||||
"""
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.utils.helpers import list_to_dict
|
||||
|
||||
|
||||
class TestHelpers(TestCase):
|
||||
"""
|
||||
Test helpers module
|
||||
"""
|
||||
|
||||
def test_list_to_dict(self):
|
||||
original = ["key=value", "a=b"]
|
||||
|
||||
self.assertEqual(list_to_dict(original=original), {"key": "value", "a": "b"})
|
||||
self.assertEqual(list_to_dict([]), {})
|
||||
self.assertEqual(list_to_dict(None), {})
|
@ -29,11 +29,29 @@ you can instead install the package with the backup plugin:
|
||||
pip install "openmetadata-ingestion[backup]"
|
||||
```
|
||||
|
||||
This tool acts as a wrapper around the powerful `mysqldump` utility with some commodity addons on top. `mysqldump` is part
|
||||
of the `mysql-client` package and can be installed on your machine as:
|
||||
## Requirements & Considerations
|
||||
|
||||
- **macOS**: `brew install mysql-client`
|
||||
- **Ubuntu**: `sudo apt-get install mysql-client`
|
||||
This is a custom utility. As almost all tables contain `GENERATED` columns, directly using `mysqldump` is not an
|
||||
option out of the box, as it would require some further cleaning steps to get the data right.
|
||||
|
||||
Instead, we have created a utility that will just dump the necessary data.
|
||||
|
||||
The requirement for running the process is that the target database should have the Flyway migrations executed.
|
||||
|
||||
The backup utility will provide an SQL file which will do two things:
|
||||
1. TRUNCATE the OpenMetadata tables
|
||||
2. INSERT the data that has been saved
|
||||
|
||||
You can then run the script's statements to restore the data.
|
||||
|
||||
<Note>
|
||||
|
||||
Make sure that the migrations have been run correctly (find out how [here](/deployment/bare-metal#4-prepare-the-openmetadata-database-and-indexes)).
|
||||
|
||||
Also, make sure that the target database does not already have any OpenMetadata data, or if it does, that you are OK
|
||||
replacing it with whatever comes from the SQL script.
|
||||
|
||||
</Note>
|
||||
|
||||
## Backup CLI
|
||||
|
||||
@ -43,13 +61,14 @@ After the installation, we can take a look at the different options to run the C
|
||||
> metadata backup --help
|
||||
Usage: metadata backup [OPTIONS]
|
||||
|
||||
Run a backup for the metadata DB. Requires mysqldump installed on the
|
||||
host.
|
||||
Run a backup for the metadata DB. Uses a custom dump strategy for
|
||||
OpenMetadata tables.
|
||||
|
||||
We can pass as many options as required with `-o <opt1>, -o <opt2> [...]`
|
||||
We can pass as many connection options as required with `-o <opt1>, -o
|
||||
<opt2> [...]` Same with connection arguments `-a <arg1>, -a <arg2> [...]`
|
||||
|
||||
To run the upload, provide the information as `--upload endpoint bucket
|
||||
key` and properly configure the environment variables AWS_ACCESS_KEY_ID &
|
||||
To run the upload, provide the information as `--upload endpoint bucket key`
|
||||
and properly configure the environment variables AWS_ACCESS_KEY_ID &
|
||||
AWS_SECRET_ACCESS_KEY
|
||||
|
||||
Options:
|
||||
@ -62,6 +81,8 @@ Options:
|
||||
--upload <TEXT TEXT TEXT>... S3 endpoint, bucket & key to upload the backup
|
||||
file
|
||||
-o, --options TEXT
|
||||
-a, --arguments TEXT
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
### Database Connection
|
||||
@ -83,13 +104,15 @@ We currently support uploading the backup files to S3. To run this, make sure to
|
||||
we can just use `--upload <endpoint> <bucket> <key>` to have the CLI upload the file. In this case, you'll get both the
|
||||
local dump file and the one in the cloud.
|
||||
|
||||
### mysqldump options
|
||||
### Connection Options and Arguments
|
||||
|
||||
`mysqldump` allows many options when running the command, and some of them might be required in different infrastructures.
|
||||
The `--options` parameters help us pass to `mysqldump` all of these required options via `-o <opt1>, -o <opt2> [...]`. An
|
||||
example of this could be the default values we have used for them: `--protocol=tcp` and `--no-tablespaces`, which are
|
||||
required to run the command pointing to the local Docker container with the database and the default `read-only` user
|
||||
OpenMetadata provides in the Docker Compose.
|
||||
You can pass any required connection options or arguments to the MySQL connection via `-o <opt1>, -o <opt2> [...]`
|
||||
or `-a <arg1>, -a <arg2> [...]`.
|
||||
|
||||
### Backup Postgres
|
||||
|
||||
If you are saving the data from Postgres, pass the argument `-s <schema>` or `--schema=<schema>` to indicate the
|
||||
schema containing the OpenMetadata tables. E.g., `-s public`.
|
||||
|
||||
### Trying it out
|
||||
|
||||
@ -125,4 +148,3 @@ Uploading dir1/dir2/openmetadata_202201250823_backup.sql to http://localhost:900
|
||||
If we now head to the minio console and check the `my-backup` bucket, we'll see our SQL dump in there.
|
||||
|
||||
<Image src="/images/deployment/backup/minio-example.png" alt="minio"/>
|
||||
|
||||
|
@ -25,12 +25,13 @@ Note that it currently has a bug where we generate an entry:
|
||||
which is incorrect and should be removed when pasting this in.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import jsonschema2md
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
import jsonschema2md
|
||||
|
||||
SOURCES_ROOT = "catalog-rest-service/src/main/resources/json/schema"
|
||||
SINK_ROOT = "openmetadata-docs/content"
|
||||
@ -77,9 +78,11 @@ def prepare_menu(new_file: Path, is_file: bool) -> None:
|
||||
category_root = "- category: Main Concepts / Metadata Standard / Schemas / "
|
||||
category_suffix = str(new_file.parent).replace(SCHEMAS_ROOT, "")
|
||||
|
||||
title = ([to_tile(new_file.stem)] if is_file else [])
|
||||
title = [to_tile(new_file.stem)] if is_file else []
|
||||
|
||||
category_suffix_list = list(map(lambda x: x.capitalize(), category_suffix.split("/"))) + title
|
||||
category_suffix_list = (
|
||||
list(map(lambda x: x.capitalize(), category_suffix.split("/"))) + title
|
||||
)
|
||||
category = category_root + " / ".join(category_suffix_list)
|
||||
print(category)
|
||||
print(f" url: {slug}")
|
||||
@ -104,16 +107,11 @@ def main() -> None:
|
||||
We build a list of (FilePath, True or False, if it is file or index)
|
||||
"""
|
||||
|
||||
results = [
|
||||
(file, True)
|
||||
for file in Path(SOURCES_ROOT).rglob("*.json")
|
||||
]
|
||||
results = [(file, True) for file in Path(SOURCES_ROOT).rglob("*.json")]
|
||||
|
||||
directories = [Path(x[0]) for x in os.walk(SOURCES_ROOT)]
|
||||
|
||||
indexes = list(
|
||||
(directory / "index.md", False) for directory in directories
|
||||
)
|
||||
indexes = list((directory / "index.md", False) for directory in directories)
|
||||
|
||||
all_elems = results + indexes
|
||||
all_elems.sort()
|
||||
|
Loading…
x
Reference in New Issue
Block a user