interfaces for db migrations (#12057)

* interfaces for db migrations

* Add Data Migration for entities to 1.1

* Migration for FQN hash

* Added Tag Usage And Fixed some transaction issue in testing

* Added SERVER_CHANGE_LOG validations

* Update Path for files

* Added Postgres Migration

* Test Suite Migration [WIP]

* remove ingestion pipeline

* moved Migration to Table Initializer

* init Data source config in TableInitializer

* make checksum ignore configurable

---------

Co-authored-by: mohitdeuex <mohit.y@deuexsolutions.com>
This commit is contained in:
Sriharsha Chintalapani 2023-06-26 00:33:44 -07:00 committed by GitHub
parent 29d3de2520
commit 33c752d2f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1266 additions and 179 deletions

View File

@ -125,95 +125,6 @@ where serviceType in ('Postgres', 'Mysql');
-- Clean old test connections
TRUNCATE automations_workflow;
-- add fullyQualifiedName hash and remove existing columns
-- update the OM system tables
ALTER TABLE field_relationship DROP KEY `PRIMARY`, ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256),
DROP INDEX from_index, DROP INDEX to_index, ADD INDEX from_fqnhash_index(fromFQNHash, relation), ADD INDEX to_fqnhash_index(toFQNHash, relation),
ADD CONSTRAINT `field_relationship_primary` PRIMARY KEY(fromFQNHash, toFQNHash, relation), MODIFY fromFQN VARCHAR(2096) NOT NULL,
MODIFY toFQN VARCHAR(2096) NOT NULL;
ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN, ADD COLUMN entityFQNHash VARCHAR (256) NOT NULL;
ALTER TABLE type_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE event_subscription_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_definition DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_suite DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash);
ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash);
ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash);
ALTER TABLE kpi_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE classification DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);;
ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE tag_usage DROP index `source`, DROP COLUMN targetFQN, ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256),
ADD UNIQUE KEY `tag_usage_key` (source, tagFQNHash, targetFQNHash);
ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE role_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE automations_workflow DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
-- update services
ALTER TABLE dbservice_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE messaging_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE dashboard_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE pipeline_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE storage_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE metadata_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE mlmodel_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
-- all entity tables
ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;
ALTER TABLE query_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE team_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE user_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE bot_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE glossary_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
-- Remove sourceUrl in pipeline_entity from DatabricksPipeline & Fivetran
UPDATE pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceUrl')
@ -223,3 +134,18 @@ WHERE JSON_EXTRACT(json, '$.serviceType') in ('DatabricksPipeline','Fivetran');
UPDATE dashboard_entity
SET json = JSON_REMOVE(json, '$.sourceUrl')
WHERE JSON_EXTRACT(json, '$.serviceType') in ('Mode');
CREATE TABLE IF NOT EXISTS SERVER_CHANGE_LOG (
installed_rank SERIAL,
version VARCHAR(256) PRIMARY KEY,
migrationFileName VARCHAR(256) NOT NULL,
checksum VARCHAR(256) NOT NULL,
installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS SERVER_MIGRATION_SQL_LOGS (
version VARCHAR(256) NOT NULL,
sqlStatement VARCHAR(10000) NOT NULL,
checksum VARCHAR(256) PRIMARY KEY,
executedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

View File

@ -87,92 +87,6 @@ WHERE serviceType IN ('Postgres', 'Mysql')
-- Clean old test connections
TRUNCATE automations_workflow;
DROP INDEX field_relationship_from_index, field_relationship_to_index;
ALTER TABLE field_relationship DROP CONSTRAINT field_relationship_pkey, ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256),
ADD CONSTRAINT field_relationship_pkey PRIMARY KEY(fromFQNHash, toFQNHash, relation),
ALTER fromFQN TYPE VARCHAR(2096), ALTER toFQN TYPE VARCHAR(2096);
CREATE INDEX IF NOT EXISTS field_relationship_from_index ON field_relationship(fromFQNHash, relation);
CREATE INDEX IF NOT EXISTS field_relationship_to_index ON field_relationship(toFQNHash, relation);
ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN, ADD COLUMN entityFQNHash VARCHAR (256) NOT NULL;
ALTER TABLE type_entity DROP CONSTRAINT type_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE event_subscription_entity DROP CONSTRAINT event_subscription_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_definition DROP CONSTRAINT test_definition_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_suite DROP CONSTRAINT test_suite_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash);
ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash);
ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash);
ALTER TABLE kpi_entity DROP CONSTRAINT kpi_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE classification DROP CONSTRAINT tag_category_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);;
ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE tag_usage DROP CONSTRAINT tag_usage_source_tagfqn_targetfqn_key, DROP COLUMN targetFQN, ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256),
ADD UNIQUE (source, tagFQNHash, targetFQNHash);
ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE role_entity DROP CONSTRAINT role_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE automations_workflow DROP CONSTRAINT automations_workflow_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
-- update services
ALTER TABLE dbservice_entity DROP CONSTRAINT dbservice_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE messaging_service_entity DROP CONSTRAINT messaging_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE dashboard_service_entity DROP CONSTRAINT dashboard_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE pipeline_service_entity DROP CONSTRAINT pipeline_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE storage_service_entity DROP CONSTRAINT storage_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE metadata_service_entity DROP CONSTRAINT metadata_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE mlmodel_service_entity DROP CONSTRAINT mlmodel_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
-- all entity tables
ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD UNIQUE (fqnHash),
ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL;
ALTER TABLE query_entity ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE team_entity DROP CONSTRAINT team_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE user_entity DROP CONSTRAINT user_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE bot_entity DROP CONSTRAINT bot_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
ALTER TABLE glossary_entity DROP CONSTRAINT glossary_entity_name_key, ADD COLUMN nameHash VARCHAR(256) NOT NULL, ADD UNIQUE (nameHash);
-- Remove sourceUrl in pipeline_entity from DatabricksPipeline & Fivetran
UPDATE pipeline_entity
SET json = json::jsonb #- '{sourceUrl}'
@ -183,3 +97,18 @@ where json #> '{serviceType}' in ('"DatabricksPipeline"','"Fivetran"');
UPDATE dashboard_entity
SET json = json::jsonb #- '{sourceUrl}'
where json #> '{serviceType}' in ('"Mode"');
CREATE TABLE IF NOT EXISTS SERVER_CHANGE_LOG (
installed_rank SERIAL,
version VARCHAR(256) PRIMARY KEY,
migrationFileName VARCHAR(256) NOT NULL,
checksum VARCHAR(256) NOT NULL,
installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS SERVER_MIGRATION_SQL_LOGS (
version VARCHAR(256) NOT NULL,
sqlStatement VARCHAR(10000) NOT NULL,
checksum VARCHAR(256) PRIMARY KEY,
executedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

View File

@ -130,6 +130,9 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
NoSuchAlgorithmException {
validateConfiguration(catalogConfig);
// init for dataSourceFactory
DatasourceConfig.initialize(catalogConfig);
ChangeEventConfig.initialize(catalogConfig);
final Jdbi jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory());
@ -163,9 +166,6 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
// Register Authenticator
registerAuthenticator(catalogConfig);
// init for dataSourceFactory
DatasourceConfig.initialize(catalogConfig);
// Unregister dropwizard default exception mappers
((DefaultServerFactory) catalogConfig.getServerFactory()).setRegisterDefaultExceptionMappers(false);
environment.jersey().property(ServerProperties.RESPONSE_SET_STATUS_OVER_SEND_ERROR, true);

View File

@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
@ -1197,6 +1198,31 @@ public interface CollectionDAO {
@Bind("jsonSchema") String jsonSchema,
@Bind("json") String json);
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO field_relationship(fromFQNHash, toFQNHash, fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) "
+ "VALUES (:fromFQNHash, :toFQNHash, :fromFQN, :toFQN, :fromType, :toType, :relation, :jsonSchema, :json) "
+ "ON DUPLICATE KEY UPDATE fromFQNHash = :fromFQNHash,"
+ "toFQNHash = :toFQNHash",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO field_relationship(fromFQNHash, toFQNHash, fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) "
+ "VALUES (:fromFQNHash, :toFQNHash, :fromFQN, :toFQN, :fromType, :toType, :relation, :jsonSchema, (:json :: jsonb)) "
+ "ON CONFLICT (fromFQN, toFQN, relation) DO UPDATE SET fromFQNHash = EXCLUDED.fromFQNHash,"
+ "toFQNHash = EXCLUDED.toFQNHash",
connectionType = POSTGRES)
void upsertFQNHash(
@Bind("fromFQNHash") String fromFQNHash,
@Bind("toFQNHash") String toFQNHash,
@Bind("fromFQN") String fromFQN,
@Bind("toFQN") String toFQN,
@Bind("fromType") String fromType,
@Bind("toType") String toType,
@Bind("relation") int relation,
@Bind("jsonSchema") String jsonSchema,
@Bind("json") String json);
@SqlQuery(
"SELECT json FROM field_relationship WHERE "
+ "fromFQNHash = :fromFQNHash AND toFQNHash = :toFQNHash AND fromType = :fromType "
@ -1219,6 +1245,10 @@ public interface CollectionDAO {
@Bind("toType") String toType,
@Bind("relation") int relation);
@SqlQuery("SELECT * FROM field_relationship")
@RegisterRowMapper(FieldRelationShipMapper.class)
List<FieldRelationship> listAll();
@SqlQuery(
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
+ "fromFQNHash = :fqnHash AND fromType = :type AND toType = :otherType AND relation = :relation "
@ -1272,6 +1302,37 @@ public interface CollectionDAO {
return Triple.of(rs.getString("fromFQN"), rs.getString("toFQN"), rs.getString("json"));
}
}
class FieldRelationShipMapper implements RowMapper<FieldRelationship> {
@Override
public FieldRelationship map(ResultSet rs, StatementContext ctx) throws SQLException {
FieldRelationship result = new FieldRelationship();
result.setFromFQNHash(rs.getString("fromFQNHash"));
result.setToFQNHash(rs.getString("toFQNHash"));
result.setFromFQN(rs.getString("fromFQN"));
result.setToFQN(rs.getString("toFQN"));
result.setFromType(rs.getString("fromType"));
result.setToType(rs.getString("toType"));
result.setRelation(rs.getInt("relation"));
result.setJsonSchema(rs.getString("jsonSchema"));
result.setJson(rs.getString("json"));
return result;
}
}
@Getter
@Setter
class FieldRelationship {
private String fromFQNHash;
private String toFQNHash;
private String fromFQN;
private String toFQN;
private String fromType;
private String toType;
private int relation;
private String jsonSchema;
private String json;
}
}
interface BotDAO extends EntityDAO<Bot> {
@ -1348,7 +1409,7 @@ public interface CollectionDAO {
interface MetricsDAO extends EntityDAO<Metrics> {
@Override
default String getTableName() {
return "metrics_entity";
return "metric_entity";
}
@Override
@ -1782,6 +1843,11 @@ public interface CollectionDAO {
"SELECT source, tagFQN, labelType, state FROM tag_usage WHERE targetFQNHash = :targetFQNHash ORDER BY tagFQN")
List<TagLabel> getTagsInternal(@Bind("targetFQNHash") String targetFQNHash);
@SqlQuery("SELECT * FROM tag_usage")
@Deprecated(since = "Release 1.1")
@RegisterRowMapper(TagLabelMapperMigration.class)
List<TagLabelMigration> listAll();
@SqlQuery(
"SELECT COUNT(*) FROM tag_usage "
+ "WHERE (tagFQNHash LIKE CONCAT(:tagFqnHash, '.%') OR tagFQNHash = :tagFqnHash) "
@ -1801,6 +1867,29 @@ public interface CollectionDAO {
"DELETE FROM tag_usage where targetFQNHash = :targetFQNHash OR targetFQNHash LIKE CONCAT(:targetFQNHash, '.%')")
void deleteTagLabelsByTargetPrefix(@Bind("targetFQNHash") String targetFQNHash);
@Deprecated(since = "Release 1.1")
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, targetFQN)"
+ "VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :targetFQN) "
+ "ON DUPLICATE KEY UPDATE tagFQNHash = :tagFQNHash, targetFQNHash = :targetFQNHash",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, targetFQN) "
+ "VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :targetFQN) "
+ "ON CONFLICT (source, tagFQN, targetFQN) "
+ "DO UPDATE SET tagFQNHash = EXCLUDED.tagFQNHash, targetFQNHash = EXCLUDED.targetFQNHash",
connectionType = POSTGRES)
void upsertFQNHash(
@Bind("source") int source,
@Bind("tagFQN") String tagFQN,
@Bind("tagFQNHash") String tagFQNHash,
@Bind("targetFQNHash") String targetFQNHash,
@Bind("labelType") int labelType,
@Bind("state") int state,
@Bind("targetFQN") String targetFQN);
/** Update all the tagFQN starting with oldPrefix to start with newPrefix due to tag or glossary name change */
default void updateTagPrefix(int source, String oldPrefix, String newPrefix) {
String update =
@ -1846,6 +1935,50 @@ public interface CollectionDAO {
.withTagFQN(r.getString("tagFQN"));
}
}
@Getter
@Setter
@Deprecated(since = "Release 1.1")
class TagLabelMigration {
public int source;
public String tagFQN;
public String targetFQN;
public int labelType;
public int state;
private String tagFQNHash;
public String targetFQNHash;
}
@Deprecated(since = "Release 1.1")
class TagLabelMapperMigration implements RowMapper<TagLabelMigration> {
@Override
public TagLabelMigration map(ResultSet r, StatementContext ctx) throws SQLException {
TagLabelMigration tagLabel = new TagLabelMigration();
tagLabel.setSource(r.getInt("source"));
tagLabel.setLabelType(r.getInt("labelType"));
tagLabel.setState(r.getInt("state"));
tagLabel.setTagFQN(r.getString("tagFQN"));
// TODO : Ugly , but this is present is lower version and removed on higher version
try {
// This field is removed in latest
tagLabel.setTargetFQN(r.getString("targetFQN"));
} catch (Exception ex) {
// Nothing to do
}
try {
tagLabel.setTagFQNHash(r.getString("tagFQNHash"));
} catch (Exception ex) {
// Nothing to do
}
try {
tagLabel.setTargetFQNHash(r.getString("targetFQNHash"));
} catch (Exception ex) {
// Nothing to do
}
return tagLabel;
}
}
}
interface RoleDAO extends EntityDAO<Role> {
@ -2830,6 +2963,20 @@ public interface CollectionDAO {
@Bind("json") String json,
@Bind("timestamp") Long timestamp);
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp",
connectionType = POSTGRES)
void updateEntityFQNHash(
@Bind("entityFQNHash") String entityFQNHash,
@Bind("entityFQN") String entityFQN,
@Bind("extension") String extension,
@Bind("timestamp") Long timestamp);
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set json = :json where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation",
@ -3099,6 +3246,40 @@ public interface CollectionDAO {
return new ReportDataRow(rowNumber, reportData);
}
}
@SqlQuery("select * from entity_extension_time_series")
@RegisterRowMapper(EntityExtensionTimeSeries.class)
List<EntityExtensionTimeSeriesTable> listAll();
@Getter
@Setter
class EntityExtensionTimeSeriesTable {
private String entityFQN;
private String extension;
private String jsonSchema;
private String json;
private long timestamp;
private String entityFQNHash;
}
class EntityExtensionTimeSeries implements RowMapper<EntityExtensionTimeSeriesTable> {
@Override
public EntityExtensionTimeSeriesTable map(ResultSet rs, StatementContext ctx) throws SQLException {
EntityExtensionTimeSeriesTable result = new EntityExtensionTimeSeriesTable();
// TODO : Ugly , after migration this is removed
try {
result.setEntityFQN(rs.getString("entityFQN"));
} catch (Exception ex) {
// Nothing
}
result.setExtension(rs.getString("extension"));
result.setJsonSchema(rs.getString("jsonSchema"));
result.setJson(rs.getString("json"));
result.setTimestamp(rs.getLong("timestamp"));
result.setEntityFQNHash(rs.getString("entityFQNHash"));
return result;
}
}
}
class EntitiesCountRowMapper implements RowMapper<EntitiesCount> {

View File

@ -3,14 +3,87 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL;
import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES;
import java.util.List;
import java.util.Optional;
import lombok.Getter;
import lombok.Setter;
import org.jdbi.v3.core.statement.StatementException;
import org.jdbi.v3.sqlobject.SingleValue;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
public interface MigrationDAO {
@ConnectionAwareSqlQuery(value = "SELECT MAX(version) FROM DATABASE_CHANGE_LOG", connectionType = MYSQL)
@ConnectionAwareSqlQuery(value = "SELECT max(version) FROM \"DATABASE_CHANGE_LOG\"", connectionType = POSTGRES)
@SingleValue
Optional<String> getMaxVersion() throws StatementException;
@ConnectionAwareSqlQuery(
value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version",
connectionType = POSTGRES)
String getVersionMigrationChecksum(@Bind("version") String version) throws StatementException;
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, CURRENT_TIMESTAMP) "
+ "ON DUPLICATE KEY UPDATE "
+ "migrationFileName = :migrationFileName, "
+ "checksum = :checksum, "
+ "installed_on = CURRENT_TIMESTAMP",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO server_change_log (version, migrationFileName, checksum, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, current_timestamp) "
+ "ON CONFLICT (version) DO UPDATE SET "
+ "migrationFileName = EXCLUDED.migrationFileName, "
+ "checksum = EXCLUDED.checksum, "
+ "installed_on = EXCLUDED.installed_on",
connectionType = POSTGRES)
void upsertServerMigration(
@Bind("version") String version,
@Bind("migrationFileName") String migrationFileName,
@Bind("checksum") String checksum);
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO SERVER_MIGRATION_SQL_LOGS (version, sqlStatement, checksum, executedAt)"
+ "VALUES (:version, :sqlStatement, :checksum, CURRENT_TIMESTAMP) "
+ "ON DUPLICATE KEY UPDATE "
+ "version = :version, "
+ "sqlStatement = :sqlStatement, "
+ "executedAt = CURRENT_TIMESTAMP",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO SERVER_MIGRATION_SQL_LOGS (version, sqlStatement, checksum, executedAt)"
+ "VALUES (:version, :sqlStatement, :checksum, current_timestamp) "
+ "ON CONFLICT (checksum) DO UPDATE SET "
+ "version = EXCLUDED.version, "
+ "sqlStatement = EXCLUDED.sqlStatement, "
+ "executedAt = EXCLUDED.executedAt",
connectionType = POSTGRES)
void upsertServerMigrationSQL(
@Bind("version") String version, @Bind("sqlStatement") String sqlStatement, @Bind("checksum") String success);
@ConnectionAwareSqlQuery(
value = "SELECT checksum FROM SERVER_MIGRATION_SQL_LOGS where version = :version",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value = "SELECT checksum FROM SERVER_MIGRATION_SQL_LOGS where version = :version",
connectionType = POSTGRES)
List<String> getServerMigrationSQLWithVersion(@Bind("version") String version);
@Getter
@Setter
class ServerMigrationSQLTable {
private String version;
private String sqlStatement;
private String checkSum;
}
}

View File

@ -0,0 +1,12 @@
package org.openmetadata.service.migration;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.CONSTRUCTOR})
public @interface MigrationFile {
String name();
}

View File

@ -0,0 +1,360 @@
package org.openmetadata.service.migration;
import static org.openmetadata.service.Entity.INGESTION_PIPELINE;
import static org.openmetadata.service.Entity.TEST_CASE;
import static org.openmetadata.service.Entity.TEST_SUITE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.CreateEntity;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.WebAnalyticEvent;
import org.openmetadata.schema.api.tests.CreateTestSuite;
import org.openmetadata.schema.dataInsight.DataInsightChart;
import org.openmetadata.schema.dataInsight.kpi.Kpi;
import org.openmetadata.schema.entity.Bot;
import org.openmetadata.schema.entity.Type;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.classification.Classification;
import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.entity.data.Chart;
import org.openmetadata.schema.entity.data.Container;
import org.openmetadata.schema.entity.data.Dashboard;
import org.openmetadata.schema.entity.data.DashboardDataModel;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.Glossary;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.data.Metrics;
import org.openmetadata.schema.entity.data.MlModel;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.data.Report;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.services.MessagingService;
import org.openmetadata.schema.entity.services.MetadataService;
import org.openmetadata.schema.entity.services.MlModelService;
import org.openmetadata.schema.entity.services.PipelineService;
import org.openmetadata.schema.entity.services.StorageService;
import org.openmetadata.schema.entity.services.connections.TestConnectionDefinition;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.teams.Role;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestDefinition;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityDAO;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.jdbi3.TestCaseRepository;
import org.openmetadata.service.jdbi3.TestSuiteRepository;
import org.openmetadata.service.migration.api.MigrationStep;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class MigrationUtil {
@SneakyThrows
public static <T extends EntityInterface> void updateFQNHashForEntity(Class<T> clazz, EntityDAO<T> dao) {
List<String> jsons = dao.listAfter(new ListFilter(Include.ALL), Integer.MAX_VALUE, "");
for (String json : jsons) {
T entity = JsonUtils.readValue(json, clazz);
dao.update(
entity.getId(), FullyQualifiedName.buildHash(entity.getFullyQualifiedName()), JsonUtils.pojoToJson(entity));
}
}
public static MigrationDAO.ServerMigrationSQLTable buildServerMigrationTable(String version, String statement) {
MigrationDAO.ServerMigrationSQLTable result = new MigrationDAO.ServerMigrationSQLTable();
result.setVersion(String.valueOf(version));
result.setSqlStatement(statement);
result.setCheckSum(EntityUtil.hash(statement));
return result;
}
public static List<MigrationDAO.ServerMigrationSQLTable> addInListIfToBeExecuted(
String version, Set<String> lookUp, List<String> queries) {
List<MigrationDAO.ServerMigrationSQLTable> result = new ArrayList<>();
for (String query : queries) {
MigrationDAO.ServerMigrationSQLTable tableContent = buildServerMigrationTable(version, query);
if (!lookUp.contains(tableContent.getCheckSum())) {
result.add(tableContent);
} else {
// TODO:LOG better
LOG.debug("Query will be skipped in Migration Step , as this has already been executed");
}
}
return result;
}
public static void dataMigrationFQNHashing(CollectionDAO collectionDAO) {
// Migration for Entities
updateFQNHashForEntity(Bot.class, collectionDAO.botDAO());
updateFQNHashForEntity(Chart.class, collectionDAO.chartDAO());
updateFQNHashForEntity(Classification.class, collectionDAO.classificationDAO());
updateFQNHashForEntity(Container.class, collectionDAO.containerDAO());
updateFQNHashForEntity(DashboardDataModel.class, collectionDAO.dashboardDataModelDAO());
updateFQNHashForEntity(Dashboard.class, collectionDAO.dashboardDAO());
updateFQNHashForEntity(DashboardService.class, collectionDAO.dashboardServiceDAO());
updateFQNHashForEntity(DataInsightChart.class, collectionDAO.dataInsightChartDAO());
updateFQNHashForEntity(Database.class, collectionDAO.databaseDAO());
updateFQNHashForEntity(DatabaseSchema.class, collectionDAO.databaseSchemaDAO());
updateFQNHashForEntity(DatabaseService.class, collectionDAO.dbServiceDAO());
updateFQNHashForEntity(EventSubscription.class, collectionDAO.eventSubscriptionDAO());
updateFQNHashForEntity(Glossary.class, collectionDAO.glossaryDAO());
updateFQNHashForEntity(GlossaryTerm.class, collectionDAO.glossaryTermDAO());
updateFQNHashForEntity(IngestionPipeline.class, collectionDAO.ingestionPipelineDAO());
updateFQNHashForEntity(Kpi.class, collectionDAO.kpiDAO());
updateFQNHashForEntity(MessagingService.class, collectionDAO.messagingServiceDAO());
updateFQNHashForEntity(MetadataService.class, collectionDAO.metadataServiceDAO());
updateFQNHashForEntity(Metrics.class, collectionDAO.metricsDAO());
updateFQNHashForEntity(MlModel.class, collectionDAO.mlModelDAO());
updateFQNHashForEntity(MlModelService.class, collectionDAO.mlModelServiceDAO());
updateFQNHashForEntity(Pipeline.class, collectionDAO.pipelineDAO());
updateFQNHashForEntity(PipelineService.class, collectionDAO.pipelineServiceDAO());
updateFQNHashForEntity(Policy.class, collectionDAO.policyDAO());
updateFQNHashForEntity(Query.class, collectionDAO.queryDAO());
updateFQNHashForEntity(Report.class, collectionDAO.reportDAO());
updateFQNHashForEntity(Role.class, collectionDAO.roleDAO());
updateFQNHashForEntity(StorageService.class, collectionDAO.storageServiceDAO());
updateFQNHashForEntity(Table.class, collectionDAO.tableDAO());
updateFQNHashForEntity(Tag.class, collectionDAO.tagDAO());
updateFQNHashForEntity(Team.class, collectionDAO.teamDAO());
updateFQNHashForEntity(TestCase.class, collectionDAO.testCaseDAO());
updateFQNHashForEntity(TestConnectionDefinition.class, collectionDAO.testConnectionDefinitionDAO());
updateFQNHashForEntity(TestDefinition.class, collectionDAO.testDefinitionDAO());
updateFQNHashForEntity(TestSuite.class, collectionDAO.testSuiteDAO());
updateFQNHashForEntity(Topic.class, collectionDAO.topicDAO());
updateFQNHashForEntity(Type.class, collectionDAO.typeEntityDAO());
updateFQNHashForEntity(User.class, collectionDAO.userDAO());
updateFQNHashForEntity(WebAnalyticEvent.class, collectionDAO.webAnalyticEventDAO());
updateFQNHashForEntity(Workflow.class, collectionDAO.workflowDAO());
// Field Relationship
updateFQNHashForFieldRelationship(collectionDAO);
// TimeSeries
updateFQNHashEntityExtensionTimeSeries(collectionDAO);
// Tag Usage
updateFQNHashTagUsage(collectionDAO);
}
private static void updateFQNHashForFieldRelationship(CollectionDAO collectionDAO) {
List<CollectionDAO.FieldRelationshipDAO.FieldRelationship> fieldRelationships =
collectionDAO.fieldRelationshipDAO().listAll();
for (CollectionDAO.FieldRelationshipDAO.FieldRelationship fieldRelationship : fieldRelationships) {
if (CommonUtil.nullOrEmpty(fieldRelationship.getFromFQNHash())
&& CommonUtil.nullOrEmpty(fieldRelationship.getToFQNHash())) {
collectionDAO
.fieldRelationshipDAO()
.upsertFQNHash(
FullyQualifiedName.buildHash(fieldRelationship.getFromFQN()),
FullyQualifiedName.buildHash(fieldRelationship.getToFQN()),
fieldRelationship.getFromFQN(),
fieldRelationship.getToFQN(),
fieldRelationship.getFromType(),
fieldRelationship.getToType(),
fieldRelationship.getRelation(),
fieldRelationship.getJsonSchema(),
fieldRelationship.getJson());
}
}
}
private static void updateFQNHashEntityExtensionTimeSeries(CollectionDAO collectionDAO) {
List<CollectionDAO.EntityExtensionTimeSeriesDAO.EntityExtensionTimeSeriesTable> timeSeriesTables =
collectionDAO.entityExtensionTimeSeriesDao().listAll();
for (CollectionDAO.EntityExtensionTimeSeriesDAO.EntityExtensionTimeSeriesTable timeSeries : timeSeriesTables) {
if (CommonUtil.nullOrEmpty(timeSeries.getEntityFQNHash())) {
collectionDAO
.entityExtensionTimeSeriesDao()
.updateEntityFQNHash(
FullyQualifiedName.buildHash(timeSeries.getEntityFQN()),
timeSeries.getEntityFQN(),
timeSeries.getExtension(),
timeSeries.getTimestamp());
}
}
}
public static void updateFQNHashTagUsage(CollectionDAO collectionDAO) {
List<CollectionDAO.TagUsageDAO.TagLabelMigration> tagLabelMigrationList = collectionDAO.tagUsageDAO().listAll();
for (CollectionDAO.TagUsageDAO.TagLabelMigration tagLabel : tagLabelMigrationList) {
if (CommonUtil.nullOrEmpty(tagLabel.getTagFQNHash()) && CommonUtil.nullOrEmpty(tagLabel.getTargetFQNHash())) {
collectionDAO
.tagUsageDAO()
.upsertFQNHash(
tagLabel.getSource(),
tagLabel.getTagFQN(),
FullyQualifiedName.buildHash(tagLabel.getTagFQN()),
FullyQualifiedName.buildHash(tagLabel.getTargetFQN()),
tagLabel.getLabelType(),
tagLabel.getState(),
tagLabel.getTargetFQN());
}
}
}
public static void performSqlExecutionAndUpdation(
MigrationStep step, MigrationDAO migrationDAO, Handle handle, List<String> queryList) {
// These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace
Set<String> executedSQLChecksums =
new HashSet<>(migrationDAO.getServerMigrationSQLWithVersion(String.valueOf(step.getMigrationVersion())));
// Execute the Statements as batch
List<MigrationDAO.ServerMigrationSQLTable> toBeExecuted =
addInListIfToBeExecuted(String.valueOf(step.getMigrationVersion()), executedSQLChecksums, queryList);
for (MigrationDAO.ServerMigrationSQLTable tableData : toBeExecuted) {
handle.execute(tableData.getSqlStatement());
migrationDAO.upsertServerMigrationSQL(
tableData.getVersion(), tableData.getSqlStatement(), tableData.getCheckSum());
}
}
public static TestSuite getTestSuite(CollectionDAO dao, CreateTestSuite create, String user) throws IOException {
TestSuite testSuite =
copy(new TestSuite(), create, user)
.withDescription(create.getDescription())
.withDisplayName(create.getDisplayName())
.withName(create.getName());
if (create.getExecutableEntityReference() != null) {
TableRepository tableRepository = new TableRepository(dao);
Table table =
JsonUtils.readValue(
tableRepository.getDao().findJsonByFqn(create.getExecutableEntityReference(), Include.ALL), Table.class);
EntityReference entityReference =
new EntityReference()
.withId(table.getId())
.withFullyQualifiedName(table.getFullyQualifiedName())
.withName(table.getName())
.withType(Entity.TABLE);
testSuite.setExecutableEntityReference(entityReference);
}
return testSuite;
}
public static TestSuite copy(TestSuite entity, CreateEntity request, String updatedBy) throws IOException {
entity.setId(UUID.randomUUID());
entity.setName(request.getName());
entity.setDisplayName(request.getDisplayName());
entity.setDescription(request.getDescription());
entity.setExtension(request.getExtension());
entity.setUpdatedBy(updatedBy);
entity.setOwner(null);
entity.setUpdatedAt(System.currentTimeMillis());
return entity;
}
@SneakyThrows
public static void testSuitesMigration(CollectionDAO collectionDAO) {
IngestionPipelineRepository ingestionPipelineRepository = new IngestionPipelineRepository(collectionDAO);
TestSuiteRepository testSuiteRepository = new TestSuiteRepository(collectionDAO);
TestCaseRepository testCaseRepository = new TestCaseRepository(collectionDAO);
List<TestCase> testCases =
testCaseRepository.listAll(new EntityUtil.Fields(List.of("id")), new ListFilter(Include.ALL));
for (TestCase test : testCases) {
// Create New Executable Test Suites
MessageParser.EntityLink entityLink = MessageParser.EntityLink.parse(test.getEntityLink());
// Create new Logical Test Suite
String testSuiteFqn = entityLink.getEntityFQN() + ".testSuite";
try {
// Check if the test Suite Exists, this brings the data on nameHash basis
TestSuite stored =
testSuiteRepository.getByName(
null, FullyQualifiedName.quoteName(testSuiteFqn), new EntityUtil.Fields(List.of("id")), Include.ALL);
testSuiteRepository.addRelationship(stored.getId(), test.getId(), TEST_SUITE, TEST_CASE, Relationship.CONTAINS);
} catch (EntityNotFoundException ex) {
// TODO: Need to update the executable field as well
TestSuite newExecutableTestSuite =
getTestSuite(
collectionDAO,
new CreateTestSuite()
.withName(testSuiteFqn)
.withExecutableEntityReference(entityLink.getEntityFQN()),
"ingestion-bot")
.withExecutable(false);
TestSuite testSuitePutResponse = testSuiteRepository.create(null, newExecutableTestSuite);
// Here we aer manually adding executable relationship since the table Repository is not registered and result
// into null for entity type table
testSuiteRepository.addRelationship(
newExecutableTestSuite.getExecutableEntityReference().getId(),
newExecutableTestSuite.getId(),
Entity.TABLE,
TEST_SUITE,
Relationship.CONTAINS);
// add relationship from testSuite to TestCases
testSuiteRepository.addRelationship(
newExecutableTestSuite.getId(), test.getId(), TEST_SUITE, TEST_CASE, Relationship.CONTAINS);
// Not a good approach but executable cannot be set true before
TestSuite temp =
testSuiteRepository
.getDao()
.findEntityByName(FullyQualifiedName.quoteName(newExecutableTestSuite.getName()));
temp.setExecutable(true);
testSuiteRepository.getDao().update(temp);
}
}
// Update Test Suites
ListFilter filter = new ListFilter(Include.ALL);
filter.addQueryParam("testSuiteType", "logical");
List<TestSuite> testSuites = testSuiteRepository.listAll(new EntityUtil.Fields(List.of("id")), filter);
for (TestSuite testSuiteRecord : testSuites) {
TestSuite temp = testSuiteRepository.getDao().findEntityById(testSuiteRecord.getId());
if (Boolean.FALSE.equals(temp.getExecutable())) {
temp.setExecutable(false);
testSuiteRepository.getDao().update(temp);
}
// get Ingestion Pipelines
try {
List<CollectionDAO.EntityRelationshipRecord> ingestionPipelineRecords =
collectionDAO
.relationshipDAO()
.findTo(
testSuiteRecord.getId().toString(),
TEST_SUITE,
Relationship.CONTAINS.ordinal(),
INGESTION_PIPELINE);
for (CollectionDAO.EntityRelationshipRecord ingestionRecord : ingestionPipelineRecords) {
// remove relationship
collectionDAO.relationshipDAO().deleteAll(ingestionRecord.getId().toString(), INGESTION_PIPELINE);
// Cannot use Delete directly it uses other repos internally
ingestionPipelineRepository.getDao().delete(ingestionRecord.getId().toString());
}
} catch (EntityNotFoundException ex) {
// Already Removed
}
}
}
}

