metadata CLI accepts tilde for relative paths (#13487)

* metadata CLI accepts tilde for relative paths

* [Docs] - Extracting MWAA details
This commit is contained in:
Pere Miquel Brull 2023-10-09 09:45:50 +02:00 committed by GitHub
parent f1a3d2addd
commit d31db4e862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 50 additions and 42 deletions

View File

@ -46,11 +46,12 @@ def get_output(output: Optional[str] = None) -> Path:
name = f"openmetadata_{now}_backup.sql" name = f"openmetadata_{now}_backup.sql"
if output: if output:
path = Path(output).expanduser()
# Create the output directory if it does not exist # Create the output directory if it does not exist
if not Path(output).is_dir(): if not path.is_dir():
Path(output).mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
return Path(output) / name return path / name
return Path(name) return Path(name)

View File

@ -12,9 +12,9 @@
""" """
Data quality utility for the metadata CLI Data quality utility for the metadata CLI
""" """
import pathlib
import sys import sys
import traceback import traceback
from pathlib import Path
from metadata.config.common import load_config_file from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger from metadata.utils.logger import cli_logger
@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import (
logger = cli_logger() logger = cli_logger()
def run_test(config_path: str) -> None: def run_test(config_path: Path) -> None:
""" """
Run the Data Quality Test Suites workflow from a config path Run the Data Quality Test Suites workflow from a config path
to a JSON or YAML file to a JSON or YAML file
:param config_path: Path to load JSON config :param config_path: Path to load JSON config
""" """
config_file = pathlib.Path(config_path)
workflow_config_dict = None workflow_config_dict = None
try: try:
workflow_config_dict = load_config_file(config_file) workflow_config_dict = load_config_file(config_path)
logger.debug(f"Using config: {workflow_config_dict}") logger.debug(f"Using config: {workflow_config_dict}")
workflow = TestSuiteWorkflow.create(workflow_config_dict) workflow = TestSuiteWorkflow.create(workflow_config_dict)
except Exception as exc: except Exception as exc:

View File

@ -12,9 +12,9 @@
""" """
Profiler utility for the metadata CLI Profiler utility for the metadata CLI
""" """
import pathlib
import sys import sys
import traceback import traceback
from pathlib import Path
from metadata.config.common import load_config_file from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger from metadata.utils.logger import cli_logger
@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import (
logger = cli_logger() logger = cli_logger()
def run_ingest(config_path: str) -> None: def run_ingest(config_path: Path) -> None:
""" """
Run the ingestion workflow from a config path Run the ingestion workflow from a config path
to a JSON or YAML file to a JSON or YAML file
:param config_path: Path to load JSON config :param config_path: Path to load JSON config
""" """
config_file = pathlib.Path(config_path)
config_dict = None config_dict = None
try: try:
config_dict = load_config_file(config_file) config_dict = load_config_file(config_path)
workflow = MetadataWorkflow.create(config_dict) workflow = MetadataWorkflow.create(config_dict)
logger.debug(f"Using config: {workflow.config}") logger.debug(f"Using config: {workflow.config}")
except Exception as exc: except Exception as exc:

View File

@ -12,9 +12,9 @@
""" """
Data Insigt utility for the metadata CLI Data Insigt utility for the metadata CLI
""" """
import pathlib
import sys import sys
import traceback import traceback
from pathlib import Path
from metadata.config.common import load_config_file from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger from metadata.utils.logger import cli_logger
@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import (
logger = cli_logger() logger = cli_logger()
def run_insight(config_path: str) -> None: def run_insight(config_path: Path) -> None:
""" """
Run the Data Insigt workflow from a config path Run the Data Insigt workflow from a config path
to a JSON or YAML file to a JSON or YAML file
:param config_path: Path to load JSON config :param config_path: Path to load JSON config
""" """
config_file = pathlib.Path(config_path)
config_dict = None config_dict = None
try: try:
config_dict = load_config_file(config_file) config_dict = load_config_file(config_path)
workflow = DataInsightWorkflow.create(config_dict) workflow = DataInsightWorkflow.create(config_dict)
logger.debug(f"Using config: {workflow.config}") logger.debug(f"Using config: {workflow.config}")
except Exception as exc: except Exception as exc:

View File

@ -12,9 +12,9 @@
""" """
Lineage utility for the metadata CLI Lineage utility for the metadata CLI
""" """
import pathlib
import sys import sys
import traceback import traceback
from pathlib import Path
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -38,17 +38,16 @@ class LineageWorkflow(BaseModel):
parseTimeout: Optional[int] = 5 * 60 # default parsing timeout to be 5 mins parseTimeout: Optional[int] = 5 * 60 # default parsing timeout to be 5 mins
def run_lineage(config_path: str) -> None: def run_lineage(config_path: Path) -> None:
""" """
Run the ingestion workflow from a config path Run the ingestion workflow from a config path
to a JSON or YAML file to a JSON or YAML file
:param config_path: Path to load JSON config :param config_path: Path to load JSON config
""" """
config_file = pathlib.Path(config_path)
config_dict = None config_dict = None
try: try:
config_dict = load_config_file(config_file) config_dict = load_config_file(config_path)
workflow = LineageWorkflow.parse_obj(config_dict) workflow = LineageWorkflow.parse_obj(config_dict)
except Exception as exc: except Exception as exc:

View File

@ -12,9 +12,9 @@
""" """
Ingest utility for the metadata CLI Ingest utility for the metadata CLI
""" """
import pathlib
import sys import sys
import traceback import traceback
from pathlib import Path
from metadata.config.common import load_config_file from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger from metadata.utils.logger import cli_logger
@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import (
logger = cli_logger() logger = cli_logger()
def run_profiler(config_path: str) -> None: def run_profiler(config_path: Path) -> None:
""" """
Run the Profiler workflow from a config path Run the Profiler workflow from a config path
to a JSON or YAML file to a JSON or YAML file
:param config_path: Path to load JSON config :param config_path: Path to load JSON config
""" """
config_file = pathlib.Path(config_path)
workflow_config_dict = None workflow_config_dict = None
try: try:
workflow_config_dict = load_config_file(config_file) workflow_config_dict = load_config_file(config_path)
logger.debug(f"Using config: {workflow_config_dict}") logger.debug(f"Using config: {workflow_config_dict}")
workflow = ProfilerWorkflow.create(workflow_config_dict) workflow = ProfilerWorkflow.create(workflow_config_dict)
except Exception as exc: except Exception as exc:

View File

@ -12,9 +12,9 @@
""" """
Usage utility for the metadata CLI Usage utility for the metadata CLI
""" """
import pathlib
import sys import sys
import traceback import traceback
from pathlib import Path
from metadata.config.common import load_config_file from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger from metadata.utils.logger import cli_logger
@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import (
logger = cli_logger() logger = cli_logger()
def run_usage(config_path: str) -> None: def run_usage(config_path: Path) -> None:
""" """
Run the usage workflow from a config path Run the usage workflow from a config path
to a JSON or YAML file to a JSON or YAML file
:param config_path: Path to load JSON config :param config_path: Path to load JSON config
""" """
config_file = pathlib.Path(config_path)
config_dict = None config_dict = None
try: try:
config_dict = load_config_file(config_file) config_dict = load_config_file(config_path)
workflow = UsageWorkflow.create(config_dict) workflow = UsageWorkflow.create(config_dict)
logger.debug(f"Using config: {workflow.config}") logger.debug(f"Using config: {workflow.config}")
except Exception as exc: except Exception as exc:

View File

@ -13,9 +13,9 @@ This module defines the CLI commands for OpenMetada
""" """
import argparse import argparse
import logging import logging
import pathlib
from enum import Enum from enum import Enum
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from metadata.__version__ import get_metadata_version from metadata.__version__ import get_metadata_version
from metadata.cli.backup import UploadDestinationType, run_backup from metadata.cli.backup import UploadDestinationType, run_backup
@ -100,7 +100,7 @@ def create_common_config_parser_args(parser: argparse.ArgumentParser):
"-c", "-c",
"--config", "--config",
help="path to the config file", help="path to the config file",
type=pathlib.Path, type=Path,
required=True, required=True,
) )
@ -110,7 +110,7 @@ def create_openmetadata_imports_migration_args(parser: argparse.ArgumentParser):
"-d", "-d",
"--dir-path", "--dir-path",
default="/opt/airflow/dags", default="/opt/airflow/dags",
type=pathlib.Path, type=Path,
help="Path to the DAG folder. Default to `/opt/airflow/dags`", help="Path to the DAG folder. Default to `/opt/airflow/dags`",
) )
@ -126,7 +126,7 @@ def create_openmetadata_dag_config_migration_args(parser: argparse.ArgumentParse
"-d", "-d",
"--dir-path", "--dir-path",
default="/opt/airflow/dag_generated_configs", default="/opt/airflow/dag_generated_configs",
type=pathlib.Path, type=Path,
help="Path to the DAG folder. Default to `/opt/airflow/dag_generated_configs`", help="Path to the DAG folder. Default to `/opt/airflow/dag_generated_configs`",
) )
@ -164,14 +164,14 @@ def docker_args(parser: argparse.ArgumentParser):
"-f", "-f",
"--file-path", "--file-path",
help="Path to Local docker-compose.yml", help="Path to Local docker-compose.yml",
type=pathlib.Path, type=Path,
required=False, required=False,
) )
parser.add_argument( parser.add_argument(
"-env-file", "-env-file",
"--env-file-path", "--env-file-path",
help="Path to env file containing the environment variables", help="Path to env file containing the environment variables",
type=pathlib.Path, type=Path,
required=False, required=False,
) )
parser.add_argument( parser.add_argument(
@ -233,7 +233,7 @@ def backup_args(parser: argparse.ArgumentParser):
parser.add_argument( parser.add_argument(
"--output", "--output",
help="Local path to store the backup", help="Local path to store the backup",
type=pathlib.Path, type=Path,
default=None, default=None,
) )
parser.add_argument( parser.add_argument(
@ -259,7 +259,7 @@ def backup_args(parser: argparse.ArgumentParser):
def restore_args(parser: argparse.ArgumentParser): def restore_args(parser: argparse.ArgumentParser):
""" """
Addtional Parser Arguments for Restore Additional Parser Arguments for Restore
""" """
parser.add_argument( parser.add_argument(
"-H", "-H",
@ -298,7 +298,7 @@ def restore_args(parser: argparse.ArgumentParser):
parser.add_argument( parser.add_argument(
"--input", "--input",
help="Local backup file path for restore", help="Local backup file path for restore",
type=pathlib.Path, type=Path,
required=True, required=True,
) )
@ -411,6 +411,7 @@ def metadata(args=None): # pylint: disable=too-many-branches
contains_args = vars(get_parser(args)) contains_args = vars(get_parser(args))
metadata_workflow = contains_args.get("command") metadata_workflow = contains_args.get("command")
config_file = contains_args.get("config") config_file = contains_args.get("config")
path = Path(config_file).expanduser()
if contains_args.get("debug"): if contains_args.get("debug"):
set_loggers_level(logging.DEBUG) set_loggers_level(logging.DEBUG)
elif contains_args.get("log_level"): elif contains_args.get("log_level"):
@ -419,17 +420,17 @@ def metadata(args=None): # pylint: disable=too-many-branches
set_loggers_level(logging.INFO) set_loggers_level(logging.INFO)
if metadata_workflow == MetadataCommands.INGEST.value: if metadata_workflow == MetadataCommands.INGEST.value:
run_ingest(config_path=config_file) run_ingest(config_path=path)
if metadata_workflow == MetadataCommands.USAGE.value: if metadata_workflow == MetadataCommands.USAGE.value:
run_usage(config_path=config_file) run_usage(config_path=path)
if metadata_workflow == MetadataCommands.LINEAGE.value: if metadata_workflow == MetadataCommands.LINEAGE.value:
run_lineage(config_path=config_file) run_lineage(config_path=path)
if metadata_workflow == MetadataCommands.INSIGHT.value: if metadata_workflow == MetadataCommands.INSIGHT.value:
run_insight(config_path=config_file) run_insight(config_path=path)
if metadata_workflow == MetadataCommands.PROFILE.value: if metadata_workflow == MetadataCommands.PROFILE.value:
run_profiler(config_path=config_file) run_profiler(config_path=path)
if metadata_workflow == MetadataCommands.TEST.value: if metadata_workflow == MetadataCommands.TEST.value:
run_test(config_path=config_file) run_test(config_path=path)
if metadata_workflow == MetadataCommands.BACKUP.value: if metadata_workflow == MetadataCommands.BACKUP.value:
run_backup( run_backup(
common_backup_obj_instance=BackupRestoreArgs( common_backup_obj_instance=BackupRestoreArgs(

View File

@ -326,6 +326,12 @@ with DAG(
After running the DAG, we can store the connection details and remove the dag file from S3. After running the DAG, we can store the connection details and remove the dag file from S3.
Note that trying to log the `conf.get("core", "sql_alchemy_conn", fallback=None)` details might either result in:
1. An empty string, depending on the Airflow version: If that's the case, you can use update the line to be `conf.get("database", "sql_alchemy_conn", fallback=None)`.
2. The password masked in `****`. If that's the case, you can use `sqlalchemy_conn = list(conf.get("core", "sql_alchemy_conn", fallback=None))`,
which will return the results separated by commas.
#### Preparing the metadata extraction #### Preparing the metadata extraction
We will use ECS here as well to get the metadata out of MWAA. The only important detail is to ensure that we are We will use ECS here as well to get the metadata out of MWAA. The only important detail is to ensure that we are

View File

@ -326,6 +326,12 @@ with DAG(
After running the DAG, we can store the connection details and remove the dag file from S3. After running the DAG, we can store the connection details and remove the dag file from S3.
Note that trying to log the `conf.get("core", "sql_alchemy_conn", fallback=None)` details might either result in:
1. An empty string, depending on the Airflow version: If that's the case, you can use update the line to be `conf.get("database", "sql_alchemy_conn", fallback=None)`.
2. The password masked in `****`. If that's the case, you can use `sqlalchemy_conn = list(conf.get("core", "sql_alchemy_conn", fallback=None))`,
which will return the results separated by commas.
#### Preparing the metadata extraction #### Preparing the metadata extraction
We will use ECS here as well to get the metadata out of MWAA. The only important detail is to ensure that we are We will use ECS here as well to get the metadata out of MWAA. The only important detail is to ensure that we are