Sonar flagged issues clean up (#6622)

This commit is contained in:
Suresh Srinivas 2022-08-06 12:50:23 -07:00 committed by GitHub
parent a6ecef3464
commit f8edbafccf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 114 additions and 145 deletions

View File

@ -7,7 +7,6 @@ import java.util.List;
import org.openmetadata.catalog.type.ResourceDescriptor;
public class ResourceRegistry {
private static final ResourceRegistry registry = new ResourceRegistry();
private static final List<ResourceDescriptor> RESOURCE_DESCRIPTORS = new ArrayList<>();
private ResourceRegistry() {}

View File

@ -118,7 +118,7 @@ public class ChangeEventHandler implements EventHandler {
EntityLink about = EntityLink.parse(thread.getAbout());
feedDao.create(thread, entity.getId(), owner, about);
String jsonThread = mapper.writeValueAsString(thread);
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.feedBroadcastChannel, jsonThread);
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
}
}
}
@ -140,23 +140,23 @@ public class ChangeEventHandler implements EventHandler {
if (thread.getPostsCount() == 0) {
List<EntityReference> assignees = thread.getTask().getAssignees();
assignees.forEach(
(e) -> {
e -> {
if (Entity.USER.equals(e.getType())) {
WebSocketManager.getInstance()
.sendToOne(e.getId(), WebSocketManager.taskBroadcastChannel, jsonThread);
.sendToOne(e.getId(), WebSocketManager.TASK_BROADCAST_CHANNEL, jsonThread);
} else if (Entity.TEAM.equals(e.getType())) {
// fetch all that are there in the team
List<EntityRelationshipRecord> records =
dao.relationshipDAO()
.findTo(e.getId().toString(), TEAM, Relationship.HAS.ordinal(), Entity.USER);
WebSocketManager.getInstance()
.sendToManyWithString(records, WebSocketManager.taskBroadcastChannel, jsonThread);
.sendToManyWithString(records, WebSocketManager.TASK_BROADCAST_CHANNEL, jsonThread);
}
});
return;
}
break;
case Conversation:
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.feedBroadcastChannel, jsonThread);
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
List<EntityLink> mentions;
if (thread.getPostsCount() == 0) {
mentions = MessageParser.getEntityLinks(thread.getMessage());
@ -165,29 +165,25 @@ public class ChangeEventHandler implements EventHandler {
mentions = MessageParser.getEntityLinks(latestPost.getMessage());
}
mentions.forEach(
(entityLink) -> {
entityLink -> {
String fqn = entityLink.getEntityFQN();
switch (entityLink.getEntityType()) {
case USER:
User user = dao.userDAO().findEntityByName(fqn);
WebSocketManager.getInstance()
.sendToOne(user.getId(), WebSocketManager.mentionChannel, jsonThread);
break;
case TEAM:
Team team = dao.teamDAO().findEntityByName(fqn);
// fetch all that are there in the team
List<EntityRelationshipRecord> records =
dao.relationshipDAO()
.findTo(team.getId().toString(), TEAM, Relationship.HAS.ordinal(), Entity.USER);
WebSocketManager.getInstance()
.sendToManyWithString(records, WebSocketManager.mentionChannel, jsonThread);
break;
if (USER.equals(entityLink.getEntityType())) {
User user = dao.userDAO().findEntityByName(fqn);
WebSocketManager.getInstance()
.sendToOne(user.getId(), WebSocketManager.MENTION_CHANNEL, jsonThread);
} else if (TEAM.equals(entityLink.getEntityType())) {
Team team = dao.teamDAO().findEntityByName(fqn);
// fetch all that are there in the team
List<EntityRelationshipRecord> records =
dao.relationshipDAO().findTo(team.getId().toString(), TEAM, Relationship.HAS.ordinal(), USER);
WebSocketManager.getInstance()
.sendToManyWithString(records, WebSocketManager.MENTION_CHANNEL, jsonThread);
}
});
return;
break;
case Announcement:
default:
return;
break;
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
@ -234,13 +230,10 @@ public class ChangeEventHandler implements EventHandler {
String entityType = entityReference.getType();
String entityFQN = entityReference.getFullyQualifiedName();
EventType eventType = null;
switch (changeType) {
case RestUtil.ENTITY_UPDATED:
eventType = ENTITY_UPDATED;
break;
case RestUtil.ENTITY_SOFT_DELETED:
eventType = ENTITY_SOFT_DELETED;
break;
if (RestUtil.ENTITY_UPDATED.equals(changeType)) {
eventType = ENTITY_UPDATED;
} else if (RestUtil.ENTITY_SOFT_DELETED.equals(changeType)) {
eventType = ENTITY_SOFT_DELETED;
}
return getChangeEvent(eventType, entityType, entityInterface)

View File

@ -2130,14 +2130,12 @@ public interface CollectionDAO {
@Override
default int listCount(ListFilter filter) {
String team = filter.getQueryParam("team");
Boolean isAdmin = null;
Boolean isBot = null;
String isBotStr = filter.getQueryParam("isBot");
String isAdminStr = filter.getQueryParam("isAdmin");
String mySqlCondition = filter.getCondition("ue");
String postgresCondition = filter.getCondition("ue");
if (isAdminStr != null) {
isAdmin = Boolean.parseBoolean(isAdminStr);
boolean isAdmin = Boolean.parseBoolean(isAdminStr);
if (isAdmin) {
mySqlCondition = String.format("%s AND JSON_EXTRACT(ue.json, '$.isAdmin') = TRUE ", mySqlCondition);
postgresCondition = String.format("%s AND ((ue.json#>'{isAdmin}')::boolean) = TRUE ", postgresCondition);
@ -2153,7 +2151,7 @@ public interface CollectionDAO {
}
}
if (isBotStr != null) {
isBot = Boolean.parseBoolean(isBotStr);
boolean isBot = Boolean.parseBoolean(isBotStr);
if (isBot) {
mySqlCondition = String.format("%s AND JSON_EXTRACT(ue.json, '$.isBot') = TRUE ", mySqlCondition);
postgresCondition = String.format("%s AND ((ue.json#>'{isBot}')::boolean) = TRUE ", postgresCondition);
@ -2167,7 +2165,7 @@ public interface CollectionDAO {
"%s AND ue.json#>'{isBot}' IS NULL OR ((ue.json#>'{isBot}')::boolean) = FALSE ", postgresCondition);
}
}
if (team == null && isAdmin == null && isBot == null) {
if (team == null && isAdminStr == null && isBotStr == null) {
return EntityDAO.super.listCount(filter);
}
return listCount(
@ -2177,14 +2175,12 @@ public interface CollectionDAO {
@Override
default List<String> listBefore(ListFilter filter, int limit, String before) {
String team = filter.getQueryParam("team");
Boolean isAdmin = null;
Boolean isBot = null;
String isBotStr = filter.getQueryParam("isBot");
String isAdminStr = filter.getQueryParam("isAdmin");
String mySqlCondition = filter.getCondition("ue");
String postgresCondition = filter.getCondition("ue");
if (isAdminStr != null) {
isAdmin = Boolean.parseBoolean(isAdminStr);
boolean isAdmin = Boolean.parseBoolean(isAdminStr);
if (isAdmin) {
mySqlCondition = String.format("%s AND JSON_EXTRACT(ue.json, '$.isAdmin') = TRUE ", mySqlCondition);
postgresCondition = String.format("%s AND ((ue.json#>'{isAdmin}')::boolean) = TRUE ", postgresCondition);
@ -2200,7 +2196,7 @@ public interface CollectionDAO {
}
}
if (isBotStr != null) {
isBot = Boolean.parseBoolean(isBotStr);
boolean isBot = Boolean.parseBoolean(isBotStr);
if (isBot) {
mySqlCondition = String.format("%s AND JSON_EXTRACT(ue.json, '$.isBot') = TRUE ", mySqlCondition);
postgresCondition = String.format("%s AND ((ue.json#>'{isBot}')::boolean) = TRUE ", postgresCondition);
@ -2214,7 +2210,7 @@ public interface CollectionDAO {
"%s AND ue.json#>'{isBot}' IS NULL OR ((ue.json#>'{isBot}')::boolean) = FALSE ", postgresCondition);
}
}
if (team == null && isAdmin == null && isBot == null) {
if (team == null && isAdminStr == null && isBotStr == null) {
return EntityDAO.super.listBefore(filter, limit, before);
}
return listBefore(
@ -2231,14 +2227,12 @@ public interface CollectionDAO {
@Override
default List<String> listAfter(ListFilter filter, int limit, String after) {
String team = filter.getQueryParam("team");
Boolean isAdmin = null;
Boolean isBot = null;
String isBotStr = filter.getQueryParam("isBot");
String isAdminStr = filter.getQueryParam("isAdmin");
String mySqlCondition = filter.getCondition("ue");
String postgresCondition = filter.getCondition("ue");
if (isAdminStr != null) {
isAdmin = Boolean.parseBoolean(isAdminStr);
boolean isAdmin = Boolean.parseBoolean(isAdminStr);
if (isAdmin) {
mySqlCondition = String.format("%s AND JSON_EXTRACT(ue.json, '$.isAdmin') = TRUE ", mySqlCondition);
postgresCondition = String.format("%s AND ((ue.json#>'{isAdmin}')::boolean) = TRUE ", postgresCondition);
@ -2254,7 +2248,7 @@ public interface CollectionDAO {
}
}
if (isBotStr != null) {
isBot = Boolean.parseBoolean(isBotStr);
boolean isBot = Boolean.parseBoolean(isBotStr);
if (isBot) {
mySqlCondition = String.format("%s AND JSON_EXTRACT(ue.json, '$.isBot') = TRUE ", mySqlCondition);
postgresCondition = String.format("%s AND ((ue.json#>'{isBot}')::boolean) = TRUE ", postgresCondition);
@ -2268,7 +2262,7 @@ public interface CollectionDAO {
"%s AND ue.json#>'{isBot}' IS NULL OR ((ue.json#>'{isBot}')::boolean) = FALSE ", postgresCondition);
}
}
if (team == null && isAdmin == null && isBot == null) {
if (team == null && isAdminStr == null && isBotStr == null) {
return EntityDAO.super.listAfter(filter, limit, after);
}
return listAfter(
@ -2703,7 +2697,7 @@ public interface CollectionDAO {
if (entityFqn != null) {
condition = String.format("%s AND entityFqn='%s' ", condition, entityFqn);
}
if (startTs != null & endTs != null) {
if (startTs != null && endTs != null) {
condition =
String.format(
"%s AND timestamp BETWEEN %d and %d ", condition, Long.parseLong(startTs), Long.parseLong(endTs));
@ -2737,7 +2731,7 @@ public interface CollectionDAO {
if (entityFqn != null) {
condition = String.format("%s AND entityFqn='%s' ", condition, entityFqn);
}
if (startTs != null & endTs != null) {
if (startTs != null && endTs != null) {
condition =
String.format(
"%s AND timestamp BETWEEN %d and %d ", condition, Long.parseLong(startTs), Long.parseLong(endTs));
@ -2769,7 +2763,7 @@ public interface CollectionDAO {
if (entityFqn != null) {
condition = String.format("%s AND entityFqn='%s' ", condition, entityFqn);
}
if (startTs != null & endTs != null) {
if (startTs != null && endTs != null) {
condition =
String.format(
"%s AND timestamp BETWEEN %d and %d ", condition, Long.parseLong(startTs), Long.parseLong(endTs));

View File

@ -98,6 +98,9 @@ public class TableRepository extends EntityRepository<Table> {
public static final String FIELD_RELATION_COLUMN_TYPE = "table.columns.column";
public static final String FIELD_RELATION_TABLE_TYPE = "table";
public static final String TABLE_PROFILE_EXTENSION = "table.tableProfile";
public static final String TABLE_SAMPLE_DATA_EXTENSION = "table.sampleData";
public static final String TABLE_PROFILER_CONFIG_EXTENSION = "table.tableProfilerConfig";
public TableRepository(CollectionDAO daoCollection) {
super(
@ -208,7 +211,7 @@ public class TableRepository extends EntityRepository<Table> {
daoCollection
.entityExtensionDAO()
.insert(tableId.toString(), "table.sampleData", "tableData", JsonUtils.pojoToJson(tableData));
.insert(tableId.toString(), TABLE_SAMPLE_DATA_EXTENSION, "tableData", JsonUtils.pojoToJson(tableData));
setFields(table, Fields.EMPTY_FIELDS);
return table.withSampleData(tableData);
}
@ -216,7 +219,7 @@ public class TableRepository extends EntityRepository<Table> {
@Transaction
public TableProfilerConfig getTableProfilerConfig(Table table) throws IOException {
return JsonUtils.readValue(
daoCollection.entityExtensionDAO().getExtension(table.getId().toString(), "table.tableProfilerConfig"),
daoCollection.entityExtensionDAO().getExtension(table.getId().toString(), TABLE_PROFILER_CONFIG_EXTENSION),
TableProfilerConfig.class);
}
@ -242,7 +245,7 @@ public class TableRepository extends EntityRepository<Table> {
.entityExtensionDAO()
.insert(
tableId.toString(),
"table.tableProfilerConfig",
TABLE_PROFILER_CONFIG_EXTENSION,
"tableProfilerConfig",
JsonUtils.pojoToJson(tableProfilerConfig));
setFields(table, Fields.EMPTY_FIELDS);
@ -254,7 +257,7 @@ public class TableRepository extends EntityRepository<Table> {
// Validate the request content
Table table = dao.findEntityById(tableId);
daoCollection.entityExtensionDAO().delete(tableId.toString(), "table.tableProfilerConfig");
daoCollection.entityExtensionDAO().delete(tableId.toString(), TABLE_PROFILER_CONFIG_EXTENSION);
setFields(table, Fields.EMPTY_FIELDS);
return table;
}
@ -273,14 +276,14 @@ public class TableRepository extends EntityRepository<Table> {
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getExtensionAtTimestamp(tableId.toString(), "table.tableProfile", tableProfile.getTimestamp()),
.getExtensionAtTimestamp(tableId.toString(), TABLE_PROFILE_EXTENSION, tableProfile.getTimestamp()),
TableProfile.class);
if (storedTableProfile != null) {
daoCollection
.entityExtensionTimeSeriesDao()
.update(
tableId.toString(),
"table.tableProfile",
TABLE_PROFILE_EXTENSION,
JsonUtils.pojoToJson(tableProfile),
tableProfile.getTimestamp());
} else {
@ -289,7 +292,7 @@ public class TableRepository extends EntityRepository<Table> {
.insert(
tableId.toString(),
table.getFullyQualifiedName(),
"table.tableProfile",
TABLE_PROFILE_EXTENSION,
"tableProfile",
JsonUtils.pojoToJson(tableProfile));
setFields(table, Fields.EMPTY_FIELDS);
@ -305,12 +308,12 @@ public class TableRepository extends EntityRepository<Table> {
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getExtensionAtTimestamp(tableId.toString(), "table.tableProfile", timestamp),
.getExtensionAtTimestamp(tableId.toString(), TABLE_PROFILE_EXTENSION, timestamp),
TableProfile.class);
if (storedTableProfile != null) {
daoCollection
.entityExtensionTimeSeriesDao()
.deleteAtTimestamp(tableId.toString(), "table.tableProfile", timestamp);
.deleteAtTimestamp(tableId.toString(), TABLE_PROFILE_EXTENSION, timestamp);
table.setTableProfile(storedTableProfile);
return table;
}
@ -991,12 +994,15 @@ public class TableRepository extends EntityRepository<Table> {
private TableData getSampleData(Table table) throws IOException {
return JsonUtils.readValue(
daoCollection.entityExtensionDAO().getExtension(table.getId().toString(), "table.sampleData"), TableData.class);
daoCollection.entityExtensionDAO().getExtension(table.getId().toString(), TABLE_SAMPLE_DATA_EXTENSION),
TableData.class);
}
private TableProfile getTableProfile(Table table) throws IOException {
return JsonUtils.readValue(
daoCollection.entityExtensionTimeSeriesDao().getLatestExtension(table.getId().toString(), "table.tableProfile"),
daoCollection
.entityExtensionTimeSeriesDao()
.getLatestExtension(table.getId().toString(), TABLE_PROFILE_EXTENSION),
TableProfile.class);
}

View File

@ -223,6 +223,11 @@ public class TeamRepository extends EntityRepository<Team> {
}
List<Team> children = getTeams(childrenRefs);
switch (team.getTeamType()) {
case GROUP:
if (!children.isEmpty()) {
throw new IllegalArgumentException(CatalogExceptionMessage.createGroup());
}
break;
case DEPARTMENT:
validateChildren(team, children, DEPARTMENT);
break;
@ -249,6 +254,7 @@ public class TeamRepository extends EntityRepository<Team> {
}
List<Team> parents = getTeams(parentRefs);
switch (team.getTeamType()) {
case GROUP:
case DEPARTMENT:
validateParents(team, parents, DEPARTMENT, DIVISION, BUSINESS_UNIT, ORGANIZATION);
break;

View File

@ -3,6 +3,7 @@ package org.openmetadata.catalog.jdbi3;
import static org.openmetadata.catalog.Entity.TEST_DEFINITION;
import java.io.IOException;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.resources.dqtests.TestDefinitionResource;
import org.openmetadata.catalog.tests.TestDefinition;
import org.openmetadata.catalog.type.EntityReference;
@ -25,7 +26,7 @@ public class TestDefinitionRepository extends EntityRepository<TestDefinition> {
@Override
public TestDefinition setFields(TestDefinition entity, EntityUtil.Fields fields) throws IOException {
entity.setOwner(fields.contains("owner") ? getOwner(entity) : null);
entity.setOwner(fields.contains(Entity.FIELD_OWNER) ? getOwner(entity) : null);
return entity;
}

View File

@ -392,9 +392,7 @@ public class UserResource extends EntityResource<User, UserRepository> {
throws IOException {
User user = dao.get(uriInfo, id, Fields.EMPTY_FIELDS);
if (!user.getIsBot()) {
throw new IllegalArgumentException("Generating JWT token is only supported for bot users");
}
authorizeGenerateJWT(user);
authorizer.authorizeAdmin(securityContext, false);
JWTAuthMechanism jwtAuthMechanism =
jwtTokenGenerator.generateJWTToken(user, generateTokenRequest.getJWTTokenExpiry());
@ -427,9 +425,7 @@ public class UserResource extends EntityResource<User, UserRepository> {
throws IOException {
User user = dao.get(uriInfo, id, Fields.EMPTY_FIELDS);
if (!user.getIsBot()) {
throw new IllegalArgumentException("Generating JWT token is only supported for bot users");
}
authorizeGenerateJWT(user);
authorizer.authorizeAdmin(securityContext, false);
JWTAuthMechanism jwtAuthMechanism = new JWTAuthMechanism().withJWTToken(StringUtils.EMPTY);
AuthenticationMechanism authenticationMechanism =
@ -460,7 +456,7 @@ public class UserResource extends EntityResource<User, UserRepository> {
throws IOException {
User user = dao.get(uriInfo, id, new Fields(List.of("authenticationMechanism")));
if (!user.getIsBot()) {
if (!Boolean.TRUE.equals(user.getIsBot())) {
throw new IllegalArgumentException("JWT token is only supported for bot users");
}
authorizer.authorizeAdmin(securityContext, false);
@ -567,4 +563,10 @@ public class UserResource extends EntityResource<User, UserRepository> {
.withTeams(EntityUtil.toEntityReferences(create.getTeams(), Entity.TEAM))
.withRoles(EntityUtil.toEntityReferences(create.getRoles(), Entity.ROLE));
}
private void authorizeGenerateJWT(User user) {
if (!Boolean.TRUE.equals(user.getIsBot())) {
throw new IllegalArgumentException("Generating JWT token is only supported for bot users");
}
}
}

View File

@ -59,9 +59,9 @@ public class DefaultAuthorizer implements Authorizer {
this.testUsers = new HashSet<>(config.getTestPrincipals());
this.principalDomain = config.getPrincipalDomain();
SubjectCache.getInstance().initialize();
PolicyCache.getInstance().initialize();
RoleCache.getInstance().initialize();
SubjectCache.initialize();
PolicyCache.initialize();
RoleCache.initialize();
LOG.debug("Admin users: {}", adminUsers);
initializeUsers();
}

View File

@ -47,7 +47,7 @@ public class PolicyCache {
}
/** To be called during application startup by Default Authorizer */
public void initialize() {
public static void initialize() {
if (!INITIALIZED) {
POLICY_CACHE = CacheBuilder.newBuilder().maximumSize(100).build(new PolicyLoader());
POLICY_REPOSITORY = Entity.getEntityRepository(Entity.POLICY);
@ -88,7 +88,7 @@ public class PolicyCache {
return rules;
}
public void cleanUp() {
public static void cleanUp() {
POLICY_CACHE.cleanUp();
INITIALIZED = false;
}

View File

@ -31,7 +31,7 @@ import org.openmetadata.catalog.util.EntityUtil.Fields;
/** Subject context used for Access Control Policies */
@Slf4j
public class RoleCache {
private static RoleCache INSTANCE = new RoleCache();
private static final RoleCache INSTANCE = new RoleCache();
private static volatile boolean INITIALIZED = false;
protected static LoadingCache<UUID, Role> ROLE_CACHE;
private static EntityRepository<Role> ROLE_REPOSITORY;
@ -42,7 +42,7 @@ public class RoleCache {
}
/** To be called only once during the application start from DefaultAuthorizer */
public void initialize() {
public static void initialize() {
if (!INITIALIZED) {
ROLE_CACHE = CacheBuilder.newBuilder().maximumSize(100).build(new RoleLoader());
ROLE_REPOSITORY = Entity.getEntityRepository(Entity.ROLE);
@ -68,9 +68,6 @@ public class RoleCache {
}
static class RoleLoader extends CacheLoader<UUID, Role> {
private static final EntityRepository<Role> ROLE_REPOSITORY = Entity.getEntityRepository(Entity.ROLE);
private static final Fields FIELDS = ROLE_REPOSITORY.getFields("policies");
@Override
public Role load(@CheckForNull UUID roleId) throws IOException {
Role role = ROLE_REPOSITORY.get(null, roleId.toString(), FIELDS);

View File

@ -34,7 +34,7 @@ import org.openmetadata.catalog.util.EntityUtil.Fields;
/** Subject context used for Access Control Policies */
@Slf4j
public class SubjectCache {
private static SubjectCache INSTANCE = new SubjectCache();
private static final SubjectCache INSTANCE = new SubjectCache();
private static volatile boolean INITIALIZED = false;
protected static LoadingCache<String, SubjectContext> USER_CACHE;
@ -46,7 +46,7 @@ public class SubjectCache {
protected static Fields TEAM_FIELDS;
// Expected to be called only once from the DefaultAuthorizer
public void initialize() {
public static void initialize() {
if (!INITIALIZED) {
USER_CACHE =
CacheBuilder.newBuilder().maximumSize(1000).expireAfterAccess(1, TimeUnit.MINUTES).build(new UserLoader());
@ -80,7 +80,7 @@ public class SubjectCache {
}
}
public void cleanUp() {
public static void cleanUp() {
USER_CACHE.invalidateAll();
TEAM_CACHE.invalidateAll();
INITIALIZED = false;

View File

@ -27,7 +27,7 @@ public class HeaderRequestWrapper extends HttpServletRequestWrapper {
super(request);
}
private Map<String, String> headerMap = new HashMap<>();
private final Map<String, String> headerMap = new HashMap<>();
public void addHeader(String name, String value) {
headerMap.put(name, value);

View File

@ -26,14 +26,13 @@ import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.catalog.security.AuthenticationConfiguration;
import org.openmetadata.catalog.security.AuthorizerConfiguration;
import org.openmetadata.catalog.security.JwtFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class SocketAddressFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(SocketAddressFilter.class);
private JwtFilter jwtFilter;
private final boolean enableSecureSocketConnection;

View File

@ -1,5 +1,7 @@
package org.openmetadata.catalog.socket;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import io.socket.engineio.server.EngineIoServer;
import io.socket.engineio.server.EngineIoServerOptions;
import io.socket.socketio.server.SocketIoNamespace;
@ -7,61 +9,46 @@ import io.socket.socketio.server.SocketIoServer;
import io.socket.socketio.server.SocketIoSocket;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.catalog.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class WebSocketManager {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class);
private static WebSocketManager INSTANCE;
private final EngineIoServer mEngineIoServer;
private final SocketIoServer mSocketIoServer;
public static final String feedBroadcastChannel = "activityFeed";
public static final String taskBroadcastChannel = "taskChannel";
public static final String mentionChannel = "mentionChannel";
@Getter private final EngineIoServer engineIoServer;
@Getter private final SocketIoServer socketIoServer;
public static final String FEED_BROADCAST_CHANNEL = "activityFeed";
public static final String TASK_BROADCAST_CHANNEL = "taskChannel";
public static final String MENTION_CHANNEL = "mentionChannel";
private final Map<UUID, Map<String, SocketIoSocket>> activityFeedEndpoints = new ConcurrentHashMap<>();
private WebSocketManager(EngineIoServerOptions eiOptions) {
mEngineIoServer = new EngineIoServer(eiOptions);
mSocketIoServer = new SocketIoServer(mEngineIoServer);
engineIoServer = new EngineIoServer(eiOptions);
socketIoServer = new SocketIoServer(engineIoServer);
initializeHandlers();
}
private void initializeHandlers() {
SocketIoNamespace ns = mSocketIoServer.namespace("/");
SocketIoNamespace ns = socketIoServer.namespace("/");
// On Connection
ns.on(
"connection",
args -> {
SocketIoSocket socket = (SocketIoSocket) args[0];
final String userId;
String tempId;
try {
tempId = socket.getInitialHeaders().get("UserId").get(0);
} catch (Exception ex) {
tempId = socket.getInitialQuery().get("userId");
}
userId = tempId;
List<String> remoteAddress = socket.getInitialHeaders().get("RemoteAddress");
Map<String, List<String>> initialHeaders = socket.getInitialHeaders();
List<String> userIdHeaders = listOrEmpty(initialHeaders.get("UserId"));
String userId = userIdHeaders.isEmpty() ? socket.getInitialQuery().get("userId") : userIdHeaders.get(0);
if (userId != null && !userId.equals("")) {
LOG.info(
"Client :"
+ userId
+ "with Remote Address :"
+ socket.getInitialHeaders().get("RemoteAddress")
+ "connected."
+ socket.getInitialQuery());
LOG.info("Client : {} with Remote Address:{} connected {} ", userId, remoteAddress, initialHeaders);
// On Socket Disconnect
socket.on(
"disconnect",
args1 -> {
LOG.info(
"Client from:"
+ userId
+ "with Remote Address :"
+ socket.getInitialHeaders().get("RemoteAddress")
+ " disconnected.");
LOG.info("Client from: {} with Remote Address:{} disconnected.", userId, remoteAddress);
UUID id = UUID.fromString(userId);
Map<String, SocketIoSocket> allUserConnection = activityFeedEndpoints.get(id);
allUserConnection.remove(socket.getId());
@ -71,20 +58,14 @@ public class WebSocketManager {
"connect_error",
args1 ->
LOG.error(
"Connection ERROR for user:"
+ userId
+ "with Remote Address :"
+ socket.getInitialHeaders().get("RemoteAddress")
+ " disconnected."));
"Connection ERROR for user:{} with Remote Address:{} disconnected", userId, remoteAddress));
socket.on(
"connect_failed",
args1 ->
LOG.error(
"Connection failed ERROR for user:"
+ userId
+ "with Remote Address :"
+ socket.getInitialHeaders().get("RemoteAddress")
+ " disconnected."));
"Connection failed ERROR for user: {} with Remote Address: {} disconnected",
userId,
remoteAddress));
UUID id = UUID.fromString(userId);
Map<String, SocketIoSocket> userSocketConnections;
@ -101,14 +82,6 @@ public class WebSocketManager {
return INSTANCE;
}
public SocketIoServer getSocketIoServer() {
return mSocketIoServer;
}
public EngineIoServer getEngineIoServer() {
return mEngineIoServer;
}
public Map<UUID, Map<String, SocketIoSocket>> getActivityFeedEndpoints() {
return activityFeedEndpoints;
}

View File

@ -49,7 +49,6 @@ import org.openmetadata.catalog.type.FieldChange;
public final class ChangeEventParser {
public static final String FEED_ADD_MARKER = "<!add>";
public static final String FEED_REMOVE_MARKER = "<!remove>";
public static final String SLACK_STRIKE_MARKER = "~%s~ ";
public static final String FEED_BOLD = "**%s**";
public static final String SLACK_BOLD = "*%s* ";
public static final String FEED_SPAN_ADD = "<span class=\"diff-added\">";
@ -94,12 +93,12 @@ public final class ChangeEventParser {
Map<EntityLink, String> messages =
getFormattedMessages(PUBLISH_TO.SLACK, event.getChangeDescription(), (EntityInterface) event.getEntity());
List<SlackAttachment> attachmentList = new ArrayList<>();
for (var entryset : messages.entrySet()) {
for (var entry : messages.entrySet()) {
SlackAttachment attachment = new SlackAttachment();
List<String> mark = new ArrayList<>();
mark.add("text");
attachment.setMarkdownIn(mark);
attachment.setText(entryset.getValue());
attachment.setText(entry.getValue());
attachmentList.add(attachment);
}
slackMessage.setAttachments(attachmentList.toArray(new SlackAttachment[0]));
@ -352,9 +351,8 @@ public final class ChangeEventParser {
if (oldValue == null || oldValue.toString().isEmpty()) {
return String.format(
"Updated " + (publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD) + " to %s",
updatedField,
getFieldValue(newValue));
"Updated %s to %s",
publishTo == PUBLISH_TO.FEED ? FEED_BOLD : SLACK_BOLD, updatedField, getFieldValue(newValue));
} else if (updatedField.contains("tags") || updatedField.contains(FIELD_OWNER)) {
return getPlainTextUpdateMessage(publishTo, updatedField, getFieldValue(oldValue), getFieldValue(newValue));
}

View File

@ -78,7 +78,7 @@ public final class RestUtil {
}
public static String replaceSpaces(String s) {
s = s.replaceAll(" ", "%20");
s = s.replace(" ", "%20");
return s;
}

View File

@ -71,6 +71,7 @@ public class AirflowConfigValidationImpl implements ConstraintValidator<AirflowC
case OPENMETADATA:
OpenMetadataJWTClientConfig openMetadataJWTClientConfig = authConfig.getOpenmetadata();
checkRequiredField(JWT_TOKEN, openMetadataJWTClientConfig.getJwtToken(), authProvider, message);
break;
case NO_AUTH:
break;
default:

View File

@ -96,8 +96,8 @@ public abstract class CatalogApplicationTest {
APP.after();
APP.getEnvironment().getApplicationContext().getServer().stop();
}
SubjectCache.getInstance().cleanUp();
PolicyCache.getInstance().cleanUp();
SubjectCache.cleanUp();
PolicyCache.cleanUp();
RoleCache.cleanUp();
}