MINOR - Docs and deprecations (#16118)

* MINOR - Docs and deprecations

* clean
This commit is contained in:
Pere Miquel Brull 2024-05-03 10:18:29 +02:00 committed by GitHub
parent f4cf6ca14a
commit a0db32c18a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 53 additions and 1147 deletions

View File

@ -1,212 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PII application
"""
import traceback
from typing import Iterable, List, Optional
from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import (
AutoTaggerAppConfig,
)
from metadata.generated.schema.entity.data.table import Column, Table, TableData
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.application import (
OpenMetadataApplicationConfig,
)
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagFQN,
TagLabel,
TagSource,
)
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.constants import PII
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
from metadata.pii.scanners.ner_scanner import NERScanner
from metadata.utils.logger import app_logger
from metadata.workflow.application import AppRunner, InvalidAppConfiguration
logger = app_logger()
DEFAULT_CONFIDENCE = 80
class AutoTaggerApp(AppRunner):
"""
PII Application
You can execute it with `metadata app -c <path-to-yaml>`
with a YAML file like:
sourcePythonClass: metadata.applications.auto_tagger.AutoTaggerApp
appConfig:
type: AutoTagger
confidenceLevel: 80
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "..."
"""
def __init__(self, config: OpenMetadataApplicationConfig, metadata: OpenMetadata):
super().__init__(config, metadata)
if not isinstance(self.app_config, AutoTaggerAppConfig):
raise InvalidAppConfiguration(
f"AutoTagger Runner expects an AutoTaggerAppConfig, we got [{config}]"
)
self._ner_scanner = None
self.confidence_threshold = (
self.app_config.confidenceLevel or DEFAULT_CONFIDENCE
)
@property
def name(self) -> str:
return "Auto Tagger"
@staticmethod
def build_column_tag(tag_fqn: str, column_fqn: str) -> ColumnTag:
"""
Build the tag and run the PATCH
"""
tag_label = TagLabel(
tagFQN=TagFQN(__root__=tag_fqn),
source=TagSource.Classification,
state=State.Suggested,
labelType=LabelType.Automated,
)
return ColumnTag(column_fqn=column_fqn, tag_label=tag_label)
@property
def ner_scanner(self) -> NERScanner:
"""Load the NER Scanner only if called"""
if self._ner_scanner is None:
self._ner_scanner = NERScanner()
return self._ner_scanner
def process_column(
self,
idx: int,
column: Column,
table_data: Optional[TableData],
confidence_threshold: float,
) -> Optional[List[ColumnTag]]:
"""
Tag a column with PII if we find it using our scanners
"""
# First, check if the column we are about to process
# already has PII tags or not
column_has_pii_tag = any(
(PII in tag.tagFQN.__root__ for tag in column.tags or [])
)
# If it has PII tags, we skip the processing
# for the column
if column_has_pii_tag is True:
return None
# Scan by column name. If no results there, check the sample data, if any
tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None
)
if (
tag_and_confidence
and tag_and_confidence.tag_fqn
and tag_and_confidence.confidence >= confidence_threshold / 100
):
# We support returning +1 tags for a single column in _run
return [
self.build_column_tag(
tag_fqn=tag_and_confidence.tag_fqn,
column_fqn=column.fullyQualifiedName.__root__,
)
]
return None
def process_table(self, table: Table) -> Optional[List[ColumnTag]]:
"""Run the patching of the table"""
column_tags = []
for idx, column in enumerate(table.columns):
try:
col_tags = self.process_column(
idx=idx,
column=column,
table_data=table.sampleData,
confidence_threshold=self.confidence_threshold,
)
if col_tags:
column_tags.extend(col_tags)
except Exception as err:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error=f"Error computing PII tags for [{column}] - [{err}]",
stackTrace=traceback.format_exc(),
)
)
if column_tags:
return column_tags
return None
def patch_columns(self, table: Table, column_tags: List[ColumnTag]) -> None:
"""Patch columns with PII"""
patched = self.metadata.patch_column_tags(table=table, column_tags=column_tags)
if not patched:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error="Error patching tags for table",
)
)
else:
self.status.scanned(table)
logger.debug(
f"Successfully patched tag {column_tags} for {table.fullyQualifiedName.__root__}"
)
def run(self) -> None:
"""
The PII Application will:
1. List tables
2. Check their column names and sample data (if any)
3. PATCH PII tags when needed
"""
tables: Iterable[Table] = self.metadata.list_all_entities(
entity=Table, fields=["sampleData", "tags"]
)
for table in tables:
column_tags = self.process_table(table)
if column_tags:
self.patch_columns(table=table, column_tags=column_tags)
else:
self.status.filter(
key=table.fullyQualifiedName.__root__, reason="No PII found"
)
def close(self) -> None:
"""Nothing to close"""

View File

@ -31,7 +31,6 @@ def run_app(config_path: Path) -> None:
:param config_path: Path to load JSON config
"""
config_dict = None
try:
config_dict = load_config_file(config_path)
workflow = ApplicationWorkflow.create(config_dict)

View File

@ -1,209 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Backup utility for the metadata CLI
"""
import traceback
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Optional, Tuple
from metadata.cli.db_dump import dump
from metadata.cli.utils import get_engine
from metadata.utils.helpers import BackupRestoreArgs
from metadata.utils.logger import ANSI, cli_logger, log_ansi_encoded_string
class UploadDestinationType(Enum):
AWS = "AWS"
AZURE = "Azure"
logger = cli_logger()
def get_output(output: Optional[str] = None, filename: Optional[str] = None) -> Path:
"""
Helper function to prepare the output backup file
path and name.
It will create the output dir if it does not exist.
:param output: local path to store the backup
:param filename: name of the backup file
:return: backup file name
"""
now = datetime.now().strftime("%Y%m%d%H%M")
name = filename if filename else f"openmetadata_{now}_backup.sql"
if output:
path = Path(output).expanduser()
# Create the output directory if it does not exist
if not path.is_dir():
path.mkdir(parents=True, exist_ok=True)
return path / name
return Path(name)
def upload_backup_aws(endpoint: str, bucket: str, key: str, file: Path) -> None:
"""
Upload the mysqldump backup file.
We will use boto3 to upload the file to the endpoint
and the key provided.
:param endpoint: S3 endpoint
:param bucket: S3 bucket to upload the file to
:param key: S3 key to upload the backup file
:param file: file to upload
"""
try:
# We just want to force boto3 install if uploading backup
# pylint: disable=import-outside-toplevel
import boto3
from boto3.exceptions import S3UploadFailedError
except ModuleNotFoundError as err:
logger.debug(traceback.format_exc())
logger.error(
"Trying to import boto3 to run the backup upload."
+ " Please install openmetadata-ingestion[backup]."
)
raise err
s3_key = Path(key) / file.name
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message=f"Uploading {file} to {endpoint}/{bucket}/{str(s3_key)}...",
)
try:
resource = boto3.resource(service_name="s3", endpoint_url=endpoint)
resource.Object(bucket, str(s3_key)).upload_file(str(file.absolute()))
except ValueError as err:
logger.debug(traceback.format_exc())
logger.error("Revisit the values of --upload")
raise err
except S3UploadFailedError as err:
logger.debug(traceback.format_exc())
logger.error(
"Error when uploading the backup to S3. Revisit the config and permissions."
+ " You should have set the environment values for AWS_ACCESS_KEY_ID"
+ " and AWS_SECRET_ACCESS_KEY"
)
raise err
def upload_backup_azure(account_url: str, container: str, file: Path) -> None:
"""
Upload the mysqldump backup file.
:param account_url: Azure account url
:param container: Azure container to upload file to
:param file: file to upload
"""
try:
# pylint: disable=import-outside-toplevel
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
default_credential = DefaultAzureCredential()
# Create the BlobServiceClient object
blob_service_client = BlobServiceClient(
account_url, credential=default_credential
)
except ModuleNotFoundError as err:
logger.debug(traceback.format_exc())
logger.error(
"Trying to import DefaultAzureCredential to run the backup upload."
)
raise err
log_ansi_encoded_string(
color=ANSI.GREEN,
message=f"Uploading {file} to {account_url}/{container}...",
)
try:
# Create a blob client using the local file name as the name for the blob
blob_client = blob_service_client.get_blob_client(
container=container, blob=file.name
)
# Upload the created file
with open(file=file, mode="rb") as data:
blob_client.upload_blob(data)
except ValueError as err:
logger.debug(traceback.format_exc())
logger.error("Revisit the values of --upload")
raise err
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
raise err
def run_backup(
common_backup_obj_instance: BackupRestoreArgs,
output: Optional[str],
filename: Optional[str],
upload_destination_type: Optional[UploadDestinationType],
upload: Optional[Tuple[str, str, str]],
) -> None:
"""
Run `mysqldump` to MySQL database and store the
output. Optionally, upload it to S3.
:param common_backup_obj_instance: cls instance to fetch common args
:param output: local path to store the backup
:param filename: filename to store the backup
:param upload_destination_type: Azure or AWS Destination Type
:param upload: URI to upload result file
"""
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=True,
message="WARNING: backup is deprecated starting 1.4.0. Use database native dump tools instead."
"For more information, please visit: "
"https://docs.open-metadata.org/v1.4.x/deployment/backup-restore-metadata",
)
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message="Creating OpenMetadata backup for "
f"{common_backup_obj_instance.host}:{common_backup_obj_instance.port}/{common_backup_obj_instance.database}...",
)
out = get_output(output, filename)
engine = get_engine(common_args=common_backup_obj_instance)
dump(engine=engine, output=out, schema=common_backup_obj_instance.schema)
log_ansi_encoded_string(
color=ANSI.GREEN, bold=False, message=f"Backup stored locally under {out}"
)
if upload:
if upload_destination_type == UploadDestinationType.AWS.value:
endpoint, bucket, key = upload
upload_backup_aws(endpoint, bucket, key, out)
elif upload_destination_type.title() == UploadDestinationType.AZURE.value:
# only need two parameters from upload, key would be null
account_url, container, key = upload
upload_backup_azure(account_url, container, out)

View File

@ -1,239 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Database Dumping utility for the metadata CLI
"""
import json
from functools import singledispatch
from pathlib import Path
from typing import Iterable, List, Optional, Union
from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine, Row
from metadata.utils.constants import UTF_8
TABLES_DUMP_ALL = {
"task_sequence",
"entity_usage",
"entity_relationship",
"entity_extension",
"field_relationship",
"tag_usage",
"openmetadata_settings",
"profiler_data_time_series",
}
CUSTOM_TABLES = {"entity_extension_time_series": {"exclude_columns": ["timestamp"]}}
NOT_MIGRATE = {"DATABASE_CHANGE_LOG", "SERVER_MIGRATION_SQL_LOGS", "SERVER_CHANGE_LOG"}
STATEMENT_JSON = "SELECT json FROM {table}"
STATEMENT_HASH_JSON = "SELECT json, {hash_column_name} FROM {table}"
STATEMENT_ALL = "SELECT * FROM {table}"
STATEMENT_TRUNCATE = "TRUNCATE TABLE {table};\n"
STATEMENT_ALL_NEW = "SELECT {cols} FROM {table}"
MYSQL_ENGINE_NAME = "mysql"
FQN_HASH_COLUMN = "fqnHash"
NAME_HASH_COLUMN = "nameHash"
def single_quote_wrap(raw: str) -> str:
"""
Add single quote wrap to string. From `str` to `'str'`
"""
return f"'{raw}'"
def clean_str(raw: str, engine: Engine) -> str:
"""
String cleaning for SQL parsing.
Logic is different between MySQL and Postgres
- descriptions/comments with single quotes, e.g., `Mysql's data`.
get converted to `Mysql''s data`
- To insert a literal backlash in MySQL you need to escape with another one. This applies for `\n` and `\"` in
inner JSONs for a field. This is not required for postgres
"""
quoted_str = raw.replace("'", "''")
if engine.name == MYSQL_ENGINE_NAME:
quoted_str = quoted_str.replace("\\", "\\\\")
return quoted_str
@singledispatch
def clean_col(column_raw: Optional[Union[dict, str]], engine: Engine) -> str:
return (
single_quote_wrap(clean_str(str(column_raw), engine))
if column_raw is not None
else "null"
)
@clean_col.register(dict)
@clean_col.register(list)
def _(column_raw: Optional[Union[dict, list]], engine: Engine) -> str:
"""
Prepare the JSON column to be inserted to MySQL
Handle:
- quotes
- True/False values
"""
return (
single_quote_wrap(
clean_str(
json.dumps(
column_raw,
default=str, # If we don't know how to serialize, convert to str
),
engine,
)
)
if column_raw is not None
else "null"
)
def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]:
"""
Method to get name of the hash column (fqnHash or nameHash)
"""
inspector = inspect(engine)
columns = inspector.get_columns(table_name)
for column in columns:
if column["name"].lower() == FQN_HASH_COLUMN.lower():
return column["name"]
if column["name"].lower() == NAME_HASH_COLUMN.lower():
return column["name"]
return None
def run_query_iter(engine: Engine, query: str) -> Iterable[Row]:
"""Return a generator of rows, one row at a time, with a limit of 100 in-mem rows"""
with engine.connect() as conn:
result = conn.execution_options(
stream_results=True, max_row_buffer=100
).execute(text(query))
for row in result:
yield row
def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
"""
Dumps JSON data.
Postgres: engine.name == "postgresql"
MySQL: engine.name == "mysql"
"""
with open(output, "a", encoding=UTF_8) as file:
for table in tables:
truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate)
hash_column_name = get_hash_column_name(engine=engine, table_name=table)
if hash_column_name:
query = STATEMENT_HASH_JSON.format(
table=table, hash_column_name=hash_column_name
)
for row in run_query_iter(engine=engine, query=query):
insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long
file.write(insert)
else:
res = engine.execute(text(STATEMENT_JSON.format(table=table))).all()
for row in res:
insert = f"INSERT INTO {table} (json) VALUES ({clean_col(row.json, engine)});\n"
file.write(insert)
def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
"""
Dump tables that need to store all data
"""
with open(output, "a", encoding=UTF_8) as file:
for table in tables:
truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate)
query = STATEMENT_ALL.format(table=table)
for row in run_query_iter(engine=engine, query=query):
data = ",".join(clean_col(col, engine) for col in row)
insert = f"INSERT INTO {table} VALUES ({data});\n"
file.write(insert)
def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:
"""
This function is used to dump entities with custom handling
"""
with open(output, "a", encoding=UTF_8) as file:
for table, data in CUSTOM_TABLES.items():
truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate)
columns = inspector.get_columns(table_name=table)
query = STATEMENT_ALL_NEW.format(
cols=",".join(
col["name"]
for col in columns
if col["name"] not in data["exclude_columns"]
),
table=table,
)
for row in run_query_iter(engine=engine, query=query):
# Let's use .format here to not add more variables
# pylint: disable=consider-using-f-string
insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(
table=table,
data=",".join(clean_col(col, engine) for col in row),
cols=",".join(
col["name"]
for col in columns
if col["name"] not in data["exclude_columns"]
),
)
file.write(insert)
def get_lower_table_names(tables):
return [table.lower() for table in tables]
def dump(engine: Engine, output: Path, schema: str = None) -> None:
"""
Get all tables from the database and dump
only the JSON column for the required tables
"""
inspector = inspect(engine)
tables = (
inspector.get_table_names(schema) if schema else inspector.get_table_names()
)
lower_tables = get_lower_table_names(tables)
all_non_json_tables = (
get_lower_table_names(TABLES_DUMP_ALL)
+ get_lower_table_names(NOT_MIGRATE)
+ get_lower_table_names(CUSTOM_TABLES)
)
dump_json_tables = [
table for table in lower_tables if table not in all_non_json_tables
]
dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output)
dump_json(tables=dump_json_tables, engine=engine, output=output)
dump_entity_custom(engine=engine, output=output, inspector=inspector)