View File

@ -0,0 +1,30 @@
package org.openmetadata.service.migration.api;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
public interface MigrationStep {
// This version should match the server version
// Ex: if the server is 1.0.0 the migration version for that server is 1.0.0
// This version is used to sort all the upgrade migrations and apply them in order
String getMigrationVersion();
String getMigrationFileName();
String getFileUuid();
ConnectionType getDatabaseConnectionType();
void initialize(Handle handle);
// Handle Non-transactional supported SQLs here Example changes in table struct (DDL
void preDDL();
// This method is to run code to fix any data
void runDataMigration();
// This method is to run SQL which can be part of the transaction post data migrations
void postDDL();
void close();
}

View File

@ -0,0 +1,156 @@
package org.openmetadata.service.migration.api;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
@Slf4j
public class MigrationWorkflow {
private final List<MigrationStep> migrations;
private final MigrationDAO migrationDAO;
private final Jdbi jdbi;
private final ConnectionType workflowDatabaseConnectionType;
private boolean ignoreFileChecksum = false;
public MigrationWorkflow(
Jdbi jdbi, ConnectionType type, List<MigrationStep> migrationSteps, boolean ignoreFileChecksum) {
this.jdbi = jdbi;
this.workflowDatabaseConnectionType = type;
this.migrationDAO = jdbi.onDemand(MigrationDAO.class);
this.ignoreFileChecksum = ignoreFileChecksum;
// Validate Migration
validateMigrations(migrationSteps);
// Sort Migration on the basis of version
migrationSteps.sort(Comparator.comparing(MigrationStep::getMigrationVersion));
// Filter Migrations to Be Run
this.migrations = filterAndGetMigrationsToRun(migrationSteps);
}
private void validateMigrations(List<MigrationStep> migrationSteps) {
for (MigrationStep step : migrationSteps) {
if (!step.getDatabaseConnectionType().equals(this.workflowDatabaseConnectionType)) {
throw new IllegalArgumentException(
String.format(
"Provided Migration File is for Database Connection %s", step.getDatabaseConnectionType().toString()));
}
}
}
private List<MigrationStep> filterAndGetMigrationsToRun(List<MigrationStep> migrations) {
LOG.debug("Filtering Server Migrations");
String maxMigration = migrations.get(migrations.size() - 1).getMigrationVersion();
List<MigrationStep> result = new ArrayList<>();
for (MigrationStep step : migrations) {
String checksum = migrationDAO.getVersionMigrationChecksum(String.valueOf(step.getMigrationVersion()));
if (checksum != null) {
// Version Exist on DB this was run
if (maxMigration.compareTo(step.getMigrationVersion()) < 0) {
// This a new Step file
result.add(step);
} else if (ignoreFileChecksum || !checksum.equals(step.getFileUuid())) {
// This migration step was ran already, if checksum is equal this step can be ignored
LOG.warn(
"[Migration Workflow] You are changing an older Migration File. This is not advised. Add your changes to latest Migrations.");
result.add(step);
}
} else {
// Version does not exist on DB, this step was not run
result.add(step);
}
}
return result;
}
@SuppressWarnings("unused")
private void initializeMigrationWorkflow() {}
public void runMigrationWorkflows() {
try (Handle transactionHandler = jdbi.open()) {
LOG.info("[MigrationWorkflow] WorkFlow Started");
try {
for (MigrationStep step : migrations) {
// Initialise Migration Steps
LOG.info(
"[MigrationStep] Initialized, Version: {}, DatabaseType: {}, FileName: {}",
step.getMigrationVersion(),
step.getDatabaseConnectionType(),
step.getMigrationFileName());
step.initialize(transactionHandler);
LOG.info(
"[MigrationStep] Running PreDataSQLs, Version: {}, DatabaseType: {}, FileName: {}",
step.getMigrationVersion(),
step.getDatabaseConnectionType(),
step.getMigrationFileName());
step.preDDL();
LOG.info("[MigrationStep] Transaction Started");
// Begin Transaction
transactionHandler.begin();
// Run Database Migration for all the Migration Steps
LOG.info(
"[MigrationStep] Running DataMigration, Version: {}, DatabaseType: {}, FileName: {}",
step.getMigrationVersion(),
step.getDatabaseConnectionType(),
step.getMigrationFileName());
step.runDataMigration();
LOG.info("[MigrationStep] Committing Transaction");
transactionHandler.commit();
// Run Database Migration for all the Migration Steps
LOG.info(
"[MigrationStep] Running TransactionalPostDataSQLs, Version: {}, DatabaseType: {}, FileName: {}",
step.getMigrationVersion(),
step.getDatabaseConnectionType(),
step.getMigrationFileName());
step.postDDL();
// Handle Migration Closure
LOG.info(
"[MigrationStep] Update Migration Status, Version: {}, DatabaseType: {}, FileName: {}",
step.getMigrationVersion(),
step.getDatabaseConnectionType(),
step.getMigrationFileName());
updateMigrationStepInDB(step);
}
} catch (Exception e) {
// Any Exception catch the error
// Rollback the transaction
LOG.error("Encountered Exception in MigrationWorkflow", e);
LOG.info("[MigrationWorkflow] Rolling Back Transaction");
if (transactionHandler.isInTransaction()) {
transactionHandler.rollback();
}
}
}
LOG.info("[MigrationWorkflow] WorkFlow Completed");
}
public void closeMigrationWorkflow() {
// 1. Write to DB table the version we upgraded to
// should be the current server version
// 2. Commit Transaction on completion
}
public void updateMigrationStepInDB(MigrationStep step) {
migrationDAO.upsertServerMigration(step.getMigrationVersion(), step.getMigrationFileName(), step.getFileUuid());
}
public void migrateSearchIndexes() {}
}

