diff --git a/ingestion/src/metadata/cli/backup.py b/ingestion/src/metadata/cli/backup.py index 7d97ed650e1..7d59476e9d6 100644 --- a/ingestion/src/metadata/cli/backup.py +++ b/ingestion/src/metadata/cli/backup.py @@ -46,11 +46,12 @@ def get_output(output: Optional[str] = None) -> Path: name = f"openmetadata_{now}_backup.sql" if output: + path = Path(output).expanduser() # Create the output directory if it does not exist - if not Path(output).is_dir(): - Path(output).mkdir(parents=True, exist_ok=True) + if not path.is_dir(): + path.mkdir(parents=True, exist_ok=True) - return Path(output) / name + return path / name return Path(name) diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index 7a97317ab36..edff3964e25 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -12,9 +12,9 @@ """ Data quality utility for the metadata CLI """ -import pathlib import sys import traceback +from pathlib import Path from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger @@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import ( logger = cli_logger() -def run_test(config_path: str) -> None: +def run_test(config_path: Path) -> None: """ Run the Data Quality Test Suites workflow from a config path to a JSON or YAML file :param config_path: Path to load JSON config """ - config_file = pathlib.Path(config_path) workflow_config_dict = None try: - workflow_config_dict = load_config_file(config_file) + workflow_config_dict = load_config_file(config_path) logger.debug(f"Using config: {workflow_config_dict}") workflow = TestSuiteWorkflow.create(workflow_config_dict) except Exception as exc: diff --git a/ingestion/src/metadata/cli/ingest.py b/ingestion/src/metadata/cli/ingest.py index ef22c51d49b..82c83f183fb 100644 --- a/ingestion/src/metadata/cli/ingest.py +++ b/ingestion/src/metadata/cli/ingest.py @@ -12,9 +12,9 @@ """ Profiler utility for the metadata CLI """ -import pathlib import sys import traceback +from pathlib import Path from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger @@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import ( logger = cli_logger() -def run_ingest(config_path: str) -> None: +def run_ingest(config_path: Path) -> None: """ Run the ingestion workflow from a config path to a JSON or YAML file :param config_path: Path to load JSON config """ - config_file = pathlib.Path(config_path) config_dict = None try: - config_dict = load_config_file(config_file) + config_dict = load_config_file(config_path) workflow = MetadataWorkflow.create(config_dict) logger.debug(f"Using config: {workflow.config}") except Exception as exc: diff --git a/ingestion/src/metadata/cli/insight.py b/ingestion/src/metadata/cli/insight.py index 2ed508d594d..fb026e32ddb 100644 --- a/ingestion/src/metadata/cli/insight.py +++ b/ingestion/src/metadata/cli/insight.py @@ -12,9 +12,9 @@ """ Data Insigt utility for the metadata CLI """ -import pathlib import sys import traceback +from pathlib import Path from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger @@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import ( logger = cli_logger() -def run_insight(config_path: str) -> None: +def run_insight(config_path: Path) -> None: """ Run the Data Insigt workflow from a config path to a JSON or YAML file :param config_path: Path to load JSON config """ - config_file = pathlib.Path(config_path) config_dict = None try: - config_dict = load_config_file(config_file) + config_dict = load_config_file(config_path) workflow = DataInsightWorkflow.create(config_dict) logger.debug(f"Using config: {workflow.config}") except Exception as exc: diff --git a/ingestion/src/metadata/cli/lineage.py b/ingestion/src/metadata/cli/lineage.py index 41cea5b5e25..c1dcbd06e29 100644 --- a/ingestion/src/metadata/cli/lineage.py +++ b/ingestion/src/metadata/cli/lineage.py @@ -12,9 +12,9 @@ """ Lineage utility for the metadata CLI """ -import pathlib import sys import traceback +from pathlib import Path from typing import Optional from pydantic import BaseModel @@ -38,17 +38,16 @@ class LineageWorkflow(BaseModel): parseTimeout: Optional[int] = 5 * 60 # default parsing timeout to be 5 mins -def run_lineage(config_path: str) -> None: +def run_lineage(config_path: Path) -> None: """ Run the ingestion workflow from a config path to a JSON or YAML file :param config_path: Path to load JSON config """ - config_file = pathlib.Path(config_path) config_dict = None try: - config_dict = load_config_file(config_file) + config_dict = load_config_file(config_path) workflow = LineageWorkflow.parse_obj(config_dict) except Exception as exc: diff --git a/ingestion/src/metadata/cli/profile.py b/ingestion/src/metadata/cli/profile.py index 0ca17da5bbe..106780ef7f0 100644 --- a/ingestion/src/metadata/cli/profile.py +++ b/ingestion/src/metadata/cli/profile.py @@ -12,9 +12,9 @@ """ Ingest utility for the metadata CLI """ -import pathlib import sys import traceback +from pathlib import Path from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger @@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import ( logger = cli_logger() -def run_profiler(config_path: str) -> None: +def run_profiler(config_path: Path) -> None: """ Run the Profiler workflow from a config path to a JSON or YAML file :param config_path: Path to load JSON config """ - config_file = pathlib.Path(config_path) workflow_config_dict = None try: - workflow_config_dict = load_config_file(config_file) + workflow_config_dict = load_config_file(config_path) logger.debug(f"Using config: {workflow_config_dict}") workflow = ProfilerWorkflow.create(workflow_config_dict) except Exception as exc: diff --git a/ingestion/src/metadata/cli/usage.py b/ingestion/src/metadata/cli/usage.py index 07f22041b82..20a87a56273 100644 --- a/ingestion/src/metadata/cli/usage.py +++ b/ingestion/src/metadata/cli/usage.py @@ -12,9 +12,9 @@ """ Usage utility for the metadata CLI """ -import pathlib import sys import traceback +from pathlib import Path from metadata.config.common import load_config_file from metadata.utils.logger import cli_logger @@ -28,17 +28,16 @@ from metadata.workflow.workflow_output_handler import ( logger = cli_logger() -def run_usage(config_path: str) -> None: +def run_usage(config_path: Path) -> None: """ Run the usage workflow from a config path to a JSON or YAML file :param config_path: Path to load JSON config """ - config_file = pathlib.Path(config_path) config_dict = None try: - config_dict = load_config_file(config_file) + config_dict = load_config_file(config_path) workflow = UsageWorkflow.create(config_dict) logger.debug(f"Using config: {workflow.config}") except Exception as exc: diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 1f7728ad607..39883dbaf2f 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -13,9 +13,9 @@ This module defines the CLI commands for OpenMetada """ import argparse import logging -import pathlib from enum import Enum from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path from metadata.__version__ import get_metadata_version from metadata.cli.backup import UploadDestinationType, run_backup @@ -100,7 +100,7 @@ def create_common_config_parser_args(parser: argparse.ArgumentParser): "-c", "--config", help="path to the config file", - type=pathlib.Path, + type=Path, required=True, ) @@ -110,7 +110,7 @@ def create_openmetadata_imports_migration_args(parser: argparse.ArgumentParser): "-d", "--dir-path", default="/opt/airflow/dags", - type=pathlib.Path, + type=Path, help="Path to the DAG folder. Default to `/opt/airflow/dags`", ) @@ -126,7 +126,7 @@ def create_openmetadata_dag_config_migration_args(parser: argparse.ArgumentParse "-d", "--dir-path", default="/opt/airflow/dag_generated_configs", - type=pathlib.Path, + type=Path, help="Path to the DAG folder. Default to `/opt/airflow/dag_generated_configs`", ) @@ -164,14 +164,14 @@ def docker_args(parser: argparse.ArgumentParser): "-f", "--file-path", help="Path to Local docker-compose.yml", - type=pathlib.Path, + type=Path, required=False, ) parser.add_argument( "-env-file", "--env-file-path", help="Path to env file containing the environment variables", - type=pathlib.Path, + type=Path, required=False, ) parser.add_argument( @@ -233,7 +233,7 @@ def backup_args(parser: argparse.ArgumentParser): parser.add_argument( "--output", help="Local path to store the backup", - type=pathlib.Path, + type=Path, default=None, ) parser.add_argument( @@ -259,7 +259,7 @@ def backup_args(parser: argparse.ArgumentParser): def restore_args(parser: argparse.ArgumentParser): """ - Addtional Parser Arguments for Restore + Additional Parser Arguments for Restore """ parser.add_argument( "-H", @@ -298,7 +298,7 @@ def restore_args(parser: argparse.ArgumentParser): parser.add_argument( "--input", help="Local backup file path for restore", - type=pathlib.Path, + type=Path, required=True, ) @@ -411,6 +411,7 @@ def metadata(args=None): # pylint: disable=too-many-branches contains_args = vars(get_parser(args)) metadata_workflow = contains_args.get("command") config_file = contains_args.get("config") + path = Path(config_file).expanduser() if contains_args.get("debug"): set_loggers_level(logging.DEBUG) elif contains_args.get("log_level"): @@ -419,17 +420,17 @@ def metadata(args=None): # pylint: disable=too-many-branches set_loggers_level(logging.INFO) if metadata_workflow == MetadataCommands.INGEST.value: - run_ingest(config_path=config_file) + run_ingest(config_path=path) if metadata_workflow == MetadataCommands.USAGE.value: - run_usage(config_path=config_file) + run_usage(config_path=path) if metadata_workflow == MetadataCommands.LINEAGE.value: - run_lineage(config_path=config_file) + run_lineage(config_path=path) if metadata_workflow == MetadataCommands.INSIGHT.value: - run_insight(config_path=config_file) + run_insight(config_path=path) if metadata_workflow == MetadataCommands.PROFILE.value: - run_profiler(config_path=config_file) + run_profiler(config_path=path) if metadata_workflow == MetadataCommands.TEST.value: - run_test(config_path=config_file) + run_test(config_path=path) if metadata_workflow == MetadataCommands.BACKUP.value: run_backup( common_backup_obj_instance=BackupRestoreArgs( diff --git a/openmetadata-docs/content/v1.1.x/connectors/pipeline/airflow/mwaa.md b/openmetadata-docs/content/v1.1.x/connectors/pipeline/airflow/mwaa.md index f2db0668afa..22280eb9398 100644 --- a/openmetadata-docs/content/v1.1.x/connectors/pipeline/airflow/mwaa.md +++ b/openmetadata-docs/content/v1.1.x/connectors/pipeline/airflow/mwaa.md @@ -326,6 +326,12 @@ with DAG( After running the DAG, we can store the connection details and remove the dag file from S3. + +Note that trying to log the `conf.get("core", "sql_alchemy_conn", fallback=None)` details might either result in: +1. An empty string, depending on the Airflow version: If that's the case, you can use update the line to be `conf.get("database", "sql_alchemy_conn", fallback=None)`. +2. The password masked in `****`. If that's the case, you can use `sqlalchemy_conn = list(conf.get("core", "sql_alchemy_conn", fallback=None))`, + which will return the results separated by commas. + #### Preparing the metadata extraction We will use ECS here as well to get the metadata out of MWAA. The only important detail is to ensure that we are diff --git a/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/mwaa.md b/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/mwaa.md index 9ae90e6f73c..8c5e96b79bc 100644 --- a/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/mwaa.md +++ b/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/mwaa.md @@ -326,6 +326,12 @@ with DAG( After running the DAG, we can store the connection details and remove the dag file from S3. +Note that trying to log the `conf.get("core", "sql_alchemy_conn", fallback=None)` details might either result in: +1. An empty string, depending on the Airflow version: If that's the case, you can use update the line to be `conf.get("database", "sql_alchemy_conn", fallback=None)`. +2. The password masked in `****`. If that's the case, you can use `sqlalchemy_conn = list(conf.get("core", "sql_alchemy_conn", fallback=None))`, + which will return the results separated by commas. + + #### Preparing the metadata extraction We will use ECS here as well to get the metadata out of MWAA. The only important detail is to ensure that we are