View File

@ -1,56 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module for the metadata backup and restore process
"""
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.ingestion.source.connections import get_connection
from metadata.utils.helpers import BackupRestoreArgs, list_to_dict
def get_engine(common_args: BackupRestoreArgs):
"""
Get the database connection engine
"""
connection_options = list_to_dict(common_args.options)
connection_arguments = list_to_dict(common_args.arguments)
connection_dict = {
"hostPort": f"{common_args.host}:{common_args.port}",
"username": common_args.user,
"authType": BasicAuth(password=common_args.password),
"connectionOptions": connection_options if connection_options else None,
"connectionArguments": connection_arguments if connection_arguments else None,
}
if not common_args.schema:
connection_dict["databaseSchema"] = common_args.database
connection = MysqlConnection(**connection_dict)
else:
connection_dict["database"] = common_args.database
connection = PostgresConnection(**connection_dict)
engine: Engine = get_connection(connection)
return engine

View File

@ -19,15 +19,12 @@ from pathlib import Path
from metadata.__version__ import get_metadata_version
from metadata.cli.app import run_app
from metadata.cli.backup import UploadDestinationType, run_backup
from metadata.cli.dataquality import run_test
from metadata.cli.ingest import run_ingest
from metadata.cli.insight import run_insight
from metadata.cli.lineage import run_lineage
from metadata.cli.profile import run_profiler
from metadata.cli.restore import run_restore
from metadata.cli.usage import run_usage
from metadata.utils.helpers import BackupRestoreArgs
from metadata.utils.logger import cli_logger, set_loggers_level
logger = cli_logger()
@ -38,8 +35,6 @@ class MetadataCommands(Enum):
USAGE = "usage"
PROFILE = "profile"
TEST = "test"
BACKUP = "backup"
RESTORE = "restore"
WEBHOOK = "webhook"
INSIGHT = "insight"
LINEAGE = "lineage"
@ -57,32 +52,6 @@ RUN_PATH_METHODS = {
}
BACKUP_HELP = """
Run a backup for the metadata DB. Uses a custom dump strategy for OpenMetadata tables.
We can pass as many connection options as required with `-o <opt1>, -o <opt2> [...]`
Same with connection arguments `-a <arg1>, -a <arg2> [...]`
To run the upload, provide the information as
`--upload endpoint bucket key` and properly configure the environment
variables AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY.
If `-s` or `--schema` is provided, we will trigger a Postgres backup instead
of a MySQL backup. This is the value of the schema containing the OpenMetadata
tables.
"""
RESTORE_HELP = """
Run a restore for the metadata DB.
We can pass as many connection options as required with `-o <opt1>, -o <opt2> [...]`
Same with connection arguments `-a <arg1>, -a <arg2> [...]`
If `-s` or `--schema` is provided, we will trigger a Postgres Restore instead
of a MySQL restore. This is the value of the schema containing the OpenMetadata
tables.
"""
def create_common_config_parser_args(parser: argparse.ArgumentParser):
parser.add_argument(
"-c",
@ -103,125 +72,6 @@ def webhook_args(parser: argparse.ArgumentParser):
parser.add_argument("-p", "--port", help="Webserver Port", type=int, default=8000)
def backup_args(parser: argparse.ArgumentParser):
"""
Additional Parser Arguments for Backup
"""
parser.add_argument(
"-H", "--host", help="Host that runs the database", required=True
)
parser.add_argument(
"-u",
"--user",
help="User to run the backup",
required=True,
)
parser.add_argument(
"-p",
"--password",
help="Credentials for the user",
required=True,
)
parser.add_argument(
"-d",
"--database",
help="Database to backup",
required=True,
)
parser.add_argument(
"--port",
help="Database service port",
default="3306",
)
parser.add_argument(
"--output",
help="Local path to store the backup",
type=Path,
default=None,
)
parser.add_argument(
"--filename",
help="Filename to store the backup",
default=None,
)
parser.add_argument(
"--upload-destination-type",
help="AWS or AZURE",
choices=UploadDestinationType.__members__,
default=None,
)
parser.add_argument(
"--upload",
help="S3 endpoint, bucket & key to upload the backup file",
nargs=3,
default=None,
)
parser.add_argument("-o", "--options", default=None, action="append")
parser.add_argument("-a", "--arguments", default=None, action="append")
parser.add_argument(
"-s",
"--schema",
default=None,
)
def restore_args(parser: argparse.ArgumentParser):
"""
Additional Parser Arguments for Restore
"""
parser.add_argument(
"-H",
"--host",
help="Host that runs the database",
required=True,
)
parser.add_argument(
"-u",
"--user",
help="User to run the restore backup",
required=True,
)
parser.add_argument(
"-p",
"--password",
help="Credentials for the user",
required=True,
)
parser.add_argument(
"-d",
"--database",
help="Database to restore",
required=True,
)
parser.add_argument(
"--port",
help="Database service port",
default="3306",
required=False,
)
parser.add_argument(
"--input",
help="Local backup file path for restore",
type=Path,
required=True,
)
parser.add_argument("-o", "--options", default=None, action="append")
parser.add_argument("-a", "--arguments", default=None, action="append")
parser.add_argument(
"-s",
"--schema",
default=None,
required=False,
)
def add_metadata_args(parser: argparse.ArgumentParser):
"""
Additional Parser Arguments for Metadata
@ -274,18 +124,6 @@ def get_parser(args=None):
help="Workflow for running external applications",
)
)
backup_args(
sub_parser.add_parser(
MetadataCommands.BACKUP.value,
help=BACKUP_HELP,
)
)
restore_args(
sub_parser.add_parser(
MetadataCommands.RESTORE.value,
help=RESTORE_HELP,
)
)
webhook_args(
sub_parser.add_parser(
MetadataCommands.WEBHOOK.value,
@ -323,37 +161,6 @@ def metadata(args=None):
if metadata_workflow in RUN_PATH_METHODS:
RUN_PATH_METHODS[metadata_workflow](path)
if metadata_workflow == MetadataCommands.BACKUP.value:
run_backup(
common_backup_obj_instance=BackupRestoreArgs(
host=contains_args.get("host"),
user=contains_args.get("user"),
password=contains_args.get("password"),
database=contains_args.get("database"),
port=contains_args.get("port"),
options=contains_args.get("options"),
arguments=contains_args.get("arguments"),
schema=contains_args.get("schema"),
),
output=contains_args.get("output"),
filename=contains_args.get("filename"),
upload_destination_type=contains_args.get("upload_destination_type"),
upload=contains_args.get("upload"),
)
if metadata_workflow == MetadataCommands.RESTORE.value:
run_restore(
common_restore_obj_instance=BackupRestoreArgs(
host=contains_args.get("host"),
user=contains_args.get("user"),
password=contains_args.get("password"),
database=contains_args.get("database"),
port=contains_args.get("port"),
options=contains_args.get("options"),
arguments=contains_args.get("arguments"),
schema=contains_args.get("schema"),
),
sql_file=contains_args.get("input"),
)
if metadata_workflow == MetadataCommands.WEBHOOK.value:
class WebhookHandler(BaseHTTPRequestHandler):

View File

@ -1,199 +0,0 @@
# Installation and deployment instructions (using Postgres as example)
Below are the instructions for connecting a Postgress server. The installation steps should be the same for connecting all kinds of servers. Different servers would require different configurations in the .yaml or DAG files. See https://docs.open-metadata.org/integrations/connectors for your configuration.
# Goal: To run Postgres metadata ingestion and quality tests with OpenMetadata using Airflow scheduler
Note: This procedure does not support Windows, because Windows does not implement "signal.SIGALRM". **It is highly recommended to use WSL 2 if you are on Windows**.
## Requirements:
See https://docs.open-metadata.org/overview/run-openmetadata-with-prefect "Requirements" section
## Installation:
1. Clone this git hub repo:
`git clone https://github.com/open-metadata/OpenMetadata.git`
2. Cd to ~/.../openmetadata/docker/metadata
3. Start the OpenMetadata containers. This will allow you run OpenMetadata in Docker:
`docker compose up -d`
- To check the status of services, run `docker compose ps`
- To access the UI: http://localhost:8585
4. Install the OpenMetadata ingestion package.
- (optional but highly recommended): Before installing this package, it is recommended to create and activate a virtual environment. To do this, run:
`python -m venv env` and `source env/bin/activate`
- To install the OpenMetadata ingestion package:
`pip install --upgrade "openmetadata-ingestion[docker]==0.10.3"` (specify the release version to ensure compatibility)
5. Install Airflow:
- 5A: Install Airflow Lineage Backend: `pip3 install "openmetadata-ingestion[airflow-container]"==0.10.3`
- 5B: Install Airflow postgres connector module: `pip3 install "openmetadata-ingestion[postgres]"==0.10.3`
- 5C: Install Airflow APIs: `pip3 install "openmetadata-airflow-managed-apis"==0.10.3`
- 5D: Install necessary Airflow plugins:
- 1) Download the latest openmetadata-airflow-apis-plugins release from https://github.com/open-metadata/OpenMetadata/releases
- 2) Untar it under your {AIRFLOW_HOME} directory (usually c/Users/Yourname/airflow). This will create and setup a plugins directory under {AIRFLOW_HOME} .
- 3) `cp -r {AIRFLOW_HOME}/plugins/dag_templates {AIRFLOW_HOME}`
- 4) `mkdir -p {AIRFLOW_HOME}/dag_generated_configs`
- 5) (re)start the airflow webserver and scheduler
6. Configure Airflow:
- 6A: configure airflow.cfg in your AIRFLOW_HOME directory. Check and make all the folder directories point to the right places. For instance, dags_folder = YOUR_AIRFLOW_HOME/dags
- 6B: configure openmetadata.yaml and update the airflowConfiguration section. See: https://docs.open-metadata.org/integrations/airflow/configure-airflow-in-the-openmetadata-server
## To run a metadata ingestion workflow with Airflow ingestion DAGs on Postgres data:
1. Prepare the Ingestion DAG:
To see a more complete tutorial on ingestion DAG, see https://docs.open-metadata.org/integrations/connectors/postgres/run-postgres-connector-with-the-airflow-sdk
To be brief, below is my own DAG. Copy & Paste the following into a python file (postgres_demo.py):
```
import pathlib
import json
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from airflow.utils.dates import days_ago
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60)
}
config = """
{
"source":{
"type": "postgres",
"serviceName": "postgres_demo",
"serviceConnection": {
"config": {
"type": "Postgres",
"username": "postgres", (change to your username)
"password": "postgres", (change to your password)
"hostPort": "192.168.1.55:5432", (change to your hostPort)
"database": "surveillance_hub" (change to your database)
}
},
"sourceConfig":{
"config": { (all of the following can switch to true or false)
"enableDataProfiler": "true" or "false",
"markDeletedTables": "true" or "false",
"includeTables": "true" or "false",
"includeViews": "true" or "false",
"generateSampleData": "true" or "false"
}
}
},
"sink":{
"type": "metadata-rest",
"config": {}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}
"""
def metadata_ingestion_workflow():
workflow_config = json.loads(config)
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"sample_data",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
schedule_interval='*/5 * * * *',
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow,
)
if __name__ == "__main__":
metadata_ingestion_workflow()
```
2. Run the DAG:
`
python postgres_demo.py
`
- Alternatively, we could run without Airflow SDK and with OpenMetadata's own methods. Run `metadata ingest -c /Your_Path_To_Json/.json`
The json configuration is exactly the same as the json configuration in the DAG.
- Or, we could also run it with `metadata ingest -c /Your_Path_To_Yaml/.yaml`
The yaml configuration would be the exact same except without the curly brackets and the double quotes.
## To run a profiler workflow on Postgres data
1. Prepare the DAG OR configure the yaml/json:
- To configure the quality tests in json/yaml, see https://docs.open-metadata.org/data-quality/data-quality-overview/tests
- To prepare the DAG, see https://github.com/open-metadata/OpenMetadata/tree/0.10.3-release/data-quality/data-quality-overview
Example yaml I was using:
```
source:
type: postgres
serviceName: your_service_name
serviceConnection:
config:
type: Postgres
username: your_username
password: your_password
hostPort:
database: your_database
sourceConfig:
config:
type: Profiler
processor:
type: orm-profiler
config:
test_suite:
name: demo_test
tests:
- table: your_table_name (FQN)
column_tests:
- columnName: id
testCase:
columnTestType: columnValuesToBeBetween
config:
minValue: 0
maxValue: 10
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth
```
Note that the table name must be FQN and match exactly with the table path on the OpenMetadata UI.
2. Run it with
`metadata profile -c /path_to_yaml/.yaml`
Make sure to refresh the OpenMetadata UI and click on the Data Quality tab to see the results.

