From a0db32c18abbe9d7f54753b5d9d0e68e96192b3d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 3 May 2024 10:18:29 +0200 Subject: [PATCH] MINOR - Docs and deprecations (#16118) * MINOR - Docs and deprecations * clean --- .../src/metadata/applications/auto_tagger.py | 212 ---------------- ingestion/src/metadata/cli/app.py | 1 - ingestion/src/metadata/cli/backup.py | 209 --------------- ingestion/src/metadata/cli/db_dump.py | 239 ------------------ ingestion/src/metadata/cli/utils.py | 56 ---- ingestion/src/metadata/cmd.py | 193 -------------- installation_deployment_postgres_demo.md | 199 --------------- .../upgrade/upgrade-prerequisites.md | 41 +-- .../pipeline/airflow/configuring-lineage.md | 2 + .../pipeline/airflow/configuring-lineage.md | 2 + .../deployment/backup-restore-metadata.md | 46 +++- 11 files changed, 53 insertions(+), 1147 deletions(-) delete mode 100644 ingestion/src/metadata/applications/auto_tagger.py delete mode 100644 ingestion/src/metadata/cli/backup.py delete mode 100644 ingestion/src/metadata/cli/db_dump.py delete mode 100644 ingestion/src/metadata/cli/utils.py delete mode 100644 installation_deployment_postgres_demo.md diff --git a/ingestion/src/metadata/applications/auto_tagger.py b/ingestion/src/metadata/applications/auto_tagger.py deleted file mode 100644 index 6719cde48e3..00000000000 --- a/ingestion/src/metadata/applications/auto_tagger.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -PII application -""" -import traceback -from typing import Iterable, List, Optional - -from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import ( - AutoTaggerAppConfig, -) -from metadata.generated.schema.entity.data.table import Column, Table, TableData -from metadata.generated.schema.entity.services.ingestionPipelines.status import ( - StackTraceError, -) -from metadata.generated.schema.metadataIngestion.application import ( - OpenMetadataApplicationConfig, -) -from metadata.generated.schema.type.tagLabel import ( - LabelType, - State, - TagFQN, - TagLabel, - TagSource, -) -from metadata.ingestion.models.table_metadata import ColumnTag -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.pii.constants import PII -from metadata.pii.scanners.column_name_scanner import ColumnNameScanner -from metadata.pii.scanners.ner_scanner import NERScanner -from metadata.utils.logger import app_logger -from metadata.workflow.application import AppRunner, InvalidAppConfiguration - -logger = app_logger() - -DEFAULT_CONFIDENCE = 80 - - -class AutoTaggerApp(AppRunner): - """ - PII Application - You can execute it with `metadata app -c ` - with a YAML file like: - - sourcePythonClass: metadata.applications.auto_tagger.AutoTaggerApp - appConfig: - type: AutoTagger - confidenceLevel: 80 - workflowConfig: - loggerLevel: INFO - openMetadataServerConfig: - hostPort: http://localhost:8585/api - authProvider: openmetadata - securityConfig: - jwtToken: "..." - """ - - def __init__(self, config: OpenMetadataApplicationConfig, metadata: OpenMetadata): - super().__init__(config, metadata) - - if not isinstance(self.app_config, AutoTaggerAppConfig): - raise InvalidAppConfiguration( - f"AutoTagger Runner expects an AutoTaggerAppConfig, we got [{config}]" - ) - - self._ner_scanner = None - self.confidence_threshold = ( - self.app_config.confidenceLevel or DEFAULT_CONFIDENCE - ) - - @property - def name(self) -> str: - return "Auto Tagger" - - @staticmethod - def build_column_tag(tag_fqn: str, column_fqn: str) -> ColumnTag: - """ - Build the tag and run the PATCH - """ - tag_label = TagLabel( - tagFQN=TagFQN(__root__=tag_fqn), - source=TagSource.Classification, - state=State.Suggested, - labelType=LabelType.Automated, - ) - - return ColumnTag(column_fqn=column_fqn, tag_label=tag_label) - - @property - def ner_scanner(self) -> NERScanner: - """Load the NER Scanner only if called""" - if self._ner_scanner is None: - self._ner_scanner = NERScanner() - - return self._ner_scanner - - def process_column( - self, - idx: int, - column: Column, - table_data: Optional[TableData], - confidence_threshold: float, - ) -> Optional[List[ColumnTag]]: - """ - Tag a column with PII if we find it using our scanners - """ - - # First, check if the column we are about to process - # already has PII tags or not - column_has_pii_tag = any( - (PII in tag.tagFQN.__root__ for tag in column.tags or []) - ) - - # If it has PII tags, we skip the processing - # for the column - if column_has_pii_tag is True: - return None - - # Scan by column name. If no results there, check the sample data, if any - tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or ( - self.ner_scanner.scan([row[idx] for row in table_data.rows]) - if table_data - else None - ) - - if ( - tag_and_confidence - and tag_and_confidence.tag_fqn - and tag_and_confidence.confidence >= confidence_threshold / 100 - ): - # We support returning +1 tags for a single column in _run - return [ - self.build_column_tag( - tag_fqn=tag_and_confidence.tag_fqn, - column_fqn=column.fullyQualifiedName.__root__, - ) - ] - - return None - - def process_table(self, table: Table) -> Optional[List[ColumnTag]]: - """Run the patching of the table""" - column_tags = [] - for idx, column in enumerate(table.columns): - try: - col_tags = self.process_column( - idx=idx, - column=column, - table_data=table.sampleData, - confidence_threshold=self.confidence_threshold, - ) - if col_tags: - column_tags.extend(col_tags) - except Exception as err: - self.status.failed( - StackTraceError( - name=table.fullyQualifiedName.__root__, - error=f"Error computing PII tags for [{column}] - [{err}]", - stackTrace=traceback.format_exc(), - ) - ) - - if column_tags: - return column_tags - - return None - - def patch_columns(self, table: Table, column_tags: List[ColumnTag]) -> None: - """Patch columns with PII""" - patched = self.metadata.patch_column_tags(table=table, column_tags=column_tags) - if not patched: - self.status.failed( - StackTraceError( - name=table.fullyQualifiedName.__root__, - error="Error patching tags for table", - ) - ) - else: - self.status.scanned(table) - logger.debug( - f"Successfully patched tag {column_tags} for {table.fullyQualifiedName.__root__}" - ) - - def run(self) -> None: - """ - The PII Application will: - 1. List tables - 2. Check their column names and sample data (if any) - 3. PATCH PII tags when needed - """ - tables: Iterable[Table] = self.metadata.list_all_entities( - entity=Table, fields=["sampleData", "tags"] - ) - for table in tables: - column_tags = self.process_table(table) - if column_tags: - self.patch_columns(table=table, column_tags=column_tags) - else: - self.status.filter( - key=table.fullyQualifiedName.__root__, reason="No PII found" - ) - - def close(self) -> None: - """Nothing to close""" diff --git a/ingestion/src/metadata/cli/app.py b/ingestion/src/metadata/cli/app.py index 7cf3f7c0d52..39a4a1b01da 100644 --- a/ingestion/src/metadata/cli/app.py +++ b/ingestion/src/metadata/cli/app.py @@ -31,7 +31,6 @@ def run_app(config_path: Path) -> None: :param config_path: Path to load JSON config """ - config_dict = None try: config_dict = load_config_file(config_path) workflow = ApplicationWorkflow.create(config_dict) diff --git a/ingestion/src/metadata/cli/backup.py b/ingestion/src/metadata/cli/backup.py deleted file mode 100644 index 94441a723eb..00000000000 --- a/ingestion/src/metadata/cli/backup.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Backup utility for the metadata CLI -""" -import traceback -from datetime import datetime -from enum import Enum -from pathlib import Path -from typing import Optional, Tuple - -from metadata.cli.db_dump import dump -from metadata.cli.utils import get_engine -from metadata.utils.helpers import BackupRestoreArgs -from metadata.utils.logger import ANSI, cli_logger, log_ansi_encoded_string - - -class UploadDestinationType(Enum): - AWS = "AWS" - AZURE = "Azure" - - -logger = cli_logger() - - -def get_output(output: Optional[str] = None, filename: Optional[str] = None) -> Path: - """ - Helper function to prepare the output backup file - path and name. - - It will create the output dir if it does not exist. - - :param output: local path to store the backup - :param filename: name of the backup file - :return: backup file name - """ - now = datetime.now().strftime("%Y%m%d%H%M") - name = filename if filename else f"openmetadata_{now}_backup.sql" - - if output: - path = Path(output).expanduser() - # Create the output directory if it does not exist - if not path.is_dir(): - path.mkdir(parents=True, exist_ok=True) - - return path / name - - return Path(name) - - -def upload_backup_aws(endpoint: str, bucket: str, key: str, file: Path) -> None: - """ - Upload the mysqldump backup file. - We will use boto3 to upload the file to the endpoint - and the key provided. - - :param endpoint: S3 endpoint - :param bucket: S3 bucket to upload the file to - :param key: S3 key to upload the backup file - :param file: file to upload - """ - - try: - # We just want to force boto3 install if uploading backup - # pylint: disable=import-outside-toplevel - import boto3 - from boto3.exceptions import S3UploadFailedError - except ModuleNotFoundError as err: - logger.debug(traceback.format_exc()) - logger.error( - "Trying to import boto3 to run the backup upload." - + " Please install openmetadata-ingestion[backup]." - ) - raise err - - s3_key = Path(key) / file.name - log_ansi_encoded_string( - color=ANSI.GREEN, - bold=False, - message=f"Uploading {file} to {endpoint}/{bucket}/{str(s3_key)}...", - ) - - try: - resource = boto3.resource(service_name="s3", endpoint_url=endpoint) - resource.Object(bucket, str(s3_key)).upload_file(str(file.absolute())) - - except ValueError as err: - logger.debug(traceback.format_exc()) - logger.error("Revisit the values of --upload") - raise err - except S3UploadFailedError as err: - logger.debug(traceback.format_exc()) - logger.error( - "Error when uploading the backup to S3. Revisit the config and permissions." - + " You should have set the environment values for AWS_ACCESS_KEY_ID" - + " and AWS_SECRET_ACCESS_KEY" - ) - raise err - - -def upload_backup_azure(account_url: str, container: str, file: Path) -> None: - """ - Upload the mysqldump backup file. - - :param account_url: Azure account url - :param container: Azure container to upload file to - :param file: file to upload - """ - - try: - # pylint: disable=import-outside-toplevel - from azure.identity import DefaultAzureCredential - from azure.storage.blob import BlobServiceClient - - default_credential = DefaultAzureCredential() - # Create the BlobServiceClient object - blob_service_client = BlobServiceClient( - account_url, credential=default_credential - ) - except ModuleNotFoundError as err: - logger.debug(traceback.format_exc()) - logger.error( - "Trying to import DefaultAzureCredential to run the backup upload." - ) - raise err - - log_ansi_encoded_string( - color=ANSI.GREEN, - message=f"Uploading {file} to {account_url}/{container}...", - ) - - try: - # Create a blob client using the local file name as the name for the blob - blob_client = blob_service_client.get_blob_client( - container=container, blob=file.name - ) - - # Upload the created file - with open(file=file, mode="rb") as data: - blob_client.upload_blob(data) - - except ValueError as err: - logger.debug(traceback.format_exc()) - logger.error("Revisit the values of --upload") - raise err - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(err) - raise err - - -def run_backup( - common_backup_obj_instance: BackupRestoreArgs, - output: Optional[str], - filename: Optional[str], - upload_destination_type: Optional[UploadDestinationType], - upload: Optional[Tuple[str, str, str]], -) -> None: - """ - Run `mysqldump` to MySQL database and store the - output. Optionally, upload it to S3. - - :param common_backup_obj_instance: cls instance to fetch common args - :param output: local path to store the backup - :param filename: filename to store the backup - :param upload_destination_type: Azure or AWS Destination Type - :param upload: URI to upload result file - - """ - log_ansi_encoded_string( - color=ANSI.BRIGHT_RED, - bold=True, - message="WARNING: backup is deprecated starting 1.4.0. Use database native dump tools instead." - "For more information, please visit: " - "https://docs.open-metadata.org/v1.4.x/deployment/backup-restore-metadata", - ) - log_ansi_encoded_string( - color=ANSI.GREEN, - bold=False, - message="Creating OpenMetadata backup for " - f"{common_backup_obj_instance.host}:{common_backup_obj_instance.port}/{common_backup_obj_instance.database}...", - ) - - out = get_output(output, filename) - - engine = get_engine(common_args=common_backup_obj_instance) - dump(engine=engine, output=out, schema=common_backup_obj_instance.schema) - - log_ansi_encoded_string( - color=ANSI.GREEN, bold=False, message=f"Backup stored locally under {out}" - ) - - if upload: - if upload_destination_type == UploadDestinationType.AWS.value: - endpoint, bucket, key = upload - upload_backup_aws(endpoint, bucket, key, out) - elif upload_destination_type.title() == UploadDestinationType.AZURE.value: - # only need two parameters from upload, key would be null - account_url, container, key = upload - upload_backup_azure(account_url, container, out) diff --git a/ingestion/src/metadata/cli/db_dump.py b/ingestion/src/metadata/cli/db_dump.py deleted file mode 100644 index d7215dce8cd..00000000000 --- a/ingestion/src/metadata/cli/db_dump.py +++ /dev/null @@ -1,239 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Database Dumping utility for the metadata CLI -""" - -import json -from functools import singledispatch -from pathlib import Path -from typing import Iterable, List, Optional, Union - -from sqlalchemy import inspect, text -from sqlalchemy.engine import Engine, Row - -from metadata.utils.constants import UTF_8 - -TABLES_DUMP_ALL = { - "task_sequence", - "entity_usage", - "entity_relationship", - "entity_extension", - "field_relationship", - "tag_usage", - "openmetadata_settings", - "profiler_data_time_series", -} - -CUSTOM_TABLES = {"entity_extension_time_series": {"exclude_columns": ["timestamp"]}} -NOT_MIGRATE = {"DATABASE_CHANGE_LOG", "SERVER_MIGRATION_SQL_LOGS", "SERVER_CHANGE_LOG"} - -STATEMENT_JSON = "SELECT json FROM {table}" -STATEMENT_HASH_JSON = "SELECT json, {hash_column_name} FROM {table}" -STATEMENT_ALL = "SELECT * FROM {table}" -STATEMENT_TRUNCATE = "TRUNCATE TABLE {table};\n" -STATEMENT_ALL_NEW = "SELECT {cols} FROM {table}" - -MYSQL_ENGINE_NAME = "mysql" -FQN_HASH_COLUMN = "fqnHash" -NAME_HASH_COLUMN = "nameHash" - - -def single_quote_wrap(raw: str) -> str: - """ - Add single quote wrap to string. From `str` to `'str'` - """ - return f"'{raw}'" - - -def clean_str(raw: str, engine: Engine) -> str: - """ - String cleaning for SQL parsing. - - Logic is different between MySQL and Postgres - - - descriptions/comments with single quotes, e.g., `Mysql's data`. - get converted to `Mysql''s data` - - To insert a literal backlash in MySQL you need to escape with another one. This applies for `\n` and `\"` in - inner JSONs for a field. This is not required for postgres - """ - quoted_str = raw.replace("'", "''") - - if engine.name == MYSQL_ENGINE_NAME: - quoted_str = quoted_str.replace("\\", "\\\\") - - return quoted_str - - -@singledispatch -def clean_col(column_raw: Optional[Union[dict, str]], engine: Engine) -> str: - return ( - single_quote_wrap(clean_str(str(column_raw), engine)) - if column_raw is not None - else "null" - ) - - -@clean_col.register(dict) -@clean_col.register(list) -def _(column_raw: Optional[Union[dict, list]], engine: Engine) -> str: - """ - Prepare the JSON column to be inserted to MySQL - - Handle: - - quotes - - True/False values - """ - return ( - single_quote_wrap( - clean_str( - json.dumps( - column_raw, - default=str, # If we don't know how to serialize, convert to str - ), - engine, - ) - ) - if column_raw is not None - else "null" - ) - - -def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]: - """ - Method to get name of the hash column (fqnHash or nameHash) - """ - inspector = inspect(engine) - columns = inspector.get_columns(table_name) - for column in columns: - if column["name"].lower() == FQN_HASH_COLUMN.lower(): - return column["name"] - if column["name"].lower() == NAME_HASH_COLUMN.lower(): - return column["name"] - return None - - -def run_query_iter(engine: Engine, query: str) -> Iterable[Row]: - """Return a generator of rows, one row at a time, with a limit of 100 in-mem rows""" - with engine.connect() as conn: - result = conn.execution_options( - stream_results=True, max_row_buffer=100 - ).execute(text(query)) - for row in result: - yield row - - -def dump_json(tables: List[str], engine: Engine, output: Path) -> None: - """ - Dumps JSON data. - - Postgres: engine.name == "postgresql" - MySQL: engine.name == "mysql" - """ - with open(output, "a", encoding=UTF_8) as file: - for table in tables: - truncate = STATEMENT_TRUNCATE.format(table=table) - file.write(truncate) - - hash_column_name = get_hash_column_name(engine=engine, table_name=table) - if hash_column_name: - query = STATEMENT_HASH_JSON.format( - table=table, hash_column_name=hash_column_name - ) - for row in run_query_iter(engine=engine, query=query): - insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long - file.write(insert) - else: - res = engine.execute(text(STATEMENT_JSON.format(table=table))).all() - for row in res: - insert = f"INSERT INTO {table} (json) VALUES ({clean_col(row.json, engine)});\n" - file.write(insert) - - -def dump_all(tables: List[str], engine: Engine, output: Path) -> None: - """ - Dump tables that need to store all data - """ - with open(output, "a", encoding=UTF_8) as file: - for table in tables: - truncate = STATEMENT_TRUNCATE.format(table=table) - file.write(truncate) - - query = STATEMENT_ALL.format(table=table) - for row in run_query_iter(engine=engine, query=query): - data = ",".join(clean_col(col, engine) for col in row) - - insert = f"INSERT INTO {table} VALUES ({data});\n" - file.write(insert) - - -def dump_entity_custom(engine: Engine, output: Path, inspector) -> None: - """ - This function is used to dump entities with custom handling - """ - with open(output, "a", encoding=UTF_8) as file: - for table, data in CUSTOM_TABLES.items(): - truncate = STATEMENT_TRUNCATE.format(table=table) - file.write(truncate) - - columns = inspector.get_columns(table_name=table) - - query = STATEMENT_ALL_NEW.format( - cols=",".join( - col["name"] - for col in columns - if col["name"] not in data["exclude_columns"] - ), - table=table, - ) - for row in run_query_iter(engine=engine, query=query): - # Let's use .format here to not add more variables - # pylint: disable=consider-using-f-string - insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format( - table=table, - data=",".join(clean_col(col, engine) for col in row), - cols=",".join( - col["name"] - for col in columns - if col["name"] not in data["exclude_columns"] - ), - ) - file.write(insert) - - -def get_lower_table_names(tables): - return [table.lower() for table in tables] - - -def dump(engine: Engine, output: Path, schema: str = None) -> None: - """ - Get all tables from the database and dump - only the JSON column for the required tables - """ - inspector = inspect(engine) - tables = ( - inspector.get_table_names(schema) if schema else inspector.get_table_names() - ) - lower_tables = get_lower_table_names(tables) - all_non_json_tables = ( - get_lower_table_names(TABLES_DUMP_ALL) - + get_lower_table_names(NOT_MIGRATE) - + get_lower_table_names(CUSTOM_TABLES) - ) - - dump_json_tables = [ - table for table in lower_tables if table not in all_non_json_tables - ] - - dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output) - dump_json(tables=dump_json_tables, engine=engine, output=output) - dump_entity_custom(engine=engine, output=output, inspector=inspector) diff --git a/ingestion/src/metadata/cli/utils.py b/ingestion/src/metadata/cli/utils.py deleted file mode 100644 index ebabae174cf..00000000000 --- a/ingestion/src/metadata/cli/utils.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Utils module for the metadata backup and restore process -""" - -from sqlalchemy.engine import Engine - -from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( - BasicAuth, -) -from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( - MysqlConnection, -) -from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( - PostgresConnection, -) -from metadata.ingestion.source.connections import get_connection -from metadata.utils.helpers import BackupRestoreArgs, list_to_dict - - -def get_engine(common_args: BackupRestoreArgs): - """ - Get the database connection engine - """ - - connection_options = list_to_dict(common_args.options) - connection_arguments = list_to_dict(common_args.arguments) - - connection_dict = { - "hostPort": f"{common_args.host}:{common_args.port}", - "username": common_args.user, - "authType": BasicAuth(password=common_args.password), - "connectionOptions": connection_options if connection_options else None, - "connectionArguments": connection_arguments if connection_arguments else None, - } - - if not common_args.schema: - connection_dict["databaseSchema"] = common_args.database - connection = MysqlConnection(**connection_dict) - else: - connection_dict["database"] = common_args.database - connection = PostgresConnection(**connection_dict) - - engine: Engine = get_connection(connection) - - return engine diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 53860512367..3ecd18c3fab 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -19,15 +19,12 @@ from pathlib import Path from metadata.__version__ import get_metadata_version from metadata.cli.app import run_app -from metadata.cli.backup import UploadDestinationType, run_backup from metadata.cli.dataquality import run_test from metadata.cli.ingest import run_ingest from metadata.cli.insight import run_insight from metadata.cli.lineage import run_lineage from metadata.cli.profile import run_profiler -from metadata.cli.restore import run_restore from metadata.cli.usage import run_usage -from metadata.utils.helpers import BackupRestoreArgs from metadata.utils.logger import cli_logger, set_loggers_level logger = cli_logger() @@ -38,8 +35,6 @@ class MetadataCommands(Enum): USAGE = "usage" PROFILE = "profile" TEST = "test" - BACKUP = "backup" - RESTORE = "restore" WEBHOOK = "webhook" INSIGHT = "insight" LINEAGE = "lineage" @@ -57,32 +52,6 @@ RUN_PATH_METHODS = { } -BACKUP_HELP = """ - Run a backup for the metadata DB. Uses a custom dump strategy for OpenMetadata tables. - - We can pass as many connection options as required with `-o , -o [...]` - Same with connection arguments `-a , -a [...]` - - To run the upload, provide the information as - `--upload endpoint bucket key` and properly configure the environment - variables AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY. - - If `-s` or `--schema` is provided, we will trigger a Postgres backup instead - of a MySQL backup. This is the value of the schema containing the OpenMetadata - tables. - """ -RESTORE_HELP = """ - Run a restore for the metadata DB. - - We can pass as many connection options as required with `-o , -o [...]` - Same with connection arguments `-a , -a [...]` - - If `-s` or `--schema` is provided, we will trigger a Postgres Restore instead - of a MySQL restore. This is the value of the schema containing the OpenMetadata - tables. - """ - - def create_common_config_parser_args(parser: argparse.ArgumentParser): parser.add_argument( "-c", @@ -103,125 +72,6 @@ def webhook_args(parser: argparse.ArgumentParser): parser.add_argument("-p", "--port", help="Webserver Port", type=int, default=8000) -def backup_args(parser: argparse.ArgumentParser): - """ - Additional Parser Arguments for Backup - """ - parser.add_argument( - "-H", "--host", help="Host that runs the database", required=True - ) - parser.add_argument( - "-u", - "--user", - help="User to run the backup", - required=True, - ) - parser.add_argument( - "-p", - "--password", - help="Credentials for the user", - required=True, - ) - parser.add_argument( - "-d", - "--database", - help="Database to backup", - required=True, - ) - parser.add_argument( - "--port", - help="Database service port", - default="3306", - ) - parser.add_argument( - "--output", - help="Local path to store the backup", - type=Path, - default=None, - ) - parser.add_argument( - "--filename", - help="Filename to store the backup", - default=None, - ) - parser.add_argument( - "--upload-destination-type", - help="AWS or AZURE", - choices=UploadDestinationType.__members__, - default=None, - ) - parser.add_argument( - "--upload", - help="S3 endpoint, bucket & key to upload the backup file", - nargs=3, - default=None, - ) - parser.add_argument("-o", "--options", default=None, action="append") - parser.add_argument("-a", "--arguments", default=None, action="append") - parser.add_argument( - "-s", - "--schema", - default=None, - ) - - -def restore_args(parser: argparse.ArgumentParser): - """ - Additional Parser Arguments for Restore - """ - parser.add_argument( - "-H", - "--host", - help="Host that runs the database", - required=True, - ) - parser.add_argument( - "-u", - "--user", - help="User to run the restore backup", - required=True, - ) - - parser.add_argument( - "-p", - "--password", - help="Credentials for the user", - required=True, - ) - - parser.add_argument( - "-d", - "--database", - help="Database to restore", - required=True, - ) - - parser.add_argument( - "--port", - help="Database service port", - default="3306", - required=False, - ) - - parser.add_argument( - "--input", - help="Local backup file path for restore", - type=Path, - required=True, - ) - - parser.add_argument("-o", "--options", default=None, action="append") - - parser.add_argument("-a", "--arguments", default=None, action="append") - - parser.add_argument( - "-s", - "--schema", - default=None, - required=False, - ) - - def add_metadata_args(parser: argparse.ArgumentParser): """ Additional Parser Arguments for Metadata @@ -274,18 +124,6 @@ def get_parser(args=None): help="Workflow for running external applications", ) ) - backup_args( - sub_parser.add_parser( - MetadataCommands.BACKUP.value, - help=BACKUP_HELP, - ) - ) - restore_args( - sub_parser.add_parser( - MetadataCommands.RESTORE.value, - help=RESTORE_HELP, - ) - ) webhook_args( sub_parser.add_parser( MetadataCommands.WEBHOOK.value, @@ -323,37 +161,6 @@ def metadata(args=None): if metadata_workflow in RUN_PATH_METHODS: RUN_PATH_METHODS[metadata_workflow](path) - if metadata_workflow == MetadataCommands.BACKUP.value: - run_backup( - common_backup_obj_instance=BackupRestoreArgs( - host=contains_args.get("host"), - user=contains_args.get("user"), - password=contains_args.get("password"), - database=contains_args.get("database"), - port=contains_args.get("port"), - options=contains_args.get("options"), - arguments=contains_args.get("arguments"), - schema=contains_args.get("schema"), - ), - output=contains_args.get("output"), - filename=contains_args.get("filename"), - upload_destination_type=contains_args.get("upload_destination_type"), - upload=contains_args.get("upload"), - ) - if metadata_workflow == MetadataCommands.RESTORE.value: - run_restore( - common_restore_obj_instance=BackupRestoreArgs( - host=contains_args.get("host"), - user=contains_args.get("user"), - password=contains_args.get("password"), - database=contains_args.get("database"), - port=contains_args.get("port"), - options=contains_args.get("options"), - arguments=contains_args.get("arguments"), - schema=contains_args.get("schema"), - ), - sql_file=contains_args.get("input"), - ) if metadata_workflow == MetadataCommands.WEBHOOK.value: class WebhookHandler(BaseHTTPRequestHandler): diff --git a/installation_deployment_postgres_demo.md b/installation_deployment_postgres_demo.md deleted file mode 100644 index 260593bda10..00000000000 --- a/installation_deployment_postgres_demo.md +++ /dev/null @@ -1,199 +0,0 @@ -# Installation and deployment instructions (using Postgres as example) - -Below are the instructions for connecting a Postgress server. The installation steps should be the same for connecting all kinds of servers. Different servers would require different configurations in the .yaml or DAG files. See https://docs.open-metadata.org/integrations/connectors for your configuration. - -# Goal: To run Postgres metadata ingestion and quality tests with OpenMetadata using Airflow scheduler - -Note: This procedure does not support Windows, because Windows does not implement "signal.SIGALRM". **It is highly recommended to use WSL 2 if you are on Windows**. - -## Requirements: -See https://docs.open-metadata.org/overview/run-openmetadata-with-prefect "Requirements" section - -## Installation: -1. Clone this git hub repo: -`git clone https://github.com/open-metadata/OpenMetadata.git` - -2. Cd to ~/.../openmetadata/docker/metadata - -3. Start the OpenMetadata containers. This will allow you run OpenMetadata in Docker: -`docker compose up -d` -- To check the status of services, run `docker compose ps` -- To access the UI: http://localhost:8585 - -4. Install the OpenMetadata ingestion package. -- (optional but highly recommended): Before installing this package, it is recommended to create and activate a virtual environment. To do this, run: -`python -m venv env` and `source env/bin/activate` - -- To install the OpenMetadata ingestion package: -`pip install --upgrade "openmetadata-ingestion[docker]==0.10.3"` (specify the release version to ensure compatibility) - -5. Install Airflow: -- 5A: Install Airflow Lineage Backend: `pip3 install "openmetadata-ingestion[airflow-container]"==0.10.3` -- 5B: Install Airflow postgres connector module: `pip3 install "openmetadata-ingestion[postgres]"==0.10.3` -- 5C: Install Airflow APIs: `pip3 install "openmetadata-airflow-managed-apis"==0.10.3` -- 5D: Install necessary Airflow plugins: - - 1) Download the latest openmetadata-airflow-apis-plugins release from https://github.com/open-metadata/OpenMetadata/releases - - 2) Untar it under your {AIRFLOW_HOME} directory (usually c/Users/Yourname/airflow). This will create and setup a plugins directory under {AIRFLOW_HOME} . - - 3) `cp -r {AIRFLOW_HOME}/plugins/dag_templates {AIRFLOW_HOME}` - - 4) `mkdir -p {AIRFLOW_HOME}/dag_generated_configs` - - 5) (re)start the airflow webserver and scheduler - -6. Configure Airflow: -- 6A: configure airflow.cfg in your AIRFLOW_HOME directory. Check and make all the folder directories point to the right places. For instance, dags_folder = YOUR_AIRFLOW_HOME/dags -- 6B: configure openmetadata.yaml and update the airflowConfiguration section. See: https://docs.open-metadata.org/integrations/airflow/configure-airflow-in-the-openmetadata-server - -## To run a metadata ingestion workflow with Airflow ingestion DAGs on Postgres data: - -1. Prepare the Ingestion DAG: -To see a more complete tutorial on ingestion DAG, see https://docs.open-metadata.org/integrations/connectors/postgres/run-postgres-connector-with-the-airflow-sdk -To be brief, below is my own DAG. Copy & Paste the following into a python file (postgres_demo.py): - -``` -import pathlib -import json -from datetime import timedelta -from airflow import DAG - -try: - from airflow.operators.python import PythonOperator -except ModuleNotFoundError: - from airflow.operators.python_operator import PythonOperator - -from metadata.config.common import load_config_file -from metadata.ingestion.api.workflow import Workflow -from airflow.utils.dates import days_ago - -default_args = { - "owner": "user_name", - "email": ["username@org.com"], - "email_on_failure": False, - "retries": 3, - "retry_delay": timedelta(minutes=5), - "execution_timeout": timedelta(minutes=60) -} - -config = """ -{ - "source":{ - "type": "postgres", - "serviceName": "postgres_demo", - "serviceConnection": { - "config": { - "type": "Postgres", - "username": "postgres", (change to your username) - "password": "postgres", (change to your password) - "hostPort": "192.168.1.55:5432", (change to your hostPort) - "database": "surveillance_hub" (change to your database) - } - }, - "sourceConfig":{ - "config": { (all of the following can switch to true or false) - "enableDataProfiler": "true" or "false", - "markDeletedTables": "true" or "false", - "includeTables": "true" or "false", - "includeViews": "true" or "false", - "generateSampleData": "true" or "false" - } - } - }, - "sink":{ - "type": "metadata-rest", - "config": {} - }, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "no-auth" - } - } - - -} -""" - -def metadata_ingestion_workflow(): - workflow_config = json.loads(config) - workflow = Workflow.create(workflow_config) - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() - - -with DAG( - "sample_data", - default_args=default_args, - description="An example DAG which runs a OpenMetadata ingestion workflow", - start_date=days_ago(1), - is_paused_upon_creation=False, - schedule_interval='*/5 * * * *', - catchup=False, -) as dag: - ingest_task = PythonOperator( - task_id="ingest_using_recipe", - python_callable=metadata_ingestion_workflow, - ) - -if __name__ == "__main__": - metadata_ingestion_workflow() -``` - -2. Run the DAG: -` -python postgres_demo.py -` - -- Alternatively, we could run without Airflow SDK and with OpenMetadata's own methods. Run `metadata ingest -c /Your_Path_To_Json/.json` -The json configuration is exactly the same as the json configuration in the DAG. -- Or, we could also run it with `metadata ingest -c /Your_Path_To_Yaml/.yaml` -The yaml configuration would be the exact same except without the curly brackets and the double quotes. - -## To run a profiler workflow on Postgres data -1. Prepare the DAG OR configure the yaml/json: -- To configure the quality tests in json/yaml, see https://docs.open-metadata.org/data-quality/data-quality-overview/tests -- To prepare the DAG, see https://github.com/open-metadata/OpenMetadata/tree/0.10.3-release/data-quality/data-quality-overview - -Example yaml I was using: -``` -source: - type: postgres - serviceName: your_service_name - serviceConnection: - config: - type: Postgres - username: your_username - password: your_password - hostPort: - database: your_database - sourceConfig: - config: - type: Profiler - -processor: - type: orm-profiler - config: - test_suite: - name: demo_test - tests: - - table: your_table_name (FQN) - column_tests: - - columnName: id - testCase: - columnTestType: columnValuesToBeBetween - config: - minValue: 0 - maxValue: 10 -sink: - type: metadata-rest - config: {} -workflowConfig: - openMetadataServerConfig: - hostPort: http://localhost:8585/api - authProvider: no-auth -``` -Note that the table name must be FQN and match exactly with the table path on the OpenMetadata UI. - -2. Run it with -`metadata profile -c /path_to_yaml/.yaml` - -Make sure to refresh the OpenMetadata UI and click on the Data Quality tab to see the results. diff --git a/openmetadata-docs/content/partials/v1.4/deployment/upgrade/upgrade-prerequisites.md b/openmetadata-docs/content/partials/v1.4/deployment/upgrade/upgrade-prerequisites.md index 36e38bdb6aa..dda306fe374 100644 --- a/openmetadata-docs/content/partials/v1.4/deployment/upgrade/upgrade-prerequisites.md +++ b/openmetadata-docs/content/partials/v1.4/deployment/upgrade/upgrade-prerequisites.md @@ -19,38 +19,18 @@ You can learn more about how the migration process works [here](/deployment/upgr {% /note %} -- To run the backup and restore commands, please make sure that you are always in the latest `openmetadata-ingestion` version to have all the improvements shipped in the CLI. -- Also, make sure you have connectivity between your database (MySQL / PostgreSQL) and the host machine where you will be running the below commands. +Since version 1.4.0, **OpenMetadata encourages using the builtin-tools for creating logical backups of the metadata**: -**1. Create a Virtual Environment and Install the Backup CLI** +- [mysqldump](https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html) for MySQL +- [pg_dump](https://www.postgresql.org/docs/current/app-pgdump.html) for Postgres -```python -python -m venv venv -source venv/bin/activate -pip install openmetadata-ingestion~=1.3.0 -``` +For PROD deployment we recommend users to rely on cloud services for their databases, be it [AWS RDS](https://docs.aws.amazon.com/rds/), +[Azure SQL](https://azure.microsoft.com/en-in/products/azure-sql/database) or [GCP Cloud SQL](https://cloud.google.com/sql/). -Validate the installed metadata version with `python -m metadata --version` - -**2. Run the Backup** - -If using MySQL: - -```bash -python -m metadata backup -u openmetadata_user -p openmetadata_password -H mysql -d openmetadata_db --port 3306 -``` - -If using Postgres: - -```bash -python -m metadata backup -u openmetadata_user -p openmetadata_password -H postgresql -d openmetadata_db --port 5432 -s public -``` - -**3. Store the backup file somewhere safe** - -The above command will generate a backup file with extension as `.sql`. You can copy the name from the backup command output. - -Make sure to store it somewhere safe in case you need to restore the data later. +If you're a user of these services, you can leverage their backup capabilities directly: +- [Creating a DB snapshot in AWS](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateSnapshot.html) +- [Backup and restore in Azure MySQL](https://learn.microsoft.com/en-us/azure/mysql/single-server/concepts-backup) +- [About GCP Cloud SQL backup](https://cloud.google.com/sql/docs/mysql/backup-recovery/backups) You can refer to the following guide to get more details about the backup and restore: @@ -104,9 +84,6 @@ After the migration is finished, you can revert this changes. # Deprecation Notice -- Check the updated [docs](/connectors/pipeline/airflow/configuring-lineage#configuring-dag-lineage) on how to configure Airflow DAG's lineage. - We will deprecate the dictionary annotation in the 1.4 release, since the new annotation allows you to define lineage between - assets other than Tables. # Breaking Changes diff --git a/openmetadata-docs/content/v1.3.x/connectors/pipeline/airflow/configuring-lineage.md b/openmetadata-docs/content/v1.3.x/connectors/pipeline/airflow/configuring-lineage.md index b02001ee51e..832534af340 100644 --- a/openmetadata-docs/content/v1.3.x/connectors/pipeline/airflow/configuring-lineage.md +++ b/openmetadata-docs/content/v1.3.x/connectors/pipeline/airflow/configuring-lineage.md @@ -25,6 +25,8 @@ the relationship. This lineage configuration method is available for OpenMetadata release 1.2.3 or higher. +We support lineage for the following entities: `Table`, `Container`, `Dashboard`, `DashboardDataModel`, `Pipeline`, `Topic`, `SearchIndex` and `MlModel`. + {% /note %} Let's take a look at the following example: diff --git a/openmetadata-docs/content/v1.4.x-SNAPSHOT/connectors/pipeline/airflow/configuring-lineage.md b/openmetadata-docs/content/v1.4.x-SNAPSHOT/connectors/pipeline/airflow/configuring-lineage.md index b02001ee51e..832534af340 100644 --- a/openmetadata-docs/content/v1.4.x-SNAPSHOT/connectors/pipeline/airflow/configuring-lineage.md +++ b/openmetadata-docs/content/v1.4.x-SNAPSHOT/connectors/pipeline/airflow/configuring-lineage.md @@ -25,6 +25,8 @@ the relationship. This lineage configuration method is available for OpenMetadata release 1.2.3 or higher. +We support lineage for the following entities: `Table`, `Container`, `Dashboard`, `DashboardDataModel`, `Pipeline`, `Topic`, `SearchIndex` and `MlModel`. + {% /note %} Let's take a look at the following example: diff --git a/openmetadata-docs/content/v1.4.x-SNAPSHOT/deployment/backup-restore-metadata.md b/openmetadata-docs/content/v1.4.x-SNAPSHOT/deployment/backup-restore-metadata.md index 1355e4947a8..766ed334728 100644 --- a/openmetadata-docs/content/v1.4.x-SNAPSHOT/deployment/backup-restore-metadata.md +++ b/openmetadata-docs/content/v1.4.x-SNAPSHOT/deployment/backup-restore-metadata.md @@ -7,17 +7,51 @@ slug: /deployment/backup-restore-metadata ## Introduction -Since version 1.4.0, OpenMetadata's encourages using the builtin-tools for creating logical backups of the metadata: +Before upgrading your OpenMetadata version we strongly recommend backing up the metadata. -- `mysqldump` for MySQL -- `pg_dump` for Postgres +The source of truth is stored in the underlying database (MySQL and Postgres supported). During each version upgrade there +is a database migration process that needs to run. It will directly attack your database and update the shape of the +data to the newest OpenMetadata release. + +It is important that we backup the data because if we face any unexpected issues during the upgrade process, +you will be able to get back to the previous version without any loss. + +{% note %} + +You can learn more about how the migration process works [here](/deployment/upgrade/how-does-it-work). + +{% /note %} + +Since version 1.4.0, **OpenMetadata encourages using the builtin-tools for creating logical backups of the metadata**: + +- [mysqldump](https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html) for MySQL +- [pg_dump](https://www.postgresql.org/docs/current/app-pgdump.html) for Postgres + +For PROD deployment we recommend users to rely on cloud services for their databases, be it [AWS RDS](https://docs.aws.amazon.com/rds/), +[Azure SQL](https://azure.microsoft.com/en-in/products/azure-sql/database) or [GCP Cloud SQL](https://cloud.google.com/sql/). + +If you're a user of these services, you can leverage their backup capabilities directly: +- [Creating a DB snapshot in AWS](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateSnapshot.html) +- [Backup and restore in Azure MySQL](https://learn.microsoft.com/en-us/azure/mysql/single-server/concepts-backup) +- [About GCP Cloud SQL backup](https://cloud.google.com/sql/docs/mysql/backup-recovery/backups) ## Requirements -- mysqldump 8.3 or higher (ingestion container is shipped with mysqldump 8.3) -- pg_dump 13.3 or higher +- `mysqldump` 8.3 or higher +- `pg_dump` 13.3 or higher -# Example +If you're running the project using docker compose, the `ingestion` container already comes packaged with the +correct `mysqldump` and `pg_dump` versions ready to use. + +## Storing the backup files + +It's important that when you backup your database, you keep the snapshot safe in case you need in later. + +You can check these two examples on how to: +- Use pipes to stream the result directly to S3 (or AWS blob storage) ([link](https://devcoops.com/pg_dump-to-s3-directly/?utm_content=cmp-true)). +- Dump to a file and copy to storage ([link](https://gist.github.com/bbcoimbra/0914c7e0f96e8ad53dfad79c64863c87)). + +# Example with Docker Start a local instance of OpenMetadata using the `docker-compose` file provided in the repository. Then, we can use the following commands to backup the metadata: