mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-04 11:33:07 +00:00
parent
d2473c8ca0
commit
8a390d4dbb
@ -33,28 +33,28 @@ public class EventPubSub {
|
||||
private static Disruptor<ChangeEventHolder> disruptor;
|
||||
private static ExecutorService executor;
|
||||
private static RingBuffer<ChangeEventHolder> ringBuffer;
|
||||
private static boolean STARTED = false;
|
||||
private static boolean started = false;
|
||||
|
||||
public static void start() {
|
||||
if (!STARTED) {
|
||||
if (!started) {
|
||||
disruptor = new Disruptor<>(ChangeEventHolder::new, 1024, DaemonThreadFactory.INSTANCE);
|
||||
disruptor.setDefaultExceptionHandler(new DefaultExceptionHandler());
|
||||
executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
|
||||
ringBuffer = disruptor.start();
|
||||
LOG.info("Disruptor started");
|
||||
STARTED = true;
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
public static void shutdown() throws InterruptedException {
|
||||
if (STARTED) {
|
||||
if (started) {
|
||||
disruptor.shutdown();
|
||||
disruptor.halt();
|
||||
executor.shutdownNow();
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
disruptor = null;
|
||||
ringBuffer = null;
|
||||
STARTED = false;
|
||||
started = false;
|
||||
LOG.info("Disruptor stopped");
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,8 +70,7 @@ public class AirflowConfiguration {
|
||||
}
|
||||
|
||||
public String getSecretKey() {
|
||||
String secretKey = "";
|
||||
return secretKey;
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -36,7 +36,7 @@ public class AirflowRESTClient {
|
||||
private final String username;
|
||||
private final String password;
|
||||
private final HttpClient client;
|
||||
private final String authHeader = "Bearer %s";
|
||||
private static final String authHeader = "Bearer %s";
|
||||
|
||||
public AirflowRESTClient(CatalogApplicationConfig config) {
|
||||
AirflowConfiguration airflowConfig = config.getAirflowConfiguration();
|
||||
|
||||
@ -221,7 +221,8 @@ public abstract class EntityRepository<T> {
|
||||
}
|
||||
int total = dao.listCount(fqnPrefix);
|
||||
|
||||
String beforeCursor, afterCursor = null;
|
||||
String beforeCursor;
|
||||
String afterCursor = null;
|
||||
beforeCursor = after == null ? null : getFullyQualifiedName(entities.get(0));
|
||||
if (entities.size() > limitParam) { // If extra result exists, then next page exists - return after cursor
|
||||
entities.remove(limitParam);
|
||||
@ -243,7 +244,8 @@ public abstract class EntityRepository<T> {
|
||||
}
|
||||
int total = dao.listCount(fqnPrefix);
|
||||
|
||||
String beforeCursor = null, afterCursor;
|
||||
String beforeCursor = null;
|
||||
String afterCursor;
|
||||
if (entities.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor
|
||||
entities.remove(0);
|
||||
beforeCursor = getFullyQualifiedName(entities.get(0));
|
||||
@ -341,7 +343,7 @@ public abstract class EntityRepository<T> {
|
||||
|
||||
// Validate follower
|
||||
User user = daoCollection.userDAO().findEntityById(userId);
|
||||
if (user.getDeleted()) {
|
||||
if (Boolean.TRUE.equals(user.getDeleted())) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.deactivatedUser(userId));
|
||||
}
|
||||
|
||||
@ -376,7 +378,7 @@ public abstract class EntityRepository<T> {
|
||||
List<EntityReference> contains =
|
||||
daoCollection.relationshipDAO().findTo(id.toString(), entityName, Relationship.CONTAINS.ordinal());
|
||||
|
||||
if (contains.size() > 0) {
|
||||
if (!contains.isEmpty()) {
|
||||
if (!recursive) {
|
||||
throw new IllegalArgumentException(entityName + " is not empty");
|
||||
}
|
||||
|
||||
@ -102,7 +102,8 @@ public class LocationRepository extends EntityRepository<Location> {
|
||||
.listPrefixesCount(
|
||||
daoCollection.locationDAO().getTableName(), daoCollection.locationDAO().getNameColumn(), fqn, service);
|
||||
|
||||
String beforeCursor = null, afterCursor;
|
||||
String beforeCursor = null;
|
||||
String afterCursor;
|
||||
if (entities.size() > limitParam) { // If extra result exists, then previous page exists - return before cursor
|
||||
entities.remove(0);
|
||||
beforeCursor = getFullyQualifiedName(entities.get(0));
|
||||
@ -137,7 +138,8 @@ public class LocationRepository extends EntityRepository<Location> {
|
||||
.listPrefixesCount(
|
||||
daoCollection.locationDAO().getTableName(), daoCollection.locationDAO().getNameColumn(), fqn, service);
|
||||
|
||||
String beforeCursor, afterCursor = null;
|
||||
String beforeCursor;
|
||||
String afterCursor = null;
|
||||
beforeCursor = after == null ? null : getFullyQualifiedName(entities.get(0));
|
||||
if (entities.size() > limitParam) { // If extra result exists, then next page exists - return after cursor
|
||||
entities.remove(limitParam);
|
||||
|
||||
@ -620,7 +620,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
.listFromByPrefix(
|
||||
table.getFullyQualifiedName(), "table.columns.column", "table.columns.column", JOINED_WITH.ordinal()));
|
||||
|
||||
if (list.size() == 0) { // No join information found. Return empty list
|
||||
if (list.isEmpty()) { // No join information found. Return empty list
|
||||
return tableJoins;
|
||||
}
|
||||
|
||||
|
||||
@ -235,7 +235,7 @@ public class TagRepository {
|
||||
Tag childTag = setFields(JsonUtils.readValue(json, Tag.class), fields);
|
||||
tagList.add(populateChildrenTags(childTag, fields));
|
||||
}
|
||||
return tag.withChildren(tagList.size() > 0 ? tagList : null);
|
||||
return tag.withChildren(!tagList.isEmpty() ? tagList : null);
|
||||
}
|
||||
|
||||
private TagCategory setFields(TagCategory category, Fields fields) {
|
||||
|
||||
@ -114,7 +114,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
}
|
||||
|
||||
public void addWebhookPublisher(Webhook webhook) {
|
||||
if (!webhook.getEnabled()) { // Only add webhook that is enabled for publishing events
|
||||
if (Boolean.FALSE.equals(webhook.getEnabled())) { // Only add webhook that is enabled for publishing events
|
||||
webhook.setStatus(Status.NOT_STARTED);
|
||||
return;
|
||||
}
|
||||
@ -126,7 +126,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
}
|
||||
|
||||
public void updateWebhookPublisher(Webhook webhook) throws InterruptedException {
|
||||
if (webhook.getEnabled()) { // Only add webhook that is enabled for publishing
|
||||
if (Boolean.TRUE.equals(webhook.getEnabled())) { // Only add webhook that is enabled for publishing
|
||||
// If there was a previous webhook either in disabled state or stopped due
|
||||
// to errors, update it and restart publishing
|
||||
WebhookPublisher previousPublisher = getPublisher(webhook.getId());
|
||||
@ -396,12 +396,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
|
||||
private void initFilter() {
|
||||
filter.clear();
|
||||
webhook
|
||||
.getEventFilters()
|
||||
.forEach(
|
||||
f -> { // Set up filters
|
||||
filter.put(f.getEventType(), f.getEntities());
|
||||
});
|
||||
webhook.getEventFilters().forEach(f -> filter.put(f.getEventType(), f.getEntities()));
|
||||
}
|
||||
|
||||
private void setErrorStatus(Long attemptTime, Integer statusCode, String reason) throws IOException {
|
||||
@ -491,7 +486,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
updatedWebhook
|
||||
.withStatus(publisher.getWebhook().getStatus())
|
||||
.withFailureDetails(publisher.getWebhook().getFailureDetails());
|
||||
if (updatedWebhook.getEnabled() == false) {
|
||||
if (Boolean.FALSE.equals(updatedWebhook.getEnabled())) {
|
||||
updatedWebhook.setStatus(Status.NOT_STARTED);
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
|
||||
*/
|
||||
public final class CollectionRegistry {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CollectionRegistry.class);
|
||||
private static CollectionRegistry INSTANCE = null;
|
||||
private static CollectionRegistry instance = null;
|
||||
|
||||
/** Map of collection endpoint path to collection details */
|
||||
private final Map<String, CollectionDetails> collectionMap = new HashMap<>();
|
||||
@ -58,11 +58,11 @@ public final class CollectionRegistry {
|
||||
private CollectionRegistry() {}
|
||||
|
||||
public static CollectionRegistry getInstance() {
|
||||
if (INSTANCE == null) {
|
||||
INSTANCE = new CollectionRegistry();
|
||||
INSTANCE.initialize();
|
||||
if (instance == null) {
|
||||
instance = new CollectionRegistry();
|
||||
instance.initialize();
|
||||
}
|
||||
return INSTANCE;
|
||||
return instance;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
@ -144,7 +144,9 @@ public final class CollectionRegistry {
|
||||
|
||||
/** Get collection details based on annotations in Resource classes */
|
||||
private static CollectionDetails getCollection(Class<?> cl) {
|
||||
String href, doc, name;
|
||||
String href;
|
||||
String doc;
|
||||
String name;
|
||||
href = null;
|
||||
doc = null;
|
||||
name = null;
|
||||
|
||||
@ -234,7 +234,7 @@ public class WebhookResource {
|
||||
throws IOException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
Webhook webhook = getWebhook(securityContext, create);
|
||||
webhook.setStatus(webhook.getEnabled() ? Status.STARTED : Status.NOT_STARTED);
|
||||
webhook.setStatus(Boolean.TRUE.equals(webhook.getEnabled()) ? Status.STARTED : Status.NOT_STARTED);
|
||||
webhook = dao.create(uriInfo, webhook);
|
||||
dao.addWebhookPublisher(webhook);
|
||||
return Response.created(webhook.getHref()).entity(webhook).build();
|
||||
@ -259,7 +259,7 @@ public class WebhookResource {
|
||||
// SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
// Table table = getTable(securityContext, create);
|
||||
Webhook webhook = getWebhook(securityContext, create);
|
||||
webhook.setStatus(webhook.getEnabled() ? Status.STARTED : Status.NOT_STARTED);
|
||||
webhook.setStatus(Boolean.TRUE.equals(webhook.getEnabled()) ? Status.STARTED : Status.NOT_STARTED);
|
||||
PutResponse<Webhook> putResponse = dao.createOrUpdate(uriInfo, webhook);
|
||||
dao.updateWebhookPublisher(webhook);
|
||||
return putResponse.toResponse();
|
||||
|
||||
@ -431,7 +431,7 @@ public class IngestionResource {
|
||||
}
|
||||
|
||||
private void deploy(Ingestion ingestion) {
|
||||
if (ingestion.getForceDeploy()) {
|
||||
if (Boolean.TRUE.equals(ingestion.getForceDeploy())) {
|
||||
airflowRESTClient.deploy(ingestion, config);
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,8 +140,7 @@ public class MessagingServiceResource {
|
||||
public MessagingService get(
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id)
|
||||
throws IOException, ParseException {
|
||||
MessagingService service = dao.get(uriInfo, id, null);
|
||||
return service;
|
||||
return dao.get(uriInfo, id, null);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
||||
@ -122,7 +122,7 @@ public class TagResource {
|
||||
}
|
||||
|
||||
static final String FIELDS = "usageCount";
|
||||
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "").split(","));
|
||||
protected static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "").split(","));
|
||||
|
||||
@GET
|
||||
@Operation(
|
||||
|
||||
@ -99,7 +99,7 @@ public class TeamResource {
|
||||
}
|
||||
}
|
||||
|
||||
public static final String FIELDS = "profile,users,owns";
|
||||
protected static final String FIELDS = "profile,users,owns";
|
||||
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "").split(","));
|
||||
|
||||
@GET
|
||||
|
||||
@ -282,9 +282,7 @@ public class TopicResource {
|
||||
public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateTopic create)
|
||||
throws IOException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
System.out.println("XXX topic retention time" + create.getRetentionTime());
|
||||
Topic topic = getTopic(securityContext, create);
|
||||
System.out.println("XXX topic retention time" + topic.getRetentionTime());
|
||||
|
||||
topic = addHref(uriInfo, dao.create(uriInfo, topic));
|
||||
return Response.created(topic.getHref()).entity(topic).build();
|
||||
@ -336,9 +334,7 @@ public class TopicResource {
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateTopic create)
|
||||
throws IOException, ParseException {
|
||||
|
||||
System.out.println("XXX topic retention time" + create.getRetentionTime());
|
||||
Topic topic = getTopic(securityContext, create);
|
||||
System.out.println("XXX topic retention time" + topic.getRetentionTime());
|
||||
PutResponse<Topic> response = dao.createOrUpdate(uriInfo, topic);
|
||||
addHref(uriInfo, response.getEntity());
|
||||
return response.toResponse();
|
||||
|
||||
@ -42,7 +42,7 @@ public class DefaultAuthorizer implements Authorizer {
|
||||
|
||||
private String principalDomain;
|
||||
private UserRepository userRepository;
|
||||
private final String fieldsParam = "teams";
|
||||
private static final String fieldsParam = "teams";
|
||||
|
||||
@Override
|
||||
public void init(AuthorizerConfiguration config, Jdbi dbi) {
|
||||
|
||||
@ -5,12 +5,12 @@ package org.openmetadata.catalog.security.policyevaluator;
|
||||
* PolicyEvaluator}
|
||||
*/
|
||||
class CommonFields {
|
||||
static String ALLOW = "allow";
|
||||
static String ENTITY_TAGS = "entityTags";
|
||||
static String ENTITY_TYPE = "entityType";
|
||||
static String OPERATION = "operation";
|
||||
static String USER_ROLES = "userRoles";
|
||||
static final String ALLOW = "allow";
|
||||
static final String ENTITY_TAGS = "entityTags";
|
||||
static final String ENTITY_TYPE = "entityType";
|
||||
static final String OPERATION = "operation";
|
||||
static final String USER_ROLES = "userRoles";
|
||||
|
||||
// By default, if no rule matches, do not grant access.
|
||||
static boolean DEFAULT_ACCESS = false;
|
||||
static final boolean DEFAULT_ACCESS = false;
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import javax.json.JsonArray;
|
||||
import javax.json.JsonArrayBuilder;
|
||||
import javax.json.JsonObject;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.json.JsonReader;
|
||||
import javax.json.JsonStructure;
|
||||
import javax.json.JsonValue;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
@ -153,18 +154,18 @@ public final class JsonUtils {
|
||||
|
||||
// Apply sortedPatch
|
||||
JsonValue patchedJson = sortedPatch.apply(targetJson);
|
||||
return JsonUtils.convertValue(patchedJson, clz);
|
||||
}
|
||||
|
||||
public static <T> T convertValue(JsonValue patched, Class<T> clz) {
|
||||
return OBJECT_MAPPER.convertValue(patched, clz);
|
||||
return OBJECT_MAPPER.convertValue(patchedJson, clz);
|
||||
}
|
||||
|
||||
public static JsonPatch getJsonPatch(String v1, String v2) {
|
||||
System.out.println(v1);
|
||||
System.out.println(v2);
|
||||
JsonValue source = Json.createReader(new StringReader(v1)).readValue();
|
||||
JsonValue dest = Json.createReader(new StringReader(v2)).readValue();
|
||||
JsonValue source = readJson(v1);
|
||||
JsonValue dest = readJson(v2);
|
||||
return Json.createDiff(source.asJsonObject(), dest.asJsonObject());
|
||||
}
|
||||
|
||||
public static JsonValue readJson(String s) {
|
||||
try (JsonReader reader = Json.createReader(new StringReader(s))) {
|
||||
return reader.readValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipFile;
|
||||
import org.slf4j.Logger;
|
||||
@ -71,16 +72,18 @@ public final class CommonUtil {
|
||||
|
||||
public static Collection<String> getResourcesFromDirectory(File file, Pattern pattern) throws IOException {
|
||||
final Path root = Path.of(file.getPath());
|
||||
return Files.walk(Paths.get(file.getPath()))
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(path -> pattern.matcher(path.toString()).matches())
|
||||
.map(
|
||||
path -> {
|
||||
String relativePath = root.relativize(path).toString();
|
||||
LOG.info("Adding directory file {}", relativePath);
|
||||
return relativePath;
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
try (Stream<Path> paths = Files.walk(Paths.get(file.getPath()))) {
|
||||
return paths
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(path -> pattern.matcher(path.toString()).matches())
|
||||
.map(
|
||||
path -> {
|
||||
String relativePath = root.relativize(path).toString();
|
||||
LOG.info("Adding directory file {}", relativePath);
|
||||
return relativePath;
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
}
|
||||
|
||||
/** Get date after {@code days} from the given date or before i{@code days} when it is negative */
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user