feat(CLI): added migration logic cmd (#9437)

This commit is contained in:
Teddy 2022-12-21 08:55:18 +01:00 committed by GitHub
parent 9a3d599f30
commit 9e01fe0636
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 366 additions and 0 deletions

View File

@ -0,0 +1,109 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
utility to update dat config file for migration from v0.12.3 to 0.13
"""
import json
import os
from copy import deepcopy
from metadata.utils.logger import cli_logger
logger = cli_logger()
def run_openmetadata_dag_config_migration(dir_path: str, keep_backups: bool) -> None:
"""Update DAG config file by removing dbtConfig key and
supportMetadataExtraction keys
Args:
dir_path (str): path directory defaults to `/opt/airflow/dag_generated_configs`
"""
for root, _, files in os.walk(dir_path):
filenames = [file_ for file_ in files if os.path.splitext(file_)[1] == ".json"]
logger.info(
f"{len(filenames)} files found in `{root}`."
"\nChecking config. in the following files:\n\t{file_list}".format(
file_list="\n\t".join(filenames)
)
)
for filename in filenames:
logger.info(f"Checking config. file: {filename}")
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as config_file:
try:
orig_fle_data = json.loads(config_file.read())
except json.JSONDecodeError:
logger.error(
f"Error decoding file {filename}. "
"The file will be skipped. You should verify if the file needs any update manually."
)
continue
fle_data = deepcopy(
orig_fle_data
) # We keep a copy of the original file data to see if any change was made
try:
fle_data["sourceConfig"]["config"]["dbtConfigSource"]
except KeyError:
logger.error(
f"Could not find the key `dbtConfigSource` in {filename}. Skipping key deletion."
)
else:
del fle_data["sourceConfig"]["config"]["dbtConfigSource"]
logger.info(f"Successfully removed key `dbtConfigSource` in {filename}")
try:
fle_data["openMetadataServerConnection"]["supportsMetadataExtraction"]
except KeyError:
logger.error(
f"Could not find the key `supportsMetadataExtraction` in {filename}. Skipping key deletion."
)
else:
del fle_data["openMetadataServerConnection"][
"supportsMetadataExtraction"
]
logger.info(
f"Successfully removed key `supportsMetadataExtraction` in {filename}"
)
try:
fle_data["sourceConfig"]["config"]["markDeletedTablesFromFilterOnly"]
except KeyError:
logger.error(
f"Could not find the key `markDeletedTablesFromFilterOnly` in {filename}. Skipping key deletion."
)
else:
del fle_data["sourceConfig"]["config"][
"markDeletedTablesFromFilterOnly"
]
logger.info(
f"Successfully removed key `markDeletedTablesFromFilterOnly` in {filename}"
)
if orig_fle_data != fle_data:
with open(
os.path.join(root, filename), "w", encoding="utf-8"
) as config_file:
config_file.write(json.dumps(fle_data))
logger.info(f"File {filename} successfuly updated")
if keep_backups:
with open(
os.path.join(root, f"{filename}.bak"), "w", encoding="utf-8"
) as bak_config_file:
bak_config_file.write(json.dumps(orig_fle_data))
logger.info(f"Backup File {filename}.bak successfuly updated")

View File

@ -22,6 +22,9 @@ from metadata.cli.backup import UploadDestinationType, run_backup
from metadata.cli.dataquality import run_test
from metadata.cli.docker import BACKEND_DATABASES, DockerActions, run_docker
from metadata.cli.ingest import run_ingest
from metadata.cli.openmetadata_dag_config_migration import (
run_openmetadata_dag_config_migration,
)
from metadata.cli.openmetadata_imports_migration import (
run_openmetadata_imports_migration,
)
@ -42,6 +45,7 @@ class MetadataCommands(Enum):
RESTORE = "restore"
WEBHOOK = "webhook"
OPENMETADATA_IMPORTS_MIGRATION = "openmetadata_imports_migration"
OPENMETADATA_DAG_CONFIG_MIGRATION = "openmetadata_dag_config_migration"
OM_IMPORTS_MIGRATION = """
@ -52,6 +56,13 @@ OM_IMPORTS_MIGRATION = """
This small CLI utility allows you to update both elements.
"""
OM_DAG_CONFIG_MIGRATION = """
Update DAG Config files generated after creating workflow in 0.12 and before.
In 0.13 certains keys of the dag config. files have been removed. This small
utility command allows you to update legacy dag config files. Note this can
also be done manually through the UI by clicking on `redeploy`
"""
BACKUP_HELP = """
Run a backup for the metadata DB. Uses a custom dump strategy for OpenMetadata tables.
@ -104,6 +115,22 @@ def create_openmetadata_imports_migration_args(parser: argparse.ArgumentParser):
)
def create_openmetadata_dag_config_migration_args(parser: argparse.ArgumentParser):
parser.add_argument(
"-d",
"--dir-path",
default="/opt/airflow/dag_generated_configs",
type=pathlib.Path,
help="Path to the DAG folder. Default to `/opt/airflow/dag_generated_configs`",
)
parser.add_argument(
"--keep-backups",
help="Flag option. If passed, old files will be kept as backups <filename>.json.bak",
action="store_true",
)
def docker_args(parser: argparse.ArgumentParser):
"""
Addtional Parser Arguments for Docker
@ -324,6 +351,12 @@ def get_parser(args=None):
help=OM_IMPORTS_MIGRATION,
)
)
create_openmetadata_dag_config_migration_args(
sub_parser.add_parser(
MetadataCommands.OPENMETADATA_DAG_CONFIG_MIGRATION.value,
help=OM_DAG_CONFIG_MIGRATION,
)
)
docker_args(
sub_parser.add_parser(MetadataCommands.DOCKER.value, help="Docker Quickstart")
)
@ -445,3 +478,8 @@ def metadata(args=None):
run_openmetadata_imports_migration(
contains_args.get("dir_path"), contains_args.get("change_config_file_path")
)
if metadata_workflow == MetadataCommands.OPENMETADATA_DAG_CONFIG_MIGRATION.value:
run_openmetadata_dag_config_migration(
contains_args.get("dir_path"), contains_args.get("keep_backups")
)