View File

@ -19,38 +19,18 @@ You can learn more about how the migration process works [here](/deployment/upgr
{% /note %}
- To run the backup and restore commands, please make sure that you are always in the latest `openmetadata-ingestion` version to have all the improvements shipped in the CLI.
- Also, make sure you have connectivity between your database (MySQL / PostgreSQL) and the host machine where you will be running the below commands.
Since version 1.4.0, **OpenMetadata encourages using the builtin-tools for creating logical backups of the metadata**:
**1. Create a Virtual Environment and Install the Backup CLI**
- [mysqldump](https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html) for MySQL
- [pg_dump](https://www.postgresql.org/docs/current/app-pgdump.html) for Postgres
```python
python -m venv venv
source venv/bin/activate
pip install openmetadata-ingestion~=1.3.0
```
For PROD deployment we recommend users to rely on cloud services for their databases, be it [AWS RDS](https://docs.aws.amazon.com/rds/),
[Azure SQL](https://azure.microsoft.com/en-in/products/azure-sql/database) or [GCP Cloud SQL](https://cloud.google.com/sql/).
Validate the installed metadata version with `python -m metadata --version`
**2. Run the Backup**
If using MySQL:
```bash
python -m metadata backup -u openmetadata_user -p openmetadata_password -H mysql -d openmetadata_db --port 3306
```
If using Postgres:
```bash
python -m metadata backup -u openmetadata_user -p openmetadata_password -H postgresql -d openmetadata_db --port 5432 -s public
```
**3. Store the backup file somewhere safe**
The above command will generate a backup file with extension as `.sql`. You can copy the name from the backup command output.
Make sure to store it somewhere safe in case you need to restore the data later.
If you're a user of these services, you can leverage their backup capabilities directly:
- [Creating a DB snapshot in AWS](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateSnapshot.html)
- [Backup and restore in Azure MySQL](https://learn.microsoft.com/en-us/azure/mysql/single-server/concepts-backup)
- [About GCP Cloud SQL backup](https://cloud.google.com/sql/docs/mysql/backup-recovery/backups)
You can refer to the following guide to get more details about the backup and restore:
@ -104,9 +84,6 @@ After the migration is finished, you can revert this changes.
# Deprecation Notice
- Check the updated [docs](/connectors/pipeline/airflow/configuring-lineage#configuring-dag-lineage) on how to configure Airflow DAG's lineage.
We will deprecate the dictionary annotation in the 1.4 release, since the new annotation allows you to define lineage between
assets other than Tables.
# Breaking Changes

View File

@ -25,6 +25,8 @@ the relationship.
This lineage configuration method is available for OpenMetadata release 1.2.3 or higher.
We support lineage for the following entities: `Table`, `Container`, `Dashboard`, `DashboardDataModel`, `Pipeline`, `Topic`, `SearchIndex` and `MlModel`.
{% /note %}
Let's take a look at the following example:

View File

@ -25,6 +25,8 @@ the relationship.
This lineage configuration method is available for OpenMetadata release 1.2.3 or higher.
We support lineage for the following entities: `Table`, `Container`, `Dashboard`, `DashboardDataModel`, `Pipeline`, `Topic`, `SearchIndex` and `MlModel`.
{% /note %}
Let's take a look at the following example:

View File

@ -7,17 +7,51 @@ slug: /deployment/backup-restore-metadata
## Introduction
Since version 1.4.0, OpenMetadata's encourages using the builtin-tools for creating logical backups of the metadata:
Before upgrading your OpenMetadata version we strongly recommend backing up the metadata.
- `mysqldump` for MySQL
- `pg_dump` for Postgres
The source of truth is stored in the underlying database (MySQL and Postgres supported). During each version upgrade there
is a database migration process that needs to run. It will directly attack your database and update the shape of the
data to the newest OpenMetadata release.
It is important that we backup the data because if we face any unexpected issues during the upgrade process,
you will be able to get back to the previous version without any loss.
{% note %}
You can learn more about how the migration process works [here](/deployment/upgrade/how-does-it-work).
{% /note %}
Since version 1.4.0, **OpenMetadata encourages using the builtin-tools for creating logical backups of the metadata**:
- [mysqldump](https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html) for MySQL
- [pg_dump](https://www.postgresql.org/docs/current/app-pgdump.html) for Postgres
For PROD deployment we recommend users to rely on cloud services for their databases, be it [AWS RDS](https://docs.aws.amazon.com/rds/),
[Azure SQL](https://azure.microsoft.com/en-in/products/azure-sql/database) or [GCP Cloud SQL](https://cloud.google.com/sql/).
If you're a user of these services, you can leverage their backup capabilities directly:
- [Creating a DB snapshot in AWS](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateSnapshot.html)
- [Backup and restore in Azure MySQL](https://learn.microsoft.com/en-us/azure/mysql/single-server/concepts-backup)
- [About GCP Cloud SQL backup](https://cloud.google.com/sql/docs/mysql/backup-recovery/backups)
## Requirements
- mysqldump 8.3 or higher (ingestion container is shipped with mysqldump 8.3)
- pg_dump 13.3 or higher
- `mysqldump` 8.3 or higher
- `pg_dump` 13.3 or higher
# Example
If you're running the project using docker compose, the `ingestion` container already comes packaged with the
correct `mysqldump` and `pg_dump` versions ready to use.
## Storing the backup files
It's important that when you backup your database, you keep the snapshot safe in case you need in later.
You can check these two examples on how to:
- Use pipes to stream the result directly to S3 (or AWS blob storage) ([link](https://devcoops.com/pg_dump-to-s3-directly/?utm_content=cmp-true)).
- Dump to a file and copy to storage ([link](https://gist.github.com/bbcoimbra/0914c7e0f96e8ad53dfad79c64863c87)).
# Example with Docker
Start a local instance of OpenMetadata using the `docker-compose` file provided in the repository. Then, we can use the following commands to backup the metadata: