From 33c752d2f5a73d02e758aa87a2e24ae4fe5d8942 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 26 Jun 2023 00:33:44 -0700 Subject: [PATCH] interfaces for db migrations (#12057) * interfaces for db migrations * Add Data Migration for entities to 1.1 * Migration for FQN hash * Added Tag Usage And Fixed some transaction issue in testing * Added SERVER_CHANGE_LOG validations * Update Path for files * Added Postgres Migration * Test Suite Migration [WIP] * remove ingestion pipeline * moved Migration to Table Initializer * init Data source config in TableInitializer * make checksum ignore configurable --------- Co-authored-by: mohitdeuex --- .../v013__create_db_connection_info.sql | 104 +---- .../v013__create_db_connection_info.sql | 101 +---- .../service/OpenMetadataApplication.java | 6 +- .../service/jdbi3/CollectionDAO.java | 183 ++++++++- .../service/jdbi3/MigrationDAO.java | 73 ++++ .../service/migration/MigrationFile.java | 12 + .../service/migration/MigrationUtil.java | 360 ++++++++++++++++++ .../service/migration/api/MigrationStep.java | 30 ++ .../migration/api/MigrationWorkflow.java | 156 ++++++++ .../versions/mysql/v110/MySQLMigration.java | 178 +++++++++ .../postgres/v110/PostgresMigration.java | 181 +++++++++ .../resources/databases/DatasourceConfig.java | 10 + .../service/util/TablesInitializer.java | 51 +++ 13 files changed, 1266 insertions(+), 179 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationFile.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationStep.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java diff --git a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v013__create_db_connection_info.sql b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v013__create_db_connection_info.sql index ea4de05d029..a0142f68fb1 100644 --- a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v013__create_db_connection_info.sql +++ b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v013__create_db_connection_info.sql @@ -125,95 +125,6 @@ where serviceType in ('Postgres', 'Mysql'); -- Clean old test connections TRUNCATE automations_workflow; - --- add fullyQualifiedName hash and remove existing columns - --- update the OM system tables - -ALTER TABLE field_relationship DROP KEY `PRIMARY`, ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256), -DROP INDEX from_index, DROP INDEX to_index, ADD INDEX from_fqnhash_index(fromFQNHash, relation), ADD INDEX to_fqnhash_index(toFQNHash, relation), - ADD CONSTRAINT `field_relationship_primary` PRIMARY KEY(fromFQNHash, toFQNHash, relation), MODIFY fromFQN VARCHAR(2096) NOT NULL, - MODIFY toFQN VARCHAR(2096) NOT NULL; - -ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN, ADD COLUMN entityFQNHash VARCHAR (256) NOT NULL; - -ALTER TABLE type_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -ALTER TABLE event_subscription_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -ALTER TABLE test_definition DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE test_suite DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL, - ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash); - -ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash); -ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash); -ALTER TABLE kpi_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -ALTER TABLE classification DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);; - -ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; - -ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; - -ALTER TABLE tag_usage DROP index `source`, DROP COLUMN targetFQN, ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256), - ADD UNIQUE KEY `tag_usage_key` (source, tagFQNHash, targetFQNHash); - -ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; - -ALTER TABLE role_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE automations_workflow DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; - - --- update services -ALTER TABLE dbservice_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE messaging_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE dashboard_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE pipeline_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE storage_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE metadata_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE mlmodel_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - - --- all entity tables -ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; -ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL; - -ALTER TABLE query_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE team_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE user_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE bot_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE glossary_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -- Remove sourceUrl in pipeline_entity from DatabricksPipeline & Fivetran UPDATE pipeline_entity SET json = JSON_REMOVE(json, '$.sourceUrl') @@ -223,3 +134,18 @@ WHERE JSON_EXTRACT(json, '$.serviceType') in ('DatabricksPipeline','Fivetran'); UPDATE dashboard_entity SET json = JSON_REMOVE(json, '$.sourceUrl') WHERE JSON_EXTRACT(json, '$.serviceType') in ('Mode'); + +CREATE TABLE IF NOT EXISTS SERVER_CHANGE_LOG ( + installed_rank SERIAL, + version VARCHAR(256) PRIMARY KEY, + migrationFileName VARCHAR(256) NOT NULL, + checksum VARCHAR(256) NOT NULL, + installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS SERVER_MIGRATION_SQL_LOGS ( + version VARCHAR(256) NOT NULL, + sqlStatement VARCHAR(10000) NOT NULL, + checksum VARCHAR(256) PRIMARY KEY, + executedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file diff --git a/bootstrap/sql/org.postgresql.Driver/v013__create_db_connection_info.sql b/bootstrap/sql/org.postgresql.Driver/v013__create_db_connection_info.sql index 10051369976..7731ef81eb0 100644 --- a/bootstrap/sql/org.postgresql.Driver/v013__create_db_connection_info.sql +++ b/bootstrap/sql/org.postgresql.Driver/v013__create_db_connection_info.sql @@ -87,92 +87,6 @@ WHERE serviceType IN ('Postgres', 'Mysql') -- Clean old test connections TRUNCATE automations_workflow; -DROP INDEX field_relationship_from_index, field_relationship_to_index; -ALTER TABLE field_relationship DROP CONSTRAINT field_relationship_pkey, ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256), - ADD CONSTRAINT field_relationship_pkey PRIMARY KEY(fromFQNHash, toFQNHash, relation), -ALTER fromFQN TYPE VARCHAR(2096), ALTER toFQN TYPE VARCHAR(2096); -CREATE INDEX IF NOT EXISTS field_relationship_from_index ON field_relationship(fromFQNHash, relation); -CREATE INDEX IF NOT EXISTS field_relationship_to_index ON field_relationship(toFQNHash, relation); - -ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN, ADD COLUMN entityFQNHash VARCHAR (256) NOT NULL; - -ALTER TABLE type_entity DROP CONSTRAINT type_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -ALTER TABLE event_subscription_entity DROP CONSTRAINT event_subscription_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -ALTER TABLE test_definition DROP CONSTRAINT test_definition_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE test_suite DROP CONSTRAINT test_suite_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL, - ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash); - -ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash); -ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash); -ALTER TABLE kpi_entity DROP CONSTRAINT kpi_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -ALTER TABLE classification DROP CONSTRAINT tag_category_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);; - -ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; - -ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; - -ALTER TABLE tag_usage DROP CONSTRAINT tag_usage_source_tagfqn_targetfqn_key, DROP COLUMN targetFQN, ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256), - ADD UNIQUE (source, tagFQNHash, targetFQNHash); - -ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; - -ALTER TABLE role_entity DROP CONSTRAINT role_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE automations_workflow DROP CONSTRAINT automations_workflow_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; - - --- update services -ALTER TABLE dbservice_entity DROP CONSTRAINT dbservice_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE messaging_service_entity DROP CONSTRAINT messaging_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE dashboard_service_entity DROP CONSTRAINT dashboard_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE pipeline_service_entity DROP CONSTRAINT pipeline_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE storage_service_entity DROP CONSTRAINT storage_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE metadata_service_entity DROP CONSTRAINT metadata_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE mlmodel_service_entity DROP CONSTRAINT mlmodel_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - - --- all entity tables -ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; -ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash), - ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL; - -ALTER TABLE query_entity ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE team_entity DROP CONSTRAINT team_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE user_entity DROP CONSTRAINT user_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE bot_entity DROP CONSTRAINT bot_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); -ALTER TABLE glossary_entity DROP CONSTRAINT glossary_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash); - -- Remove sourceUrl in pipeline_entity from DatabricksPipeline & Fivetran UPDATE pipeline_entity SET json = json::jsonb #- '{sourceUrl}' @@ -183,3 +97,18 @@ where json #> '{serviceType}' in ('"DatabricksPipeline"','"Fivetran"'); UPDATE dashboard_entity SET json = json::jsonb #- '{sourceUrl}' where json #> '{serviceType}' in ('"Mode"'); + +CREATE TABLE IF NOT EXISTS SERVER_CHANGE_LOG ( + installed_rank SERIAL, + version VARCHAR(256) PRIMARY KEY, + migrationFileName VARCHAR(256) NOT NULL, + checksum VARCHAR(256) NOT NULL, + installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS SERVER_MIGRATION_SQL_LOGS ( + version VARCHAR(256) NOT NULL, + sqlStatement VARCHAR(10000) NOT NULL, + checksum VARCHAR(256) PRIMARY KEY, + executedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index f6cdb08bf91..4137359df07 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -130,6 +130,9 @@ public class OpenMetadataApplication extends Application listAll(); + @SqlQuery( "SELECT fromFQN, toFQN, json FROM field_relationship WHERE " + "fromFQNHash = :fqnHash AND fromType = :type AND toType = :otherType AND relation = :relation " @@ -1272,6 +1302,37 @@ public interface CollectionDAO { return Triple.of(rs.getString("fromFQN"), rs.getString("toFQN"), rs.getString("json")); } } + + class FieldRelationShipMapper implements RowMapper { + @Override + public FieldRelationship map(ResultSet rs, StatementContext ctx) throws SQLException { + FieldRelationship result = new FieldRelationship(); + result.setFromFQNHash(rs.getString("fromFQNHash")); + result.setToFQNHash(rs.getString("toFQNHash")); + result.setFromFQN(rs.getString("fromFQN")); + result.setToFQN(rs.getString("toFQN")); + result.setFromType(rs.getString("fromType")); + result.setToType(rs.getString("toType")); + result.setRelation(rs.getInt("relation")); + result.setJsonSchema(rs.getString("jsonSchema")); + result.setJson(rs.getString("json")); + return result; + } + } + + @Getter + @Setter + class FieldRelationship { + private String fromFQNHash; + private String toFQNHash; + private String fromFQN; + private String toFQN; + private String fromType; + private String toType; + private int relation; + private String jsonSchema; + private String json; + } } interface BotDAO extends EntityDAO { @@ -1348,7 +1409,7 @@ public interface CollectionDAO { interface MetricsDAO extends EntityDAO { @Override default String getTableName() { - return "metrics_entity"; + return "metric_entity"; } @Override @@ -1782,6 +1843,11 @@ public interface CollectionDAO { "SELECT source, tagFQN, labelType, state FROM tag_usage WHERE targetFQNHash = :targetFQNHash ORDER BY tagFQN") List getTagsInternal(@Bind("targetFQNHash") String targetFQNHash); + @SqlQuery("SELECT * FROM tag_usage") + @Deprecated(since = "Release 1.1") + @RegisterRowMapper(TagLabelMapperMigration.class) + List listAll(); + @SqlQuery( "SELECT COUNT(*) FROM tag_usage " + "WHERE (tagFQNHash LIKE CONCAT(:tagFqnHash, '.%') OR tagFQNHash = :tagFqnHash) " @@ -1801,6 +1867,29 @@ public interface CollectionDAO { "DELETE FROM tag_usage where targetFQNHash = :targetFQNHash OR targetFQNHash LIKE CONCAT(:targetFQNHash, '.%')") void deleteTagLabelsByTargetPrefix(@Bind("targetFQNHash") String targetFQNHash); + @Deprecated(since = "Release 1.1") + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, targetFQN)" + + "VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :targetFQN) " + + "ON DUPLICATE KEY UPDATE tagFQNHash = :tagFQNHash, targetFQNHash = :targetFQNHash", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, targetFQN) " + + "VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :targetFQN) " + + "ON CONFLICT (source, tagFQN, targetFQN) " + + "DO UPDATE SET tagFQNHash = EXCLUDED.tagFQNHash, targetFQNHash = EXCLUDED.targetFQNHash", + connectionType = POSTGRES) + void upsertFQNHash( + @Bind("source") int source, + @Bind("tagFQN") String tagFQN, + @Bind("tagFQNHash") String tagFQNHash, + @Bind("targetFQNHash") String targetFQNHash, + @Bind("labelType") int labelType, + @Bind("state") int state, + @Bind("targetFQN") String targetFQN); + /** Update all the tagFQN starting with oldPrefix to start with newPrefix due to tag or glossary name change */ default void updateTagPrefix(int source, String oldPrefix, String newPrefix) { String update = @@ -1846,6 +1935,50 @@ public interface CollectionDAO { .withTagFQN(r.getString("tagFQN")); } } + + @Getter + @Setter + @Deprecated(since = "Release 1.1") + class TagLabelMigration { + public int source; + public String tagFQN; + public String targetFQN; + public int labelType; + public int state; + private String tagFQNHash; + public String targetFQNHash; + } + + @Deprecated(since = "Release 1.1") + class TagLabelMapperMigration implements RowMapper { + @Override + public TagLabelMigration map(ResultSet r, StatementContext ctx) throws SQLException { + TagLabelMigration tagLabel = new TagLabelMigration(); + + tagLabel.setSource(r.getInt("source")); + tagLabel.setLabelType(r.getInt("labelType")); + tagLabel.setState(r.getInt("state")); + tagLabel.setTagFQN(r.getString("tagFQN")); + // TODO : Ugly , but this is present is lower version and removed on higher version + try { + // This field is removed in latest + tagLabel.setTargetFQN(r.getString("targetFQN")); + } catch (Exception ex) { + // Nothing to do + } + try { + tagLabel.setTagFQNHash(r.getString("tagFQNHash")); + } catch (Exception ex) { + // Nothing to do + } + try { + tagLabel.setTargetFQNHash(r.getString("targetFQNHash")); + } catch (Exception ex) { + // Nothing to do + } + return tagLabel; + } + } } interface RoleDAO extends EntityDAO { @@ -2830,6 +2963,20 @@ public interface CollectionDAO { @Bind("json") String json, @Bind("timestamp") Long timestamp); + @ConnectionAwareSqlUpdate( + value = + "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp", + connectionType = POSTGRES) + void updateEntityFQNHash( + @Bind("entityFQNHash") String entityFQNHash, + @Bind("entityFQN") String entityFQN, + @Bind("extension") String extension, + @Bind("timestamp") Long timestamp); + @ConnectionAwareSqlUpdate( value = "UPDATE entity_extension_time_series set json = :json where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation", @@ -3099,6 +3246,40 @@ public interface CollectionDAO { return new ReportDataRow(rowNumber, reportData); } } + + @SqlQuery("select * from entity_extension_time_series") + @RegisterRowMapper(EntityExtensionTimeSeries.class) + List listAll(); + + @Getter + @Setter + class EntityExtensionTimeSeriesTable { + private String entityFQN; + private String extension; + private String jsonSchema; + private String json; + private long timestamp; + private String entityFQNHash; + } + + class EntityExtensionTimeSeries implements RowMapper { + @Override + public EntityExtensionTimeSeriesTable map(ResultSet rs, StatementContext ctx) throws SQLException { + EntityExtensionTimeSeriesTable result = new EntityExtensionTimeSeriesTable(); + // TODO : Ugly , after migration this is removed + try { + result.setEntityFQN(rs.getString("entityFQN")); + } catch (Exception ex) { + // Nothing + } + result.setExtension(rs.getString("extension")); + result.setJsonSchema(rs.getString("jsonSchema")); + result.setJson(rs.getString("json")); + result.setTimestamp(rs.getLong("timestamp")); + result.setEntityFQNHash(rs.getString("entityFQNHash")); + return result; + } + } } class EntitiesCountRowMapper implements RowMapper { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java index 44176619fc3..12e92a91b38 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java @@ -3,14 +3,87 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; +import java.util.List; import java.util.Optional; +import lombok.Getter; +import lombok.Setter; import org.jdbi.v3.core.statement.StatementException; import org.jdbi.v3.sqlobject.SingleValue; +import org.jdbi.v3.sqlobject.customizer.Bind; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery; +import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate; public interface MigrationDAO { @ConnectionAwareSqlQuery(value = "SELECT MAX(version) FROM DATABASE_CHANGE_LOG", connectionType = MYSQL) @ConnectionAwareSqlQuery(value = "SELECT max(version) FROM \"DATABASE_CHANGE_LOG\"", connectionType = POSTGRES) @SingleValue Optional getMaxVersion() throws StatementException; + + @ConnectionAwareSqlQuery( + value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version", + connectionType = POSTGRES) + String getVersionMigrationChecksum(@Bind("version") String version) throws StatementException; + + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, installed_on)" + + "VALUES (:version, :migrationFileName, :checksum, CURRENT_TIMESTAMP) " + + "ON DUPLICATE KEY UPDATE " + + "migrationFileName = :migrationFileName, " + + "checksum = :checksum, " + + "installed_on = CURRENT_TIMESTAMP", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO server_change_log (version, migrationFileName, checksum, installed_on)" + + "VALUES (:version, :migrationFileName, :checksum, current_timestamp) " + + "ON CONFLICT (version) DO UPDATE SET " + + "migrationFileName = EXCLUDED.migrationFileName, " + + "checksum = EXCLUDED.checksum, " + + "installed_on = EXCLUDED.installed_on", + connectionType = POSTGRES) + void upsertServerMigration( + @Bind("version") String version, + @Bind("migrationFileName") String migrationFileName, + @Bind("checksum") String checksum); + + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO SERVER_MIGRATION_SQL_LOGS (version, sqlStatement, checksum, executedAt)" + + "VALUES (:version, :sqlStatement, :checksum, CURRENT_TIMESTAMP) " + + "ON DUPLICATE KEY UPDATE " + + "version = :version, " + + "sqlStatement = :sqlStatement, " + + "executedAt = CURRENT_TIMESTAMP", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO SERVER_MIGRATION_SQL_LOGS (version, sqlStatement, checksum, executedAt)" + + "VALUES (:version, :sqlStatement, :checksum, current_timestamp) " + + "ON CONFLICT (checksum) DO UPDATE SET " + + "version = EXCLUDED.version, " + + "sqlStatement = EXCLUDED.sqlStatement, " + + "executedAt = EXCLUDED.executedAt", + connectionType = POSTGRES) + void upsertServerMigrationSQL( + @Bind("version") String version, @Bind("sqlStatement") String sqlStatement, @Bind("checksum") String success); + + @ConnectionAwareSqlQuery( + value = "SELECT checksum FROM SERVER_MIGRATION_SQL_LOGS where version = :version", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = "SELECT checksum FROM SERVER_MIGRATION_SQL_LOGS where version = :version", + connectionType = POSTGRES) + List getServerMigrationSQLWithVersion(@Bind("version") String version); + + @Getter + @Setter + class ServerMigrationSQLTable { + private String version; + private String sqlStatement; + private String checkSum; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationFile.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationFile.java new file mode 100644 index 00000000000..16080067fc5 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationFile.java @@ -0,0 +1,12 @@ +package org.openmetadata.service.migration; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.CONSTRUCTOR}) +public @interface MigrationFile { + String name(); +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java new file mode 100644 index 00000000000..b34d05334b5 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java @@ -0,0 +1,360 @@ +package org.openmetadata.service.migration; + +import static org.openmetadata.service.Entity.INGESTION_PIPELINE; +import static org.openmetadata.service.Entity.TEST_CASE; +import static org.openmetadata.service.Entity.TEST_SUITE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.openmetadata.common.utils.CommonUtil; +import org.openmetadata.schema.CreateEntity; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.analytics.WebAnalyticEvent; +import org.openmetadata.schema.api.tests.CreateTestSuite; +import org.openmetadata.schema.dataInsight.DataInsightChart; +import org.openmetadata.schema.dataInsight.kpi.Kpi; +import org.openmetadata.schema.entity.Bot; +import org.openmetadata.schema.entity.Type; +import org.openmetadata.schema.entity.automations.Workflow; +import org.openmetadata.schema.entity.classification.Classification; +import org.openmetadata.schema.entity.classification.Tag; +import org.openmetadata.schema.entity.data.Chart; +import org.openmetadata.schema.entity.data.Container; +import org.openmetadata.schema.entity.data.Dashboard; +import org.openmetadata.schema.entity.data.DashboardDataModel; +import org.openmetadata.schema.entity.data.Database; +import org.openmetadata.schema.entity.data.DatabaseSchema; +import org.openmetadata.schema.entity.data.Glossary; +import org.openmetadata.schema.entity.data.GlossaryTerm; +import org.openmetadata.schema.entity.data.Metrics; +import org.openmetadata.schema.entity.data.MlModel; +import org.openmetadata.schema.entity.data.Pipeline; +import org.openmetadata.schema.entity.data.Query; +import org.openmetadata.schema.entity.data.Report; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.data.Topic; +import org.openmetadata.schema.entity.events.EventSubscription; +import org.openmetadata.schema.entity.policies.Policy; +import org.openmetadata.schema.entity.services.DashboardService; +import org.openmetadata.schema.entity.services.DatabaseService; +import org.openmetadata.schema.entity.services.MessagingService; +import org.openmetadata.schema.entity.services.MetadataService; +import org.openmetadata.schema.entity.services.MlModelService; +import org.openmetadata.schema.entity.services.PipelineService; +import org.openmetadata.schema.entity.services.StorageService; +import org.openmetadata.schema.entity.services.connections.TestConnectionDefinition; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.teams.Role; +import org.openmetadata.schema.entity.teams.Team; +import org.openmetadata.schema.entity.teams.User; +import org.openmetadata.schema.tests.TestCase; +import org.openmetadata.schema.tests.TestDefinition; +import org.openmetadata.schema.tests.TestSuite; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.Relationship; +import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.EntityNotFoundException; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityDAO; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.MigrationDAO; +import org.openmetadata.service.jdbi3.TableRepository; +import org.openmetadata.service.jdbi3.TestCaseRepository; +import org.openmetadata.service.jdbi3.TestSuiteRepository; +import org.openmetadata.service.migration.api.MigrationStep; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.util.EntityUtil; +import org.openmetadata.service.util.FullyQualifiedName; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class MigrationUtil { + @SneakyThrows + public static void updateFQNHashForEntity(Class clazz, EntityDAO dao) { + List jsons = dao.listAfter(new ListFilter(Include.ALL), Integer.MAX_VALUE, ""); + for (String json : jsons) { + T entity = JsonUtils.readValue(json, clazz); + dao.update( + entity.getId(), FullyQualifiedName.buildHash(entity.getFullyQualifiedName()), JsonUtils.pojoToJson(entity)); + } + } + + public static MigrationDAO.ServerMigrationSQLTable buildServerMigrationTable(String version, String statement) { + MigrationDAO.ServerMigrationSQLTable result = new MigrationDAO.ServerMigrationSQLTable(); + result.setVersion(String.valueOf(version)); + result.setSqlStatement(statement); + result.setCheckSum(EntityUtil.hash(statement)); + return result; + } + + public static List addInListIfToBeExecuted( + String version, Set lookUp, List queries) { + List result = new ArrayList<>(); + for (String query : queries) { + MigrationDAO.ServerMigrationSQLTable tableContent = buildServerMigrationTable(version, query); + if (!lookUp.contains(tableContent.getCheckSum())) { + result.add(tableContent); + } else { + // TODO:LOG better + LOG.debug("Query will be skipped in Migration Step , as this has already been executed"); + } + } + return result; + } + + public static void dataMigrationFQNHashing(CollectionDAO collectionDAO) { + // Migration for Entities + updateFQNHashForEntity(Bot.class, collectionDAO.botDAO()); + updateFQNHashForEntity(Chart.class, collectionDAO.chartDAO()); + updateFQNHashForEntity(Classification.class, collectionDAO.classificationDAO()); + updateFQNHashForEntity(Container.class, collectionDAO.containerDAO()); + updateFQNHashForEntity(DashboardDataModel.class, collectionDAO.dashboardDataModelDAO()); + updateFQNHashForEntity(Dashboard.class, collectionDAO.dashboardDAO()); + updateFQNHashForEntity(DashboardService.class, collectionDAO.dashboardServiceDAO()); + updateFQNHashForEntity(DataInsightChart.class, collectionDAO.dataInsightChartDAO()); + updateFQNHashForEntity(Database.class, collectionDAO.databaseDAO()); + updateFQNHashForEntity(DatabaseSchema.class, collectionDAO.databaseSchemaDAO()); + updateFQNHashForEntity(DatabaseService.class, collectionDAO.dbServiceDAO()); + updateFQNHashForEntity(EventSubscription.class, collectionDAO.eventSubscriptionDAO()); + updateFQNHashForEntity(Glossary.class, collectionDAO.glossaryDAO()); + updateFQNHashForEntity(GlossaryTerm.class, collectionDAO.glossaryTermDAO()); + updateFQNHashForEntity(IngestionPipeline.class, collectionDAO.ingestionPipelineDAO()); + updateFQNHashForEntity(Kpi.class, collectionDAO.kpiDAO()); + updateFQNHashForEntity(MessagingService.class, collectionDAO.messagingServiceDAO()); + updateFQNHashForEntity(MetadataService.class, collectionDAO.metadataServiceDAO()); + updateFQNHashForEntity(Metrics.class, collectionDAO.metricsDAO()); + updateFQNHashForEntity(MlModel.class, collectionDAO.mlModelDAO()); + updateFQNHashForEntity(MlModelService.class, collectionDAO.mlModelServiceDAO()); + updateFQNHashForEntity(Pipeline.class, collectionDAO.pipelineDAO()); + updateFQNHashForEntity(PipelineService.class, collectionDAO.pipelineServiceDAO()); + updateFQNHashForEntity(Policy.class, collectionDAO.policyDAO()); + updateFQNHashForEntity(Query.class, collectionDAO.queryDAO()); + updateFQNHashForEntity(Report.class, collectionDAO.reportDAO()); + updateFQNHashForEntity(Role.class, collectionDAO.roleDAO()); + updateFQNHashForEntity(StorageService.class, collectionDAO.storageServiceDAO()); + updateFQNHashForEntity(Table.class, collectionDAO.tableDAO()); + updateFQNHashForEntity(Tag.class, collectionDAO.tagDAO()); + updateFQNHashForEntity(Team.class, collectionDAO.teamDAO()); + updateFQNHashForEntity(TestCase.class, collectionDAO.testCaseDAO()); + updateFQNHashForEntity(TestConnectionDefinition.class, collectionDAO.testConnectionDefinitionDAO()); + updateFQNHashForEntity(TestDefinition.class, collectionDAO.testDefinitionDAO()); + updateFQNHashForEntity(TestSuite.class, collectionDAO.testSuiteDAO()); + updateFQNHashForEntity(Topic.class, collectionDAO.topicDAO()); + updateFQNHashForEntity(Type.class, collectionDAO.typeEntityDAO()); + updateFQNHashForEntity(User.class, collectionDAO.userDAO()); + updateFQNHashForEntity(WebAnalyticEvent.class, collectionDAO.webAnalyticEventDAO()); + updateFQNHashForEntity(Workflow.class, collectionDAO.workflowDAO()); + + // Field Relationship + updateFQNHashForFieldRelationship(collectionDAO); + + // TimeSeries + updateFQNHashEntityExtensionTimeSeries(collectionDAO); + + // Tag Usage + updateFQNHashTagUsage(collectionDAO); + } + + private static void updateFQNHashForFieldRelationship(CollectionDAO collectionDAO) { + List fieldRelationships = + collectionDAO.fieldRelationshipDAO().listAll(); + for (CollectionDAO.FieldRelationshipDAO.FieldRelationship fieldRelationship : fieldRelationships) { + if (CommonUtil.nullOrEmpty(fieldRelationship.getFromFQNHash()) + && CommonUtil.nullOrEmpty(fieldRelationship.getToFQNHash())) { + collectionDAO + .fieldRelationshipDAO() + .upsertFQNHash( + FullyQualifiedName.buildHash(fieldRelationship.getFromFQN()), + FullyQualifiedName.buildHash(fieldRelationship.getToFQN()), + fieldRelationship.getFromFQN(), + fieldRelationship.getToFQN(), + fieldRelationship.getFromType(), + fieldRelationship.getToType(), + fieldRelationship.getRelation(), + fieldRelationship.getJsonSchema(), + fieldRelationship.getJson()); + } + } + } + + private static void updateFQNHashEntityExtensionTimeSeries(CollectionDAO collectionDAO) { + List timeSeriesTables = + collectionDAO.entityExtensionTimeSeriesDao().listAll(); + for (CollectionDAO.EntityExtensionTimeSeriesDAO.EntityExtensionTimeSeriesTable timeSeries : timeSeriesTables) { + if (CommonUtil.nullOrEmpty(timeSeries.getEntityFQNHash())) { + collectionDAO + .entityExtensionTimeSeriesDao() + .updateEntityFQNHash( + FullyQualifiedName.buildHash(timeSeries.getEntityFQN()), + timeSeries.getEntityFQN(), + timeSeries.getExtension(), + timeSeries.getTimestamp()); + } + } + } + + public static void updateFQNHashTagUsage(CollectionDAO collectionDAO) { + List tagLabelMigrationList = collectionDAO.tagUsageDAO().listAll(); + for (CollectionDAO.TagUsageDAO.TagLabelMigration tagLabel : tagLabelMigrationList) { + if (CommonUtil.nullOrEmpty(tagLabel.getTagFQNHash()) && CommonUtil.nullOrEmpty(tagLabel.getTargetFQNHash())) { + collectionDAO + .tagUsageDAO() + .upsertFQNHash( + tagLabel.getSource(), + tagLabel.getTagFQN(), + FullyQualifiedName.buildHash(tagLabel.getTagFQN()), + FullyQualifiedName.buildHash(tagLabel.getTargetFQN()), + tagLabel.getLabelType(), + tagLabel.getState(), + tagLabel.getTargetFQN()); + } + } + } + + public static void performSqlExecutionAndUpdation( + MigrationStep step, MigrationDAO migrationDAO, Handle handle, List queryList) { + // These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace + Set executedSQLChecksums = + new HashSet<>(migrationDAO.getServerMigrationSQLWithVersion(String.valueOf(step.getMigrationVersion()))); + // Execute the Statements as batch + List toBeExecuted = + addInListIfToBeExecuted(String.valueOf(step.getMigrationVersion()), executedSQLChecksums, queryList); + + for (MigrationDAO.ServerMigrationSQLTable tableData : toBeExecuted) { + handle.execute(tableData.getSqlStatement()); + migrationDAO.upsertServerMigrationSQL( + tableData.getVersion(), tableData.getSqlStatement(), tableData.getCheckSum()); + } + } + + public static TestSuite getTestSuite(CollectionDAO dao, CreateTestSuite create, String user) throws IOException { + TestSuite testSuite = + copy(new TestSuite(), create, user) + .withDescription(create.getDescription()) + .withDisplayName(create.getDisplayName()) + .withName(create.getName()); + if (create.getExecutableEntityReference() != null) { + TableRepository tableRepository = new TableRepository(dao); + Table table = + JsonUtils.readValue( + tableRepository.getDao().findJsonByFqn(create.getExecutableEntityReference(), Include.ALL), Table.class); + EntityReference entityReference = + new EntityReference() + .withId(table.getId()) + .withFullyQualifiedName(table.getFullyQualifiedName()) + .withName(table.getName()) + .withType(Entity.TABLE); + testSuite.setExecutableEntityReference(entityReference); + } + return testSuite; + } + + public static TestSuite copy(TestSuite entity, CreateEntity request, String updatedBy) throws IOException { + entity.setId(UUID.randomUUID()); + entity.setName(request.getName()); + entity.setDisplayName(request.getDisplayName()); + entity.setDescription(request.getDescription()); + entity.setExtension(request.getExtension()); + entity.setUpdatedBy(updatedBy); + entity.setOwner(null); + entity.setUpdatedAt(System.currentTimeMillis()); + return entity; + } + + @SneakyThrows + public static void testSuitesMigration(CollectionDAO collectionDAO) { + IngestionPipelineRepository ingestionPipelineRepository = new IngestionPipelineRepository(collectionDAO); + TestSuiteRepository testSuiteRepository = new TestSuiteRepository(collectionDAO); + TestCaseRepository testCaseRepository = new TestCaseRepository(collectionDAO); + List testCases = + testCaseRepository.listAll(new EntityUtil.Fields(List.of("id")), new ListFilter(Include.ALL)); + + for (TestCase test : testCases) { + + // Create New Executable Test Suites + MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(test.getEntityLink()); + // Create new Logical Test Suite + String testSuiteFqn = entityLink.getEntityFQN() + ".testSuite"; + try { + // Check if the test Suite Exists, this brings the data on nameHash basis + TestSuite stored = + testSuiteRepository.getByName( + null, FullyQualifiedName.quoteName(testSuiteFqn), new EntityUtil.Fields(List.of("id")), Include.ALL); + testSuiteRepository.addRelationship(stored.getId(), test.getId(), TEST_SUITE, TEST_CASE, Relationship.CONTAINS); + } catch (EntityNotFoundException ex) { + // TODO: Need to update the executable field as well + TestSuite newExecutableTestSuite = + getTestSuite( + collectionDAO, + new CreateTestSuite() + .withName(testSuiteFqn) + .withExecutableEntityReference(entityLink.getEntityFQN()), + "ingestion-bot") + .withExecutable(false); + TestSuite testSuitePutResponse = testSuiteRepository.create(null, newExecutableTestSuite); + // Here we aer manually adding executable relationship since the table Repository is not registered and result + // into null for entity type table + testSuiteRepository.addRelationship( + newExecutableTestSuite.getExecutableEntityReference().getId(), + newExecutableTestSuite.getId(), + Entity.TABLE, + TEST_SUITE, + Relationship.CONTAINS); + + // add relationship from testSuite to TestCases + testSuiteRepository.addRelationship( + newExecutableTestSuite.getId(), test.getId(), TEST_SUITE, TEST_CASE, Relationship.CONTAINS); + + // Not a good approach but executable cannot be set true before + TestSuite temp = + testSuiteRepository + .getDao() + .findEntityByName(FullyQualifiedName.quoteName(newExecutableTestSuite.getName())); + temp.setExecutable(true); + testSuiteRepository.getDao().update(temp); + } + } + + // Update Test Suites + ListFilter filter = new ListFilter(Include.ALL); + filter.addQueryParam("testSuiteType", "logical"); + List testSuites = testSuiteRepository.listAll(new EntityUtil.Fields(List.of("id")), filter); + + for (TestSuite testSuiteRecord : testSuites) { + TestSuite temp = testSuiteRepository.getDao().findEntityById(testSuiteRecord.getId()); + if (Boolean.FALSE.equals(temp.getExecutable())) { + temp.setExecutable(false); + testSuiteRepository.getDao().update(temp); + } + + // get Ingestion Pipelines + try { + List ingestionPipelineRecords = + collectionDAO + .relationshipDAO() + .findTo( + testSuiteRecord.getId().toString(), + TEST_SUITE, + Relationship.CONTAINS.ordinal(), + INGESTION_PIPELINE); + for (CollectionDAO.EntityRelationshipRecord ingestionRecord : ingestionPipelineRecords) { + // remove relationship + collectionDAO.relationshipDAO().deleteAll(ingestionRecord.getId().toString(), INGESTION_PIPELINE); + // Cannot use Delete directly it uses other repos internally + ingestionPipelineRepository.getDao().delete(ingestionRecord.getId().toString()); + } + } catch (EntityNotFoundException ex) { + // Already Removed + } + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationStep.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationStep.java new file mode 100644 index 00000000000..b0e60753ffe --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationStep.java @@ -0,0 +1,30 @@ +package org.openmetadata.service.migration.api; + +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.locator.ConnectionType; + +public interface MigrationStep { + // This version should match the server version + // Ex: if the server is 1.0.0 the migration version for that server is 1.0.0 + // This version is used to sort all the upgrade migrations and apply them in order + String getMigrationVersion(); + + String getMigrationFileName(); + + String getFileUuid(); + + ConnectionType getDatabaseConnectionType(); + + void initialize(Handle handle); + + // Handle Non-transactional supported SQLs here Example changes in table struct (DDL + void preDDL(); + + // This method is to run code to fix any data + void runDataMigration(); + + // This method is to run SQL which can be part of the transaction post data migrations + void postDDL(); + + void close(); +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java new file mode 100644 index 00000000000..7e936e61d66 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java @@ -0,0 +1,156 @@ +package org.openmetadata.service.migration.api; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; +import org.openmetadata.service.jdbi3.MigrationDAO; +import org.openmetadata.service.jdbi3.locator.ConnectionType; + +@Slf4j +public class MigrationWorkflow { + private final List migrations; + private final MigrationDAO migrationDAO; + private final Jdbi jdbi; + private final ConnectionType workflowDatabaseConnectionType; + + private boolean ignoreFileChecksum = false; + + public MigrationWorkflow( + Jdbi jdbi, ConnectionType type, List migrationSteps, boolean ignoreFileChecksum) { + this.jdbi = jdbi; + this.workflowDatabaseConnectionType = type; + this.migrationDAO = jdbi.onDemand(MigrationDAO.class); + this.ignoreFileChecksum = ignoreFileChecksum; + + // Validate Migration + validateMigrations(migrationSteps); + + // Sort Migration on the basis of version + migrationSteps.sort(Comparator.comparing(MigrationStep::getMigrationVersion)); + + // Filter Migrations to Be Run + this.migrations = filterAndGetMigrationsToRun(migrationSteps); + } + + private void validateMigrations(List migrationSteps) { + for (MigrationStep step : migrationSteps) { + if (!step.getDatabaseConnectionType().equals(this.workflowDatabaseConnectionType)) { + throw new IllegalArgumentException( + String.format( + "Provided Migration File is for Database Connection %s", step.getDatabaseConnectionType().toString())); + } + } + } + + private List filterAndGetMigrationsToRun(List migrations) { + LOG.debug("Filtering Server Migrations"); + String maxMigration = migrations.get(migrations.size() - 1).getMigrationVersion(); + List result = new ArrayList<>(); + + for (MigrationStep step : migrations) { + String checksum = migrationDAO.getVersionMigrationChecksum(String.valueOf(step.getMigrationVersion())); + if (checksum != null) { + // Version Exist on DB this was run + if (maxMigration.compareTo(step.getMigrationVersion()) < 0) { + // This a new Step file + result.add(step); + } else if (ignoreFileChecksum || !checksum.equals(step.getFileUuid())) { + // This migration step was ran already, if checksum is equal this step can be ignored + LOG.warn( + "[Migration Workflow] You are changing an older Migration File. This is not advised. Add your changes to latest Migrations."); + result.add(step); + } + } else { + // Version does not exist on DB, this step was not run + result.add(step); + } + } + return result; + } + + @SuppressWarnings("unused") + private void initializeMigrationWorkflow() {} + + public void runMigrationWorkflows() { + try (Handle transactionHandler = jdbi.open()) { + LOG.info("[MigrationWorkflow] WorkFlow Started"); + try { + + for (MigrationStep step : migrations) { + // Initialise Migration Steps + LOG.info( + "[MigrationStep] Initialized, Version: {}, DatabaseType: {}, FileName: {}", + step.getMigrationVersion(), + step.getDatabaseConnectionType(), + step.getMigrationFileName()); + step.initialize(transactionHandler); + + LOG.info( + "[MigrationStep] Running PreDataSQLs, Version: {}, DatabaseType: {}, FileName: {}", + step.getMigrationVersion(), + step.getDatabaseConnectionType(), + step.getMigrationFileName()); + step.preDDL(); + + LOG.info("[MigrationStep] Transaction Started"); + + // Begin Transaction + transactionHandler.begin(); + + // Run Database Migration for all the Migration Steps + LOG.info( + "[MigrationStep] Running DataMigration, Version: {}, DatabaseType: {}, FileName: {}", + step.getMigrationVersion(), + step.getDatabaseConnectionType(), + step.getMigrationFileName()); + step.runDataMigration(); + + LOG.info("[MigrationStep] Committing Transaction"); + transactionHandler.commit(); + + // Run Database Migration for all the Migration Steps + LOG.info( + "[MigrationStep] Running TransactionalPostDataSQLs, Version: {}, DatabaseType: {}, FileName: {}", + step.getMigrationVersion(), + step.getDatabaseConnectionType(), + step.getMigrationFileName()); + step.postDDL(); + + // Handle Migration Closure + LOG.info( + "[MigrationStep] Update Migration Status, Version: {}, DatabaseType: {}, FileName: {}", + step.getMigrationVersion(), + step.getDatabaseConnectionType(), + step.getMigrationFileName()); + updateMigrationStepInDB(step); + } + + } catch (Exception e) { + // Any Exception catch the error + // Rollback the transaction + LOG.error("Encountered Exception in MigrationWorkflow", e); + LOG.info("[MigrationWorkflow] Rolling Back Transaction"); + if (transactionHandler.isInTransaction()) { + transactionHandler.rollback(); + } + } + } + LOG.info("[MigrationWorkflow] WorkFlow Completed"); + } + + public void closeMigrationWorkflow() { + // 1. Write to DB table the version we upgraded to + // should be the current server version + + // 2. Commit Transaction on completion + } + + public void updateMigrationStepInDB(MigrationStep step) { + migrationDAO.upsertServerMigration(step.getMigrationVersion(), step.getMigrationFileName(), step.getFileUuid()); + } + + public void migrateSearchIndexes() {} +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java new file mode 100644 index 00000000000..9741b894793 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java @@ -0,0 +1,178 @@ +package org.openmetadata.service.migration.versions.mysql.v110; + +import static org.openmetadata.service.migration.MigrationUtil.dataMigrationFQNHashing; +import static org.openmetadata.service.migration.MigrationUtil.performSqlExecutionAndUpdation; +import static org.openmetadata.service.migration.MigrationUtil.testSuitesMigration; + +import java.util.Arrays; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.MigrationDAO; +import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.migration.MigrationFile; +import org.openmetadata.service.migration.api.MigrationStep; + +@Slf4j +@MigrationFile(name = "v110_MySQLMigration") +@SuppressWarnings("unused") +public class MySQLMigration implements MigrationStep { + private CollectionDAO collectionDAO; + private MigrationDAO migrationDAO; + private Handle handle; + + @Override + public String getMigrationVersion() { + return "1.1.0"; + } + + @Override + public String getMigrationFileName() { + return "v110_MySQLMigration"; + } + + @Override + public String getFileUuid() { + return "ffcc502b-d4a0-4e5f-a562-0a6d4110c762"; + } + + @Override + public ConnectionType getDatabaseConnectionType() { + return ConnectionType.MYSQL; + } + + @Override + public void initialize(Handle handle) { + this.handle = handle; + this.collectionDAO = handle.attach(CollectionDAO.class); + this.migrationDAO = handle.attach(MigrationDAO.class); + } + + @Override + public void preDDL() { + preDDLFQNHashing(); + } + + @Override + public void runDataMigration() { + // FQN Hashing Migrations + dataMigrationFQNHashing(collectionDAO); + } + + @Override + public void postDDL() { + // This SQLs cannot be part of the commit as these need some data to be committed + postDDLFQNHashing(); + testSuitesMigration(collectionDAO); + } + + @Override + public void close() {} + + private void preDDLFQNHashing() { + // These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace + List queryList = + Arrays.asList( + "ALTER TABLE bot_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE classification DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE dashboard_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL", + "ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE dbservice_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE event_subscription_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE glossary_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE kpi_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE messaging_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE metadata_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE mlmodel_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE pipeline_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE query_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE role_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE storage_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE team_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL, ADD COLUMN fqnHash VARCHAR(256) NOT NULL", + "ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256) NOT NULL,ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL", + "ALTER TABLE test_definition DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE test_suite DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;", + "ALTER TABLE type_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE user_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + "ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL", + "ALTER TABLE automations_workflow DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL", + // field_relationship + "ALTER TABLE field_relationship ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256), DROP INDEX from_index, DROP INDEX to_index, ADD INDEX from_fqnhash_index(fromFQNHash, relation), ADD INDEX to_fqnhash_index(toFQNHash, relation)", + // entity_extension_time_series + "ALTER TABLE entity_extension_time_series ADD COLUMN entityFQNHash VARCHAR (256) NOT NULL", + // tag_usage + "ALTER TABLE tag_usage ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256)"); + performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList); + } + + private void postDDLFQNHashing() { + // These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace + List queryList = + Arrays.asList( + "ALTER TABLE bot_entity ADD UNIQUE (nameHash)", + "ALTER TABLE chart_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE classification ADD UNIQUE (nameHash)", + "ALTER TABLE storage_container_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE dashboard_data_model_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE dashboard_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE dashboard_service_entity ADD UNIQUE (nameHash)", + "ALTER TABLE data_insight_chart ADD UNIQUE (fqnHash)", + "ALTER TABLE database_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE database_schema_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE dbservice_entity ADD UNIQUE (nameHash)", + "ALTER TABLE event_subscription_entity ADD UNIQUE (nameHash)", + "ALTER TABLE glossary_entity ADD UNIQUE (nameHash)", + "ALTER TABLE glossary_term_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE ingestion_pipeline_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE kpi_entity ADD UNIQUE (nameHash)", + "ALTER TABLE messaging_service_entity ADD UNIQUE (nameHash)", + "ALTER TABLE metadata_service_entity ADD UNIQUE (nameHash)", + "ALTER TABLE metric_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE ml_model_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE mlmodel_service_entity ADD UNIQUE (nameHash)", + "ALTER TABLE pipeline_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE pipeline_service_entity ADD UNIQUE (nameHash)", + "ALTER TABLE policy_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE query_entity ADD UNIQUE (nameHash)", + "ALTER TABLE report_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE role_entity ADD UNIQUE (nameHash)", + "ALTER TABLE storage_service_entity ADD UNIQUE (nameHash)", + "ALTER TABLE table_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE tag ADD UNIQUE (fqnHash)", + "ALTER TABLE team_entity ADD UNIQUE (nameHash)", + "ALTER TABLE test_case ADD UNIQUE (fqnHash)", + "ALTER TABLE test_connection_definition ADD UNIQUE (nameHash)", + "ALTER TABLE test_definition ADD UNIQUE (nameHash)", + "ALTER TABLE test_suite ADD UNIQUE (nameHash)", + "ALTER TABLE topic_entity ADD UNIQUE (fqnHash)", + "ALTER TABLE type_entity ADD UNIQUE (nameHash)", + "ALTER TABLE user_entity ADD UNIQUE (nameHash)", + "ALTER TABLE web_analytic_event ADD UNIQUE (fqnHash)", + "ALTER TABLE automations_workflow ADD UNIQUE (nameHash)", + // entity_extension_time_series + "ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN", + // field_relationship + "ALTER TABLE field_relationship DROP KEY `PRIMARY`,ADD CONSTRAINT `field_relationship_primary` PRIMARY KEY(fromFQNHash, toFQNHash, relation), MODIFY fromFQN VARCHAR(2096) NOT NULL, MODIFY toFQN VARCHAR(2096) NOT NULL", + // tag_usage + "ALTER TABLE tag_usage DROP index `source`, DROP COLUMN targetFQN, ADD UNIQUE KEY `tag_usage_key` (source, tagFQNHash, targetFQNHash)"); + performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java new file mode 100644 index 00000000000..a2d166752f7 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java @@ -0,0 +1,181 @@ +package org.openmetadata.service.migration.versions.postgres.v110; + +import static org.openmetadata.service.migration.MigrationUtil.dataMigrationFQNHashing; +import static org.openmetadata.service.migration.MigrationUtil.performSqlExecutionAndUpdation; +import static org.openmetadata.service.migration.MigrationUtil.testSuitesMigration; + +import java.util.Arrays; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.MigrationDAO; +import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.migration.MigrationFile; +import org.openmetadata.service.migration.api.MigrationStep; + +@Slf4j +@MigrationFile(name = "v110_PostgresMigration") +@SuppressWarnings("unused") +public class PostgresMigration implements MigrationStep { + private CollectionDAO collectionDAO; + private MigrationDAO migrationDAO; + private Handle handle; + + @Override + public String getMigrationVersion() { + return "1.1.0"; + } + + @Override + public String getMigrationFileName() { + return "v110_PostgresMigration"; + } + + @Override + public String getFileUuid() { + return "98b837ea-5941-4577-bb6d-99ca6a80ed13"; + } + + @Override + public ConnectionType getDatabaseConnectionType() { + return ConnectionType.POSTGRES; + } + + @Override + public void initialize(Handle handle) { + this.handle = handle; + this.collectionDAO = handle.attach(CollectionDAO.class); + this.migrationDAO = handle.attach(MigrationDAO.class); + } + + @Override + public void preDDL() { + // FQNHASH + preDDLFQNHashing(); + } + + @Override + public void runDataMigration() { + dataMigrationFQNHashing(collectionDAO); + } + + @Override + public void postDDL() { + postDDLFQNHashing(); + testSuitesMigration(collectionDAO); + } + + @Override + public void close() {} + + private void preDDLFQNHashing() { + // These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace + List queryList = + Arrays.asList( + "ALTER TABLE bot_entity DROP CONSTRAINT bot_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE classification DROP CONSTRAINT tag_category_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE dashboard_service_entity DROP CONSTRAINT dashboard_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256)", + "ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE dbservice_entity DROP CONSTRAINT dbservice_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE event_subscription_entity DROP CONSTRAINT event_subscription_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE glossary_entity DROP CONSTRAINT glossary_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE kpi_entity DROP CONSTRAINT kpi_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE messaging_service_entity DROP CONSTRAINT messaging_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE metadata_service_entity DROP CONSTRAINT metadata_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE mlmodel_service_entity DROP CONSTRAINT mlmodel_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE pipeline_service_entity DROP CONSTRAINT pipeline_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE query_entity DROP CONSTRAINT query_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE role_entity DROP CONSTRAINT role_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE storage_service_entity DROP CONSTRAINT storage_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE team_entity DROP CONSTRAINT team_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL, ADD COLUMN fqnHash VARCHAR(256)", + "ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE test_definition DROP CONSTRAINT test_definition_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE test_suite DROP CONSTRAINT test_suite_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL", + "ALTER TABLE type_entity DROP CONSTRAINT type_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE user_entity DROP CONSTRAINT user_entity_name_key, ADD COLUMN nameHash VARCHAR(256)", + "ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256)", + "ALTER TABLE automations_workflow DROP CONSTRAINT automations_workflow_name_key, ADD COLUMN nameHash VARCHAR(256)", + // field_relationship + "DROP INDEX field_relationship_from_index, field_relationship_to_index;", + "ALTER TABLE field_relationship ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256)", + "CREATE INDEX IF NOT EXISTS field_relationship_from_index ON field_relationship(fromFQNHash, relation)", + "CREATE INDEX IF NOT EXISTS field_relationship_to_index ON field_relationship(toFQNHash, relation)", + // entity_extension_time_series + "ALTER TABLE entity_extension_time_series ADD COLUMN entityFQNHash VARCHAR (256)", + // tag_usage + "ALTER TABLE tag_usage ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256)"); + performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList); + } + + private void postDDLFQNHashing() { + // These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace + + List queryList = + Arrays.asList( + "ALTER TABLE bot_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE chart_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE classification ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE storage_container_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE dashboard_data_model_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE dashboard_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE dashboard_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE data_insight_chart ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE database_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE database_schema_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE dbservice_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE event_subscription_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE glossary_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE glossary_term_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE ingestion_pipeline_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE kpi_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE messaging_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE metadata_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE metric_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE ml_model_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE mlmodel_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE pipeline_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE pipeline_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE policy_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE query_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE report_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE role_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE storage_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE table_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE tag ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE team_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE test_case ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE test_connection_definition ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE test_definition ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE test_suite ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE topic_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE type_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE user_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + "ALTER TABLE web_analytic_event ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL", + "ALTER TABLE automations_workflow ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL", + // field_relationship + "ALTER TABLE field_relationship DROP CONSTRAINT field_relationship_pkey, ADD CONSTRAINT field_relationship_pkey PRIMARY KEY(fromFQNHash, toFQNHash, relation), ALTER fromFQN TYPE VARCHAR(2096), ALTER toFQN TYPE VARCHAR(2096)", + // entity_extension_time_series + "ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN, ALTER COLUMN entityFQNHash SET NOT NULL", + // tag_usage + "ALTER TABLE tag_usage DROP CONSTRAINT tag_usage_source_tagfqn_targetfqn_key, DROP COLUMN targetFQN, ADD UNIQUE (source, tagFQNHash, targetFQNHash)"); + performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatasourceConfig.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatasourceConfig.java index 397f9c56291..cafdeb38fd9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatasourceConfig.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatasourceConfig.java @@ -24,4 +24,14 @@ public class DatasourceConfig { public Boolean isMySQL() { return ConnectionType.MYSQL.label.equals(dataSourceFactory.getDriverClass()); } + + public ConnectionType getDatabaseConnectionType() { + if (ConnectionType.MYSQL.label.equals(dataSourceFactory.getDriverClass())) { + return ConnectionType.MYSQL; + } else { + // Currently Mysql and Postgres are only supported hence not added a label check for now for Postgres it's either + // this or that + return ConnectionType.POSTGRES; + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/TablesInitializer.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/TablesInitializer.java index f06276a84e5..3eaec5d3553 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/TablesInitializer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/TablesInitializer.java @@ -29,7 +29,10 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Scanner; +import java.util.Set; import javax.validation.Validator; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -47,23 +50,35 @@ import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; +import org.openmetadata.service.migration.MigrationFile; +import org.openmetadata.service.migration.api.MigrationStep; +import org.openmetadata.service.migration.api.MigrationWorkflow; +import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.search.IndexUtil; import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.secrets.SecretsManagerFactory; +import org.reflections.Reflections; public final class TablesInitializer { private static final String DEBUG_MODE_ENABLED = "debug_mode"; private static final String OPTION_SCRIPT_ROOT_PATH = "script-root"; private static final String OPTION_CONFIG_FILE_PATH = "config"; + private static final String OPTION_IGNORE_SERVER_FILE_CHECKSUM = "ignoreCheckSum"; private static final String DISABLE_VALIDATE_ON_MIGRATE = "disable-validate-on-migrate"; private static final Options OPTIONS; private static boolean debugMode = false; + private static boolean ignoreServerFileChecksum = false; static { OPTIONS = new Options(); OPTIONS.addOption("debug", DEBUG_MODE_ENABLED, false, "Enable Debug Mode"); OPTIONS.addOption("s", OPTION_SCRIPT_ROOT_PATH, true, "Root directory of script path"); OPTIONS.addOption("c", OPTION_CONFIG_FILE_PATH, true, "Config file path"); + OPTIONS.addOption( + "ignoreCheckSum", + OPTION_IGNORE_SERVER_FILE_CHECKSUM, + true, + "Ignore the server checksum and rerun same file in migrate"); OPTIONS.addOption(null, SchemaMigrationOption.CREATE.toString(), false, "Run sql migrations from scratch"); OPTIONS.addOption(null, SchemaMigrationOption.DROP.toString(), false, "Drop all the tables in the target database"); OPTIONS.addOption( @@ -110,6 +125,9 @@ public final class TablesInitializer { if (commandLine.hasOption(DEBUG_MODE_ENABLED)) { debugMode = true; } + if (commandLine.hasOption(OPTION_IGNORE_SERVER_FILE_CHECKSUM)) { + ignoreServerFileChecksum = Boolean.parseBoolean(commandLine.getOptionValue(OPTION_IGNORE_SERVER_FILE_CHECKSUM)); + } boolean isSchemaMigrationOptionSpecified = false; SchemaMigrationOption schemaMigrationOptionSpecified = null; for (SchemaMigrationOption schemaMigrationOption : SchemaMigrationOption.values()) { @@ -259,6 +277,8 @@ public final class TablesInitializer { break; case MIGRATE: flyway.migrate(); + // Validate and Run System Data Migrations + validateAndRunSystemDataMigrations(jdbi, config, ignoreServerFileChecksum); break; case INFO: printToConsoleMandatory(dumpToAsciiTable(flyway.info().all())); @@ -308,6 +328,37 @@ public final class TablesInitializer { } } + private static void validateAndRunSystemDataMigrations( + Jdbi jdbi, OpenMetadataApplicationConfig config, boolean ignoreFileChecksum) { + DatasourceConfig.initialize(config); + List loadedMigrationFiles = getServerMigrationFiles(); + MigrationWorkflow workflow = + new MigrationWorkflow( + jdbi, DatasourceConfig.getInstance().getDatabaseConnectionType(), loadedMigrationFiles, ignoreFileChecksum); + workflow.runMigrationWorkflows(); + } + + private static List getServerMigrationFiles() { + List migrations = new ArrayList<>(); + try { + String prefix = + Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL()) + ? "org.openmetadata.service.migration.versions.mysql" + : "org.openmetadata.service.migration.versions.postgres"; + Reflections reflections = new Reflections(prefix); + Set> migrationClasses = reflections.getTypesAnnotatedWith(MigrationFile.class); + for (Class clazz : migrationClasses) { + MigrationStep step = + Class.forName(clazz.getCanonicalName()).asSubclass(MigrationStep.class).getConstructor().newInstance(); + migrations.add(step); + } + } catch (Exception ex) { + printToConsoleMandatory("Failure in list System Migration Files"); + throw new RuntimeException(ex); + } + return migrations; + } + private static void printError(String message) { System.err.println(message); }