MINOR - Clean metadata CLI (#15631)

* Docs

* MINOR - Clean metadata CLI

* remove tests
This commit is contained in:
Pere Miquel Brull 2024-03-26 16:36:47 +01:00 committed by GitHub
parent 813a9ca451
commit 9d7bfa363e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 14 additions and 1171 deletions

View File

@ -184,7 +184,6 @@ plugins: Dict[str, Set[str]] = {
*COMMONS["datalake"],
},
"deltalake": {"delta-spark<=2.3.0"},
"docker": {"python_on_whales==0.55.0"},
"domo": {VERSIONS["pydomo"]},
"doris": {"pydoris==1.0.2"},
"druid": {"pydruid>=0.6.5"},

View File

@ -1,382 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Docker functions for CLI
"""
import json
import os
import pathlib
import shutil
import sys
import tempfile
import time
import traceback
from base64 import b64encode
from datetime import timedelta
from typing import Optional
import requests
from requests._internal_utils import to_native_string
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.client_version import get_client_version
from metadata.utils.helpers import DockerActions
from metadata.utils.logger import (
ANSI,
cli_logger,
log_ansi_encoded_string,
ometa_logger,
)
logger = cli_logger()
CALC_GB = 1024 * 1024 * 1024
MIN_MEMORY_LIMIT = 6 * CALC_GB
MAIN_DIR = "docker-volume"
RELEASE_BRANCH_VERSION = get_client_version()
REQUESTS_TIMEOUT = 60 * 5
DOCKER_URL_ROOT = (
"https://raw.githubusercontent.com/open-metadata/OpenMetadata/"
f"{RELEASE_BRANCH_VERSION}/docker/docker-compose-quickstart/"
)
DEFAULT_COMPOSE_FILE = "docker-compose.yml"
BACKEND_DATABASES = {
"mysql": DEFAULT_COMPOSE_FILE,
"postgres": "docker-compose-postgres.yml",
}
DEFAULT_JWT_TOKEN = (
"eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9"
".eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg"
"0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiE"
"C0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQh"
"yNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLw"
"Sl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTn"
"P49U493VanKpUAfzIiOiIbhg"
)
def docker_volume():
# create a main directory
if not os.path.exists(MAIN_DIR):
os.mkdir(MAIN_DIR)
def start_docker(docker, start_time, file_path, ingest_sample_data: bool):
"""
Method for starting up the docker containers
"""
logger.info("Running docker compose for OpenMetadata..")
docker_volume()
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=False, message="It may take some time on the first run "
)
if file_path:
docker.compose.up(detach=True, build=True)
else:
docker.compose.up(detach=True)
logger.info("Ran docker compose for OpenMetadata successfully.")
if ingest_sample_data:
logger.info("Waiting for ingestion to complete..")
wait_for_containers(docker)
run_sample_data()
metadata_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(jwtToken=DEFAULT_JWT_TOKEN),
)
ometa_logger().disabled = True
ometa_client = OpenMetadata(metadata_config)
while True:
try:
resp = ometa_client.get_by_name(
entity=Table, fqn="sample_data.ecommerce_db.shopify.dim_customer"
)
if not resp:
raise RuntimeError("Error")
break
except Exception:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(5)
ometa_logger().disabled = False
# Wait until docker is not only running, but the server is up
log_ansi_encoded_string(
color=ANSI.YELLOW,
bold=False,
message="Waiting for server to be up at http://localhost:8585 ",
)
while True:
try:
res = requests.get("http://localhost:8585", timeout=REQUESTS_TIMEOUT)
if res.status_code == 200:
break
except Exception:
pass
time.sleep(5)
elapsed = time.time() - start_time
logger.info(
f"Time taken to get OpenMetadata running: {str(timedelta(seconds=elapsed))}"
)
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message="\n✅ OpenMetadata is up and running",
)
log_ansi_encoded_string(
color=ANSI.BLUE,
bold=False,
message="\nOpen http://localhost:8585 in your browser to access OpenMetadata."
"\nTo checkout Ingestion via Airflow, go to http://localhost:8080 "
"\n(username: admin, password: admin)",
)
log_ansi_encoded_string(
color=ANSI.MAGENTA,
bold=False,
message="We are available on Slack, https://slack.open-metadata.org/."
"Reach out to us if you have any questions."
"\nIf you like what we are doing, please consider giving us a star on github at"
" https://github.com/open-metadata/OpenMetadata. It helps OpenMetadata reach wider audience and helps"
" our community.\n",
)
def env_file_check(env_file_path) -> Optional[pathlib.Path]:
"""
Method for checking if the env file path is valid
"""
if env_file_path is not None:
if env_file_path == "":
raise ValueError("Please provide path to env file")
logger.info(f"Using env file from {env_file_path}")
return pathlib.Path(env_file_path)
return None
def file_path_check(file_path, database: str):
"""
Method for checking if the file path is valid
"""
docker_compose_file_name = BACKEND_DATABASES.get(database) or DEFAULT_COMPOSE_FILE
if file_path is None:
docker_compose_file_path = (
pathlib.Path(tempfile.gettempdir()) / docker_compose_file_name
)
if not docker_compose_file_path.exists():
logger.info(
f"Downloading latest docker compose file {docker_compose_file_name} from openmetadata repository..."
)
resp = requests.get(
f"{DOCKER_URL_ROOT}{docker_compose_file_name}", timeout=REQUESTS_TIMEOUT
)
with open(docker_compose_file_path, "wb") as docker_compose_file_handle:
docker_compose_file_handle.write(resp.content)
docker_compose_file_handle.close()
else:
if file_path == "":
raise ValueError("Please Provide Path to local docker-compose.yml file")
logger.info(f"Using docker compose file from {file_path}")
docker_compose_file_path = pathlib.Path(file_path)
return docker_compose_file_path
def run_docker( # pylint: disable=too-many-branches too-many-statements
docker_obj_instance: DockerActions,
file_path: str,
env_file_path: str,
ingest_sample_data: bool,
database: str,
):
"""
Main method for the OpenMetadata docker commands
"""
try:
# We just want to import docker client when needed
from python_on_whales import ( # pylint: disable=import-outside-toplevel
DockerClient,
)
docker = DockerClient(compose_project_name="openmetadata", compose_files=[])
logger.info("Checking if docker compose is installed...")
if not docker.compose.is_installed():
raise RuntimeError("Docker Compose CLI is not installed on the system.")
docker_info = docker.info()
logger.info("Checking if docker service is running...")
if not docker_info.id:
raise RuntimeError("Docker Service is not up and running.")
logger.info("Checking openmetadata memory constraints...")
if docker_info.mem_total < MIN_MEMORY_LIMIT:
raise MemoryError
# Check for -f <Path>
docker_compose_file_path = file_path_check(file_path, database)
env_file = env_file_check(env_file_path)
# Set up Docker Client Config with docker compose file path
docker = DockerClient(
compose_project_name="openmetadata",
compose_files=[docker_compose_file_path],
compose_env_file=env_file,
compose_project_directory=pathlib.Path(),
)
if docker_obj_instance.start:
start_docker(
docker=docker,
start_time=time.time(),
file_path=file_path,
ingest_sample_data=ingest_sample_data,
)
if docker_obj_instance.pause:
logger.info("Pausing docker compose for OpenMetadata...")
docker.compose.pause()
logger.info("Pausing docker compose for OpenMetadata successful.")
if docker_obj_instance.resume:
logger.info("Resuming docker compose for OpenMetadata...")
docker.compose.unpause()
logger.info("Resuming docker compose for OpenMetadata Successful.")
if docker_obj_instance.stop:
logger.info("Stopping docker compose for OpenMetadata...")
docker.compose.stop()
logger.info("Docker compose for OpenMetadata stopped successfully.")
if docker_obj_instance.reset_db:
reset_db_om(docker)
if docker_obj_instance.clean:
logger.info(
"Stopping docker compose for OpenMetadata and removing images, networks, volumes..."
)
logger.info(
"Do you want to delete the MySQL docker volume with the OpenMetadata data?"
)
user_response = input("Please enter [y/N]\n")
if user_response == "y":
try:
shutil.rmtree(MAIN_DIR)
except FileNotFoundError:
pass
docker.compose.down(remove_orphans=True, remove_images="all", volumes=True)
logger.info(
"Stopped docker compose for OpenMetadata and removing images, networks, volumes."
)
if file_path is None:
docker_compose_file_path.unlink()
except MemoryError:
logger.debug(traceback.format_exc())
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=False,
message="Please Allocate More memory to Docker.\nRecommended: 6GB+\nCurrent: "
f"{round(float(dict(docker_info).get('mem_total')) / CALC_GB)}",
)
except Exception as exc:
logger.debug(traceback.format_exc())
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=False, message=f"{str(exc)}"
)
def reset_db_om(docker):
"""
Reset the OpenMetadata Database and clear everything in the DB
"""
if docker.container.inspect("openmetadata_server").state.running:
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=False,
message="Resetting OpenMetadata.\nThis will clear out all the data",
)
docker.container.execute(
container="openmetadata_server",
tty=True,
command=[
"/bin/bash",
"-c",
"./openmetadata-*/bootstrap/bootstrap_storage.sh drop-create-all",
],
)
else:
log_ansi_encoded_string(
color=ANSI.YELLOW,
bold=False,
message="OpenMetadata Instance is not up and running",
)
def wait_for_containers(docker) -> None:
"""
Wait until docker containers are running
"""
while True:
running = (
docker.container.inspect("openmetadata_server").state.running
and docker.container.inspect("openmetadata_ingestion").state.running
)
if running:
break
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(5)
def run_sample_data() -> None:
"""
Trigger sample data DAGs
"""
base_url = "http://localhost:8080/api"
dags = ["sample_data", "sample_usage", "index_metadata"]
client_config = ClientConfig(
base_url=base_url,
auth_header="Authorization",
auth_token_mode="Basic",
access_token=to_native_string(
b64encode(b":".join(("admin".encode(), "admin".encode()))).strip()
),
)
client = REST(client_config)
timeout = time.time() + 60 * 5 # Timeout of 5 minutes
while True:
try:
resp = client.get("/dags")
if resp:
break
if time.time() > timeout:
raise TimeoutError("Ingestion container timed out")
except TimeoutError as err:
logger.debug(traceback.format_exc())
sys.stdout.write(str(err))
sys.exit(1)
except Exception:
sys.stdout.write(".")
time.sleep(5)
for dag in dags:
json_sample_data = {"is_paused": False}
client.patch(f"/dags/{dag}", data=json.dumps(json_sample_data))

View File

@ -1,109 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
utility to update dat config file for migration from v0.12.3 to 0.13
"""
import json
import os
from copy import deepcopy
from metadata.utils.logger import cli_logger
logger = cli_logger()
def run_openmetadata_dag_config_migration(dir_path: str, keep_backups: bool) -> None:
"""Update DAG config file by removing dbtConfig key and
supportMetadataExtraction keys
Args:
dir_path (str): path directory defaults to `/opt/airflow/dag_generated_configs`
"""
for root, _, files in os.walk(dir_path):
filenames = [file_ for file_ in files if os.path.splitext(file_)[1] == ".json"]
logger.info(
f"{len(filenames)} files found in `{root}`."
"\nChecking config. in the following files:\n\t{file_list}".format(
file_list="\n\t".join(filenames)
)
)
for filename in filenames:
logger.info(f"Checking config. file: {filename}")
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as config_file:
try:
orig_fle_data = json.loads(config_file.read())
except json.JSONDecodeError:
logger.error(
f"Error decoding file {filename}. "
"The file will be skipped. You should verify if the file needs any update manually."
)
continue
fle_data = deepcopy(
orig_fle_data
) # We keep a copy of the original file data to see if any change was made
try:
fle_data["sourceConfig"]["config"]["dbtConfigSource"]
except KeyError:
logger.error(
f"Could not find the key `dbtConfigSource` in {filename}. Skipping key deletion."
)
else:
del fle_data["sourceConfig"]["config"]["dbtConfigSource"]
logger.info(f"Successfully removed key `dbtConfigSource` in {filename}")
try:
fle_data["openMetadataServerConnection"]["supportsMetadataExtraction"]
except KeyError:
logger.error(
f"Could not find the key `supportsMetadataExtraction` in {filename}. Skipping key deletion."
)
else:
del fle_data["openMetadataServerConnection"][
"supportsMetadataExtraction"
]
logger.info(
f"Successfully removed key `supportsMetadataExtraction` in {filename}"
)
try:
fle_data["sourceConfig"]["config"]["markDeletedTablesFromFilterOnly"]
except KeyError:
logger.error(
f"Could not find the key `markDeletedTablesFromFilterOnly` in {filename}. Skipping key deletion."
)
else:
del fle_data["sourceConfig"]["config"][
"markDeletedTablesFromFilterOnly"
]
logger.info(
f"Successfully removed key `markDeletedTablesFromFilterOnly` in {filename}"
)
if orig_fle_data != fle_data:
with open(
os.path.join(root, filename), "w", encoding="utf-8"
) as config_file:
config_file.write(json.dumps(fle_data))
logger.info(f"File {filename} successfully updated")
if keep_backups:
with open(
os.path.join(root, f"{filename}.bak"), "w", encoding="utf-8"
) as bak_config_file:
bak_config_file.write(json.dumps(orig_fle_data))
logger.info(f"Backup File {filename}.bak successfully updated")

View File

@ -1,71 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
utility to update import for migration from v0.11.5 to 0.12
"""
import os
from metadata.utils.logger import cli_logger
logger = cli_logger()
V115_IMPORT_STRING = "from openmetadata."
V115_DAG_CONFIG_PATH = '"/airflow/dag_generated_configs/'
V12_IMPORT_STRING = "from openmetadata_managed_apis."
V12_DAG_CONFIG_PATH = '"/opt/airflow/dag_generated_configs/'
def run_openmetadata_imports_migration(
dir_path: str, change_config_file_path: bool
) -> None:
"""Given a path to the DAG folder we'll look for openmetadata import and update the package to
`openmetadata_managed_apis`
Args:
dir_path (str): path to the DAG folder
"""
for root, _, filenames in os.walk(dir_path):
filenames = [
filename for filename in filenames if os.path.splitext(filename)[1] == ".py"
]
logger.info(
f"{len(filenames)} files found in `{root}`."
"\nChecking for imports in the following files:\n\t{file_list}".format(
file_list="\n\t".join(filenames)
)
)
for filename in filenames:
logger.info(f"Checking imports in {filename}")
with open(os.path.join(root, filename), "r", encoding="utf-8") as dag_fle:
orig_fle_data = dag_fle.read()
fle_data = orig_fle_data # We keep a copy of the original file data to see if any change was made
if V115_IMPORT_STRING in fle_data:
fle_data = fle_data.replace(V115_IMPORT_STRING, V12_IMPORT_STRING)
logger.info(
f"Imports found in {filename}. Replaced `{V115_IMPORT_STRING}` with `{V12_IMPORT_STRING}`"
)
if change_config_file_path and V115_DAG_CONFIG_PATH in fle_data:
fle_data = fle_data.replace(V115_DAG_CONFIG_PATH, V12_DAG_CONFIG_PATH)
logger.info(
f"Old config path found. Replaced {V115_DAG_CONFIG_PATH} with {V12_DAG_CONFIG_PATH}."
)
if orig_fle_data != fle_data:
with open(
os.path.join(root, filename), "w", encoding="utf-8"
) as dag_file:
dag_file.write(fle_data)

View File

@ -9,7 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module defines the CLI commands for OpenMetada
This module defines the CLI commands for OpenMetadata
"""
import argparse
import logging
@ -21,16 +21,9 @@ from metadata.__version__ import get_metadata_version
from metadata.cli.app import run_app
from metadata.cli.backup import UploadDestinationType, run_backup
from metadata.cli.dataquality import run_test
from metadata.cli.docker import BACKEND_DATABASES, DockerActions, run_docker
from metadata.cli.ingest import run_ingest
from metadata.cli.insight import run_insight
from metadata.cli.lineage import run_lineage
from metadata.cli.openmetadata_dag_config_migration import (
run_openmetadata_dag_config_migration,
)
from metadata.cli.openmetadata_imports_migration import (
run_openmetadata_imports_migration,
)
from metadata.cli.profile import run_profiler
from metadata.cli.restore import run_restore
from metadata.cli.usage import run_usage
@ -45,15 +38,12 @@ class MetadataCommands(Enum):
USAGE = "usage"
PROFILE = "profile"
TEST = "test"
DOCKER = "docker"
BACKUP = "backup"
RESTORE = "restore"
WEBHOOK = "webhook"
INSIGHT = "insight"
LINEAGE = "lineage"
APP = "app"
OPENMETADATA_IMPORTS_MIGRATION = "openmetadata_imports_migration"
OPENMETADATA_DAG_CONFIG_MIGRATION = "openmetadata_dag_config_migration"
RUN_PATH_METHODS = {
@ -67,21 +57,6 @@ RUN_PATH_METHODS = {
}
OM_IMPORTS_MIGRATION = """
Update DAG files generated after creating workflow in 0.11 and before.
In 0.12 the airflow managed API package name changed from `openmetadata` to
`openmetadata_managed_apis` hence breaking existing DAGs.
The `dag_generated_config` folder also changed location in Docker.
This small CLI utility allows you to update both elements.
"""
OM_DAG_CONFIG_MIGRATION = """
Update DAG Config files generated after creating workflow in 0.12 and before.
In 0.13 certains keys of the dag config. files have been removed. This small
utility command allows you to update legacy dag config files. Note this can
also be done manually through the UI by clicking on `redeploy`
"""
BACKUP_HELP = """
Run a backup for the metadata DB. Uses a custom dump strategy for OpenMetadata tables.
@ -118,91 +93,6 @@ def create_common_config_parser_args(parser: argparse.ArgumentParser):
)
def create_openmetadata_imports_migration_args(parser: argparse.ArgumentParser):
parser.add_argument(
"-d",
"--dir-path",
default="/opt/airflow/dags",
type=Path,
help="Path to the DAG folder. Default to `/opt/airflow/dags`",
)
parser.add_argument(
"--change-config-file-path",
help="Flag option. If pass this will try to change the path of the dag config files",
type=bool,
)
def create_openmetadata_dag_config_migration_args(parser: argparse.ArgumentParser):
parser.add_argument(
"-d",
"--dir-path",
default="/opt/airflow/dag_generated_configs",
type=Path,
help="Path to the DAG folder. Default to `/opt/airflow/dag_generated_configs`",
)
parser.add_argument(
"--keep-backups",
help="Flag option. If passed, old files will be kept as backups <filename>.json.bak",
action="store_true",
)
def docker_args(parser: argparse.ArgumentParser):
"""
Additional Parser Arguments for Docker
"""
parser.add_argument(
"--start", help="Start release docker containers", action="store_true"
)
parser.add_argument(
"--stop", help="Stops openmetadata docker containers", action="store_true"
)
parser.add_argument(
"--pause", help="Pause openmetadata docker containers", action="store_true"
)
parser.add_argument(
"--resume",
help="Resume/Unpause openmetadata docker containers",
action="store_true",
)
parser.add_argument(
"--clean",
help="Stops and remove openmetadata docker containers along with images, volumes, networks associated",
action="store_true",
)
parser.add_argument(
"-f",
"--file-path",
help="Path to Local docker-compose.yml",
type=Path,
required=False,
)
parser.add_argument(
"-env-file",
"--env-file-path",
help="Path to env file containing the environment variables",
type=Path,
required=False,
)
parser.add_argument(
"--reset-db", help="Reset OpenMetadata Data", action="store_true"
)
parser.add_argument(
"--ingest-sample-data",
help="Enable the sample metadata ingestion",
action="store_true",
)
parser.add_argument(
"-db",
"--database",
choices=list(BACKEND_DATABASES.keys()),
default="mysql",
)
def webhook_args(parser: argparse.ArgumentParser):
"""
Additional Parser Arguments for Webhook
@ -384,21 +274,6 @@ def get_parser(args=None):
help="Workflow for running external applications",
)
)
create_openmetadata_imports_migration_args(
sub_parser.add_parser(
MetadataCommands.OPENMETADATA_IMPORTS_MIGRATION.value,
help=OM_IMPORTS_MIGRATION,
)
)
create_openmetadata_dag_config_migration_args(
sub_parser.add_parser(
MetadataCommands.OPENMETADATA_DAG_CONFIG_MIGRATION.value,
help=OM_DAG_CONFIG_MIGRATION,
)
)
docker_args(
sub_parser.add_parser(MetadataCommands.DOCKER.value, help="Docker Quickstart")
)
backup_args(
sub_parser.add_parser(
MetadataCommands.BACKUP.value,
@ -419,7 +294,7 @@ def get_parser(args=None):
)
create_common_config_parser_args(
sub_parser.add_parser(
MetadataCommands.INSIGHT.value, help="Data Insigt Workflow"
MetadataCommands.INSIGHT.value, help="Data Insights Workflow"
)
)
@ -479,21 +354,6 @@ def metadata(args=None):
),
sql_file=contains_args.get("input"),
)
if metadata_workflow == MetadataCommands.DOCKER.value:
run_docker(
docker_obj_instance=DockerActions(
start=contains_args.get("start"),
stop=contains_args.get("stop"),
pause=contains_args.get("pause"),
resume=contains_args.get("resume"),
clean=contains_args.get("clean"),
reset_db=contains_args.get("reset_db"),
),
file_path=contains_args.get("file_path"),
env_file_path=contains_args.get("env_file_path"),
ingest_sample_data=contains_args.get("ingest_sample_data"),
database=contains_args.get("database"),
)
if metadata_workflow == MetadataCommands.WEBHOOK.value:
class WebhookHandler(BaseHTTPRequestHandler):
@ -518,13 +378,3 @@ def metadata(args=None):
(contains_args.get("host"), contains_args.get("port")), WebhookHandler
) as server:
server.serve_forever()
if metadata_workflow == MetadataCommands.OPENMETADATA_IMPORTS_MIGRATION.value:
run_openmetadata_imports_migration(
contains_args.get("dir_path"), contains_args.get("change_config_file_path")
)
if metadata_workflow == MetadataCommands.OPENMETADATA_DAG_CONFIG_MIGRATION.value:
run_openmetadata_dag_config_migration(
contains_args.get("dir_path"), contains_args.get("keep_backups")
)

