[issue-2322] - Backup CLI (#2349)

* Prepare cli module

* Check call

* Prepare backup

* Run pytest on schema changes

* Remove traces

* Add backup plugin

* run generate during CI

* Install after generate

* Address logger hotspot
This commit is contained in:
Pere Miquel Brull 2022-01-22 21:08:14 +01:00 committed by GitHub
parent 8fca53ec52
commit 1eb3f1ad41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 225 additions and 13 deletions

View File

@ -16,10 +16,13 @@ on:
branches: [main]
paths:
- ingestion/**
- catalog-rest-service/src/main/resources/json/schema/**
pull_request_target:
branches: [main]
paths:
- ingestion/**
- catalog-rest-service/src/main/resources/json/schema/**
jobs:
py-run-tests:
runs-on: ubuntu-latest

View File

@ -35,8 +35,9 @@ black_check:
generate:
@echo "Running Datamodel Code Generator"
@echo "Make sure to first run `make install_dev`"
@echo "Make sure to first run the install_dev recipe"
datamodel-codegen --input catalog-rest-service/src/main/resources/json --input-file-type jsonschema --output ingestion/src/metadata/generated
make install
run_ometa_integration_tests:
coverage run -m pytest -c ingestion/setup.cfg --doctest-modules --junitxml=ingestion/junit/test-results-integration.xml --override-ini=testpaths="ingestion/tests/integration/ometa ingestion/tests/integration/stage"

View File

@ -14,7 +14,6 @@ cd "$( dirname "${BASH_SOURCE[0]}" )"
echo "Maven Build - Skipping Tests"
cd ../ && mvn -DskipTests clean package
echo "Prepare Docker volume for the operators"
make generate
cd docker/local-metadata
echo "Starting Local Docker Containers"
docker-compose down && docker-compose up --build -d

View File

@ -66,6 +66,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": {"google-cloud-logging", "cachetools"},
# "docker": {"docker==5.0.3"},
"docker": {"python_on_whales==0.34.0"},
"backup": {"boto3~=1.19.12"},
"dbt": {},
"druid": {"pydruid>=0.6.2"},
"elasticsearch": {"elasticsearch~=7.13.1"},

View File

View File

@ -0,0 +1,132 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Backup utility for the metadata CLI
"""
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
import boto3
import click
from boto3.exceptions import S3UploadFailedError
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_output(output: Optional[str] = None) -> Path:
"""
Helper function to prepare the output backup file
path and name.
It will create the output dir if it does not exist.
:param output: local path to store the backup
:return: backup file name
"""
now = datetime.now().strftime("%Y%m%d%H%M")
name = f"openmetadata_{now}_backup.sql"
if output:
# Create the output directory if it does not exists
if not Path(output).is_dir():
Path(output).mkdir(parents=True, exist_ok=True)
return Path(output) / name
return Path(name)
def upload_backup(endpoint: str, bucket: str, key: str, file: Path) -> None:
"""
Upload the mysqldump backup file.
We will use boto3 to upload the file to the endpoint
and the key provided.
:param endpoint: S3 endpoint
:param bucket: S3 bucket to upload the file to
:param key: S3 key to upload the backup file
:param file: file to upload
"""
s3_key = Path(key) / file.name
click.secho(
f"Uploading {file} to {endpoint}/{bucket}/{str(s3_key)}...",
fg="bright_green",
)
try:
resource = boto3.resource(service_name="s3", endpoint_url=endpoint)
resource.Object(bucket, str(s3_key)).upload_file(file.absolute().name)
except ValueError as err:
logger.error("Revisit the values of --upload")
raise err
except S3UploadFailedError as err:
logger.error(
"Error when uploading the backup to S3. Revisit the config and permissions."
+ " You should have set the environment values for AWS_ACCESS_KEY_ID"
+ " and AWS_SECRET_ACCESS_KEY"
)
raise err
def run_backup(
host: str,
user: str,
password: str,
database: str,
port: str,
output: Optional[str],
upload: Optional[Tuple[str, str, str]],
options: List[str],
) -> None:
"""
Run `mysqldump` to MySQL database and store the
output. Optionally, upload it to S3.
:param host: service host
:param user: service user
:param password: service pwd
:param database: database to backup
:param port: database service port
:param output: local path to store the backup
:param upload: URI to upload result file
:param options: list of other options to pass to mysqldump
"""
click.secho(
f"Creating OpenMetadata backup for {host}:{port}/{database}...",
fg="bright_green",
)
out = get_output(output)
mysqldump_root = f"mysqldump -h {host} -u {user} -p{password}"
port_opt = f"-P {port}" if port else ""
command = " ".join([mysqldump_root, port_opt, *options, database, f"> {out.name}"])
res = subprocess.run(command, shell=True)
if res.returncode != 0:
raise RuntimeError("Error encountered when running mysqldump!")
click.secho(
f"Backup stored locally under {out}",
fg="bright_green",
)
if upload:
endpoint, bucket, key = upload
upload_backup(endpoint, bucket, key, out)

View File

@ -1,22 +1,25 @@
import pathlib
import sys
import time
import logging
import pathlib
import click
import requests as requests
import sys
import tempfile
import time
import traceback
from datetime import timedelta
import click
import requests as requests
logger = logging.getLogger(__name__)
logging.getLogger("urllib3").setLevel(logging.WARN)
# Configure logger.
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s"
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s"
)
)
logging.basicConfig(format=BASE_LOGGING_FORMAT)
logger.addHandler(handler)
calc_gb = 1024 * 1024 * 1000
min_memory_limit = 3 * calc_gb
@ -96,7 +99,7 @@ def run_docker(start, stop, pause, resume, clean, file_path):
ometa_client.get(f"/tables/name/bigquery_gcp.shopify.dim_customer")
break
except Exception as err:
sys.stdout.write(".")
sys.stdout.write("../utils")
sys.stdout.flush()
time.sleep(5)
elapsed = time.time() - start_time

View File

@ -13,16 +13,17 @@ import logging
import os
import pathlib
import sys
from typing import List, Optional, Tuple
import click
from pydantic import ValidationError
from metadata.__version__ import get_metadata_version
from metadata.cli.backup import run_backup
from metadata.cli.docker import run_docker
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from metadata.profiler.profiler_metadata import ProfileResult
from metadata.profiler.profiler_runner import ProfilerRunner
from metadata.utils.docker import run_docker
logger = logging.getLogger(__name__)
@ -187,4 +188,76 @@ def docker(start, stop, pause, resume, clean, file_path) -> None:
run_docker(start, stop, pause, resume, clean, file_path)
@metadata.command()
@click.option(
"-h",
"--host",
help="Host that runs the database",
required=True,
)
@click.option(
"-u",
"--user",
help="User to run the backup",
required=True,
)
@click.option(
"-p",
"--password",
help="Credentials for the user",
required=True,
)
@click.option(
"-d",
"--database",
help="Database to backup",
required=True,
)
@click.option(
"--port",
help="Database service port",
default="3306",
required=False,
)
@click.option(
"--output",
help="Local path to store the backup",
type=click.Path(exists=False, dir_okay=True),
default=None,
required=False,
)
@click.option(
"--upload",
help="S3 endpoint, bucket & key to upload the backup file",
nargs=3,
type=click.Tuple([str, str, str]),
default=None,
required=False,
)
@click.option(
"-o", "--options", multiple=True, default=["--protocol=tcp", "--no-tablespaces"]
)
def backup(
host: str,
user: str,
password: str,
database: str,
port: str,
output: Optional[str],
upload: Optional[Tuple[str, str, str]],
options: List[str],
) -> None:
"""
Run a backup for the metadata DB.
Requires mysqldump installed on the host.
We can pass as many options as required with `-o <opt1>, -o <opt2> [...]`
To run the upload, provide the information as
`--upload endpoint bucket key` and properly configure the environment
variables AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY
"""
run_backup(host, user, password, database, port, output, upload, options)
metadata.add_command(check)