View File

@ -0,0 +1,178 @@
package org.openmetadata.service.migration.versions.mysql.v110;
import static org.openmetadata.service.migration.MigrationUtil.dataMigrationFQNHashing;
import static org.openmetadata.service.migration.MigrationUtil.performSqlExecutionAndUpdation;
import static org.openmetadata.service.migration.MigrationUtil.testSuitesMigration;
import java.util.Arrays;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.MigrationFile;
import org.openmetadata.service.migration.api.MigrationStep;
@Slf4j
@MigrationFile(name = "v110_MySQLMigration")
@SuppressWarnings("unused")
public class MySQLMigration implements MigrationStep {
private CollectionDAO collectionDAO;
private MigrationDAO migrationDAO;
private Handle handle;
@Override
public String getMigrationVersion() {
return "1.1.0";
}
@Override
public String getMigrationFileName() {
return "v110_MySQLMigration";
}
@Override
public String getFileUuid() {
return "ffcc502b-d4a0-4e5f-a562-0a6d4110c762";
}
@Override
public ConnectionType getDatabaseConnectionType() {
return ConnectionType.MYSQL;
}
@Override
public void initialize(Handle handle) {
this.handle = handle;
this.collectionDAO = handle.attach(CollectionDAO.class);
this.migrationDAO = handle.attach(MigrationDAO.class);
}
@Override
public void preDDL() {
preDDLFQNHashing();
}
@Override
public void runDataMigration() {
// FQN Hashing Migrations
dataMigrationFQNHashing(collectionDAO);
}
@Override
public void postDDL() {
// This SQLs cannot be part of the commit as these need some data to be committed
postDDLFQNHashing();
testSuitesMigration(collectionDAO);
}
@Override
public void close() {}
private void preDDLFQNHashing() {
// These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace
List<String> queryList =
Arrays.asList(
"ALTER TABLE bot_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE classification DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE dashboard_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL",
"ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE dbservice_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE event_subscription_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE glossary_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE kpi_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE messaging_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE metadata_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE mlmodel_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE pipeline_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE query_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE role_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE storage_service_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE team_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL, ADD COLUMN fqnHash VARCHAR(256) NOT NULL",
"ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256) NOT NULL,ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL",
"ALTER TABLE test_definition DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE test_suite DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL;",
"ALTER TABLE type_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE user_entity DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
"ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256) NOT NULL",
"ALTER TABLE automations_workflow DROP KEY `name`, ADD COLUMN nameHash VARCHAR(256) NOT NULL",
// field_relationship
"ALTER TABLE field_relationship ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256), DROP INDEX from_index, DROP INDEX to_index, ADD INDEX from_fqnhash_index(fromFQNHash, relation), ADD INDEX to_fqnhash_index(toFQNHash, relation)",
// entity_extension_time_series
"ALTER TABLE entity_extension_time_series ADD COLUMN entityFQNHash VARCHAR (256) NOT NULL",
// tag_usage
"ALTER TABLE tag_usage ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256)");
performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList);
}
private void postDDLFQNHashing() {
// These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace
List<String> queryList =
Arrays.asList(
"ALTER TABLE bot_entity ADD UNIQUE (nameHash)",
"ALTER TABLE chart_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE classification ADD UNIQUE (nameHash)",
"ALTER TABLE storage_container_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE dashboard_data_model_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE dashboard_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE dashboard_service_entity ADD UNIQUE (nameHash)",
"ALTER TABLE data_insight_chart ADD UNIQUE (fqnHash)",
"ALTER TABLE database_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE database_schema_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE dbservice_entity ADD UNIQUE (nameHash)",
"ALTER TABLE event_subscription_entity ADD UNIQUE (nameHash)",
"ALTER TABLE glossary_entity ADD UNIQUE (nameHash)",
"ALTER TABLE glossary_term_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE ingestion_pipeline_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE kpi_entity ADD UNIQUE (nameHash)",
"ALTER TABLE messaging_service_entity ADD UNIQUE (nameHash)",
"ALTER TABLE metadata_service_entity ADD UNIQUE (nameHash)",
"ALTER TABLE metric_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE ml_model_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE mlmodel_service_entity ADD UNIQUE (nameHash)",
"ALTER TABLE pipeline_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE pipeline_service_entity ADD UNIQUE (nameHash)",
"ALTER TABLE policy_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE query_entity ADD UNIQUE (nameHash)",
"ALTER TABLE report_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE role_entity ADD UNIQUE (nameHash)",
"ALTER TABLE storage_service_entity ADD UNIQUE (nameHash)",
"ALTER TABLE table_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE tag ADD UNIQUE (fqnHash)",
"ALTER TABLE team_entity ADD UNIQUE (nameHash)",
"ALTER TABLE test_case ADD UNIQUE (fqnHash)",
"ALTER TABLE test_connection_definition ADD UNIQUE (nameHash)",
"ALTER TABLE test_definition ADD UNIQUE (nameHash)",
"ALTER TABLE test_suite ADD UNIQUE (nameHash)",
"ALTER TABLE topic_entity ADD UNIQUE (fqnHash)",
"ALTER TABLE type_entity ADD UNIQUE (nameHash)",
"ALTER TABLE user_entity ADD UNIQUE (nameHash)",
"ALTER TABLE web_analytic_event ADD UNIQUE (fqnHash)",
"ALTER TABLE automations_workflow ADD UNIQUE (nameHash)",
// entity_extension_time_series
"ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN",
// field_relationship
"ALTER TABLE field_relationship DROP KEY `PRIMARY`,ADD CONSTRAINT `field_relationship_primary` PRIMARY KEY(fromFQNHash, toFQNHash, relation), MODIFY fromFQN VARCHAR(2096) NOT NULL, MODIFY toFQN VARCHAR(2096) NOT NULL",
// tag_usage
"ALTER TABLE tag_usage DROP index `source`, DROP COLUMN targetFQN, ADD UNIQUE KEY `tag_usage_key` (source, tagFQNHash, targetFQNHash)");
performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList);
}
}

