| 
									
										
										
										
											2021-08-20 10:58:07 -07:00
										 |  |  | package controllers;
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | import com.fasterxml.jackson.databind.JsonNode;
 | 
					
						
							|  |  |  | import com.typesafe.config.Config;
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  | import org.apache.kafka.clients.CommonClientConfigs;
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  | import org.apache.kafka.clients.producer.KafkaProducer;
 | 
					
						
							|  |  |  | import org.apache.kafka.clients.producer.ProducerConfig;
 | 
					
						
							|  |  |  | import org.apache.kafka.clients.producer.ProducerRecord;
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  | import org.apache.kafka.common.config.SaslConfigs;
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  | import org.apache.kafka.common.config.SslConfigs;
 | 
					
						
							|  |  |  | import org.apache.kafka.common.security.auth.SecurityProtocol;
 | 
					
						
							| 
									
										
										
										
											2021-06-25 10:56:45 -07:00
										 |  |  | import org.slf4j.Logger;
 | 
					
						
							|  |  |  | import org.slf4j.LoggerFactory;
 | 
					
						
							| 
									
										
										
										
											2021-08-20 10:58:07 -07:00
										 |  |  | import auth.Authenticator;
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  | import javax.annotation.Nonnull;
 | 
					
						
							|  |  |  | import javax.inject.Inject;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import play.mvc.Controller;
 | 
					
						
							|  |  |  | import play.mvc.Result;
 | 
					
						
							|  |  |  | import play.mvc.Security;
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  | import java.util.Arrays;
 | 
					
						
							|  |  |  | import java.util.Collections;
 | 
					
						
							|  |  |  | import java.util.List;
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  | import java.util.Optional;
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  | import java.util.Properties;
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  | import utils.ConfigUtil;
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-20 10:58:07 -07:00
										 |  |  | import static auth.AuthUtils.*;
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-20 10:58:07 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | // TODO: Migrate this to metadata-service.
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  | public class TrackingController extends Controller {
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-06-25 10:56:45 -07:00
										 |  |  |     private final Logger _logger = LoggerFactory.getLogger(TrackingController.class.getName());
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  |     private static final List<String> KAFKA_SSL_PROTOCOLS = Collections.unmodifiableList(
 | 
					
						
							| 
									
										
										
										
											2021-08-05 22:08:50 -07:00
										 |  |  |             Arrays.asList(SecurityProtocol.SSL.name(),SecurityProtocol.SASL_SSL.name(),
 | 
					
						
							|  |  |  |             SecurityProtocol.SASL_PLAINTEXT.name()));
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |     private final Boolean _isEnabled;
 | 
					
						
							|  |  |  |     private final Config _config;
 | 
					
						
							|  |  |  |     private final KafkaProducer<String, String> _producer;
 | 
					
						
							|  |  |  |     private final String _topic;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @Inject
 | 
					
						
							|  |  |  |     public TrackingController(@Nonnull Config config) {
 | 
					
						
							|  |  |  |         _config = config;
 | 
					
						
							|  |  |  |         _isEnabled = !config.hasPath("analytics.enabled") || config.getBoolean("analytics.enabled");
 | 
					
						
							|  |  |  |         if (_isEnabled) {
 | 
					
						
							| 
									
										
										
										
											2021-06-25 10:56:45 -07:00
										 |  |  |             _logger.debug("Analytics tracking is enabled");
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |             _producer = createKafkaProducer();
 | 
					
						
							|  |  |  |             _topic = config.getString("analytics.tracking.topic");
 | 
					
						
							|  |  |  |         } else {
 | 
					
						
							|  |  |  |             _producer = null;
 | 
					
						
							|  |  |  |             _topic = null;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @Security.Authenticated(Authenticator.class)
 | 
					
						
							|  |  |  |     @Nonnull
 | 
					
						
							|  |  |  |     public Result track() throws Exception {
 | 
					
						
							|  |  |  |         if (!_isEnabled) {
 | 
					
						
							|  |  |  |             // If tracking is disabled, simply return a 200.
 | 
					
						
							|  |  |  |             return status(200);
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         JsonNode event;
 | 
					
						
							|  |  |  |         try {
 | 
					
						
							|  |  |  |             event = request().body().asJson();
 | 
					
						
							|  |  |  |         } catch (Exception e) {
 | 
					
						
							|  |  |  |             return badRequest();
 | 
					
						
							|  |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2021-08-20 10:58:07 -07:00
										 |  |  |         final String actor = ctx().session().get(ACTOR);
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |         try {
 | 
					
						
							| 
									
										
										
										
											2021-06-25 10:56:45 -07:00
										 |  |  |             _logger.debug(String.format("Emitting product analytics event. actor: %s, event: %s", actor, event));
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |             final ProducerRecord<String, String> record = new ProducerRecord<>(
 | 
					
						
							|  |  |  |                     _topic,
 | 
					
						
							|  |  |  |                     actor,
 | 
					
						
							|  |  |  |                     event.toString());
 | 
					
						
							|  |  |  |              _producer.send(record);
 | 
					
						
							|  |  |  |              _producer.flush();
 | 
					
						
							|  |  |  |              return ok();
 | 
					
						
							|  |  |  |         } catch(Exception e) {
 | 
					
						
							| 
									
										
										
										
											2021-06-25 10:56:45 -07:00
										 |  |  |             _logger.error(String.format("Failed to emit product analytics event. actor: %s, event: %s", actor, event));
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |             return internalServerError(e.getMessage());
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @Override
 | 
					
						
							|  |  |  |     protected void finalize() {
 | 
					
						
							|  |  |  |         _producer.close();
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  |     private void setConfig(Properties props, String key, String configKey) {
 | 
					
						
							|  |  |  |         Optional.ofNullable(ConfigUtil.getString(_config, configKey, null))
 | 
					
						
							|  |  |  |             .ifPresent(v -> props.put(key, v));
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |     private KafkaProducer createKafkaProducer() {
 | 
					
						
							|  |  |  |         final Properties props = new Properties();
 | 
					
						
							|  |  |  |         props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
 | 
					
						
							|  |  |  |         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getString("analytics.kafka.bootstrap.server"));
 | 
					
						
							|  |  |  |         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn.
 | 
					
						
							|  |  |  |         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object.
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         final String securityProtocolConfig = "analytics.kafka.security.protocol";
 | 
					
						
							|  |  |  |         if (_config.hasPath(securityProtocolConfig)
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  |                 && KAFKA_SSL_PROTOCOLS.contains(_config.getString(securityProtocolConfig))) {
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  |             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, _config.getString(securityProtocolConfig));
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  |             setConfig(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, "analytics.kafka.ssl.key.password");
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  |             setConfig(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "analytics.kafka.ssl.keystore.type");
 | 
					
						
							|  |  |  |             setConfig(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.keystore.location");
 | 
					
						
							|  |  |  |             setConfig(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.keystore.password");
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  |             setConfig(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "analytics.kafka.ssl.truststore.type");
 | 
					
						
							|  |  |  |             setConfig(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.truststore.location");
 | 
					
						
							|  |  |  |             setConfig(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.truststore.password");
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  |             setConfig(props, SslConfigs.SSL_PROTOCOL_CONFIG, "analytics.kafka.ssl.protocol");
 | 
					
						
							|  |  |  |             setConfig(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "analytics.kafka.ssl.endpoint.identification.algorithm");
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-05 22:08:50 -07:00
										 |  |  |             final String securityProtocol = _config.getString(securityProtocolConfig);
 | 
					
						
							|  |  |  |             if (securityProtocol.equals(SecurityProtocol.SASL_SSL.name())
 | 
					
						
							|  |  |  |                     || securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())) {
 | 
					
						
							| 
									
										
										
										
											2021-07-08 15:11:40 -07:00
										 |  |  |                 setConfig(props, SaslConfigs.SASL_MECHANISM, "analytics.kafka.sasl.mechanism");
 | 
					
						
							|  |  |  |                 setConfig(props, SaslConfigs.SASL_JAAS_CONFIG, "analytics.kafka.sasl.jaas.config");
 | 
					
						
							| 
									
										
										
										
											2021-08-05 22:08:50 -07:00
										 |  |  |                 setConfig(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "analytics.kafka.sasl.kerberos.service.name");
 | 
					
						
							| 
									
										
										
										
											2021-09-15 18:08:54 -07:00
										 |  |  |                 setConfig(props, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "analytics.kafka.sasl.login.callback.handler.class");
 | 
					
						
							| 
									
										
										
										
											2021-06-10 21:10:13 +02:00
										 |  |  |             }
 | 
					
						
							| 
									
										
										
										
											2021-05-27 16:37:57 -07:00
										 |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-11 15:41:42 -07:00
										 |  |  |         return new KafkaProducer(props);
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 |