2021-08-20 10:58:07 -07:00
|
|
|
package controllers;
|
2021-05-11 15:41:42 -07:00
|
|
|
|
2022-10-06 18:56:32 -07:00
|
|
|
import auth.Authenticator;
|
|
|
|
import client.AuthServiceClient;
|
2021-05-11 15:41:42 -07:00
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
|
import com.typesafe.config.Config;
|
2022-10-06 18:56:32 -07:00
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Optional;
|
|
|
|
import java.util.Properties;
|
|
|
|
import javax.annotation.Nonnull;
|
|
|
|
import javax.inject.Inject;
|
2021-05-27 16:37:57 -07:00
|
|
|
import org.apache.kafka.clients.CommonClientConfigs;
|
2021-05-11 15:41:42 -07:00
|
|
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
2021-06-10 21:10:13 +02:00
|
|
|
import org.apache.kafka.common.config.SaslConfigs;
|
2021-05-27 16:37:57 -07:00
|
|
|
import org.apache.kafka.common.config.SslConfigs;
|
|
|
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
2021-06-25 10:56:45 -07:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
2021-05-11 15:41:42 -07:00
|
|
|
import play.mvc.Controller;
|
2022-10-06 18:56:32 -07:00
|
|
|
import play.mvc.Http;
|
2021-05-11 15:41:42 -07:00
|
|
|
import play.mvc.Result;
|
|
|
|
import play.mvc.Security;
|
2021-07-08 15:11:40 -07:00
|
|
|
import utils.ConfigUtil;
|
|
|
|
|
2021-08-20 10:58:07 -07:00
|
|
|
import static auth.AuthUtils.*;
|
2021-05-11 15:41:42 -07:00
|
|
|
|
2021-08-20 10:58:07 -07:00
|
|
|
|
|
|
|
// TODO: Migrate this to metadata-service.
|
2021-05-11 15:41:42 -07:00
|
|
|
public class TrackingController extends Controller {
|
|
|
|
|
2021-06-25 10:56:45 -07:00
|
|
|
private final Logger _logger = LoggerFactory.getLogger(TrackingController.class.getName());
|
|
|
|
|
2021-06-10 21:10:13 +02:00
|
|
|
private static final List<String> KAFKA_SSL_PROTOCOLS = Collections.unmodifiableList(
|
2022-10-06 18:56:32 -07:00
|
|
|
Arrays.asList(SecurityProtocol.SSL.name(), SecurityProtocol.SASL_SSL.name(),
|
2021-08-05 22:08:50 -07:00
|
|
|
SecurityProtocol.SASL_PLAINTEXT.name()));
|
2021-06-10 21:10:13 +02:00
|
|
|
|
2021-05-11 15:41:42 -07:00
|
|
|
private final Boolean _isEnabled;
|
|
|
|
private final Config _config;
|
|
|
|
private final KafkaProducer<String, String> _producer;
|
|
|
|
private final String _topic;
|
|
|
|
|
2022-10-06 18:56:32 -07:00
|
|
|
@Inject
|
|
|
|
AuthServiceClient _authClient;
|
|
|
|
|
2021-05-11 15:41:42 -07:00
|
|
|
@Inject
|
|
|
|
public TrackingController(@Nonnull Config config) {
|
|
|
|
_config = config;
|
|
|
|
_isEnabled = !config.hasPath("analytics.enabled") || config.getBoolean("analytics.enabled");
|
|
|
|
if (_isEnabled) {
|
2021-06-25 10:56:45 -07:00
|
|
|
_logger.debug("Analytics tracking is enabled");
|
2021-05-11 15:41:42 -07:00
|
|
|
_producer = createKafkaProducer();
|
|
|
|
_topic = config.getString("analytics.tracking.topic");
|
|
|
|
} else {
|
|
|
|
_producer = null;
|
|
|
|
_topic = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Security.Authenticated(Authenticator.class)
|
|
|
|
@Nonnull
|
2022-10-06 18:56:32 -07:00
|
|
|
public Result track(Http.Request request) throws Exception {
|
2021-05-11 15:41:42 -07:00
|
|
|
if (!_isEnabled) {
|
|
|
|
// If tracking is disabled, simply return a 200.
|
|
|
|
return status(200);
|
|
|
|
}
|
|
|
|
|
|
|
|
JsonNode event;
|
|
|
|
try {
|
2022-10-06 18:56:32 -07:00
|
|
|
event = request.body().asJson();
|
2021-05-11 15:41:42 -07:00
|
|
|
} catch (Exception e) {
|
|
|
|
return badRequest();
|
|
|
|
}
|
2022-12-08 20:27:51 -06:00
|
|
|
final String actor = request.session().data().get(ACTOR);
|
2021-05-11 15:41:42 -07:00
|
|
|
try {
|
2021-06-25 10:56:45 -07:00
|
|
|
_logger.debug(String.format("Emitting product analytics event. actor: %s, event: %s", actor, event));
|
2021-05-11 15:41:42 -07:00
|
|
|
final ProducerRecord<String, String> record = new ProducerRecord<>(
|
2022-10-06 18:56:32 -07:00
|
|
|
_topic,
|
|
|
|
actor,
|
|
|
|
event.toString());
|
|
|
|
_producer.send(record);
|
|
|
|
_producer.flush();
|
|
|
|
_authClient.track(event.toString());
|
|
|
|
return ok();
|
2022-05-10 18:15:53 -05:00
|
|
|
} catch (Exception e) {
|
2021-06-25 10:56:45 -07:00
|
|
|
_logger.error(String.format("Failed to emit product analytics event. actor: %s, event: %s", actor, event));
|
2021-05-11 15:41:42 -07:00
|
|
|
return internalServerError(e.getMessage());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected void finalize() {
|
|
|
|
_producer.close();
|
|
|
|
}
|
|
|
|
|
2021-07-08 15:11:40 -07:00
|
|
|
private void setConfig(Properties props, String key, String configKey) {
|
|
|
|
Optional.ofNullable(ConfigUtil.getString(_config, configKey, null))
|
|
|
|
.ifPresent(v -> props.put(key, v));
|
|
|
|
}
|
|
|
|
|
2021-05-11 15:41:42 -07:00
|
|
|
private KafkaProducer createKafkaProducer() {
|
|
|
|
final Properties props = new Properties();
|
|
|
|
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
|
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getString("analytics.kafka.bootstrap.server"));
|
|
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn.
|
|
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object.
|
2021-05-27 16:37:57 -07:00
|
|
|
|
|
|
|
final String securityProtocolConfig = "analytics.kafka.security.protocol";
|
|
|
|
if (_config.hasPath(securityProtocolConfig)
|
2021-06-10 21:10:13 +02:00
|
|
|
&& KAFKA_SSL_PROTOCOLS.contains(_config.getString(securityProtocolConfig))) {
|
2021-05-27 16:37:57 -07:00
|
|
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, _config.getString(securityProtocolConfig));
|
2021-07-08 15:11:40 -07:00
|
|
|
setConfig(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, "analytics.kafka.ssl.key.password");
|
2021-05-27 16:37:57 -07:00
|
|
|
|
2021-07-08 15:11:40 -07:00
|
|
|
setConfig(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "analytics.kafka.ssl.keystore.type");
|
|
|
|
setConfig(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.keystore.location");
|
|
|
|
setConfig(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.keystore.password");
|
2021-05-27 16:37:57 -07:00
|
|
|
|
2021-07-08 15:11:40 -07:00
|
|
|
setConfig(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "analytics.kafka.ssl.truststore.type");
|
|
|
|
setConfig(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.truststore.location");
|
|
|
|
setConfig(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.truststore.password");
|
2021-05-27 16:37:57 -07:00
|
|
|
|
2021-07-08 15:11:40 -07:00
|
|
|
setConfig(props, SslConfigs.SSL_PROTOCOL_CONFIG, "analytics.kafka.ssl.protocol");
|
|
|
|
setConfig(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "analytics.kafka.ssl.endpoint.identification.algorithm");
|
2021-06-10 21:10:13 +02:00
|
|
|
|
2021-08-05 22:08:50 -07:00
|
|
|
final String securityProtocol = _config.getString(securityProtocolConfig);
|
|
|
|
if (securityProtocol.equals(SecurityProtocol.SASL_SSL.name())
|
|
|
|
|| securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())) {
|
2021-07-08 15:11:40 -07:00
|
|
|
setConfig(props, SaslConfigs.SASL_MECHANISM, "analytics.kafka.sasl.mechanism");
|
|
|
|
setConfig(props, SaslConfigs.SASL_JAAS_CONFIG, "analytics.kafka.sasl.jaas.config");
|
2021-08-05 22:08:50 -07:00
|
|
|
setConfig(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "analytics.kafka.sasl.kerberos.service.name");
|
2021-09-15 18:08:54 -07:00
|
|
|
setConfig(props, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "analytics.kafka.sasl.login.callback.handler.class");
|
2021-06-10 21:10:13 +02:00
|
|
|
}
|
2021-05-27 16:37:57 -07:00
|
|
|
}
|
|
|
|
|
2021-05-11 15:41:42 -07:00
|
|
|
return new KafkaProducer(props);
|
|
|
|
}
|
|
|
|
}
|