View File

@ -0,0 +1,110 @@
{
"id": "55048a4e-d23e-4c18-b684-03bfb51308a8",
"name": "rds_metadata_LTu8aqU4",
"displayName": "rds_metadata_LTu8aqU4",
"description": null,
"pipelineType": "metadata",
"owner": {
"id": "9f815a7d-df5a-4cf3-969d-8579becce8e4",
"type": "user",
"name": "admin",
"fullyQualifiedName": "admin",
"description": null,
"displayName": null,
"deleted": false,
"href": null
},
"fullyQualifiedName": "rds.rds_metadata_LTu8aqU4",
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
"markDeletedTables": true,
"markDeletedTablesFromFilterOnly": false,
"includeTables": true,
"includeViews": false,
"includeTags": false,
"useFqnForFiltering": false,
"schemaFilterPattern": {
"includes": [
"dbt_jaffle"
],
"excludes": null
},
"tableFilterPattern": null,
"databaseFilterPattern": null,
"dbtConfigSource": {
"dbtCatalogFilePath": "/opt/airflow/catalog.json",
"dbtManifestFilePath": "/opt/airflow/manifest.json",
"dbtRunResultsFilePath": "",
"dbtUpdateDescriptions": false
}
}
},
"openMetadataServerConnection": {
"clusterName": "openmetadata",
"type": "OpenMetadata",
"hostPort": "http://openmetadata-server:8585/api",
"authProvider": "openmetadata",
"verifySSL": "no-ssl",
"sslConfig": null,
"securityConfig": {
"jwtToken": "myToken"
},
"secretsManagerProvider": "noop",
"secretsManagerCredentials": null,
"apiVersion": "v1",
"includeTopics": true,
"includeTables": true,
"includeDashboards": true,
"includePipelines": true,
"includeMlModels": true,
"includeUsers": true,
"includeTeams": true,
"includeGlossaryTerms": true,
"includeTags": true,
"includePolicy": true,
"includeMessagingServices": true,
"enableVersionValidation": true,
"includeDatabaseServices": true,
"includePipelineServices": true,
"limitRecords": 1000,
"forceEntityOverwriting": false,
"supportsMetadataExtraction": true
},
"airflowConfig": {
"pausePipeline": false,
"concurrency": 1,
"startDate": null,
"endDate": null,
"pipelineTimezone": "UTC",
"retries": 3,
"retryDelay": 300,
"pipelineCatchup": false,
"scheduleInterval": "0 * * * *",
"maxActiveRuns": 1,
"workflowTimeout": null,
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
"email": null
},
"service": {
"id": "a8517b60-e69b-4841-b123-2398fedf9f28",
"type": "databaseService",
"name": "rds",
"fullyQualifiedName": "rds",
"description": "",
"displayName": null,
"deleted": false,
"href": null
},
"pipelineStatuses": null,
"loggerLevel": "INFO",
"deployed": null,
"enabled": true,
"href": "http://localhost:8585/api/v1/services/ingestionPipelines/55048a4e-d23e-4c18-b684-03bfb51308a8",
"version": 0.1,
"updatedAt": 1671541748444,
"updatedBy": "admin",
"changeDescription": null,
"deleted": false
}

View File

@ -0,0 +1,109 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test dag config migration cli script
"""
import json
import os
from pathlib import Path
from unittest import TestCase
from metadata.cli.openmetadata_dag_config_migration import (
run_openmetadata_dag_config_migration,
)
class TestOpenmetadataImportsMigration(TestCase):
"""Test class for the cli scrip test"""
store = dict()
resources_path = Path(__file__).parent.absolute() / "resources"
@classmethod
def setUpClass(cls) -> None:
for root, _, filenames in os.walk(cls.resources_path):
for filename in filenames:
with open(os.path.join(root, filename), "r", encoding="utf-8") as fle:
cls.store[os.path.join(root, filename)] = fle.read()
def test_run_openmetadata_imports_migration_w_bak(self):
"""test the run openmetadata function"""
run_openmetadata_dag_config_migration(self.resources_path, True)
failures = []
for root, _, filenames in os.walk(self.resources_path):
assert any(".json.bak" in filename for filename in filenames)
for filename in filenames:
if os.path.splitext(filename)[1] == ".json":
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as fle:
data = json.loads(fle.read())
try:
data["sourceConfig"]["config"]["dbtConfigSource"]
except KeyError:
pass
else:
failures.append(filename)
try:
data["openMetadataServerConnection"][
"supportsMetadataExtraction"
]
except KeyError:
pass
else:
failures.append(filename)
try:
data["sourceConfig"]["config"][
"markDeletedTablesFromFilterOnly"
]
except KeyError:
pass
else:
failures.append(filename)
if os.path.splitext(filename)[1] == ".bak":
with open(
os.path.join(root, filename), "r", encoding="utf-8"
) as fle:
data = json.loads(fle.read())
try:
data["sourceConfig"]["config"]["dbtConfigSource"]
except KeyError:
failures.append(filename)
try:
data["openMetadataServerConnection"][
"supportsMetadataExtraction"
]
except KeyError:
failures.append(filename)
try:
data["sourceConfig"]["config"][
"markDeletedTablesFromFilterOnly"
]
except KeyError:
failures.append(filename)
assert not failures
@classmethod
def tearDownClass(cls) -> None:
for file_path, file_content in cls.store.items():
with open(file_path, "w", encoding="utf-8") as fle:
fle.write(file_content)
for root, _, filenames in os.walk(cls.resources_path):
bak_files = [
file_ for file_ in filenames if os.path.splitext(file_)[1] == ".bak"
]
for bak_file in bak_files:
os.remove(os.path.join(root, bak_file))