View File

@ -1,109 +0,0 @@
{
"id": "55048a4e-d23e-4c18-b684-03bfb51308a8",
"name": "rds_metadata_LTu8aqU4",
"displayName": "rds_metadata_LTu8aqU4",
"description": null,
"pipelineType": "metadata",
"owner": {
"id": "9f815a7d-df5a-4cf3-969d-8579becce8e4",
"type": "user",
"name": "admin",
"fullyQualifiedName": "admin",
"description": null,
"displayName": null,
"deleted": false,
"href": null
},
"fullyQualifiedName": "rds.rds_metadata_LTu8aqU4",
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
"markDeletedTables": true,
"markDeletedTablesFromFilterOnly": false,
"includeTables": true,
"includeViews": false,
"includeTags": false,
"useFqnForFiltering": false,
"schemaFilterPattern": {
"includes": [
"dbt_jaffle"
],
"excludes": null
},
"tableFilterPattern": null,
"databaseFilterPattern": null,
"dbtConfigSource": {
"dbtCatalogFilePath": "/opt/airflow/catalog.json",
"dbtManifestFilePath": "/opt/airflow/manifest.json",
"dbtRunResultsFilePath": "",
"dbtUpdateDescriptions": false
}
}
},
"openMetadataServerConnection": {
"clusterName": "openmetadata",
"type": "OpenMetadata",
"hostPort": "http://openmetadata-server:8585/api",
"authProvider": "openmetadata",
"verifySSL": "no-ssl",
"sslConfig": null,
"securityConfig": {
"jwtToken": "myToken"
},
"secretsManagerProvider": "noop",
"apiVersion": "v1",
"includeTopics": true,
"includeTables": true,
"includeDashboards": true,
"includePipelines": true,
"includeMlModels": true,
"includeUsers": true,
"includeTeams": true,
"includeGlossaryTerms": true,
"includeTags": true,
"includePolicy": true,
"includeMessagingServices": true,
"enableVersionValidation": true,
"includeDatabaseServices": true,
"includePipelineServices": true,
"limitRecords": 1000,
"forceEntityOverwriting": false,
"supportsMetadataExtraction": true
},
"airflowConfig": {
"pausePipeline": false,
"concurrency": 1,
"startDate": null,
"endDate": null,
"pipelineTimezone": "UTC",
"retries": 3,
"retryDelay": 300,
"pipelineCatchup": false,
"scheduleInterval": "0 * * * *",
"maxActiveRuns": 1,
"workflowTimeout": null,
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
"email": null
},
"service": {
"id": "a8517b60-e69b-4841-b123-2398fedf9f28",
"type": "databaseService",
"name": "rds",
"fullyQualifiedName": "rds",
"description": "",
"displayName": null,
"deleted": false,
"href": null
},
"pipelineStatuses": null,
"loggerLevel": "INFO",
"deployed": null,
"enabled": true,
"href": "http://localhost:8585/api/v1/services/ingestionPipelines/55048a4e-d23e-4c18-b684-03bfb51308a8",
"version": 0.1,
"updatedAt": 1671541748444,
"updatedBy": "admin",
"changeDescription": null,
"deleted": false
}