View File

@ -0,0 +1,181 @@
package org.openmetadata.service.migration.versions.postgres.v110;
import static org.openmetadata.service.migration.MigrationUtil.dataMigrationFQNHashing;
import static org.openmetadata.service.migration.MigrationUtil.performSqlExecutionAndUpdation;
import static org.openmetadata.service.migration.MigrationUtil.testSuitesMigration;
import java.util.Arrays;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.MigrationFile;
import org.openmetadata.service.migration.api.MigrationStep;
@Slf4j
@MigrationFile(name = "v110_PostgresMigration")
@SuppressWarnings("unused")
public class PostgresMigration implements MigrationStep {
private CollectionDAO collectionDAO;
private MigrationDAO migrationDAO;
private Handle handle;
@Override
public String getMigrationVersion() {
return "1.1.0";
}
@Override
public String getMigrationFileName() {
return "v110_PostgresMigration";
}
@Override
public String getFileUuid() {
return "98b837ea-5941-4577-bb6d-99ca6a80ed13";
}
@Override
public ConnectionType getDatabaseConnectionType() {
return ConnectionType.POSTGRES;
}
@Override
public void initialize(Handle handle) {
this.handle = handle;
this.collectionDAO = handle.attach(CollectionDAO.class);
this.migrationDAO = handle.attach(MigrationDAO.class);
}
@Override
public void preDDL() {
// FQNHASH
preDDLFQNHashing();
}
@Override
public void runDataMigration() {
dataMigrationFQNHashing(collectionDAO);
}
@Override
public void postDDL() {
postDDLFQNHashing();
testSuitesMigration(collectionDAO);
}
@Override
public void close() {}
private void preDDLFQNHashing() {
// These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace
List<String> queryList =
Arrays.asList(
"ALTER TABLE bot_entity DROP CONSTRAINT bot_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE chart_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE classification DROP CONSTRAINT tag_category_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE storage_container_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE dashboard_data_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE dashboard_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE dashboard_service_entity DROP CONSTRAINT dashboard_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE data_insight_chart DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256)",
"ALTER TABLE database_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE database_schema_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE dbservice_entity DROP CONSTRAINT dbservice_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE event_subscription_entity DROP CONSTRAINT event_subscription_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE glossary_entity DROP CONSTRAINT glossary_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE glossary_term_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE ingestion_pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE kpi_entity DROP CONSTRAINT kpi_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE messaging_service_entity DROP CONSTRAINT messaging_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE metadata_service_entity DROP CONSTRAINT metadata_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE metric_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE ml_model_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE mlmodel_service_entity DROP CONSTRAINT mlmodel_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE pipeline_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE pipeline_service_entity DROP CONSTRAINT pipeline_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE policy_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE query_entity DROP CONSTRAINT query_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE report_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE role_entity DROP CONSTRAINT role_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE storage_service_entity DROP CONSTRAINT storage_service_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE table_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE tag DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE team_entity DROP CONSTRAINT team_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE test_case DROP COLUMN fullyQualifiedName, ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL, ADD COLUMN fqnHash VARCHAR(256)",
"ALTER TABLE test_connection_definition ADD COLUMN nameHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE test_definition DROP CONSTRAINT test_definition_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE test_suite DROP CONSTRAINT test_suite_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE topic_entity DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256), ADD COLUMN name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL",
"ALTER TABLE type_entity DROP CONSTRAINT type_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE user_entity DROP CONSTRAINT user_entity_name_key, ADD COLUMN nameHash VARCHAR(256)",
"ALTER TABLE web_analytic_event DROP COLUMN fullyQualifiedName, ADD COLUMN fqnHash VARCHAR(256)",
"ALTER TABLE automations_workflow DROP CONSTRAINT automations_workflow_name_key, ADD COLUMN nameHash VARCHAR(256)",
// field_relationship
"DROP INDEX field_relationship_from_index, field_relationship_to_index;",
"ALTER TABLE field_relationship ADD COLUMN fromFQNHash VARCHAR(256), ADD COLUMN toFQNHash VARCHAR(256)",
"CREATE INDEX IF NOT EXISTS field_relationship_from_index ON field_relationship(fromFQNHash, relation)",
"CREATE INDEX IF NOT EXISTS field_relationship_to_index ON field_relationship(toFQNHash, relation)",
// entity_extension_time_series
"ALTER TABLE entity_extension_time_series ADD COLUMN entityFQNHash VARCHAR (256)",
// tag_usage
"ALTER TABLE tag_usage ADD COLUMN tagFQNHash VARCHAR(256), ADD COLUMN targetFQNHash VARCHAR(256)");
performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList);
}
private void postDDLFQNHashing() {
// These are DDL Statements and will cause an Implicit commit even if part of transaction still committed inplace
List<String> queryList =
Arrays.asList(
"ALTER TABLE bot_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE chart_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE classification ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE storage_container_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE dashboard_data_model_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE dashboard_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE dashboard_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE data_insight_chart ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE database_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE database_schema_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE dbservice_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE event_subscription_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE glossary_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE glossary_term_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE ingestion_pipeline_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE kpi_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE messaging_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE metadata_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE metric_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE ml_model_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE mlmodel_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE pipeline_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE pipeline_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE policy_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE query_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE report_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE role_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE storage_service_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE table_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE tag ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE team_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE test_case ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE test_connection_definition ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE test_definition ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE test_suite ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE topic_entity ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE type_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE user_entity ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
"ALTER TABLE web_analytic_event ADD UNIQUE (fqnHash), ALTER COLUMN fqnHash SET NOT NULL",
"ALTER TABLE automations_workflow ADD UNIQUE (nameHash), ALTER COLUMN nameHash SET NOT NULL",
// field_relationship
"ALTER TABLE field_relationship DROP CONSTRAINT field_relationship_pkey, ADD CONSTRAINT field_relationship_pkey PRIMARY KEY(fromFQNHash, toFQNHash, relation), ALTER fromFQN TYPE VARCHAR(2096), ALTER toFQN TYPE VARCHAR(2096)",
// entity_extension_time_series
"ALTER TABLE entity_extension_time_series DROP COLUMN entityFQN, ALTER COLUMN entityFQNHash SET NOT NULL",
// tag_usage
"ALTER TABLE tag_usage DROP CONSTRAINT tag_usage_source_tagfqn_targetfqn_key, DROP COLUMN targetFQN, ADD UNIQUE (source, tagFQNHash, targetFQNHash)");
performSqlExecutionAndUpdation(this, migrationDAO, handle, queryList);
}
}

View File

@ -24,4 +24,14 @@ public class DatasourceConfig {
public Boolean isMySQL() {
return ConnectionType.MYSQL.label.equals(dataSourceFactory.getDriverClass());
}
public ConnectionType getDatabaseConnectionType() {
if (ConnectionType.MYSQL.label.equals(dataSourceFactory.getDriverClass())) {
return ConnectionType.MYSQL;
} else {
// Currently Mysql and Postgres are only supported hence not added a label check for now for Postgres it's either
// this or that
return ConnectionType.POSTGRES;
}
}
}

View File

@ -29,7 +29,10 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.Set;
import javax.validation.Validator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -47,23 +50,35 @@ import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.migration.MigrationFile;
import org.openmetadata.service.migration.api.MigrationStep;
import org.openmetadata.service.migration.api.MigrationWorkflow;
import org.openmetadata.service.resources.databases.DatasourceConfig;
import org.openmetadata.service.search.IndexUtil;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.reflections.Reflections;
public final class TablesInitializer {
private static final String DEBUG_MODE_ENABLED = "debug_mode";
private static final String OPTION_SCRIPT_ROOT_PATH = "script-root";
private static final String OPTION_CONFIG_FILE_PATH = "config";
private static final String OPTION_IGNORE_SERVER_FILE_CHECKSUM = "ignoreCheckSum";
private static final String DISABLE_VALIDATE_ON_MIGRATE = "disable-validate-on-migrate";
private static final Options OPTIONS;
private static boolean debugMode = false;
private static boolean ignoreServerFileChecksum = false;
static {
OPTIONS = new Options();
OPTIONS.addOption("debug", DEBUG_MODE_ENABLED, false, "Enable Debug Mode");
OPTIONS.addOption("s", OPTION_SCRIPT_ROOT_PATH, true, "Root directory of script path");
OPTIONS.addOption("c", OPTION_CONFIG_FILE_PATH, true, "Config file path");
OPTIONS.addOption(
"ignoreCheckSum",
OPTION_IGNORE_SERVER_FILE_CHECKSUM,
true,
"Ignore the server checksum and rerun same file in migrate");
OPTIONS.addOption(null, SchemaMigrationOption.CREATE.toString(), false, "Run sql migrations from scratch");
OPTIONS.addOption(null, SchemaMigrationOption.DROP.toString(), false, "Drop all the tables in the target database");
OPTIONS.addOption(
@ -110,6 +125,9 @@ public final class TablesInitializer {
if (commandLine.hasOption(DEBUG_MODE_ENABLED)) {
debugMode = true;
}
if (commandLine.hasOption(OPTION_IGNORE_SERVER_FILE_CHECKSUM)) {
ignoreServerFileChecksum = Boolean.parseBoolean(commandLine.getOptionValue(OPTION_IGNORE_SERVER_FILE_CHECKSUM));
}
boolean isSchemaMigrationOptionSpecified = false;
SchemaMigrationOption schemaMigrationOptionSpecified = null;
for (SchemaMigrationOption schemaMigrationOption : SchemaMigrationOption.values()) {
@ -259,6 +277,8 @@ public final class TablesInitializer {
break;
case MIGRATE:
flyway.migrate();
// Validate and Run System Data Migrations
validateAndRunSystemDataMigrations(jdbi, config, ignoreServerFileChecksum);
break;
case INFO:
printToConsoleMandatory(dumpToAsciiTable(flyway.info().all()));
@ -308,6 +328,37 @@ public final class TablesInitializer {
}
}
private static void validateAndRunSystemDataMigrations(
Jdbi jdbi, OpenMetadataApplicationConfig config, boolean ignoreFileChecksum) {
DatasourceConfig.initialize(config);
List<MigrationStep> loadedMigrationFiles = getServerMigrationFiles();
MigrationWorkflow workflow =
new MigrationWorkflow(
jdbi, DatasourceConfig.getInstance().getDatabaseConnectionType(), loadedMigrationFiles, ignoreFileChecksum);
workflow.runMigrationWorkflows();
}
private static List<MigrationStep> getServerMigrationFiles() {
List<MigrationStep> migrations = new ArrayList<>();
try {
String prefix =
Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())
? "org.openmetadata.service.migration.versions.mysql"
: "org.openmetadata.service.migration.versions.postgres";
Reflections reflections = new Reflections(prefix);
Set<Class<?>> migrationClasses = reflections.getTypesAnnotatedWith(MigrationFile.class);
for (Class<?> clazz : migrationClasses) {
MigrationStep step =
Class.forName(clazz.getCanonicalName()).asSubclass(MigrationStep.class).getConstructor().newInstance();
migrations.add(step);
}
} catch (Exception ex) {
printToConsoleMandatory("Failure in list System Migration Files");
throw new RuntimeException(ex);
}
return migrations;
}
private static void printError(String message) {
System.err.println(message);
}