From 9e01fe0636bf2c36009984d84ef4002aad7e7618 Mon Sep 17 00:00:00 2001 From: Teddy Date: Wed, 21 Dec 2022 08:55:18 +0100 Subject: [PATCH] feat(CLI): added migration logic cmd (#9437) --- .../cli/openmetadata_dag_config_migration.py | 109 +++++++++++++++++ ingestion/src/metadata/cmd.py | 38 ++++++ .../metadata/cli/resources/dag_config.json | 110 ++++++++++++++++++ .../test_openmetadata_dag_config_migration.py | 109 +++++++++++++++++ 4 files changed, 366 insertions(+) create mode 100644 ingestion/src/metadata/cli/openmetadata_dag_config_migration.py create mode 100644 ingestion/tests/unit/metadata/cli/resources/dag_config.json create mode 100644 ingestion/tests/unit/metadata/cli/test_openmetadata_dag_config_migration.py diff --git a/ingestion/src/metadata/cli/openmetadata_dag_config_migration.py b/ingestion/src/metadata/cli/openmetadata_dag_config_migration.py new file mode 100644 index 00000000000..63f9ed7d8cd --- /dev/null +++ b/ingestion/src/metadata/cli/openmetadata_dag_config_migration.py @@ -0,0 +1,109 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +utility to update dat config file for migration from v0.12.3 to 0.13 +""" + +import json +import os +from copy import deepcopy + +from metadata.utils.logger import cli_logger + +logger = cli_logger() + + +def run_openmetadata_dag_config_migration(dir_path: str, keep_backups: bool) -> None: + """Update DAG config file by removing dbtConfig key and + supportMetadataExtraction keys + + Args: + dir_path (str): path directory defaults to `/opt/airflow/dag_generated_configs` + """ + for root, _, files in os.walk(dir_path): + filenames = [file_ for file_ in files if os.path.splitext(file_)[1] == ".json"] + logger.info( + f"{len(filenames)} files found in `{root}`." + "\nChecking config. in the following files:\n\t{file_list}".format( + file_list="\n\t".join(filenames) + ) + ) + + for filename in filenames: + logger.info(f"Checking config. file: {filename}") + with open( + os.path.join(root, filename), "r", encoding="utf-8" + ) as config_file: + try: + orig_fle_data = json.loads(config_file.read()) + except json.JSONDecodeError: + logger.error( + f"Error decoding file {filename}. " + "The file will be skipped. You should verify if the file needs any update manually." + ) + continue + + fle_data = deepcopy( + orig_fle_data + ) # We keep a copy of the original file data to see if any change was made + + try: + fle_data["sourceConfig"]["config"]["dbtConfigSource"] + except KeyError: + logger.error( + f"Could not find the key `dbtConfigSource` in {filename}. Skipping key deletion." + ) + else: + del fle_data["sourceConfig"]["config"]["dbtConfigSource"] + logger.info(f"Successfully removed key `dbtConfigSource` in {filename}") + + try: + fle_data["openMetadataServerConnection"]["supportsMetadataExtraction"] + except KeyError: + logger.error( + f"Could not find the key `supportsMetadataExtraction` in {filename}. Skipping key deletion." + ) + else: + del fle_data["openMetadataServerConnection"][ + "supportsMetadataExtraction" + ] + logger.info( + f"Successfully removed key `supportsMetadataExtraction` in {filename}" + ) + + try: + fle_data["sourceConfig"]["config"]["markDeletedTablesFromFilterOnly"] + except KeyError: + logger.error( + f"Could not find the key `markDeletedTablesFromFilterOnly` in {filename}. Skipping key deletion." + ) + else: + del fle_data["sourceConfig"]["config"][ + "markDeletedTablesFromFilterOnly" + ] + logger.info( + f"Successfully removed key `markDeletedTablesFromFilterOnly` in {filename}" + ) + + if orig_fle_data != fle_data: + with open( + os.path.join(root, filename), "w", encoding="utf-8" + ) as config_file: + config_file.write(json.dumps(fle_data)) + logger.info(f"File {filename} successfuly updated") + + if keep_backups: + with open( + os.path.join(root, f"{filename}.bak"), "w", encoding="utf-8" + ) as bak_config_file: + bak_config_file.write(json.dumps(orig_fle_data)) + logger.info(f"Backup File {filename}.bak successfuly updated") diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 5335e784f68..34cec218d17 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -22,6 +22,9 @@ from metadata.cli.backup import UploadDestinationType, run_backup from metadata.cli.dataquality import run_test from metadata.cli.docker import BACKEND_DATABASES, DockerActions, run_docker from metadata.cli.ingest import run_ingest +from metadata.cli.openmetadata_dag_config_migration import ( + run_openmetadata_dag_config_migration, +) from metadata.cli.openmetadata_imports_migration import ( run_openmetadata_imports_migration, ) @@ -42,6 +45,7 @@ class MetadataCommands(Enum): RESTORE = "restore" WEBHOOK = "webhook" OPENMETADATA_IMPORTS_MIGRATION = "openmetadata_imports_migration" + OPENMETADATA_DAG_CONFIG_MIGRATION = "openmetadata_dag_config_migration" OM_IMPORTS_MIGRATION = """ @@ -52,6 +56,13 @@ OM_IMPORTS_MIGRATION = """ This small CLI utility allows you to update both elements. """ +OM_DAG_CONFIG_MIGRATION = """ + Update DAG Config files generated after creating workflow in 0.12 and before. + In 0.13 certains keys of the dag config. files have been removed. This small + utility command allows you to update legacy dag config files. Note this can + also be done manually through the UI by clicking on `redeploy` + """ + BACKUP_HELP = """ Run a backup for the metadata DB. Uses a custom dump strategy for OpenMetadata tables. @@ -104,6 +115,22 @@ def create_openmetadata_imports_migration_args(parser: argparse.ArgumentParser): ) +def create_openmetadata_dag_config_migration_args(parser: argparse.ArgumentParser): + parser.add_argument( + "-d", + "--dir-path", + default="/opt/airflow/dag_generated_configs", + type=pathlib.Path, + help="Path to the DAG folder. Default to `/opt/airflow/dag_generated_configs`", + ) + + parser.add_argument( + "--keep-backups", + help="Flag option. If passed, old files will be kept as backups .json.bak", + action="store_true", + ) + + def docker_args(parser: argparse.ArgumentParser): """ Addtional Parser Arguments for Docker @@ -324,6 +351,12 @@ def get_parser(args=None): help=OM_IMPORTS_MIGRATION, ) ) + create_openmetadata_dag_config_migration_args( + sub_parser.add_parser( + MetadataCommands.OPENMETADATA_DAG_CONFIG_MIGRATION.value, + help=OM_DAG_CONFIG_MIGRATION, + ) + ) docker_args( sub_parser.add_parser(MetadataCommands.DOCKER.value, help="Docker Quickstart") ) @@ -445,3 +478,8 @@ def metadata(args=None): run_openmetadata_imports_migration( contains_args.get("dir_path"), contains_args.get("change_config_file_path") ) + + if metadata_workflow == MetadataCommands.OPENMETADATA_DAG_CONFIG_MIGRATION.value: + run_openmetadata_dag_config_migration( + contains_args.get("dir_path"), contains_args.get("keep_backups") + ) diff --git a/ingestion/tests/unit/metadata/cli/resources/dag_config.json b/ingestion/tests/unit/metadata/cli/resources/dag_config.json new file mode 100644 index 00000000000..d7b45eac2d4 --- /dev/null +++ b/ingestion/tests/unit/metadata/cli/resources/dag_config.json @@ -0,0 +1,110 @@ +{ + "id": "55048a4e-d23e-4c18-b684-03bfb51308a8", + "name": "rds_metadata_LTu8aqU4", + "displayName": "rds_metadata_LTu8aqU4", + "description": null, + "pipelineType": "metadata", + "owner": { + "id": "9f815a7d-df5a-4cf3-969d-8579becce8e4", + "type": "user", + "name": "admin", + "fullyQualifiedName": "admin", + "description": null, + "displayName": null, + "deleted": false, + "href": null + }, + "fullyQualifiedName": "rds.rds_metadata_LTu8aqU4", + "sourceConfig": { + "config": { + "type": "DatabaseMetadata", + "markDeletedTables": true, + "markDeletedTablesFromFilterOnly": false, + "includeTables": true, + "includeViews": false, + "includeTags": false, + "useFqnForFiltering": false, + "schemaFilterPattern": { + "includes": [ + "dbt_jaffle" + ], + "excludes": null + }, + "tableFilterPattern": null, + "databaseFilterPattern": null, + "dbtConfigSource": { + "dbtCatalogFilePath": "/opt/airflow/catalog.json", + "dbtManifestFilePath": "/opt/airflow/manifest.json", + "dbtRunResultsFilePath": "", + "dbtUpdateDescriptions": false + } + } + }, + "openMetadataServerConnection": { + "clusterName": "openmetadata", + "type": "OpenMetadata", + "hostPort": "http://openmetadata-server:8585/api", + "authProvider": "openmetadata", + "verifySSL": "no-ssl", + "sslConfig": null, + "securityConfig": { + "jwtToken": "myToken" + }, + "secretsManagerProvider": "noop", + "secretsManagerCredentials": null, + "apiVersion": "v1", + "includeTopics": true, + "includeTables": true, + "includeDashboards": true, + "includePipelines": true, + "includeMlModels": true, + "includeUsers": true, + "includeTeams": true, + "includeGlossaryTerms": true, + "includeTags": true, + "includePolicy": true, + "includeMessagingServices": true, + "enableVersionValidation": true, + "includeDatabaseServices": true, + "includePipelineServices": true, + "limitRecords": 1000, + "forceEntityOverwriting": false, + "supportsMetadataExtraction": true + }, + "airflowConfig": { + "pausePipeline": false, + "concurrency": 1, + "startDate": null, + "endDate": null, + "pipelineTimezone": "UTC", + "retries": 3, + "retryDelay": 300, + "pipelineCatchup": false, + "scheduleInterval": "0 * * * *", + "maxActiveRuns": 1, + "workflowTimeout": null, + "workflowDefaultView": "tree", + "workflowDefaultViewOrientation": "LR", + "email": null + }, + "service": { + "id": "a8517b60-e69b-4841-b123-2398fedf9f28", + "type": "databaseService", + "name": "rds", + "fullyQualifiedName": "rds", + "description": "", + "displayName": null, + "deleted": false, + "href": null + }, + "pipelineStatuses": null, + "loggerLevel": "INFO", + "deployed": null, + "enabled": true, + "href": "http://localhost:8585/api/v1/services/ingestionPipelines/55048a4e-d23e-4c18-b684-03bfb51308a8", + "version": 0.1, + "updatedAt": 1671541748444, + "updatedBy": "admin", + "changeDescription": null, + "deleted": false +} \ No newline at end of file diff --git a/ingestion/tests/unit/metadata/cli/test_openmetadata_dag_config_migration.py b/ingestion/tests/unit/metadata/cli/test_openmetadata_dag_config_migration.py new file mode 100644 index 00000000000..556f7b77db8 --- /dev/null +++ b/ingestion/tests/unit/metadata/cli/test_openmetadata_dag_config_migration.py @@ -0,0 +1,109 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +test dag config migration cli script +""" +import json +import os +from pathlib import Path +from unittest import TestCase + +from metadata.cli.openmetadata_dag_config_migration import ( + run_openmetadata_dag_config_migration, +) + + +class TestOpenmetadataImportsMigration(TestCase): + """Test class for the cli scrip test""" + + store = dict() + resources_path = Path(__file__).parent.absolute() / "resources" + + @classmethod + def setUpClass(cls) -> None: + for root, _, filenames in os.walk(cls.resources_path): + for filename in filenames: + with open(os.path.join(root, filename), "r", encoding="utf-8") as fle: + cls.store[os.path.join(root, filename)] = fle.read() + + def test_run_openmetadata_imports_migration_w_bak(self): + """test the run openmetadata function""" + run_openmetadata_dag_config_migration(self.resources_path, True) + failures = [] + + for root, _, filenames in os.walk(self.resources_path): + assert any(".json.bak" in filename for filename in filenames) + for filename in filenames: + if os.path.splitext(filename)[1] == ".json": + with open( + os.path.join(root, filename), "r", encoding="utf-8" + ) as fle: + data = json.loads(fle.read()) + try: + data["sourceConfig"]["config"]["dbtConfigSource"] + except KeyError: + pass + else: + failures.append(filename) + try: + data["openMetadataServerConnection"][ + "supportsMetadataExtraction" + ] + except KeyError: + pass + else: + failures.append(filename) + try: + data["sourceConfig"]["config"][ + "markDeletedTablesFromFilterOnly" + ] + except KeyError: + pass + else: + failures.append(filename) + + if os.path.splitext(filename)[1] == ".bak": + with open( + os.path.join(root, filename), "r", encoding="utf-8" + ) as fle: + data = json.loads(fle.read()) + try: + data["sourceConfig"]["config"]["dbtConfigSource"] + except KeyError: + failures.append(filename) + try: + data["openMetadataServerConnection"][ + "supportsMetadataExtraction" + ] + except KeyError: + failures.append(filename) + try: + data["sourceConfig"]["config"][ + "markDeletedTablesFromFilterOnly" + ] + except KeyError: + failures.append(filename) + + assert not failures + + @classmethod + def tearDownClass(cls) -> None: + for file_path, file_content in cls.store.items(): + with open(file_path, "w", encoding="utf-8") as fle: + fle.write(file_content) + + for root, _, filenames in os.walk(cls.resources_path): + bak_files = [ + file_ for file_ in filenames if os.path.splitext(file_)[1] == ".bak" + ] + for bak_file in bak_files: + os.remove(os.path.join(root, bak_file))