View File

@ -1,109 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test dag config migration cli script
"""
import json
import os
from pathlib import Path
from unittest import TestCase
from metadata.cli.openmetadata_dag_config_migration import (
run_openmetadata_dag_config_migration,
)
class TestOpenmetadataImportsMigration(TestCase):
"""Test class for the cli scrip test"""
store = dict()
resources_path = Path(__file__).parent.absolute() / "resources"
@classmethod
def setUpClass(cls) -> None:
for root, _, filenames in os.walk(cls.resources_path):
for filename in filenames:
with open(os.path.join(root, filename), "r", encoding="utf-8") as fle:
cls.store[os.path.join(root, filename)] = fle.read()
def test_run_openmetadata_imports_migration_w_bak(self):
"""test the run openmetadata function"""
run_openmetadata_dag_config_migration(self.resources_path, True)
failures = []
for root, _, filenames in os.walk(self.resources_path):
assert any(".json.bak" in filename for filename in filenames)
for filename in filenames:
if os.path.splitext(filename)[1] == ".json":
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as fle:
data = json.loads(fle.read())
try:
data["sourceConfig"]["config"]["dbtConfigSource"]
except KeyError:
pass
else:
failures.append(filename)
try:
data["openMetadataServerConnection"][
"supportsMetadataExtraction"
]
except KeyError:
pass
else:
failures.append(filename)
try:
data["sourceConfig"]["config"][
"markDeletedTablesFromFilterOnly"
]
except KeyError:
pass
else:
failures.append(filename)
if os.path.splitext(filename)[1] == ".bak":
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as fle:
data = json.loads(fle.read())
try:
data["sourceConfig"]["config"]["dbtConfigSource"]
except KeyError:
failures.append(filename)
try:
data["openMetadataServerConnection"][
"supportsMetadataExtraction"
]
except KeyError:
failures.append(filename)
try:
data["sourceConfig"]["config"][
"markDeletedTablesFromFilterOnly"
]
except KeyError:
failures.append(filename)
assert not failures
@classmethod
def tearDownClass(cls) -> None:
for file_path, file_content in cls.store.items():
with open(file_path, "w", encoding="utf-8") as fle:
fle.write(file_content)
for root, _, filenames in os.walk(cls.resources_path):
bak_files = [
file_ for file_ in filenames if os.path.splitext(file_)[1] == ".bak"
]
for bak_file in bak_files:
os.remove(os.path.join(root, bak_file))

View File

@ -1,60 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test import migration cli script
"""
import os
from pathlib import Path
from unittest import TestCase
from metadata.cli.openmetadata_imports_migration import (
run_openmetadata_imports_migration,
)
class TestOpenmetadataImportsMigration(TestCase):
"""Test class for the cli scrip test"""
store = dict()
resources_path = Path(__file__).parent.absolute() / "resources"
@classmethod
def setUpClass(cls) -> None:
for root, _, filenames in os.walk(cls.resources_path):
for filename in filenames:
with open(os.path.join(root, filename), "r", encoding="utf-8") as fle:
cls.store[os.path.join(root, filename)] = fle.read()
def test_run_openmetadata_imports_migration(self):
"""test the run openmetadata function"""
run_openmetadata_imports_migration(self.resources_path, True)
failures = []
for root, _, filenames in os.walk(self.resources_path):
for filename in filenames:
if os.path.splitext(filename)[1] == ".py":
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as fle:
data = fle.read()
if "from openmetadata_managed_apis." not in data:
failures.append(filename)
if "/opt/airflow/dag_generated_configs" not in data:
failures.append(filename)
assert not failures
@classmethod
def tearDownClass(cls) -> None:
for file_path, file_content in cls.store.items():
with open(file_path, "w", encoding="utf-8") as fle:
fle.write(file_content)

