MINOR - Add App Limits (#18072)

* App Limits - Prep extension

* App Limits - Prep extension

* App Limits - Allow to search by name

* App Limits - Allow to search by name

* fix postgres sql

* comments
This commit is contained in:
Pere Miquel Brull 2024-10-03 11:00:29 +02:00 committed by GitHub
parent 5da3f4592b
commit e81efc8d98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 685 additions and 70 deletions

View File

@ -42,3 +42,4 @@ SET json = JSON_REMOVE(json, '$.testCaseResult');
UPDATE installed_apps SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication';
UPDATE apps_marketplace SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication';
ALTER TABLE apps_extension_time_series ADD COLUMN appName VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.appName') STORED NOT NULL;

View File

@ -0,0 +1,8 @@
-- Extend app extension for limits
ALTER TABLE apps_extension_time_series ADD COLUMN extension VARCHAR(255);
UPDATE apps_extension_time_series SET extension = 'status' WHERE extension IS NULL;
ALTER TABLE apps_extension_time_series MODIFY COLUMN extension VARCHAR(255) NOT NULL;
CREATE INDEX apps_extension_time_series_extension ON apps_extension_time_series(extension);
-- Clean dangling workflows not removed after test connection
truncate automations_workflow;

View File

@ -57,4 +57,6 @@ SET json = jsonb_set(
'{supportsInterrupt}',
to_jsonb(true)
)
where name = 'SearchIndexingApplication';
where name = 'SearchIndexingApplication';
ALTER TABLE apps_extension_time_series ADD COLUMN appName VARCHAR(256) GENERATED ALWAYS AS (json ->> 'appName') STORED NOT NULL;

View File

@ -0,0 +1,8 @@
-- Extend app extension for limits
ALTER TABLE apps_extension_time_series ADD COLUMN extension VARCHAR(255);
UPDATE apps_extension_time_series SET extension = 'status' WHERE extension IS NULL;
ALTER TABLE apps_extension_time_series ALTER COLUMN extension SET NOT NULL;
CREATE INDEX IF NOT EXISTS apps_extension_time_series_extension ON apps_extension_time_series(extension);
-- Clean dangling workflows not removed after test connection
truncate automations_workflow;

View File

@ -6,6 +6,9 @@ import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.service.util.JsonUtils;
public class AppUtil {
public enum RunType {
@ -94,4 +97,17 @@ public class AppUtil {
private AppRunStatus status;
private String runType;
}
public static AppExtension buildExtension(
Object object, App app, long timestamp, AppExtension.ExtensionType extensionType) {
Map<String, Object> jsonData = JsonUtils.getMap(object);
AppExtension data =
new AppExtension()
.withAppId(app.getId())
.withAppName(app.getName())
.withTimestamp(timestamp)
.withExtension(extensionType);
jsonData.forEach(data::setAdditionalProperty);
return data;
}
}

View File

@ -8,10 +8,12 @@ import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.JobDataMap;
@ -22,12 +24,14 @@ import org.quartz.JobListener;
@Slf4j
public abstract class AbstractOmAppJobListener implements JobListener {
private final CollectionDAO collectionDAO;
private final AppRepository repository;
private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun";
public static final String APP_RUN_STATS = "AppRunStats";
public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER";
protected AbstractOmAppJobListener(CollectionDAO dao) {
this.collectionDAO = dao;
this.repository = new AppRepository();
}
@Override
@ -48,6 +52,7 @@ public abstract class AbstractOmAppJobListener implements JobListener {
AppRunRecord runRecord =
new AppRunRecord()
.withAppId(jobApp.getId())
.withAppName(jobApp.getName())
.withStartTime(jobStartTime)
.withTimestamp(jobStartTime)
.withRunType(runType)
@ -57,9 +62,8 @@ public abstract class AbstractOmAppJobListener implements JobListener {
boolean update = false;
if (jobExecutionContext.isRecovering()) {
AppRunRecord latestRunRecord =
JsonUtils.readValue(
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
AppRunRecord.class);
repository.getLatestExtensionById(
jobApp, AppRunRecord.class, AppExtension.ExtensionType.STATUS);
if (latestRunRecord != null) {
runRecord = latestRunRecord;
}
@ -147,9 +151,14 @@ public abstract class AbstractOmAppJobListener implements JobListener {
collectionDAO
.appExtensionTimeSeriesDao()
.update(
appId.toString(), JsonUtils.pojoToJson(appRunRecord), appRunRecord.getTimestamp());
appId.toString(),
JsonUtils.pojoToJson(appRunRecord),
appRunRecord.getTimestamp(),
AppExtension.ExtensionType.STATUS.toString());
} else {
collectionDAO.appExtensionTimeSeriesDao().insert(JsonUtils.pojoToJson(appRunRecord));
collectionDAO
.appExtensionTimeSeriesDao()
.insert(JsonUtils.pojoToJson(appRunRecord), AppExtension.ExtensionType.STATUS.toString());
}
}

View File

@ -1,10 +1,11 @@
package org.openmetadata.service.exception;
import javax.ws.rs.core.Response;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.sdk.exception.WebServiceException;
public class AppException extends WebServiceException {
public static final String APP_RUN_RECORD_NOT_FOUND = "No Available Application Run Records.";
public static final String APP_EXTENSION_NOT_FOUND = "No Available Application Extension";
private static final String ERROR_TYPE = "APP_ERROR";
public AppException(String message) {
@ -18,4 +19,9 @@ public class AppException extends WebServiceException {
public static AppException byMessage(Response.Status status, String errorMessage) {
return new AppException(status, errorMessage);
}
public static AppException byExtension(AppExtension.ExtensionType extensionType) {
return new AppException(
String.format("%s: %s", APP_EXTENSION_NOT_FOUND, extensionType.toString()));
}
}

View File

@ -1,6 +1,6 @@
package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.exception.AppException.APP_RUN_RECORD_NOT_FOUND;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.util.UserUtil.getUser;
import java.util.ArrayList;
@ -12,6 +12,7 @@ import org.openmetadata.schema.auth.JWTAuthMechanism;
import org.openmetadata.schema.auth.JWTTokenExpiry;
import org.openmetadata.schema.entity.Bot;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
import org.openmetadata.schema.entity.teams.User;
@ -166,6 +167,16 @@ public class AppRepository extends EntityRepository<App> {
}
}
@Override
protected void postDelete(App entity) {
// Delete the status stored in the app extension
// Note that we don't want to delete the LIMITS, since we want to keep them
// between different app installations
daoCollection
.appExtensionTimeSeriesDao()
.delete(entity.getId().toString(), AppExtension.ExtensionType.STATUS.toString());
}
public final List<App> listAll() {
// forward scrolling, if after == null then first page is being asked
List<String> jsons = dao.listAfterWithOffset(Integer.MAX_VALUE, 0);
@ -177,18 +188,39 @@ public class AppRepository extends EntityRepository<App> {
return entities;
}
public ResultList<AppRunRecord> listAppRuns(UUID appId, int limitParam, int offset) {
int total = daoCollection.appExtensionTimeSeriesDao().listAppRunRecordCount(appId.toString());
List<AppRunRecord> entities = new ArrayList<>();
public ResultList<AppRunRecord> listAppRuns(App app, int limitParam, int offset) {
return listAppExtensionById(
app, limitParam, offset, AppRunRecord.class, AppExtension.ExtensionType.STATUS);
}
public AppRunRecord getLatestAppRuns(App app) {
return getLatestExtensionById(app, AppRunRecord.class, AppExtension.ExtensionType.STATUS);
}
public AppRunRecord getLatestAppRunsAfterStartTime(App app, long startTime) {
return getLatestExtensionAfterStartTimeById(
app, startTime, AppRunRecord.class, AppExtension.ExtensionType.STATUS);
}
public <T> ResultList<T> listAppExtensionByName(
App app,
int limitParam,
int offset,
Class<T> clazz,
AppExtension.ExtensionType extensionType) {
int total =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionCountByName(app.getName(), extensionType.toString());
List<T> entities = new ArrayList<>();
if (limitParam > 0) {
// forward scrolling, if after == null then first page is being asked
List<String> jsons =
daoCollection
.appExtensionTimeSeriesDao()
.listAppRunRecord(appId.toString(), limitParam, offset);
.listAppExtensionByName(app.getName(), limitParam, offset, extensionType.toString());
for (String json : jsons) {
AppRunRecord entity = JsonUtils.readValue(json, AppRunRecord.class);
T entity = JsonUtils.readValue(json, clazz);
entities.add(entity);
}
@ -199,6 +231,148 @@ public class AppRepository extends EntityRepository<App> {
}
}
public <T> ResultList<T> listAppExtensionById(
App app,
int limitParam,
int offset,
Class<T> clazz,
AppExtension.ExtensionType extensionType) {
int total =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionCount(app.getId().toString(), extensionType.toString());
List<T> entities = new ArrayList<>();
if (limitParam > 0) {
// forward scrolling, if after == null then first page is being asked
List<String> jsons =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtension(
app.getId().toString(), limitParam, offset, extensionType.toString());
for (String json : jsons) {
T entity = JsonUtils.readValue(json, clazz);
entities.add(entity);
}
return new ResultList<>(entities, offset, total);
} else {
// limit == 0 , return total count of entity.
return new ResultList<>(entities, null, total);
}
}
public <T> ResultList<T> listAppExtensionAfterTimeByName(
App app,
long startTime,
int limitParam,
int offset,
Class<T> clazz,
AppExtension.ExtensionType extensionType) {
int total =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionCountAfterTimeByName(
app.getName(), startTime, extensionType.toString());
List<T> entities = new ArrayList<>();
if (limitParam > 0) {
// forward scrolling, if after == null then first page is being asked
List<String> jsons =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionAfterTimeByName(
app.getName(), limitParam, offset, startTime, extensionType.toString());
for (String json : jsons) {
T entity = JsonUtils.readValue(json, clazz);
entities.add(entity);
}
return new ResultList<>(entities, offset, total);
} else {
// limit == 0 , return total count of entity.
return new ResultList<>(entities, null, total);
}
}
public <T> ResultList<T> listAppExtensionAfterTimeById(
App app,
long startTime,
int limitParam,
int offset,
Class<T> clazz,
AppExtension.ExtensionType extensionType) {
int total =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionCountAfterTime(
app.getId().toString(), startTime, extensionType.toString());
List<T> entities = new ArrayList<>();
if (limitParam > 0) {
// forward scrolling, if after == null then first page is being asked
List<String> jsons =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionAfterTime(
app.getId().toString(), limitParam, offset, startTime, extensionType.toString());
for (String json : jsons) {
T entity = JsonUtils.readValue(json, clazz);
entities.add(entity);
}
return new ResultList<>(entities, offset, total);
} else {
// limit == 0 , return total count of entity.
return new ResultList<>(entities, null, total);
}
}
public <T> T getLatestExtensionByName(
App app, Class<T> clazz, AppExtension.ExtensionType extensionType) {
List<String> result =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionByName(app.getName(), 1, 0, extensionType.toString());
if (nullOrEmpty(result)) {
throw AppException.byExtension(extensionType);
}
return JsonUtils.readValue(result.get(0), clazz);
}
public <T> T getLatestExtensionById(
App app, Class<T> clazz, AppExtension.ExtensionType extensionType) {
List<String> result =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtension(app.getId().toString(), 1, 0, extensionType.toString());
if (nullOrEmpty(result)) {
throw AppException.byExtension(extensionType);
}
return JsonUtils.readValue(result.get(0), clazz);
}
public <T> T getLatestExtensionAfterStartTimeByName(
App app, long startTime, Class<T> clazz, AppExtension.ExtensionType extensionType) {
List<String> result =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionAfterTimeByName(
app.getName(), 1, 0, startTime, extensionType.toString());
if (nullOrEmpty(result)) {
throw AppException.byExtension(extensionType);
}
return JsonUtils.readValue(result.get(0), clazz);
}
public <T> T getLatestExtensionAfterStartTimeById(
App app, long startTime, Class<T> clazz, AppExtension.ExtensionType extensionType) {
List<String> result =
daoCollection
.appExtensionTimeSeriesDao()
.listAppExtensionAfterTime(
app.getId().toString(), 1, 0, startTime, extensionType.toString());
if (nullOrEmpty(result)) {
throw AppException.byExtension(extensionType);
}
return JsonUtils.readValue(result.get(0), clazz);
}
@Override
protected void cleanup(App app) {
// Remove the Pipelines for Application
@ -209,22 +383,6 @@ public class AppRepository extends EntityRepository<App> {
super.cleanup(app);
}
public AppRunRecord getLatestAppRuns(UUID appId) {
String json = daoCollection.appExtensionTimeSeriesDao().getLatestAppRun(appId);
if (json == null) {
throw new AppException(APP_RUN_RECORD_NOT_FOUND);
}
return JsonUtils.readValue(json, AppRunRecord.class);
}
public AppRunRecord getLatestAppRunsAfterStartTime(UUID appId, long startTime) {
String json = daoCollection.appExtensionTimeSeriesDao().getLatestAppRun(appId, startTime);
if (json == null) {
throw new AppException(APP_RUN_RECORD_NOT_FOUND);
}
return JsonUtils.readValue(json, AppRunRecord.class);
}
@Override
public EntityUpdater getUpdater(App original, App updated, Operation operation) {
return new AppRepository.AppUpdater(original, updated, operation);

View File

@ -4245,65 +4245,102 @@ public interface CollectionDAO {
interface AppExtensionTimeSeries {
@ConnectionAwareSqlUpdate(
value = "INSERT INTO apps_extension_time_series(json) VALUES (:json)",
value =
"INSERT INTO apps_extension_time_series(json, extension) VALUES (:json, :extension)",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value = "INSERT INTO apps_extension_time_series(json) VALUES ((:json :: jsonb))",
value =
"INSERT INTO apps_extension_time_series(json, extension) VALUES (:json :: jsonb, :extension)",
connectionType = POSTGRES)
void insert(@Bind("json") String json);
void insert(@Bind("json") String json, @Bind("extension") String extension);
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running'",
"UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running' AND extension = 'status'",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running'",
"UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running' AND extension = 'status'",
connectionType = POSTGRES)
void markStaleEntriesStopped(@Bind("appId") String appId);
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp",
"UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp and extension=:extension",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series set json = (:json :: jsonb) where appId=:appId and timestamp=:timestamp",
"UPDATE apps_extension_time_series set json = (:json :: jsonb) where appId=:appId and timestamp=:timestamp and extension=:extension",
connectionType = POSTGRES)
void update(
@Bind("appId") String appId, @Bind("json") String json, @Bind("timestamp") Long timestamp);
@Bind("appId") String appId,
@Bind("json") String json,
@Bind("timestamp") Long timestamp,
@Bind("extension") String extension);
@SqlQuery("SELECT count(*) FROM apps_extension_time_series where appId = :appId")
int listAppRunRecordCount(@Bind("appId") String appId);
@SqlUpdate(
"DELETE FROM apps_extension_time_series WHERE appId = :appId AND extension = :extension")
void delete(@Bind("appId") String appId, @Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM apps_extension_time_series where appId = :appId ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppRunRecord(
@Bind("appId") String appId, @Bind("limit") int limit, @Bind("offset") int offset);
"SELECT count(*) FROM apps_extension_time_series where appId = :appId and extension = :extension")
int listAppExtensionCount(@Bind("appId") String appId, @Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM apps_extension_time_series where appId = :appId AND timestamp > :startTime ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppRunRecordAfterTime(
"SELECT count(*) FROM apps_extension_time_series where appId = :appId and extension = :extension AND timestamp > :startTime")
int listAppExtensionCountAfterTime(
@Bind("appId") String appId,
@Bind("startTime") long startTime,
@Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM apps_extension_time_series where appId = :appId AND extension = :extension ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppExtension(
@Bind("appId") String appId,
@Bind("limit") int limit,
@Bind("offset") int offset,
@Bind("startTime") long startTime);
@Bind("extension") String extension);
default String getLatestAppRun(UUID appId) {
List<String> result = listAppRunRecord(appId.toString(), 1, 0);
if (!nullOrEmpty(result)) {
return result.get(0);
}
return null;
}
@SqlQuery(
"SELECT json FROM apps_extension_time_series where appId = :appId AND extension = :extension AND timestamp > :startTime ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppExtensionAfterTime(
@Bind("appId") String appId,
@Bind("limit") int limit,
@Bind("offset") int offset,
@Bind("startTime") long startTime,
@Bind("extension") String extension);
default String getLatestAppRun(UUID appId, long startTime) {
List<String> result = listAppRunRecordAfterTime(appId.toString(), 1, 0, startTime);
if (!nullOrEmpty(result)) {
return result.get(0);
}
return null;
}
// Prepare methods to get extension by name instead of ID
// For example, for limits we need to fetch by app name to ensure if we reinstall the app,
// they'll still be taken into account
@SqlQuery(
"SELECT count(*) FROM apps_extension_time_series where appName = :appName and extension = :extension")
int listAppExtensionCountByName(
@Bind("appName") String appName, @Bind("extension") String extension);
@SqlQuery(
"SELECT count(*) FROM apps_extension_time_series where appName = :appName and extension = :extension AND timestamp > :startTime")
int listAppExtensionCountAfterTimeByName(
@Bind("appName") String appName,
@Bind("startTime") long startTime,
@Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM apps_extension_time_series where appName = :appName AND extension = :extension ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppExtensionByName(
@Bind("appName") String appName,
@Bind("limit") int limit,
@Bind("offset") int offset,
@Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM apps_extension_time_series where appName = :appName AND extension = :extension AND timestamp > :startTime ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppExtensionAfterTimeByName(
@Bind("appName") String appName,
@Bind("limit") int limit,
@Bind("offset") int offset,
@Bind("startTime") long startTime,
@Bind("extension") String extension);
}
interface ReportDataTimeSeriesDAO extends EntityTimeSeriesDAO {

View File

@ -0,0 +1,81 @@
package org.openmetadata.service.limits;
import javax.ws.rs.core.SecurityContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.service.exception.LimitsException;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
@Slf4j
public abstract class AppLimits {
private final CollectionDAO.AppExtensionTimeSeries dao;
private final @Getter AppRepository repository;
private @Getter App app;
public AppLimits(CollectionDAO collectionDAO) {
this.dao = collectionDAO.appExtensionTimeSeriesDao();
this.repository = new AppRepository();
}
// This can only happen with runtime loaded apps with the private config
public void init(App app) {
this.app = app;
try {
parseAppLimits();
} catch (Exception e) {
LOG.error("Error parsing limits config file: {}", e.getMessage());
}
}
public AppExtension getLatestLimit() {
return repository.getLatestExtensionByName(
this.app, AppExtension.class, AppExtension.ExtensionType.LIMITS);
}
public AppExtension getLatestLimit(long startTime) {
return repository.getLatestExtensionAfterStartTimeByName(
this.app, startTime, AppExtension.class, AppExtension.ExtensionType.LIMITS);
}
public ResultList<AppExtension> listLimits(int limitParam, int offset) {
return repository.listAppExtensionByName(
this.app, limitParam, offset, AppExtension.class, AppExtension.ExtensionType.LIMITS);
}
public void insertLimit(AppExtension limitsExtension) {
try {
if (limitsExtension.getAppId() != getApp().getId()) {
LOG.error(
"App ID mismatch. You can't manage limits from another app: {} != {}",
limitsExtension.getAppId(),
getApp().getId());
return;
}
// Ensure the passed extension is an updated limit
limitsExtension.setTimestamp(System.currentTimeMillis());
limitsExtension.setExtension(AppExtension.ExtensionType.LIMITS);
this.dao.insert(
JsonUtils.pojoToJson(limitsExtension), AppExtension.ExtensionType.LIMITS.toString());
} catch (Exception e) {
LOG.error("Error inserting app limits for {}: {}", this.getApp().getName(), e.getMessage());
}
}
// Parse the app limits defined in the Private Configuration
// Let each App parse and store however is needed
public abstract void parseAppLimits() throws LimitsException;
// Enforce limits for the app
public abstract void enforceLimits(
SecurityContext securityContext,
ResourceContextInterface resourceContext,
OperationContext operationContext);
}

View File

@ -0,0 +1,20 @@
package org.openmetadata.service.migration.mysql.v160;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addAppExtensionName;
import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
@SneakyThrows
public void runDataMigration() {
addAppExtensionName(handle, collectionDAO, authenticationConfiguration, false);
}
}

View File

@ -0,0 +1,20 @@
package org.openmetadata.service.migration.postgres.v160;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addAppExtensionName;
import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
@SneakyThrows
public void runDataMigration() {
addAppExtensionName(handle, collectionDAO, authenticationConfiguration, true);
}
}

View File

@ -0,0 +1,83 @@
package org.openmetadata.service.migration.utils.v160;
import java.util.UUID;
import javax.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.statement.Update;
import org.openmetadata.schema.api.security.AuthenticationConfiguration;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class MigrationUtil {
// Just list status to make this ignore the new `limits` extension if it ever runs again
private static final String SELECT_ALL_APP_EXTENSION_TIME_SERIES =
"SELECT appId, json FROM apps_extension_time_series where extension = 'status'";
private static final String UPDATE_MYSQL_APP_EXTENSION =
"UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.appName', :appName) WHERE appId = :appId AND extension = 'status'";
private static final String UPDATE_PG_APP_EXTENSION =
"UPDATE apps_extension_time_series SET json = jsonb_set(json_data, '{appName}', :appName::jsonb) WHERE appId = :appId AND extension = 'status'";
// We'll list the entries in app_extension_time_series, clean those whose appId
// is not installed, and for those that appId matches from installed Apps, we'll
// add the appName to the JSON data.
// Note that we only want to clean up old status data.
public static void addAppExtensionName(
Handle handle,
CollectionDAO daoCollection,
AuthenticationConfiguration config,
boolean postgres) {
LOG.info("Migrating app extension name...");
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
try {
handle
.createQuery(SELECT_ALL_APP_EXTENSION_TIME_SERIES)
.mapToMap()
.forEach(
row -> {
try {
UUID appId = UUID.fromString(row.get("appid").toString());
// Ignore if this has already been migrated
JsonObject json = JsonUtils.readJson((String) row.get("json")).asJsonObject();
if (json.containsKey("appName")) {
return;
}
// Else, update the name
App app = appRepository.find(appId, Include.ALL);
updateAppExtension(handle, app, postgres);
} catch (EntityNotFoundException ex) {
// Clean up the old status data
daoCollection
.appExtensionTimeSeriesDao()
.delete(
row.get("appid").toString(),
AppExtension.ExtensionType.STATUS.toString());
} catch (Exception ex) {
LOG.warn(
String.format("Error migrating app extension [%s] due to [%s]", row, ex));
}
});
} catch (Exception ex) {
LOG.warn("Error running app extension migration ", ex);
}
}
private static void updateAppExtension(Handle handle, App app, boolean postgres) {
Update update;
if (postgres) {
update = handle.createUpdate(UPDATE_PG_APP_EXTENSION);
} else {
update = handle.createUpdate(UPDATE_MYSQL_APP_EXTENSION);
}
update.bind("appId", app.getId().toString()).bind("appName", app.getName()).execute();
}
}

View File

@ -47,6 +47,7 @@ import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.data.RestoreEntity;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppType;
@ -289,7 +290,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
if (installation.getAppType().equals(AppType.Internal)) {
return Response.status(Response.Status.OK)
.entity(repository.listAppRuns(installation.getId(), limitParam, offset))
.entity(repository.listAppRuns(installation, limitParam, offset))
.build();
}
if (!installation.getPipelines().isEmpty()) {
@ -308,6 +309,77 @@ public class AppResource extends EntityResource<App, AppRepository> {
throw new IllegalArgumentException("App does not have an associated pipeline.");
}
@GET
@Path("/name/{name}/extension")
@Operation(
operationId = "listAppExtension",
summary = "List App Extension data",
description =
"Get a list of applications Extension data."
+ " Use cursor-based pagination to limit the number "
+ "entries in the list using `offset` query params.",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of Installed Applications Runs",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = AppExtension.class)))
})
public Response listAppExtension(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(description = "Limit records. (1 to 1000000, default = 10)")
@DefaultValue("10")
@QueryParam("limit")
@Min(0)
@Max(1000000)
int limitParam,
@Parameter(description = "Offset records. (0 to 1000000, default = 0)")
@DefaultValue("0")
@QueryParam("offset")
@Min(0)
@Max(1000000)
int offset,
@Parameter(
description = "Filter pipeline status after the given start timestamp",
schema = @Schema(type = "number"))
@QueryParam("startTs")
Long startTs,
@Parameter(description = "Get the extension type", schema = @Schema(type = "string"))
@QueryParam("extensionType")
AppExtension.ExtensionType extensionType,
@Parameter(
description = "List extensions by name instead of id",
schema = @Schema(type = "boolean"))
@QueryParam("byName")
@DefaultValue("false")
boolean byName) {
App installation = repository.getByName(uriInfo, name, repository.getFields("id"));
if (startTs != null) {
ResultList<AppExtension> appExtensionList =
byName
? repository.listAppExtensionAfterTimeByName(
installation, startTs, limitParam, offset, AppExtension.class, extensionType)
: repository.listAppExtensionAfterTimeById(
installation, startTs, limitParam, offset, AppExtension.class, extensionType);
return Response.status(Response.Status.OK).entity(appExtensionList).build();
}
ResultList<AppExtension> appExtensionList =
byName
? repository.listAppExtensionByName(
installation, limitParam, offset, AppExtension.class, extensionType)
: repository.listAppExtensionById(
installation, limitParam, offset, AppExtension.class, extensionType);
return Response.status(Response.Status.OK).entity(appExtensionList).build();
}
@GET
@Path("/name/{name}/logs")
@Operation(
@ -336,7 +408,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
if (installation.getAppType().equals(AppType.Internal)) {
return Response.status(Response.Status.OK)
.entity(repository.getLatestAppRuns(installation.getId()))
.entity(repository.getLatestAppRuns(installation))
.build();
} else {
if (!installation.getPipelines().isEmpty()) {
@ -385,7 +457,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
if (installation.getAppType().equals(AppType.Internal)) {
return Response.status(Response.Status.OK)
.entity(repository.getLatestAppRuns(installation.getId()))
.entity(repository.getLatestAppRuns(installation))
.build();
} else {
if (!installation.getPipelines().isEmpty()) {

View File

@ -338,8 +338,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
try {
AppRepository appRepository =
(AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
appRunRecord =
appRepository.getLatestAppRunsAfterStartTime(searchIndexApp.getId(), startTime);
appRunRecord = appRepository.getLatestAppRunsAfterStartTime(searchIndexApp, startTime);
if (isRunCompleted(appRunRecord)) {
List<String> columns =
new ArrayList<>(

View File

@ -36,6 +36,7 @@ import org.openmetadata.schema.analytics.type.WebAnalyticEventType;
import org.openmetadata.schema.api.data.CreateTableProfile;
import org.openmetadata.schema.api.services.CreateDatabaseService;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppSchedule;
@ -336,6 +337,7 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
String appName = "SearchIndexingApplication";
postTriggerApp(appName, ADMIN_AUTH_HEADERS);
assertAppStatusAvailableAfterTrigger(appName);
assertListExtension(appName, AppExtension.ExtensionType.STATUS);
assertAppRanAfterTriggerWithStatus(appName, AppRunRecord.Status.SUCCESS);
}
@ -352,6 +354,19 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
APP_TRIGGER_RETRY);
}
private void assertListExtension(String appName, AppExtension.ExtensionType extensionType) {
assertEventually(
"appIsRunning",
() -> {
try {
assert Objects.nonNull(listAppExtension(appName, extensionType, ADMIN_AUTH_HEADERS));
} catch (HttpResponseException ex) {
throw new AssertionError(ex);
}
},
APP_TRIGGER_RETRY);
}
private void assertAppRanAfterTriggerWithStatus(String appName, AppRunRecord.Status status) {
assertEventually(
"appStatus",
@ -420,4 +435,13 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
WebTarget target = getResource(String.format("apps/name/%s/runs/latest", appName));
return TestUtils.get(target, AppRunRecord.class, authHeaders);
}
private AppExtension listAppExtension(
String appName, AppExtension.ExtensionType extensionType, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target =
getResource(
String.format("apps/name/%s/extension?extensionType=%s", appName, extensionType));
return TestUtils.get(target, AppExtension.class, authHeaders);
}
}

View File

@ -0,0 +1,37 @@
{
"$id": "https://open-metadata.org/schema/entity/applications/appExtension.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AppExtension",
"javaType": "org.openmetadata.schema.entity.app.AppExtension",
"description": "App Extension Object.",
"type": "object",
"definitions": {
"extensionType": {
"description": "Extension type.",
"type": "string",
"enum": [
"status",
"limits"
]
}
},
"properties": {
"appId": {
"description": "Unique identifier of this application.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"appName": {
"description": "Name of the application.",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"timestamp": {
"description": "Start of the job status.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"extension": {
"$ref": "#/definitions/extensionType"
}
},
"additionalProperties": true,
"required": ["appId", "appName", "extension"]
}

View File

@ -10,6 +10,19 @@
"description": "Unique identifier of this application for which the job is ran.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"appName": {
"description": "Name of the application.",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"timestamp": {
"description": "Update time of the job status.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"extension": {
"description": "Extension type.",
"type": "string",
"default": "status"
},
"status": {
"description": "Status for the Job.",
"type": "string",
@ -40,10 +53,6 @@
"description": "Execution time of the job status.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"timestamp": {
"description": "Update time of the job status.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"failureContext": {
"description": "Failure Context for the Application.",
"type": "object"

View File

@ -5,6 +5,26 @@
"description": "PRivate Configuration for the MetaPilot External Application.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.app.external.MetaPilotAppPrivateConfig",
"definitions": {
"metaPilotLimits": {
"description": "Limits for the MetaPilot Application.",
"type": "object",
"properties": {
"descriptions": {
"description": "Maximum number of descriptions generated by the MetaPilot",
"type": "integer"
},
"queries": {
"description": "Maximum number of queries generated by MetaPilot.",
"type": "integer"
},
"billingCycleStart": {
"description": "Start of the billing cycle.",
"$ref": "../../../../../type/basic.json#/definitions/date"
}
}
}
},
"properties": {
"waiiInstance": {
"title": "WAII Instance",
@ -24,8 +44,13 @@
"description": "WAII API Token",
"type": "string",
"format": "password"
},
"limits": {
"title": "Limits",
"description": "Limits for the MetaPilot Application.",
"$ref": "#/definitions/metaPilotLimits"
}
},
"additionalProperties": false,
"required": ["waiiInstance", "collateURL", "token"]
"required": ["waiiInstance", "collateURL", "token", "limits"]
}