mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-25 16:05:11 +00:00 
			
		
		
		
	 ecc01b9a46
			
		
	
	
		ecc01b9a46
		
			
		
	
	
	
	
		
			
			* fix(security): commons-text in frontend * refactor(restli): set threads based on cpu cores feat(mce-consumers): hit local restli endpoint * testing docker build * Add retry configuration options for entity client * Kafka debugging * fix(kafka-setup): parallelize topic creation * Adjust docker build * Docker build updates * WIP * fix(lint): metadata-ingestion lint * fix(gradle-docker): fix docker frontend dep * fix(elastic): fix race condition between gms and mae for index creation * Revert "fix(elastic): fix race condition between gms and mae for index creation" This reverts commit 9629d12c3bdb3c0dab87604d409ca4c642c9c6d3. * fix(test): fix datahub frontend test for clean/test cycle * fix(test): datahub-frontend missing assets in test * fix(security): set protobuf lib datahub-upgrade & mce/mae-consumer * gitingore update * fix(docker): remove platform on docker base image, set by buildx * refactor(kafka-producer): update kafka producer tracking/logging * updates per PR feedback * Add documentation around mce standalone consumer Kafka consumer concurrency to follow thread count for restli & sql connection pool Co-authored-by: leifker <dleifker@gmail.com> Co-authored-by: Pedro Silva <pedro@acryl.io>
		
			
				
	
	
		
			73 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
			
		
		
	
	
			73 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
| package controllers;
 | |
| 
 | |
| import auth.Authenticator;
 | |
| import client.AuthServiceClient;
 | |
| import com.fasterxml.jackson.databind.JsonNode;
 | |
| import com.typesafe.config.Config;
 | |
| import javax.annotation.Nonnull;
 | |
| import javax.inject.Inject;
 | |
| import javax.inject.Singleton;
 | |
| 
 | |
| 
 | |
| import org.apache.kafka.clients.producer.ProducerRecord;
 | |
| import org.slf4j.Logger;
 | |
| import org.slf4j.LoggerFactory;
 | |
| import play.mvc.Controller;
 | |
| import play.mvc.Http;
 | |
| import play.mvc.Result;
 | |
| import play.mvc.Security;
 | |
| import client.KafkaTrackingProducer;
 | |
| 
 | |
| import static auth.AuthUtils.ACTOR;
 | |
| 
 | |
| 
 | |
| // TODO: Migrate this to metadata-service.
 | |
| @Singleton
 | |
| public class TrackingController extends Controller {
 | |
| 
 | |
|     private final Logger _logger = LoggerFactory.getLogger(TrackingController.class.getName());
 | |
| 
 | |
|     private final String _topic;
 | |
| 
 | |
|     @Inject
 | |
|     KafkaTrackingProducer _producer;
 | |
| 
 | |
|     @Inject
 | |
|     AuthServiceClient _authClient;
 | |
| 
 | |
|     @Inject
 | |
|     public TrackingController(@Nonnull Config config) {
 | |
|         _topic = config.getString("analytics.tracking.topic");
 | |
|     }
 | |
| 
 | |
|     @Security.Authenticated(Authenticator.class)
 | |
|     @Nonnull
 | |
|     public Result track(Http.Request request) throws Exception {
 | |
|         if (!_producer.isEnabled()) {
 | |
|             // If tracking is disabled, simply return a 200.
 | |
|             return status(200);
 | |
|         }
 | |
| 
 | |
|         JsonNode event;
 | |
|         try {
 | |
|             event = request.body().asJson();
 | |
|         } catch (Exception e) {
 | |
|             return badRequest();
 | |
|         }
 | |
|         final String actor = request.session().data().get(ACTOR);
 | |
|         try {
 | |
|             _logger.debug(String.format("Emitting product analytics event. actor: %s, event: %s", actor, event));
 | |
|             final ProducerRecord<String, String> record = new ProducerRecord<>(
 | |
|                 _topic,
 | |
|                 actor,
 | |
|                 event.toString());
 | |
|             _producer.send(record);
 | |
|             _authClient.track(event.toString());
 | |
|             return ok();
 | |
|         } catch (Exception e) {
 | |
|             _logger.error(String.format("Failed to emit product analytics event. actor: %s, event: %s", actor, event));
 | |
|             return internalServerError(e.getMessage());
 | |
|         }
 | |
|     }
 | |
| }
 |