View File

@ -108,6 +108,9 @@ After the migration is finished, you can revert this changes.
We will deprecate the dictionary annotation in the 1.4 release, since the new annotation allows you to define lineage between
assets other than Tables.
- On 1.4.0 we will deprecate the `metadata backup` and `metadata restore` commands in favor of native backup & restore tools
from MySQL and PostgreSQL. We will provide a guide on how to use these tools to backup and restore OpenMetadata metadata.
# Breaking Changes
## 1.3.0

View File

@ -110,186 +110,17 @@ After the migration is finished, you can revert this changes.
# Breaking Changes
## 1.3.0
## 1.4.0
### New Alerts and Observability
### Metadata Docker CLI
{% note noteType="Warning" %}
For the past releases, we have been updating the documentation to point users to directly run the docker quickstart
with the docker compose files in the release page ([docs](quick-start/local-docker-deployment)).
Upgrading to OpenMetadata 1.3.0 will REMOVE your existing Alerts. **You will need to recreate your alerts manually.**
In this release, we're completely removing the support for `metadata docker`.
{% /note %}
### Metadata Backup & Restore
We have fully reworked how we manage alerts to make the experience easier for end users, with a more comprehensive
list of sources, filters and actions.
This process required a full backend rewrite, which means that there is no automatic way to migrate alerts from the old system.
{% image
src="/images/v1.4/deployment/upgrade/alerts.png"
alt="alerts"
caption="New Alerts UI"
/%}
### Secrets Manager
The Secrets Manager `noop` option has been renamed to `db`. You can find this in the config below:
```yaml
secretsManagerConfiguration:
secretsManager: ${SECRET_MANAGER:-db} # Possible values are "db", "managed-aws", "managed-aws-ssm"
prefix: ${SECRET_MANAGER_PREFIX:-""} # Define the secret key ID as /<prefix>/<clusterName>/<key>
tags: ${SECRET_MANAGER_TAGS:-[]} # Add tags to the created resource, e.g., in AWS. Format is `[key1:value1,key2:value2,...]`
```
Either update your YAMLs or the env var you are using under `SECRET_MANAGER`.
Note how we also added the possibility to add `prefix` when defining the secret key ID in the external secrets managers and
the option to tag the created resources.
### Docker user
In this release we updated the server [Dockerfile](https://github.com/open-metadata/OpenMetadata/blob/1.3.0/docker/development/Dockerfile#L34)
to work with `openmetadata` as a user instead of root.
If you're mapping volumes, specially when [configuring JWK](https://docs.open-metadata.org/v1.4.x/deployment/docker#add-docker-volumes-for-openmetadata-server-compose-service),
you will need to update the owner of the directory to get it working with the new `openmetadata` user.
You will need to run:
```bash
chown 1000 private_key.der
```
Otherwise, you'll see a similar error in your server logs:
```
ERROR [2024-02-08 15:29:36,792] [main] o.o.s.s.j.JWTTokenGenerator - Failed to initialize JWTTokenGenerator
java.nio.file.AccessDeniedException: /etc/openmetadata/jwtkeys/private_key.der
at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
...
```
### Elasticsearch reindex from Python
In 1.2.0 we introduced the Elasticsearch reindex job as part of the OpenMetadata server. In this release, we
removed triggering ES job from Python workflows. Everything happens in the server now. The image will not ship the `metadata_to_es` DAG anymore.
### Ingestion & Ingestion Base Python Version
The `openmetadata/ingestion` and `openmetadata/ingestion-base` images now use Python 3.10.
Note that starting release 1.3.0, the `openmetadata-ingestion` package started supporting Python 3.11. We'll
migrate the images to 3.11 in the next release.
### Python SDK Auth Mechanisms
We cleaned all the Python SDK code related to any auth system that is not JWT token. Bots deprecated that behavior 2 releases ago
and only supported JWT. This is now reflected in the code.
### Airflow Connection
Removed the `MSSQL` connection option from airflow backend database. This is due to the option being experimental and
will be deprecated by the Airflow team. For more information refer to the [link](https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#choosing-database-backend).
If you are using airflow with `MSSQL` backend, we recommend switching it to the supported backends e.g., `MYSQL` or `POSTGRES`.
This is what has been removed:
```yaml
...
connection:
type: Mssql
username: user
password: pass
hostPort: localhost:1433
database: dev
```
### Custom Connectors
In 1.3.0 we started registering more information from Ingestion Pipelines status' in the platform. This required
us to create new JSON Schemas for the added properties, that before were only used in the Ingestion Framework.
Due to this, we need to update one import and one of its properties' names.
**StackTraceError**
- From `from metadata.ingestion.api.models import StackTraceError`
- To `from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError`
And we renamed its property `stack_trace` to `stackTrace` to follow the naming conventions in JSON Schemas.
### SQL Lineage
In the `collate-sqllineage` dependency, we have renamed the `sqllineage` import to `collate_sqllineage`.
This change has been made to avoid any conflict with the open source version of `sqllineage`.
In case you are using this package directly in your python scripts please make sure to rename your imports:
- From `from sqllineage.xxx import xxx`
- To `from collate_sqllineage.xxx import xxx`
## Service Connection Changes
### MongoDB Connection
We have removed the connection string authentication from MongoDB service and now we only support
passing the authentication credentials by value.
{% note %}
Before the upgrade make sure you review the mongodb connection
if you have provided the proper connection details/credentials to ensure the smooth migration.
{% /note %}
If you were using connection string based authentication then structure of connection details would change:
#### From
```yaml
...
serviceConnection:
config: Mongo
connectionDetails:
connectionURI: mongodb+srv://user:pass@localhost:27017
```
#### To
```yaml
...
serviceConnection:
config: Mongo
scheme: mongodb+srv
username: username
password: password
hostPort: localhost:27017
```
If you were using connection value based authentication then structure of connection details would change:
#### From
```yaml
...
serviceConnection:
config: Mongo
connectionDetails:
scheme: mongodb+srv
username: username
password: password
hostPort: localhost:27017
```
#### To
```yaml
...
serviceConnection:
config: Mongo
scheme: mongodb+srv
username: username
password: password
hostPort: localhost:27017
```
On the `metadata` CLI, we are deprecating the `backup` and `restore` commands, since users can now completely
rely on native database tools both for MySQL and PostgreSQL. Check the [docs](/deployment/backup-restore-metadata)
for more information