[Backend] Websocket implementation added (#5440)

* [Backend] Websocket implementation added

* [Backend] Websocket implementation added [google formatter fix]

* [backend] Updated Reiew Comments

* [backend] Updated CheckStyle
This commit is contained in:
mohitdeuex 2022-06-15 10:14:47 +05:30 committed by GitHub
parent 0a9e36f1b4
commit be18feb928
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 349 additions and 16 deletions

View File

@ -390,6 +390,21 @@
<groupId>org.jeasy</groupId>
<artifactId>easy-rules-core</artifactId>
</dependency>
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-server</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>io.socket</groupId>
<artifactId>engine.io-server-jetty</artifactId>
<version>6.2.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>9.4.46.v20220331</version>
</dependency>
</dependencies>
<build>

View File

@ -32,6 +32,8 @@ import io.federecio.dropwizard.swagger.SwaggerBundleConfiguration;
import io.github.maksymdolgykh.dropwizard.micrometer.MicrometerBundle;
import io.github.maksymdolgykh.dropwizard.micrometer.MicrometerHttpFilter;
import io.github.maksymdolgykh.dropwizard.micrometer.MicrometerJdbiTimingCollector;
import io.socket.engineio.server.EngineIoServerOptions;
import io.socket.engineio.server.JettyWebSocketHandler;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.temporal.ChronoUnit;
@ -39,13 +41,18 @@ import java.util.EnumSet;
import java.util.Optional;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
import javax.servlet.ServletException;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jetty.http.pathmap.ServletPathSpec;
import org.eclipse.jetty.servlet.ErrorPageErrorHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.server.ServerProperties;
import org.jdbi.v3.core.Jdbi;
@ -74,12 +81,17 @@ import org.openmetadata.catalog.security.policyevaluator.PolicyEvaluator;
import org.openmetadata.catalog.security.policyevaluator.RoleEvaluator;
import org.openmetadata.catalog.slack.SlackPublisherConfiguration;
import org.openmetadata.catalog.slack.SlackWebhookEventPublisher;
import org.openmetadata.catalog.socket.FeedServlet;
import org.openmetadata.catalog.socket.SocketAddressFilter;
import org.openmetadata.catalog.socket.WebSocketManager;
/** Main catalog application */
@Slf4j
public class CatalogApplication extends Application<CatalogApplicationConfig> {
private Authorizer authorizer;
private SocketAddressFilter socketAddressFilter = null;
@Override
public void run(CatalogApplicationConfig catalogConfig, Environment environment)
throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
@ -151,6 +163,7 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
FilterRegistration.Dynamic micrometerFilter =
environment.servlets().addFilter("MicrometerHttpFilter", new MicrometerHttpFilter());
micrometerFilter.addMappingForUrlPatterns(EnumSet.allOf(DispatcherType.class), true, "/*");
intializeWebsockets(environment);
}
@SneakyThrows
@ -201,7 +214,11 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
InstantiationException {
AuthorizerConfiguration authorizerConf = catalogConfig.getAuthorizerConfiguration();
AuthenticationConfiguration authenticationConfiguration = catalogConfig.getAuthenticationConfiguration();
// to authenticate request while opening websocket connections
if (authorizerConf != null) {
if (authorizerConf.getEnableSecureSocketConnection()) {
socketAddressFilter = new SocketAddressFilter(authenticationConfiguration, authorizerConf);
}
authorizer =
Class.forName(authorizerConf.getClassName()).asSubclass(Authorizer.class).getConstructor().newInstance();
String filterClazzName = authorizerConf.getContainerRequestFilter();
@ -261,6 +278,29 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
environment.getApplicationContext().setErrorHandler(eph);
}
private void intializeWebsockets(Environment environment) {
EngineIoServerOptions eioOptions = EngineIoServerOptions.newFromDefault();
eioOptions.setAllowedCorsOrigins(null);
WebSocketManager.WebSocketManagerBuilder.build(eioOptions);
environment.getApplicationContext().setContextPath("/");
if (socketAddressFilter != null)
environment
.getApplicationContext()
.addFilter(new FilterHolder(socketAddressFilter), "/api/v1/push/feed/*", EnumSet.of(DispatcherType.REQUEST));
environment.getApplicationContext().addServlet(new ServletHolder(new FeedServlet()), "/api/v1/push/feed/*");
// Upgrade connection to websocket from Http
try {
WebSocketUpgradeFilter webSocketUpgradeFilter =
WebSocketUpgradeFilter.configureContext(environment.getApplicationContext());
webSocketUpgradeFilter.addMapping(
new ServletPathSpec("/api/v1/push/feed/*"),
(servletUpgradeRequest, servletUpgradeResponse) ->
new JettyWebSocketHandler(WebSocketManager.getInstance().getEngineIoServer()));
} catch (ServletException ex) {
LOG.error("Websocket Upgrade Filter error : ", ex.getMessage());
}
}
public static void main(String[] args) throws Exception {
CatalogApplication catalogApplication = new CatalogApplication();
catalogApplication.run(args);

View File

@ -18,6 +18,7 @@ import static org.openmetadata.catalog.type.EventType.ENTITY_SOFT_DELETED;
import static org.openmetadata.catalog.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -36,6 +37,7 @@ import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.FeedRepository;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.socket.WebSocketManager;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
@ -48,10 +50,12 @@ import org.openmetadata.catalog.util.RestUtil;
public class ChangeEventHandler implements EventHandler {
private CollectionDAO dao;
private FeedRepository feedDao;
private ObjectMapper mapper;
public void init(CatalogApplicationConfig config, Jdbi jdbi) {
this.dao = jdbi.onDemand(CollectionDAO.class);
this.feedDao = new FeedRepository(dao);
this.mapper = new ObjectMapper();
}
public Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
@ -100,6 +104,8 @@ public class ChangeEventHandler implements EventHandler {
}
EntityLink about = EntityLink.parse(thread.getAbout());
feedDao.create(thread, entity.getId(), owner, about);
String json = mapper.writeValueAsString(thread);
WebSocketManager.getInstance().broadCastMessageToClients(json);
}
}
}

View File

@ -25,6 +25,7 @@ public class AuthorizerConfiguration {
@NotEmpty @Getter @Setter private Set<String> botPrincipals;
@NotEmpty @Getter @Setter private String principalDomain;
@NotEmpty @Getter @Setter private Boolean enforcePrincipalDomain;
@NotEmpty @Getter @Setter private Boolean enableSecureSocketConnection;
@Override
public String toString() {

View File

@ -56,7 +56,6 @@ public class JwtFilter implements ContainerRequestFilter {
public static final String AUTHORIZATION_HEADER = "Authorization";
public static final String TOKEN_PREFIX = "Bearer";
public static final String BOT_CLAIM = "isBot";
private List<String> jwtPrincipalClaims;
private JwkProvider jwkProvider;
private String principalDomain;
@ -104,10 +103,33 @@ public class JwtFilter implements ContainerRequestFilter {
String tokenFromHeader = extractToken(headers);
LOG.debug("Token from header:{}", tokenFromHeader);
DecodedJWT jwt = validateAndReturnDecodedJwtToken(tokenFromHeader);
Map<String, Claim> claims = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
claims.putAll(jwt.getClaims());
String userName = validateAndReturnUsername(claims);
// validate bot token
if (claims.containsKey(BOT_CLAIM) && claims.get(BOT_CLAIM).asBoolean()) {
validateBotToken(tokenFromHeader, userName);
}
// Setting Security Context
CatalogPrincipal catalogPrincipal = new CatalogPrincipal(userName);
String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
CatalogSecurityContext catalogSecurityContext =
new CatalogSecurityContext(catalogPrincipal, scheme, SecurityContext.DIGEST_AUTH);
LOG.debug("SecurityContext {}", catalogSecurityContext);
requestContext.setSecurityContext(catalogSecurityContext);
}
@SneakyThrows
public DecodedJWT validateAndReturnDecodedJwtToken(String token) {
// Decode JWT Token
DecodedJWT jwt;
try {
jwt = JWT.decode(tokenFromHeader);
jwt = JWT.decode(token);
} catch (JWTDecodeException e) {
throw new AuthenticationException("Invalid token", e);
}
@ -127,10 +149,12 @@ public class JwtFilter implements ContainerRequestFilter {
} catch (RuntimeException runtimeException) {
throw new AuthenticationException("Invalid token");
}
return jwt;
}
@SneakyThrows
public String validateAndReturnUsername(Map<String, Claim> claims) {
// Get username from JWT token
Map<String, Claim> claims = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
claims.putAll(jwt.getClaims());
String jwtClaim =
jwtPrincipalClaims.stream()
.filter(claims::containsKey)
@ -141,6 +165,7 @@ public class JwtFilter implements ContainerRequestFilter {
() ->
new AuthenticationException(
"Invalid JWT token, none of the following claims are present " + jwtPrincipalClaims));
String userName;
String domain;
if (jwtClaim.contains("@")) {
@ -158,18 +183,7 @@ public class JwtFilter implements ContainerRequestFilter {
String.format("Not Authorized! Email does not match the principal domain %s", principalDomain));
}
}
// validate bot token
if (claims.containsKey(BOT_CLAIM) && claims.get(BOT_CLAIM).asBoolean()) {
validateBotToken(tokenFromHeader, userName);
}
// Setting Security Context
CatalogPrincipal catalogPrincipal = new CatalogPrincipal(userName);
String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
CatalogSecurityContext catalogSecurityContext =
new CatalogSecurityContext(catalogPrincipal, scheme, SecurityContext.DIGEST_AUTH);
LOG.debug("SecurityContext {}", catalogSecurityContext);
requestContext.setSecurityContext(catalogSecurityContext);
return userName;
}
protected static String extractToken(MultivaluedMap<String, String> headers) {
@ -185,6 +199,18 @@ public class JwtFilter implements ContainerRequestFilter {
throw new AuthenticationException("Not Authorized! Token not present");
}
public static String extractToken(String tokenFromHeader) {
LOG.debug("Request Token:{}", tokenFromHeader);
if (Strings.isNullOrEmpty(tokenFromHeader)) {
throw new AuthenticationException("Not Authorized! Token not present");
}
// Extract the bearer token
if (tokenFromHeader.startsWith(TOKEN_PREFIX)) {
return tokenFromHeader.substring(TOKEN_PREFIX.length() + 1);
}
throw new AuthenticationException("Not Authorized! Token not present");
}
private void validateBotToken(String tokenFromHeader, String userName) throws IOException {
EntityRepository<User> userRepository = Entity.getEntityRepository(Entity.USER);
User user = userRepository.getByName(null, userName, new EntityUtil.Fields(List.of("authenticationMechanism")));

View File

@ -0,0 +1,40 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.socket;
import java.io.IOException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/api/v1/push/feed/*")
public class FeedServlet extends HttpServlet {
public FeedServlet() {}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException {
WebSocketManager.getInstance()
.getEngineIoServer()
.handleRequest(
new HttpServletRequestWrapper(request) {
@Override
public boolean isAsyncSupported() {
return true;
}
},
response);
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.socket;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
public class HeaderRequestWrapper extends HttpServletRequestWrapper {
public HeaderRequestWrapper(HttpServletRequest request) {
super(request);
}
private Map<String, String> headerMap = new HashMap<>();
public void addHeader(String name, String value) {
headerMap.put(name, value);
}
@Override
public String getHeader(String name) {
String headerValue = super.getHeader(name);
if (headerMap.containsKey(name)) {
headerValue = headerMap.get(name);
}
return headerValue;
}
@Override
public Enumeration<String> getHeaderNames() {
List<String> names = Collections.list(super.getHeaderNames());
names.addAll(headerMap.keySet());
return Collections.enumeration(names);
}
@Override
public Enumeration<String> getHeaders(String name) {
List<String> values = Collections.list(super.getHeaders(name));
if (headerMap.containsKey(name)) {
values.add(headerMap.get(name));
}
return Collections.enumeration(values);
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.socket;
import com.auth0.jwt.interfaces.Claim;
import com.auth0.jwt.interfaces.DecodedJWT;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import org.openmetadata.catalog.security.AuthenticationConfiguration;
import org.openmetadata.catalog.security.AuthorizerConfiguration;
import org.openmetadata.catalog.security.JwtFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketAddressFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(SocketAddressFilter.class);
private JwtFilter jwtFilter;
public SocketAddressFilter(
AuthenticationConfiguration authenticationConfiguration, AuthorizerConfiguration authorizerConf) {
jwtFilter = new JwtFilter(authenticationConfiguration, authorizerConf);
}
@Override
public void destroy() {}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
String tokenWithType = httpServletRequest.getHeader("Authorization");
HeaderRequestWrapper requestWrapper = new HeaderRequestWrapper(httpServletRequest);
requestWrapper.addHeader("RemoteAddress", request.getRemoteAddr());
requestWrapper.addHeader("Authorization", tokenWithType);
String token = JwtFilter.extractToken(tokenWithType);
// validate token
DecodedJWT jwt = jwtFilter.validateAndReturnDecodedJwtToken(token);
// validate Domain and Username
Map<String, Claim> claims = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
claims.putAll(jwt.getClaims());
jwtFilter.validateAndReturnUsername(claims);
// Goes to default servlet.
chain.doFilter(requestWrapper, response);
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {}
}

View File

@ -0,0 +1,71 @@
package org.openmetadata.catalog.socket;
import io.socket.engineio.server.EngineIoServer;
import io.socket.engineio.server.EngineIoServerOptions;
import io.socket.socketio.server.SocketIoNamespace;
import io.socket.socketio.server.SocketIoServer;
import io.socket.socketio.server.SocketIoSocket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketManager {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class);
private static WebSocketManager INSTANCE;
private final EngineIoServer mEngineIoServer;
private final SocketIoServer mSocketIoServer;
private final String feedBroadcastChannel = "activityFeed";
private final Map<String, SocketIoSocket> activityFeedEndpoints = new ConcurrentHashMap<>();
private WebSocketManager(EngineIoServerOptions eiOptions) {
mEngineIoServer = new EngineIoServer(eiOptions);
mSocketIoServer = new SocketIoServer(mEngineIoServer);
intilizateHandlers();
}
private void intilizateHandlers() {
SocketIoNamespace ns = mSocketIoServer.namespace("/");
// On Connection
ns.on(
"connection",
args -> {
SocketIoSocket socket = (SocketIoSocket) args[0];
LOG.info(
"Client :"
+ socket.getId()
+ "with Remote Address :"
+ socket.getInitialHeaders().get("RemoteAddress")
+ "connected.");
activityFeedEndpoints.put(socket.getId(), socket);
});
}
public static WebSocketManager getInstance() {
return INSTANCE;
}
public SocketIoServer getSocketIoServer() {
return mSocketIoServer;
}
public EngineIoServer getEngineIoServer() {
return mEngineIoServer;
}
public Map<String, SocketIoSocket> getActivityFeedEndpoints() {
return activityFeedEndpoints;
}
public void broadCastMessageToClients(String message) {
for (Map.Entry<String, SocketIoSocket> endpoints : activityFeedEndpoints.entrySet()) {
endpoints.getValue().send(feedBroadcastChannel, message);
}
}
public static class WebSocketManagerBuilder {
public static void build(EngineIoServerOptions eiOptions) {
INSTANCE = new WebSocketManager(eiOptions);
}
}
}

View File

@ -129,6 +129,8 @@ authorizerConfiguration:
botPrincipals:
- "ingestion-bot"
principalDomain: "open-metadata.org"
enforcePrincipalDomain: false
enableSecureSocketConnection: false
authenticationConfiguration:
provider: "openID"

View File

@ -133,6 +133,7 @@ authorizerConfiguration:
botPrincipals: ${AUTHORIZER_INGESTION_PRINCIPALS:-[ingestion-bot]}
principalDomain: ${AUTHORIZER_PRINCIPAL_DOMAIN:-"openmetadata.org"}
enforcePrincipalDomain: ${AUTHORIZER_ENFORCE_PRINCIPAL_DOMAIN:-false}
enableSecureSocketConnection : ${AUTHORIZER_ENABLE_SECURE_SOCKET:-false}
authenticationConfiguration:
provider: ${AUTHENTICATION_PROVIDER:-no-auth}

View File

@ -65,6 +65,7 @@ services:
AUTHORIZER_INGESTION_PRINCIPALS: ${AUTHORIZER_INGESTION_PRINCIPALS:-[ingestion-bot]}
AUTHORIZER_PRINCIPAL_DOMAIN: ${AUTHORIZER_PRINCIPAL_DOMAIN:-""}
AUTHORIZER_ENFORCE_PRINCIPAL_DOMAIN: ${AUTHORIZER_ENFORCE_PRINCIPAL_DOMAIN:-false}
AUTHORIZER_ENABLE_SECURE_SOCKET: ${AUTHORIZER_ENABLE_SECURE_SOCKET:-false}
AUTHENTICATION_PROVIDER: ${AUTHENTICATION_PROVIDER:-no-auth}
CUSTOM_OIDC_AUTHENTICATION_PROVIDER_NAME: ${CUSTOM_OIDC_AUTHENTICATION_PROVIDER_NAME:-""}
AUTHENTICATION_PUBLIC_KEYS: ${AUTHENTICATION_PUBLIC_KEY:-[https://www.googleapis.com/oauth2/v3/certs]}

View File

@ -54,6 +54,7 @@ services:
AUTHORIZER_INGESTION_PRINCIPALS: ${AUTHORIZER_INGESTION_PRINCIPAL:-[ingestion-bot]}
AUTHORIZER_PRINCIPAL_DOMAIN: ${AUTHORIZER_PRINCIPAL_DOMAIN:-""}
AUTHORIZER_ENFORCE_PRINCIPAL_DOMAIN: ${AUTHORIZER_ENFORCE_PRINCIPAL_DOMAIN:-false}
AUTHORIZER_ENABLE_SECURE_SOCKET: ${AUTHORIZER_ENABLE_SECURE_SOCKET:-false}
AUTHENTICATION_PROVIDER: ${AUTHENTICATION_PROVIDER:-no-auth}
CUSTOM_OIDC_AUTHENTICATION_PROVIDER_NAME: ${CUSTOM_OIDC_AUTHENTICATION_PROVIDER_NAME:-""}
AUTHENTICATION_PUBLIC_KEYS: ${AUTHENTICATION_PUBLIC_KEY:-[https://www.googleapis.com/oauth2/v3/certs]}