mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 12:51:23 +00:00 
			
		
		
		
	fix(): DUE Producer Configuration & tracking message validation (#13427)
Co-authored-by: Pedro Silva <pedro@acryl.io>
This commit is contained in:
		
							parent
							
								
									294ad23500
								
							
						
					
					
						commit
						131de0f026
					
				@ -61,7 +61,8 @@ public class TrackingController extends Controller {
 | 
			
		||||
    } catch (Exception e) {
 | 
			
		||||
      logger.error(
 | 
			
		||||
          String.format(
 | 
			
		||||
              "Failed to emit product analytics event. actor: %s, event: %s", actor, event));
 | 
			
		||||
              "Failed to emit product analytics event. actor: %s, event: %s", actor, event),
 | 
			
		||||
          e);
 | 
			
		||||
      return internalServerError(e.getMessage());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -443,9 +443,11 @@ public class AuthServiceController {
 | 
			
		||||
      log.error("Failed to parse json while attempting to track analytics event", e);
 | 
			
		||||
      return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
 | 
			
		||||
    }
 | 
			
		||||
    if (bodyJson == null) {
 | 
			
		||||
    if (!bodyJson.has("type")) {
 | 
			
		||||
      log.warn("Invalid tracking request: missing `type` field");
 | 
			
		||||
      return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return CompletableFuture.supplyAsync(
 | 
			
		||||
        () -> {
 | 
			
		||||
          try {
 | 
			
		||||
 | 
			
		||||
@ -7,7 +7,9 @@ import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyLong;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyString;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.Mockito.doThrow;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
import static org.testng.Assert.*;
 | 
			
		||||
@ -21,6 +23,7 @@ import com.datahub.authentication.invite.InviteTokenService;
 | 
			
		||||
import com.datahub.authentication.token.StatelessTokenService;
 | 
			
		||||
import com.datahub.authentication.token.TokenType;
 | 
			
		||||
import com.datahub.authentication.user.NativeUserService;
 | 
			
		||||
import com.datahub.telemetry.TrackingService;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
@ -74,6 +77,7 @@ public class AuthServiceControllerTest extends AbstractTestNGSpringContextTests
 | 
			
		||||
  @Autowired private Tracer mockTracer;
 | 
			
		||||
  @Autowired private SpanContext mockSpanContext;
 | 
			
		||||
  @Autowired private ObjectMapper objectMapper;
 | 
			
		||||
  @Autowired private TrackingService mockTrackingService;
 | 
			
		||||
 | 
			
		||||
  private final String PREFERRED_JWS_ALGORITHM = "preferredJwsAlgorithm";
 | 
			
		||||
 | 
			
		||||
@ -518,4 +522,136 @@ public class AuthServiceControllerTest extends AbstractTestNGSpringContextTests
 | 
			
		||||
    assertTrue(jsonNode.get("extractJwtAccessTokenClaims").asBoolean());
 | 
			
		||||
    assertEquals("RS256", jsonNode.get("preferredJwsAlgorithm").asText());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void testTrackSuccess() throws Exception {
 | 
			
		||||
    // Setup
 | 
			
		||||
    String eventPayload = "{\"type\":\"page_view\",\"properties\":{\"page\":\"dashboard\"}}";
 | 
			
		||||
    HttpEntity<String> httpEntity = new HttpEntity<>(eventPayload);
 | 
			
		||||
 | 
			
		||||
    // Mock tracking service (already @Autowired in the test class)
 | 
			
		||||
    // No need to configure behavior as the method doesn't return anything
 | 
			
		||||
 | 
			
		||||
    // Execute
 | 
			
		||||
    ResponseEntity<String> response = authServiceController.track(httpEntity).join();
 | 
			
		||||
 | 
			
		||||
    // Verify
 | 
			
		||||
    assertEquals(HttpStatus.OK, response.getStatusCode());
 | 
			
		||||
 | 
			
		||||
    // Verify tracking service was called with correct parameters
 | 
			
		||||
    ArgumentCaptor<JsonNode> jsonCaptor = ArgumentCaptor.forClass(JsonNode.class);
 | 
			
		||||
    verify(mockTrackingService)
 | 
			
		||||
        .emitAnalyticsEvent(eq(systemOperationContext), jsonCaptor.capture());
 | 
			
		||||
 | 
			
		||||
    JsonNode capturedJson = jsonCaptor.getValue();
 | 
			
		||||
    assertTrue(capturedJson.has("type"));
 | 
			
		||||
    assertEquals("page_view", capturedJson.get("type").asText());
 | 
			
		||||
    assertTrue(capturedJson.has("properties"));
 | 
			
		||||
    assertTrue(capturedJson.get("properties").has("page"));
 | 
			
		||||
    assertEquals("dashboard", capturedJson.get("properties").get("page").asText());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void testTrackBadRequest() throws Exception {
 | 
			
		||||
    // Setup - invalid JSON
 | 
			
		||||
    String invalidJson = "{malformed json";
 | 
			
		||||
    HttpEntity<String> httpEntity = new HttpEntity<>(invalidJson);
 | 
			
		||||
 | 
			
		||||
    // Execute
 | 
			
		||||
    ResponseEntity<String> response = authServiceController.track(httpEntity).join();
 | 
			
		||||
 | 
			
		||||
    // Verify
 | 
			
		||||
    assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode());
 | 
			
		||||
 | 
			
		||||
    // Verify tracking service was not called
 | 
			
		||||
    verify(mockTrackingService, never()).emitAnalyticsEvent(any(), any());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void testTrackMissingRequiredFields() throws Exception {
 | 
			
		||||
    // Setup - missing type fields
 | 
			
		||||
    String missingFieldsJson = "{\"user\":\"testUser\"}";
 | 
			
		||||
    HttpEntity<String> httpEntity = new HttpEntity<>(missingFieldsJson);
 | 
			
		||||
 | 
			
		||||
    // Execute
 | 
			
		||||
    ResponseEntity<String> response = authServiceController.track(httpEntity).join();
 | 
			
		||||
 | 
			
		||||
    // Verify
 | 
			
		||||
    assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode());
 | 
			
		||||
 | 
			
		||||
    // Verify tracking service was not called
 | 
			
		||||
    verify(mockTrackingService, never()).emitAnalyticsEvent(any(), any());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void testTrackServiceException() throws Exception {
 | 
			
		||||
    // Setup
 | 
			
		||||
    String eventPayload = "{\"type\":\"error_event\"}";
 | 
			
		||||
    HttpEntity<String> httpEntity = new HttpEntity<>(eventPayload);
 | 
			
		||||
 | 
			
		||||
    // Mock tracking service to throw exception
 | 
			
		||||
    doThrow(new RuntimeException("Test exception"))
 | 
			
		||||
        .when(mockTrackingService)
 | 
			
		||||
        .emitAnalyticsEvent(eq(systemOperationContext), any(JsonNode.class));
 | 
			
		||||
 | 
			
		||||
    // Execute
 | 
			
		||||
    ResponseEntity<String> response = authServiceController.track(httpEntity).join();
 | 
			
		||||
 | 
			
		||||
    // Verify
 | 
			
		||||
    assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void testTrackWithComplexPageViewEventPayload() throws Exception {
 | 
			
		||||
    // Setup
 | 
			
		||||
    String complexPayload =
 | 
			
		||||
        "{\n"
 | 
			
		||||
            + "  \"title\" : \"DataHub\",\n"
 | 
			
		||||
            + "  \"url\" : \"http://localhost:9002/\",\n"
 | 
			
		||||
            + "  \"path\" : \"/\",\n"
 | 
			
		||||
            + "  \"hash\" : \"\",\n"
 | 
			
		||||
            + "  \"search\" : \"\",\n"
 | 
			
		||||
            + "  \"width\" : 1785,\n"
 | 
			
		||||
            + "  \"height\" : 857,\n"
 | 
			
		||||
            + "  \"referrer\" : \"http://localhost:9002/\",\n"
 | 
			
		||||
            + "  \"prevPathname\" : \"http://localhost:9002/\",\n"
 | 
			
		||||
            + "  \"type\" : \"PageViewEvent\",\n"
 | 
			
		||||
            + "  \"actorUrn\" : \"urn:li:corpuser:datahub\",\n"
 | 
			
		||||
            + "  \"timestamp\" : 1746475429127,\n"
 | 
			
		||||
            + "  \"date\" : \"Mon May 05 2025 15:03:49 GMT-0500 (Central Daylight Time)\",\n"
 | 
			
		||||
            + "  \"userAgent\" : \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36\",\n"
 | 
			
		||||
            + "  \"browserId\" : \"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\",\n"
 | 
			
		||||
            + "  \"origin\" : \"http://localhost:9002\",\n"
 | 
			
		||||
            + "  \"isThemeV2Enabled\" : true,\n"
 | 
			
		||||
            + "  \"userPersona\" : \"urn:li:dataHubPersona:businessUser\",\n"
 | 
			
		||||
            + "  \"serverVersion\" : \"v1.1.0\"\n"
 | 
			
		||||
            + "}";
 | 
			
		||||
 | 
			
		||||
    HttpEntity<String> httpEntity = new HttpEntity<>(complexPayload);
 | 
			
		||||
 | 
			
		||||
    // Execute
 | 
			
		||||
    ResponseEntity<String> response = authServiceController.track(httpEntity).join();
 | 
			
		||||
 | 
			
		||||
    // Verify response status
 | 
			
		||||
    assertEquals(HttpStatus.OK, response.getStatusCode());
 | 
			
		||||
 | 
			
		||||
    // Verify tracking service was called with correct parameters
 | 
			
		||||
    ArgumentCaptor<JsonNode> jsonCaptor = ArgumentCaptor.forClass(JsonNode.class);
 | 
			
		||||
    verify(mockTrackingService)
 | 
			
		||||
        .emitAnalyticsEvent(eq(systemOperationContext), jsonCaptor.capture());
 | 
			
		||||
 | 
			
		||||
    // Verify the complex JSON structure was correctly parsed and passed to the tracking service
 | 
			
		||||
    JsonNode capturedJson = jsonCaptor.getValue();
 | 
			
		||||
 | 
			
		||||
    // Verify key fields from the complex payload
 | 
			
		||||
    assertEquals("DataHub", capturedJson.get("title").asText());
 | 
			
		||||
    assertEquals("http://localhost:9002/", capturedJson.get("url").asText());
 | 
			
		||||
    assertEquals("PageViewEvent", capturedJson.get("type").asText());
 | 
			
		||||
    assertEquals("urn:li:corpuser:datahub", capturedJson.get("actorUrn").asText());
 | 
			
		||||
    assertEquals(1746475429127L, capturedJson.get("timestamp").asLong());
 | 
			
		||||
    assertEquals("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", capturedJson.get("browserId").asText());
 | 
			
		||||
    assertEquals("urn:li:dataHubPersona:businessUser", capturedJson.get("userPersona").asText());
 | 
			
		||||
    assertEquals("v1.1.0", capturedJson.get("serverVersion").asText());
 | 
			
		||||
    assertTrue(capturedJson.get("isThemeV2Enabled").asBoolean());
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,6 @@ import com.linkedin.metadata.config.kafka.KafkaConfiguration;
 | 
			
		||||
import com.linkedin.metadata.config.kafka.ProducerConfiguration;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Properties;
 | 
			
		||||
import org.apache.avro.generic.IndexedRecord;
 | 
			
		||||
import org.apache.kafka.clients.producer.KafkaProducer;
 | 
			
		||||
import org.apache.kafka.clients.producer.Producer;
 | 
			
		||||
@ -39,11 +38,16 @@ public class DataHubKafkaProducerFactory {
 | 
			
		||||
   */
 | 
			
		||||
  @Bean(name = "dataHubUsageProducer")
 | 
			
		||||
  protected Producer<String, String> createDUEProducer(
 | 
			
		||||
      @Qualifier("configurationProvider") ConfigurationProvider provider) {
 | 
			
		||||
      @Qualifier("configurationProvider") ConfigurationProvider provider,
 | 
			
		||||
      KafkaProperties properties) {
 | 
			
		||||
 | 
			
		||||
    KafkaConfiguration kafkaConfiguration = provider.getKafka();
 | 
			
		||||
    final ProducerConfiguration producerConfiguration = kafkaConfiguration.getProducer();
 | 
			
		||||
    final Properties props = new Properties();
 | 
			
		||||
 | 
			
		||||
    // Initialize with Spring Kafka production configuration
 | 
			
		||||
    Map<String, Object> props = properties.buildProducerProperties(null);
 | 
			
		||||
 | 
			
		||||
    // Apply DUE specifics
 | 
			
		||||
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-analytics");
 | 
			
		||||
    props.put(
 | 
			
		||||
        ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,46 @@
 | 
			
		||||
package com.linkedin.gms.factory.kafka;
 | 
			
		||||
 | 
			
		||||
import static org.testng.Assert.assertEquals;
 | 
			
		||||
import static org.testng.Assert.assertNotNull;
 | 
			
		||||
 | 
			
		||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
 | 
			
		||||
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
 | 
			
		||||
import java.lang.reflect.Field;
 | 
			
		||||
import org.apache.kafka.clients.producer.KafkaProducer;
 | 
			
		||||
import org.apache.kafka.clients.producer.Producer;
 | 
			
		||||
import org.apache.kafka.clients.producer.ProducerConfig;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Qualifier;
 | 
			
		||||
import org.springframework.boot.test.context.SpringBootTest;
 | 
			
		||||
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
 | 
			
		||||
import org.testng.annotations.Test;
 | 
			
		||||
 | 
			
		||||
@SpringBootTest(
 | 
			
		||||
    properties = {
 | 
			
		||||
      "kafka.schemaRegistry.type=KAFKA",
 | 
			
		||||
      "spring.kafka.properties.security.protocol=SSL"
 | 
			
		||||
    },
 | 
			
		||||
    classes = {
 | 
			
		||||
      DataHubKafkaProducerFactory.class,
 | 
			
		||||
      KafkaSchemaRegistryFactory.class,
 | 
			
		||||
      ConfigurationProvider.class
 | 
			
		||||
    })
 | 
			
		||||
public class DataHubKafkaProducerFactoryTest extends AbstractTestNGSpringContextTests {
 | 
			
		||||
  @Autowired
 | 
			
		||||
  @Qualifier("dataHubUsageProducer")
 | 
			
		||||
  Producer<String, String> dataHubUsageProducer;
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  void testInitialization() throws NoSuchFieldException, IllegalAccessException {
 | 
			
		||||
    assertNotNull(dataHubUsageProducer);
 | 
			
		||||
 | 
			
		||||
    // Use reflection to access the internal properties of the KafkaProducer
 | 
			
		||||
    Field producerConfigField = KafkaProducer.class.getDeclaredField("producerConfig");
 | 
			
		||||
    producerConfigField.setAccessible(true);
 | 
			
		||||
    ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(dataHubUsageProducer);
 | 
			
		||||
 | 
			
		||||
    // Use the ProducerConfig.get() method to access specific properties
 | 
			
		||||
    String securityProtocol = producerConfig.getString("security.protocol");
 | 
			
		||||
    assertEquals("SSL", securityProtocol, "SSL security protocol should be set");
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user