diff --git a/ingestion/src/metadata/cli/db_dump.py b/ingestion/src/metadata/cli/db_dump.py index 0339e7ade43..2889019667b 100644 --- a/ingestion/src/metadata/cli/db_dump.py +++ b/ingestion/src/metadata/cli/db_dump.py @@ -37,11 +37,14 @@ CUSTOM_TABLES = {"entity_extension_time_series": {"exclude_columns": ["timestamp NOT_MIGRATE = {"DATABASE_CHANGE_LOG", "SERVER_MIGRATION_SQL_LOGS", "SERVER_CHANGE_LOG"} STATEMENT_JSON = "SELECT json FROM {table}" +STATEMENT_HASH_JSON = "SELECT json, {hash_column_name} FROM {table}" STATEMENT_ALL = "SELECT * FROM {table}" STATEMENT_TRUNCATE = "TRUNCATE TABLE {table};\n" STATEMENT_ALL_NEW = "SELECT {cols} FROM {table}" MYSQL_ENGINE_NAME = "mysql" +FQN_HASH_COLUMN = "fqnHash" +NAME_HASH_COLUMN = "nameHash" def single_quote_wrap(raw: str) -> str: @@ -104,6 +107,20 @@ def _(column_raw: Optional[Union[dict, list]], engine: Engine) -> str: ) +def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]: + """ + Method to get name of the hash column (fqnHash or nameHash) + """ + inspector = inspect(engine) + columns = inspector.get_columns(table_name) + for column in columns: + if column["name"] == FQN_HASH_COLUMN: + return FQN_HASH_COLUMN + if column["name"] == NAME_HASH_COLUMN: + return NAME_HASH_COLUMN + return None + + def dump_json(tables: List[str], engine: Engine, output: Path) -> None: """ Dumps JSON data. @@ -116,10 +133,23 @@ def dump_json(tables: List[str], engine: Engine, output: Path) -> None: truncate = STATEMENT_TRUNCATE.format(table=table) file.write(truncate) - res = engine.execute(text(STATEMENT_JSON.format(table=table))).all() - for row in res: - insert = f"INSERT INTO {table} (json) VALUES ({clean_col(row.json, engine)});\n" - file.write(insert) + hash_column_name = get_hash_column_name(engine=engine, table_name=table) + if hash_column_name: + res = engine.execute( + text( + STATEMENT_HASH_JSON.format( + table=table, hash_column_name=hash_column_name + ) + ) + ).all() + for row in res: + insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long + file.write(insert) + else: + res = engine.execute(text(STATEMENT_JSON.format(table=table))).all() + for row in res: + insert = f"INSERT INTO {table} (json) VALUES ({clean_col(row.json, engine)});\n" + file.write(insert) def dump_all(tables: List[str], engine: Engine, output: Path) -> None: