OpenMetadata Operations Command Line Utility (#14504)

* Add new OpenMetadataSetup command line application to migrate/deploy and re-index

* Add new OpenMetadataSetup command line application to migrate/deploy and re-index

* Add deployPipelines option

* Add reIndex option

* add subcommands

* add provision to store upgrade metrics

* rename bootstrap script

* fix styling checks

* Add changelog and store metrics into SERVER_CHANGE_LOG

* Cast jsonb

* Cast jsonb

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2023-12-28 23:22:58 -08:00 committed by GitHub
parent 2f679a9a48
commit 0303b44b9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 796 additions and 50 deletions

50
bootstrap/openmetadata-ops.sh Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Resolve links - $0 may be a softlink
PRG="${0}"
while [ -h "${PRG}" ]; do
ls=`ls -ld "${PRG}"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "${PRG}"`/"$link"
fi
done
BOOTSTRAP_DIR=`dirname ${PRG}`
CONFIG_FILE_PATH=${BOOTSTRAP_DIR}/../conf/openmetadata.yaml
# Which java to use
if [ -z "${JAVA_HOME}" ]; then
JAVA="java"
else
JAVA="${JAVA_HOME}/bin/java"
fi
OPENMETADATA_SETUP_MAIN_CLASS=org.openmetadata.service.util.OpenMetadataOperations
LIBS_DIR="${BOOTSTRAP_DIR}"/../libs/
if [ ${debug} ] ; then
echo $LIBS_DIR
fi
if [ -d "${LIBS_DIR}" ]; then
for file in "${LIBS_DIR}"*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
else
CLASSPATH=`mvn -pl openmetadata-service -q exec:exec -Dexec.executable=echo -Dexec.args="%classpath"`
fi
${JAVA} -Dbootstrap.dir=$BOOTSTRAP_DIR -cp ${CLASSPATH} ${OPENMETADATA_SETUP_MAIN_CLASS} -c $CONFIG_FILE_PATH "$@"

View File

@ -0,0 +1 @@
ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics JSON;

View File

@ -0,0 +1 @@
ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics jsonb;

View File

@ -91,7 +91,19 @@ server:
logging:
level: ${LOG_LEVEL:-INFO}
loggers:
io.swagger: DEBUG
org.openmetadata.service.util.OpenMetadataSetup:
level: INFO
appenders:
- type: console
logFormat: "%msg%n"
timeZone: UTC
- type: file
logFormat: "%level [%d{ISO8601,UTC}] [%t] %logger{5} - %msg%n"
currentLogFilename: ./logs/openmetadata-operations.log
archivedLogFilenamePattern: ./logs/openmetadata-operations-%d{yyyy-MM-dd}-%i.log.gz
archivedFileCount: 7
timeZone: UTC
maxFileSize: 50MB
appenders:
- type: console
threshold: TRACE
@ -249,7 +261,7 @@ pipelineServiceClientConfiguration:
# If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient"
className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}
metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api}
metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://host.docker.internal:8585/api}
ingestionIpInfoEnabled: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}

View File

@ -485,6 +485,10 @@
<version>1.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -297,7 +297,7 @@ public final class Entity {
ENTITY_LIST.add(entity);
Collections.sort(ENTITY_LIST);
LOG.info("Registering entity {} {}", clazz, entity);
LOG.debug("Registering entity {} {}", clazz, entity);
}
public static <T extends EntityTimeSeriesInterface> void registerEntity(
@ -309,7 +309,7 @@ public final class Entity {
ENTITY_LIST.add(entity);
Collections.sort(ENTITY_LIST);
LOG.info("Registering entity time series {} {}", clazz, entity);
LOG.debug("Registering entity time series {} {}", clazz, entity);
}
public static void registerResourcePermissions(

View File

@ -13,6 +13,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -54,6 +55,40 @@ import org.quartz.JobExecutionContext;
@Slf4j
public class SearchIndexApp extends AbstractNativeApplication {
private static final String ALL = "all";
private static final Set<String> ALL_ENTITIES =
Set.of(
"table",
"dashboard",
"topic",
"pipeline",
"searchIndex",
"user",
"team",
"glossaryTerm",
"mlmodel",
"tag",
"classification",
"query",
"container",
"database",
"databaseSchema",
"testCase",
"testSuite",
"chart",
"dashboardDataModel",
"databaseService",
"messagingService",
"dashboardService",
"pipelineService",
"mlmodelService",
"searchService",
"entityReportData",
"webAnalyticEntityViewReportData",
"webAnalyticUserActivityReportData",
"domain",
"storedProcedure");
private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>();
private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>();
@ -67,12 +102,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
@Override
public void init(App app, CollectionDAO dao, SearchRepository searchRepository) {
super.init(app, dao, searchRepository);
// request for reindexing
EventPublisherJob request =
JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class)
.withStats(new Stats())
.withFailure(new Failure());
if (request.getEntities().contains(ALL)) {
request.setEntities(ALL_ENTITIES);
}
int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO);
this.jobData = request;
this.jobData.setStats(

View File

@ -35,7 +35,7 @@ public final class PipelineServiceClientFactory {
}
String pipelineServiceClientClass = config.getClassName();
LOG.info("Registering PipelineServiceClient: {}", pipelineServiceClientClass);
LOG.debug("Registering PipelineServiceClient: {}", pipelineServiceClientClass);
try {
pipelineServiceClient =

View File

@ -3,13 +3,20 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL;
import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.StatementException;
import org.jdbi.v3.sqlobject.SingleValue;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
@ -53,26 +60,29 @@ public interface MigrationDAO {
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, CURRENT_TIMESTAMP) "
"INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, metrics, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, :metrics, CURRENT_TIMESTAMP) "
+ "ON DUPLICATE KEY UPDATE "
+ "migrationFileName = :migrationFileName, "
+ "checksum = :checksum, "
+ "metrics = :metrics,"
+ "installed_on = CURRENT_TIMESTAMP",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO server_change_log (version, migrationFileName, checksum, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, current_timestamp) "
"INSERT INTO server_change_log (version, migrationFileName, checksum, metrics, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, to_jsonb(:metrics::text), current_timestamp) "
+ "ON CONFLICT (version) DO UPDATE SET "
+ "migrationFileName = EXCLUDED.migrationFileName, "
+ "metrics = to_jsonb(:metrics::text),"
+ "checksum = EXCLUDED.checksum, "
+ "installed_on = EXCLUDED.installed_on",
connectionType = POSTGRES)
void upsertServerMigration(
@Bind("version") String version,
@Bind("migrationFileName") String migrationFileName,
@Bind("checksum") String checksum);
@Bind("checksum") String checksum,
@Bind("metrics") String metrics);
@ConnectionAwareSqlUpdate(
value =
@ -113,6 +123,11 @@ public interface MigrationDAO {
connectionType = POSTGRES)
String checkIfQueryPreviouslyRan(@Bind("checksum") String checksum);
@SqlQuery(
"SELECT installed_rank, version, migrationFileName, checksum, installed_on, metrics FROM SERVER_CHANGE_LOG ORDER BY version ASC")
@RegisterRowMapper(FromServerChangeLogMapper.class)
List<ServerChangeLog> listMetricsFromDBMigrations();
@Getter
@Setter
class ServerMigrationSQLTable {
@ -120,4 +135,30 @@ public interface MigrationDAO {
private String sqlStatement;
private String checkSum;
}
@Getter
@Setter
@Builder
class ServerChangeLog {
private Integer installedRank;
private String version;
private String migrationFileName;
private String checksum;
private String installedOn;
private String metrics;
}
class FromServerChangeLogMapper implements RowMapper<ServerChangeLog> {
@Override
public ServerChangeLog map(ResultSet rs, StatementContext ctx) throws SQLException {
return ServerChangeLog.builder()
.installedRank(rs.getInt("installed_rank"))
.version(rs.getString("version"))
.migrationFileName(rs.getString("migrationFileName"))
.checksum(rs.getString("checksum"))
.installedOn(rs.getString("installed_on"))
.metrics(rs.getString("metrics"))
.build();
}
}
}

View File

@ -7,13 +7,14 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.json.JSONObject;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.context.MigrationContext;
import org.openmetadata.service.migration.context.MigrationWorkflowContext;
import org.openmetadata.service.migration.utils.MigrationFile;
@ -29,6 +30,8 @@ public class MigrationWorkflow {
private final boolean forceMigrations;
private Optional<String> currentMaxMigrationVersion;
public MigrationWorkflow(
Jdbi jdbi,
String nativeSQLScriptRootPath,
@ -86,26 +89,26 @@ public class MigrationWorkflow {
return Stream.concat(
availableOMNativeMigrations.stream(), availableExtensionMigrations.stream())
.sorted()
.collect(Collectors.toList());
.toList();
}
public List<MigrationFile> getMigrationFilesFromPath(String path, ConnectionType connectionType) {
return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory)))
.map(dir -> new MigrationFile(dir, migrationDAO, connectionType))
.sorted()
.collect(Collectors.toList());
.toList();
}
private List<MigrationProcess> filterAndGetMigrationsToRun(
List<MigrationFile> availableMigrations) {
LOG.debug("Filtering Server Migrations");
Optional<String> previousMaxMigration = migrationDAO.getMaxServerMigrationVersion();
currentMaxMigrationVersion = migrationDAO.getMaxServerMigrationVersion();
List<MigrationFile> applyMigrations;
if (previousMaxMigration.isPresent() && !forceMigrations) {
if (currentMaxMigrationVersion.isPresent() && !forceMigrations) {
applyMigrations =
availableMigrations.stream()
.filter(migration -> migration.biggerThan(previousMaxMigration.get()))
.collect(Collectors.toList());
.filter(migration -> migration.biggerThan(currentMaxMigrationVersion.get()))
.toList();
} else {
applyMigrations = availableMigrations;
}
@ -125,14 +128,16 @@ public class MigrationWorkflow {
return processes;
}
@SuppressWarnings("unused")
private void initializeMigrationWorkflow() {}
public void runMigrationWorkflows() {
try (Handle transactionHandler = jdbi.open()) {
LOG.info("[MigrationWorkflow] WorkFlow Started");
MigrationWorkflowContext context = new MigrationWorkflowContext(transactionHandler);
context.computeInitialContext();
if (currentMaxMigrationVersion.isPresent()) {
LOG.debug("Current Max version {}", currentMaxMigrationVersion.get());
context.computeInitialContext(currentMaxMigrationVersion.get());
} else {
context.computeInitialContext("1.1.0");
}
try {
for (MigrationProcess process : migrations) {
// Initialise Migration Steps
@ -176,7 +181,7 @@ public class MigrationWorkflow {
process.getVersion(),
process.getDatabaseConnectionType(),
process.getMigrationsPath());
updateMigrationStepInDB(process);
updateMigrationStepInDB(process, context);
}
} catch (Exception e) {
@ -190,17 +195,14 @@ public class MigrationWorkflow {
LOG.info("[MigrationWorkflow] WorkFlow Completed");
}
public void closeMigrationWorkflow() {
// 1. Write to DB table the version we upgraded to
// should be the current server version
// 2. Commit Transaction on completion
}
public void updateMigrationStepInDB(MigrationProcess step) {
public void updateMigrationStepInDB(
MigrationProcess step, MigrationWorkflowContext workflowContext) {
MigrationContext context = workflowContext.getMigrationContext().get(step.getVersion());
JSONObject metrics = new JSONObject(context.getResults());
migrationDAO.upsertServerMigration(
step.getVersion(), step.getMigrationsPath(), UUID.randomUUID().toString());
step.getVersion(),
step.getMigrationsPath(),
UUID.randomUUID().toString(),
metrics.toString());
}
public void migrateSearchIndexes() {}
}

View File

@ -2,7 +2,6 @@ package org.openmetadata.service.migration.context;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -20,8 +19,7 @@ public class MigrationContext {
public MigrationContext(String version, List<MigrationOps> migrationOps, Handle handle) {
this.version = version;
this.migrationOps =
Stream.concat(migrationOps.stream(), CommonMigrationOps.getCommonOps().stream())
.collect(Collectors.toList());
Stream.concat(migrationOps.stream(), CommonMigrationOps.getCommonOps().stream()).toList();
this.handle = handle;
}

View File

@ -10,19 +10,15 @@ import org.openmetadata.service.migration.api.MigrationProcess;
@Slf4j
public class MigrationWorkflowContext {
@Getter private final HashMap<String, MigrationContext> migrationContext;
private final MigrationContext initialContext;
private final Handle handle;
public MigrationWorkflowContext(Handle handle) {
this.migrationContext = new HashMap<>();
this.handle = handle;
// Initialize the context only with the common ops
this.initialContext = new MigrationContext("initial", List.of(), handle);
}
public void computeInitialContext() {
computeMigrationSafely(this.initialContext);
public void computeInitialContext(String currentMaxMigrationVersion) {
computeMigrationSafely(new MigrationContext(currentMaxMigrationVersion, List.of(), handle));
}
public void computeMigrationContext(MigrationProcess process) {

View File

@ -861,6 +861,8 @@ public class AppResource extends EntityResource<App, AppRepository> {
pipelineServiceClient.deployPipeline(ingestionPipeline, service);
if (status.getCode() == 200) {
ingestionPipelineRepository.createOrUpdate(uriInfo, ingestionPipeline);
} else {
ingestionPipeline.setDeployed(false);
}
return Response.status(status.getCode()).entity(status).build();
}

View File

@ -885,8 +885,6 @@ public class IngestionPipelineResource
UUID id, UriInfo uriInfo, SecurityContext securityContext) {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline ingestionPipeline = repository.get(uriInfo, id, fields);
ingestionPipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build());
decryptOrNullify(securityContext, ingestionPipeline, true);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);

View File

@ -0,0 +1,126 @@
package org.openmetadata.service.util;
import java.util.ArrayList;
import java.util.List;
public class AsciiTable {
private static final String DEFAULT_COLUMN_NAME = "(No column name)";
private static final String DEFAULT_NO_VALUE = "-";
private final List<String> columns;
private final List<List<String>> rows;
private final boolean printHeader;
private final String nullText;
private final String emptyText;
public AsciiTable(
List<String> columns,
List<List<String>> rows,
boolean printHeader,
String nullText,
String emptyText) {
this.columns = ensureValidColumns(columns);
this.rows = rows;
this.printHeader = printHeader;
this.nullText = nullText;
this.emptyText = emptyText;
}
private static List<String> ensureValidColumns(List<String> columns) {
List<String> validColumns = new ArrayList<>();
for (String column : columns) {
validColumns.add(column != null ? column : DEFAULT_COLUMN_NAME);
}
return validColumns;
}
/**
* @return The table rendered with column header and row data.
*/
public String render() {
List<Integer> widths = new ArrayList<>();
for (String column : columns) {
widths.add(column.length());
}
for (List<String> row : rows) {
for (int i = 0; i < row.size(); i++) {
widths.set(i, Math.max(widths.get(i), getValue(row, i).length()));
}
}
StringBuilder ruler = new StringBuilder("+");
for (Integer width : widths) {
ruler.append("-").append(trimOrPad("", width, '-')).append("-+");
}
ruler.append("\n");
StringBuilder result = new StringBuilder();
if (printHeader) {
StringBuilder header = new StringBuilder("|");
for (int i = 0; i < widths.size(); i++) {
header.append(" ").append(trimOrPad(columns.get(i), widths.get(i), ' ')).append(" |");
}
header.append("\n");
result.append(ruler);
result.append(header);
}
result.append(ruler);
if (rows.isEmpty()) {
result
.append("| ")
.append(trimOrPad(emptyText, ruler.length() - Math.min(ruler.length(), 5)))
.append(" |\n");
} else {
for (List<String> row : rows) {
StringBuilder r = new StringBuilder("|");
for (int i = 0; i < widths.size(); i++) {
r.append(" ").append(trimOrPad(getValue(row, i), widths.get(i), ' ')).append(" |");
}
r.append("\n");
result.append(r);
}
}
result.append(ruler);
return result.toString();
}
private String getValue(List<String> row, int i) {
try {
String value = row.get(i);
if (value == null) {
value = nullText;
}
return value;
} catch (IndexOutOfBoundsException e) {
return DEFAULT_NO_VALUE;
}
}
private String trimOrPad(String str, int length, char padChar) {
StringBuilder result;
if (str == null) {
result = new StringBuilder();
} else {
result = new StringBuilder(str);
}
if (result.length() > length) {
return result.substring(0, length);
}
while (result.length() < length) {
result.append(padChar);
}
return result.toString();
}
private String trimOrPad(String str, int length) {
return trimOrPad(str, length, ' ');
}
}

View File

@ -0,0 +1,467 @@
package org.openmetadata.service.util;
import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.dropwizard.configuration.EnvironmentVariableSubstitutor;
import io.dropwizard.configuration.FileConfigurationSourceProvider;
import io.dropwizard.configuration.SubstitutingSourceProvider;
import io.dropwizard.configuration.YamlConfigurationFactory;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jackson.Jackson;
import io.dropwizard.jersey.validation.Validators;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import javax.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationVersion;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
import org.jdbi.v3.sqlobject.SqlObjects;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppSchedule;
import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.type.Include;
import org.openmetadata.sdk.PipelineServiceClient;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.api.MigrationWorkflow;
import org.openmetadata.service.resources.databases.DatasourceConfig;
import org.openmetadata.service.search.SearchIndexFactory;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.util.jdbi.DatabaseAuthenticationProviderFactory;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
@Slf4j
@Command(
name = "OpenMetadataSetup",
mixinStandardHelpOptions = true,
version = "OpenMetadataSetup 1.3",
description =
"Creates or Migrates Database/Search Indexes. ReIndex the existing data into Elastic Search "
+ "or OpenSearch. Re-Deploys the service pipelines.")
public class OpenMetadataOperations implements Callable<Integer> {
private OpenMetadataApplicationConfig config;
private Flyway flyway;
private Jdbi jdbi;
private SearchRepository searchRepository;
private String nativeSQLScriptRootPath;
private String extensionSQLScriptRootPath;
private SecretsManager secretsManager;
private CollectionDAO collectionDAO;
@Option(
names = {"-d", "--debug"},
defaultValue = "false")
private boolean debug;
@Option(
names = {"-c", "--config"},
required = true)
private String configFilePath;
@Override
public Integer call() {
LOG.info(
"Subcommand needed: 'info', 'validate', 'repair', 'check-connection', "
+ "'drop-create', 'migrate', 'reindex', 'deploy-pipelines'");
return 0;
}
@Command(
name = "info",
description =
"Shows the list of migrations applied and the pending migration "
+ "waiting to be applied on the target database")
public Integer info() {
try {
parseConfig();
LOG.info(dumpToAsciiTable(flyway.info().all()));
return 0;
} catch (Exception e) {
LOG.error("Failed due to ", e);
return 1;
}
}
@Command(
name = "validate",
description =
"Checks if the all the migrations haven been applied " + "on the target database.")
public Integer validate() {
try {
parseConfig();
flyway.validate();
return 0;
} catch (Exception e) {
LOG.error("Database migration validation failed due to ", e);
return 1;
}
}
@Command(
name = "repair",
description =
"Repairs the DATABASE_CHANGE_LOG table which is used to track"
+ "all the migrations on the target database This involves removing entries for the failed migrations and update"
+ "the checksum of migrations already applied on the target database")
public Integer repair() {
try {
parseConfig();
flyway.repair();
return 0;
} catch (Exception e) {
LOG.error("Repair of CHANGE_LOG failed due to ", e);
return 1;
}
}
@Command(
name = "check-connection",
description =
"Checks if a connection can be successfully " + "obtained for the target database")
public Integer checkConnection() {
try {
parseConfig();
flyway.getConfiguration().getDataSource().getConnection();
return 0;
} catch (Exception e) {
LOG.error("Failed to check connection due to ", e);
return 1;
}
}
@Command(
name = "drop-create",
description =
"Deletes any tables in configured database and creates a new tables "
+ "based on current version of OpenMetadata. This command also re-creates the search indexes.")
public Integer dropCreate() {
try {
promptUserForDelete();
parseConfig();
LOG.info("Deleting all the OpenMetadata tables.");
flyway.clean();
LOG.info("Creating the OpenMetadata Schema.");
flyway.migrate();
validateAndRunSystemDataMigrations(true);
LOG.info("OpenMetadata Database Schema is Updated.");
return 0;
} catch (Exception e) {
LOG.error("Failed to drop create due to ", e);
return 1;
}
}
@Command(
name = "migrate",
description = "Migrates the OpenMetadata database schema and search index mappings.")
public Integer migrate(
@Option(
names = {"--force"},
description = "Forces migrations to be run again, even if they have ran previously",
defaultValue = "false")
boolean force) {
try {
LOG.info("Migrating the OpenMetadata Schema.");
parseConfig();
flyway.migrate();
validateAndRunSystemDataMigrations(force);
printChangeLog();
return 0;
} catch (Exception e) {
LOG.error("Failed to db migration due to ", e);
return 1;
}
}
@Command(name = "changelog", description = "Prints the change log of database migration.")
public Integer changelog() {
try {
parseConfig();
printChangeLog();
return 0;
} catch (Exception e) {
LOG.error("Failed to fetch db change log due to ", e);
return 1;
}
}
@Command(name = "reindex", description = "Re Indexes data into search engine from command line.")
public Integer reIndex(
@Option(
names = {"-b", "--batch-size"},
defaultValue = "100")
int batchSize,
@Option(
names = {"--recreate-indexes"},
defaultValue = "true")
boolean recreateIndexes) {
try {
parseConfig();
AppScheduler.initialize(collectionDAO, searchRepository);
App searchIndexApp =
new App()
.withId(UUID.randomUUID())
.withName("SearchIndexApp")
.withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp")
.withAppSchedule(
new AppSchedule().withScheduleType(AppSchedule.ScheduleTimeline.DAILY))
.withAppConfiguration(
new EventPublisherJob()
.withEntities(new HashSet<>(List.of("all")))
.withRecreateIndex(recreateIndexes)
.withBatchSize(batchSize)
.withSearchIndexMappingLanguage(
config.getElasticSearchConfiguration().getSearchIndexMappingLanguage()))
.withRuntime(new ScheduledExecutionContext().withEnabled(true));
AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp);
return 0;
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);
return 1;
}
}
@Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.")
public Integer deployPipelines() {
try {
LOG.info("Deploying Pipelines");
parseConfig();
PipelineServiceClient pipelineServiceClient =
PipelineServiceClientFactory.createPipelineServiceClient(
config.getPipelineServiceClientConfiguration());
IngestionPipelineRepository pipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
List<IngestionPipeline> pipelines =
pipelineRepository.listAll(
new EntityUtil.Fields(Set.of(FIELD_OWNER, "service")),
new ListFilter(Include.NON_DELETED));
LOG.debug(String.format("Pipelines %d", pipelines.size()));
List<String> columns = Arrays.asList("Name", "Type", "Service Name", "Status");
List<List<String>> pipelineStatuses = new ArrayList<>();
for (IngestionPipeline pipeline : pipelines) {
deployPipeline(pipeline, pipelineServiceClient, pipelineStatuses);
}
printToAsciiTable(columns, pipelineStatuses, "No Pipelines Found");
return 0;
} catch (Exception e) {
LOG.error("Failed to deploy pipelines due to ", e);
return 1;
}
}
private void deployPipeline(
IngestionPipeline pipeline,
PipelineServiceClient pipelineServiceClient,
List<List<String>> pipelineStatuses) {
try {
LOG.debug(String.format("deploying pipeline %s", pipeline.getName()));
pipeline.setOpenMetadataServerConnection(new OpenMetadataConnectionBuilder(config).build());
secretsManager.decryptIngestionPipeline(pipeline);
OpenMetadataConnection openMetadataServerConnection =
new OpenMetadataConnectionBuilder(config).build();
pipeline.setOpenMetadataServerConnection(
secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false));
ServiceEntityInterface service =
Entity.getEntity(pipeline.getService(), "", Include.NON_DELETED);
pipelineServiceClient.deployPipeline(pipeline, service);
} catch (Exception e) {
LOG.error(
String.format(
"Failed to deploy pipeline %s of type %s for service %s",
pipeline.getName(),
pipeline.getPipelineType().value(),
pipeline.getService().getName()),
e);
pipeline.setDeployed(false);
} finally {
LOG.debug("update the pipeline");
collectionDAO.ingestionPipelineDAO().update(pipeline);
pipelineStatuses.add(
Arrays.asList(
pipeline.getName(),
pipeline.getPipelineType().value(),
pipeline.getService().getName(),
pipeline.getDeployed().toString()));
}
}
private void parseConfig() throws Exception {
if (debug) {
Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.DEBUG);
}
ObjectMapper objectMapper = Jackson.newObjectMapper();
Validator validator = Validators.newValidator();
YamlConfigurationFactory<OpenMetadataApplicationConfig> factory =
new YamlConfigurationFactory<>(
OpenMetadataApplicationConfig.class, validator, objectMapper, "dw");
config =
factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(), new EnvironmentVariableSubstitutor(false)),
configFilePath);
Fernet.getInstance().setFernetKey(config);
DataSourceFactory dataSourceFactory = config.getDataSourceFactory();
if (dataSourceFactory == null) {
throw new IllegalArgumentException("No database in config file");
}
// Check for db auth providers.
DatabaseAuthenticationProviderFactory.get(dataSourceFactory.getUrl())
.ifPresent(
databaseAuthenticationProvider -> {
String token =
databaseAuthenticationProvider.authenticate(
dataSourceFactory.getUrl(),
dataSourceFactory.getUser(),
dataSourceFactory.getPassword());
dataSourceFactory.setPassword(token);
});
String jdbcUrl = dataSourceFactory.getUrl();
String user = dataSourceFactory.getUser();
String password = dataSourceFactory.getPassword();
assert user != null && password != null;
String flywayRootPath = config.getMigrationConfiguration().getFlywayPath();
String location =
"filesystem:"
+ flywayRootPath
+ File.separator
+ config.getDataSourceFactory().getDriverClass();
flyway =
Flyway.configure()
.encoding(StandardCharsets.UTF_8)
.table("DATABASE_CHANGE_LOG")
.sqlMigrationPrefix("v")
.validateOnMigrate(false)
.outOfOrder(false)
.baselineOnMigrate(true)
.baselineVersion(MigrationVersion.fromVersion("000"))
.cleanOnValidationError(false)
.locations(location)
.dataSource(jdbcUrl, user, password)
.cleanDisabled(false)
.load();
nativeSQLScriptRootPath = config.getMigrationConfiguration().getNativePath();
extensionSQLScriptRootPath = config.getMigrationConfiguration().getExtensionPath();
jdbi = Jdbi.create(jdbcUrl, user, password);
jdbi.installPlugin(new SqlObjectPlugin());
jdbi.getConfig(SqlObjects.class)
.setSqlLocator(
new ConnectionAwareAnnotationSqlLocator(
config.getDataSourceFactory().getDriverClass()));
searchRepository =
new SearchRepository(config.getElasticSearchConfiguration(), new SearchIndexFactory());
// Initialize secrets manager
secretsManager =
SecretsManagerFactory.createSecretsManager(
config.getSecretsManagerConfiguration(), config.getClusterName());
collectionDAO = jdbi.onDemand(CollectionDAO.class);
Entity.setCollectionDAO(collectionDAO);
Entity.initializeRepositories(config, jdbi);
}
private void promptUserForDelete() {
LOG.info(
"""
You are about drop all the data in the database. ALL METADATA WILL BE DELETED.\s
This is not recommended for a Production setup or any deployment where you have collected\s
a lot of information from the users, such as descriptions, tags, etc.
""");
String input = "";
Scanner scanner = new Scanner(System.in);
while (!input.equals("DELETE")) {
LOG.info("Enter QUIT to quit. If you still want to continue, please enter DELETE: ");
input = scanner.next();
if (input.equals("QUIT")) {
LOG.info("Exiting without deleting data");
System.exit(1);
}
}
}
private void validateAndRunSystemDataMigrations(boolean force) {
ConnectionType connType = ConnectionType.from(config.getDataSourceFactory().getDriverClass());
DatasourceConfig.initialize(connType.label);
MigrationWorkflow workflow =
new MigrationWorkflow(
jdbi, nativeSQLScriptRootPath, connType, extensionSQLScriptRootPath, force);
workflow.loadMigrations();
workflow.runMigrationWorkflows();
Entity.cleanup();
}
private void printToAsciiTable(List<String> columns, List<List<String>> rows, String emptyText) {
LOG.info(new AsciiTable(columns, rows, true, "", emptyText).render());
}
private void printChangeLog() {
MigrationDAO migrationDAO = jdbi.onDemand(MigrationDAO.class);
List<MigrationDAO.ServerChangeLog> serverChangeLogs =
migrationDAO.listMetricsFromDBMigrations();
Set<String> columns = new LinkedHashSet<>(Set.of("version", "installedOn"));
List<List<String>> rows = new ArrayList<>();
for (MigrationDAO.ServerChangeLog serverChangeLog : serverChangeLogs) {
List<String> row = new ArrayList<>();
JsonObject metricsJson = new Gson().fromJson(serverChangeLog.getMetrics(), JsonObject.class);
Set<String> keys = metricsJson.keySet();
columns.addAll(keys);
row.add(serverChangeLog.getVersion());
row.add(serverChangeLog.getInstalledOn());
row.addAll(
metricsJson.entrySet().stream()
.map(Map.Entry::getValue)
.map(JsonElement::toString)
.toList());
rows.add(row);
}
printToAsciiTable(columns.stream().toList(), rows, "No Server Change log found");
}
public static void main(String... args) {
int exitCode =
new CommandLine(new org.openmetadata.service.util.OpenMetadataOperations()).execute(args);
System.exit(exitCode);
}
}

View File

@ -2,15 +2,20 @@
<configuration>
<variable name="ROOT_LOG_LEVEL" value="${BOOTSTRAP_LOG_LEVEL:-INFO}" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${LOG_LEVEL:-INFO}</level>
</filter>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n
</pattern>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<appender name="file" class="ch.qos.logback.core.FileAppender">
<file>./logs/openmetadata-operation.log</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="${LOG_LEVEL:-INFO}">
<appender-ref ref="STDOUT" />
<appender-ref ref="file"/>
</root>
</configuration>

View File

@ -150,6 +150,7 @@
<woodstox.version>5.4.0</woodstox.version>
<slack.version>1.29.2</slack.version>
<spotless.version>2.41.1</spotless.version>
<picocli.version>4.7.5</picocli.version>
</properties>
<dependencyManagement>
<dependencies>
@ -480,6 +481,11 @@
<artifactId>resilience4j-retry</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
<version>${picocli.version}</version>
</dependency>
<!-- avoid security issue https://security.snyk.io/vuln/SNYK-JAVA-ORGECLIPSEJETTY-1090340 -->
<dependency>