diff --git a/.github/workflows/py-tests.yml b/.github/workflows/py-tests.yml index e0139c31cd5..81d8181137d 100644 --- a/.github/workflows/py-tests.yml +++ b/.github/workflows/py-tests.yml @@ -16,10 +16,13 @@ on: branches: [main] paths: - ingestion/** + - catalog-rest-service/src/main/resources/json/schema/** pull_request_target: branches: [main] paths: - ingestion/** + - catalog-rest-service/src/main/resources/json/schema/** + jobs: py-run-tests: runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index 022523468d6..b71c68b5c52 100644 --- a/Makefile +++ b/Makefile @@ -35,8 +35,9 @@ black_check: generate: @echo "Running Datamodel Code Generator" - @echo "Make sure to first run `make install_dev`" + @echo "Make sure to first run the install_dev recipe" datamodel-codegen --input catalog-rest-service/src/main/resources/json --input-file-type jsonschema --output ingestion/src/metadata/generated + make install run_ometa_integration_tests: coverage run -m pytest -c ingestion/setup.cfg --doctest-modules --junitxml=ingestion/junit/test-results-integration.xml --override-ini=testpaths="ingestion/tests/integration/ometa ingestion/tests/integration/stage" diff --git a/docker/run_local_docker.sh b/docker/run_local_docker.sh index f044a34bbf9..3c6d717cfef 100755 --- a/docker/run_local_docker.sh +++ b/docker/run_local_docker.sh @@ -14,7 +14,6 @@ cd "$( dirname "${BASH_SOURCE[0]}" )" echo "Maven Build - Skipping Tests" cd ../ && mvn -DskipTests clean package echo "Prepare Docker volume for the operators" -make generate cd docker/local-metadata echo "Starting Local Docker Containers" docker-compose down && docker-compose up --build -d diff --git a/ingestion/setup.py b/ingestion/setup.py index 890c78dbe88..c0f0e2abd0a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -66,6 +66,7 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, # "docker": {"docker==5.0.3"}, "docker": {"python_on_whales==0.34.0"}, + "backup": {"boto3~=1.19.12"}, "dbt": {}, "druid": {"pydruid>=0.6.2"}, "elasticsearch": {"elasticsearch~=7.13.1"}, diff --git a/ingestion/src/metadata/cli/__init__.py b/ingestion/src/metadata/cli/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/cli/backup.py b/ingestion/src/metadata/cli/backup.py new file mode 100644 index 00000000000..f5ecc1930cd --- /dev/null +++ b/ingestion/src/metadata/cli/backup.py @@ -0,0 +1,132 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Backup utility for the metadata CLI +""" +import logging +import subprocess +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Tuple + +import boto3 +import click +from boto3.exceptions import S3UploadFailedError + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def get_output(output: Optional[str] = None) -> Path: + """ + Helper function to prepare the output backup file + path and name. + + It will create the output dir if it does not exist. + + :param output: local path to store the backup + :return: backup file name + """ + now = datetime.now().strftime("%Y%m%d%H%M") + name = f"openmetadata_{now}_backup.sql" + + if output: + # Create the output directory if it does not exists + if not Path(output).is_dir(): + Path(output).mkdir(parents=True, exist_ok=True) + + return Path(output) / name + + return Path(name) + + +def upload_backup(endpoint: str, bucket: str, key: str, file: Path) -> None: + """ + Upload the mysqldump backup file. + We will use boto3 to upload the file to the endpoint + and the key provided. + + :param endpoint: S3 endpoint + :param bucket: S3 bucket to upload the file to + :param key: S3 key to upload the backup file + :param file: file to upload + """ + + s3_key = Path(key) / file.name + click.secho( + f"Uploading {file} to {endpoint}/{bucket}/{str(s3_key)}...", + fg="bright_green", + ) + + try: + resource = boto3.resource(service_name="s3", endpoint_url=endpoint) + resource.Object(bucket, str(s3_key)).upload_file(file.absolute().name) + + except ValueError as err: + logger.error("Revisit the values of --upload") + raise err + except S3UploadFailedError as err: + logger.error( + "Error when uploading the backup to S3. Revisit the config and permissions." + + " You should have set the environment values for AWS_ACCESS_KEY_ID" + + " and AWS_SECRET_ACCESS_KEY" + ) + raise err + + +def run_backup( + host: str, + user: str, + password: str, + database: str, + port: str, + output: Optional[str], + upload: Optional[Tuple[str, str, str]], + options: List[str], +) -> None: + """ + Run `mysqldump` to MySQL database and store the + output. Optionally, upload it to S3. + + :param host: service host + :param user: service user + :param password: service pwd + :param database: database to backup + :param port: database service port + :param output: local path to store the backup + :param upload: URI to upload result file + :param options: list of other options to pass to mysqldump + """ + click.secho( + f"Creating OpenMetadata backup for {host}:{port}/{database}...", + fg="bright_green", + ) + + out = get_output(output) + + mysqldump_root = f"mysqldump -h {host} -u {user} -p{password}" + port_opt = f"-P {port}" if port else "" + + command = " ".join([mysqldump_root, port_opt, *options, database, f"> {out.name}"]) + + res = subprocess.run(command, shell=True) + if res.returncode != 0: + raise RuntimeError("Error encountered when running mysqldump!") + + click.secho( + f"Backup stored locally under {out}", + fg="bright_green", + ) + + if upload: + endpoint, bucket, key = upload + upload_backup(endpoint, bucket, key, out) diff --git a/ingestion/src/metadata/utils/docker.py b/ingestion/src/metadata/cli/docker.py similarity index 95% rename from ingestion/src/metadata/utils/docker.py rename to ingestion/src/metadata/cli/docker.py index 4e5912652fd..58a89f786d3 100644 --- a/ingestion/src/metadata/utils/docker.py +++ b/ingestion/src/metadata/cli/docker.py @@ -1,22 +1,25 @@ -import pathlib -import sys -import time import logging import pathlib -import click -import requests as requests +import sys import tempfile +import time import traceback from datetime import timedelta +import click +import requests as requests + logger = logging.getLogger(__name__) logging.getLogger("urllib3").setLevel(logging.WARN) # Configure logger. -BASE_LOGGING_FORMAT = ( - "[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s" +handler = logging.StreamHandler() +handler.setFormatter( + logging.Formatter( + "[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s" + ) ) -logging.basicConfig(format=BASE_LOGGING_FORMAT) +logger.addHandler(handler) calc_gb = 1024 * 1024 * 1000 min_memory_limit = 3 * calc_gb @@ -96,7 +99,7 @@ def run_docker(start, stop, pause, resume, clean, file_path): ometa_client.get(f"/tables/name/bigquery_gcp.shopify.dim_customer") break except Exception as err: - sys.stdout.write(".") + sys.stdout.write("../utils") sys.stdout.flush() time.sleep(5) elapsed = time.time() - start_time diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 56463a27e92..f7de4480bdf 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -13,16 +13,17 @@ import logging import os import pathlib import sys +from typing import List, Optional, Tuple import click from pydantic import ValidationError from metadata.__version__ import get_metadata_version +from metadata.cli.backup import run_backup +from metadata.cli.docker import run_docker from metadata.config.common import load_config_file from metadata.ingestion.api.workflow import Workflow -from metadata.profiler.profiler_metadata import ProfileResult from metadata.profiler.profiler_runner import ProfilerRunner -from metadata.utils.docker import run_docker logger = logging.getLogger(__name__) @@ -187,4 +188,76 @@ def docker(start, stop, pause, resume, clean, file_path) -> None: run_docker(start, stop, pause, resume, clean, file_path) +@metadata.command() +@click.option( + "-h", + "--host", + help="Host that runs the database", + required=True, +) +@click.option( + "-u", + "--user", + help="User to run the backup", + required=True, +) +@click.option( + "-p", + "--password", + help="Credentials for the user", + required=True, +) +@click.option( + "-d", + "--database", + help="Database to backup", + required=True, +) +@click.option( + "--port", + help="Database service port", + default="3306", + required=False, +) +@click.option( + "--output", + help="Local path to store the backup", + type=click.Path(exists=False, dir_okay=True), + default=None, + required=False, +) +@click.option( + "--upload", + help="S3 endpoint, bucket & key to upload the backup file", + nargs=3, + type=click.Tuple([str, str, str]), + default=None, + required=False, +) +@click.option( + "-o", "--options", multiple=True, default=["--protocol=tcp", "--no-tablespaces"] +) +def backup( + host: str, + user: str, + password: str, + database: str, + port: str, + output: Optional[str], + upload: Optional[Tuple[str, str, str]], + options: List[str], +) -> None: + """ + Run a backup for the metadata DB. + Requires mysqldump installed on the host. + + We can pass as many options as required with `-o , -o [...]` + + To run the upload, provide the information as + `--upload endpoint bucket key` and properly configure the environment + variables AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY + """ + run_backup(host, user, password, database, port, output, upload, options) + + metadata.add_command(check)