diff --git a/datahub-frontend/app/controllers/TrackingController.java b/datahub-frontend/app/controllers/TrackingController.java index 5d12c96ed7..2f538c5787 100644 --- a/datahub-frontend/app/controllers/TrackingController.java +++ b/datahub-frontend/app/controllers/TrackingController.java @@ -61,7 +61,8 @@ public class TrackingController extends Controller { } catch (Exception e) { logger.error( String.format( - "Failed to emit product analytics event. actor: %s, event: %s", actor, event)); + "Failed to emit product analytics event. actor: %s, event: %s", actor, event), + e); return internalServerError(e.getMessage()); } } diff --git a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java index 52be3cd3b1..9a818b8910 100644 --- a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java +++ b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java @@ -443,9 +443,11 @@ public class AuthServiceController { log.error("Failed to parse json while attempting to track analytics event", e); return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } - if (bodyJson == null) { + if (!bodyJson.has("type")) { + log.warn("Invalid tracking request: missing `type` field"); return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } + return CompletableFuture.supplyAsync( () -> { try { diff --git a/metadata-service/auth-servlet-impl/src/test/java/com/datahub/auth/authentication/AuthServiceControllerTest.java b/metadata-service/auth-servlet-impl/src/test/java/com/datahub/auth/authentication/AuthServiceControllerTest.java index 02d4ae5399..e94768e02c 100644 --- a/metadata-service/auth-servlet-impl/src/test/java/com/datahub/auth/authentication/AuthServiceControllerTest.java +++ b/metadata-service/auth-servlet-impl/src/test/java/com/datahub/auth/authentication/AuthServiceControllerTest.java @@ -7,7 +7,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.*; @@ -21,6 +23,7 @@ import com.datahub.authentication.invite.InviteTokenService; import com.datahub.authentication.token.StatelessTokenService; import com.datahub.authentication.token.TokenType; import com.datahub.authentication.user.NativeUserService; +import com.datahub.telemetry.TrackingService; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -74,6 +77,7 @@ public class AuthServiceControllerTest extends AbstractTestNGSpringContextTests @Autowired private Tracer mockTracer; @Autowired private SpanContext mockSpanContext; @Autowired private ObjectMapper objectMapper; + @Autowired private TrackingService mockTrackingService; private final String PREFERRED_JWS_ALGORITHM = "preferredJwsAlgorithm"; @@ -518,4 +522,136 @@ public class AuthServiceControllerTest extends AbstractTestNGSpringContextTests assertTrue(jsonNode.get("extractJwtAccessTokenClaims").asBoolean()); assertEquals("RS256", jsonNode.get("preferredJwsAlgorithm").asText()); } + + @Test + public void testTrackSuccess() throws Exception { + // Setup + String eventPayload = "{\"type\":\"page_view\",\"properties\":{\"page\":\"dashboard\"}}"; + HttpEntity httpEntity = new HttpEntity<>(eventPayload); + + // Mock tracking service (already @Autowired in the test class) + // No need to configure behavior as the method doesn't return anything + + // Execute + ResponseEntity response = authServiceController.track(httpEntity).join(); + + // Verify + assertEquals(HttpStatus.OK, response.getStatusCode()); + + // Verify tracking service was called with correct parameters + ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(mockTrackingService) + .emitAnalyticsEvent(eq(systemOperationContext), jsonCaptor.capture()); + + JsonNode capturedJson = jsonCaptor.getValue(); + assertTrue(capturedJson.has("type")); + assertEquals("page_view", capturedJson.get("type").asText()); + assertTrue(capturedJson.has("properties")); + assertTrue(capturedJson.get("properties").has("page")); + assertEquals("dashboard", capturedJson.get("properties").get("page").asText()); + } + + @Test + public void testTrackBadRequest() throws Exception { + // Setup - invalid JSON + String invalidJson = "{malformed json"; + HttpEntity httpEntity = new HttpEntity<>(invalidJson); + + // Execute + ResponseEntity response = authServiceController.track(httpEntity).join(); + + // Verify + assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode()); + + // Verify tracking service was not called + verify(mockTrackingService, never()).emitAnalyticsEvent(any(), any()); + } + + @Test + public void testTrackMissingRequiredFields() throws Exception { + // Setup - missing type fields + String missingFieldsJson = "{\"user\":\"testUser\"}"; + HttpEntity httpEntity = new HttpEntity<>(missingFieldsJson); + + // Execute + ResponseEntity response = authServiceController.track(httpEntity).join(); + + // Verify + assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode()); + + // Verify tracking service was not called + verify(mockTrackingService, never()).emitAnalyticsEvent(any(), any()); + } + + @Test + public void testTrackServiceException() throws Exception { + // Setup + String eventPayload = "{\"type\":\"error_event\"}"; + HttpEntity httpEntity = new HttpEntity<>(eventPayload); + + // Mock tracking service to throw exception + doThrow(new RuntimeException("Test exception")) + .when(mockTrackingService) + .emitAnalyticsEvent(eq(systemOperationContext), any(JsonNode.class)); + + // Execute + ResponseEntity response = authServiceController.track(httpEntity).join(); + + // Verify + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); + } + + @Test + public void testTrackWithComplexPageViewEventPayload() throws Exception { + // Setup + String complexPayload = + "{\n" + + " \"title\" : \"DataHub\",\n" + + " \"url\" : \"http://localhost:9002/\",\n" + + " \"path\" : \"/\",\n" + + " \"hash\" : \"\",\n" + + " \"search\" : \"\",\n" + + " \"width\" : 1785,\n" + + " \"height\" : 857,\n" + + " \"referrer\" : \"http://localhost:9002/\",\n" + + " \"prevPathname\" : \"http://localhost:9002/\",\n" + + " \"type\" : \"PageViewEvent\",\n" + + " \"actorUrn\" : \"urn:li:corpuser:datahub\",\n" + + " \"timestamp\" : 1746475429127,\n" + + " \"date\" : \"Mon May 05 2025 15:03:49 GMT-0500 (Central Daylight Time)\",\n" + + " \"userAgent\" : \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36\",\n" + + " \"browserId\" : \"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\",\n" + + " \"origin\" : \"http://localhost:9002\",\n" + + " \"isThemeV2Enabled\" : true,\n" + + " \"userPersona\" : \"urn:li:dataHubPersona:businessUser\",\n" + + " \"serverVersion\" : \"v1.1.0\"\n" + + "}"; + + HttpEntity httpEntity = new HttpEntity<>(complexPayload); + + // Execute + ResponseEntity response = authServiceController.track(httpEntity).join(); + + // Verify response status + assertEquals(HttpStatus.OK, response.getStatusCode()); + + // Verify tracking service was called with correct parameters + ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(mockTrackingService) + .emitAnalyticsEvent(eq(systemOperationContext), jsonCaptor.capture()); + + // Verify the complex JSON structure was correctly parsed and passed to the tracking service + JsonNode capturedJson = jsonCaptor.getValue(); + + // Verify key fields from the complex payload + assertEquals("DataHub", capturedJson.get("title").asText()); + assertEquals("http://localhost:9002/", capturedJson.get("url").asText()); + assertEquals("PageViewEvent", capturedJson.get("type").asText()); + assertEquals("urn:li:corpuser:datahub", capturedJson.get("actorUrn").asText()); + assertEquals(1746475429127L, capturedJson.get("timestamp").asLong()); + assertEquals("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", capturedJson.get("browserId").asText()); + assertEquals("urn:li:dataHubPersona:businessUser", capturedJson.get("userPersona").asText()); + assertEquals("v1.1.0", capturedJson.get("serverVersion").asText()); + assertTrue(capturedJson.get("isThemeV2Enabled").asBoolean()); + } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index a558974933..be9fbf2d92 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -5,7 +5,6 @@ import com.linkedin.metadata.config.kafka.KafkaConfiguration; import com.linkedin.metadata.config.kafka.ProducerConfiguration; import java.util.Arrays; import java.util.Map; -import java.util.Properties; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -39,11 +38,16 @@ public class DataHubKafkaProducerFactory { */ @Bean(name = "dataHubUsageProducer") protected Producer createDUEProducer( - @Qualifier("configurationProvider") ConfigurationProvider provider) { + @Qualifier("configurationProvider") ConfigurationProvider provider, + KafkaProperties properties) { KafkaConfiguration kafkaConfiguration = provider.getKafka(); final ProducerConfiguration producerConfiguration = kafkaConfiguration.getProducer(); - final Properties props = new Properties(); + + // Initialize with Spring Kafka production configuration + Map props = properties.buildProducerProperties(null); + + // Apply DUE specifics props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-analytics"); props.put( ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactoryTest.java new file mode 100644 index 0000000000..5d5c7ca72f --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactoryTest.java @@ -0,0 +1,46 @@ +package com.linkedin.gms.factory.kafka; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; +import java.lang.reflect.Field; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +@SpringBootTest( + properties = { + "kafka.schemaRegistry.type=KAFKA", + "spring.kafka.properties.security.protocol=SSL" + }, + classes = { + DataHubKafkaProducerFactory.class, + KafkaSchemaRegistryFactory.class, + ConfigurationProvider.class + }) +public class DataHubKafkaProducerFactoryTest extends AbstractTestNGSpringContextTests { + @Autowired + @Qualifier("dataHubUsageProducer") + Producer dataHubUsageProducer; + + @Test + void testInitialization() throws NoSuchFieldException, IllegalAccessException { + assertNotNull(dataHubUsageProducer); + + // Use reflection to access the internal properties of the KafkaProducer + Field producerConfigField = KafkaProducer.class.getDeclaredField("producerConfig"); + producerConfigField.setAccessible(true); + ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(dataHubUsageProducer); + + // Use the ProducerConfig.get() method to access specific properties + String securityProtocol = producerConfig.getString("security.protocol"); + assertEquals("SSL", securityProtocol, "SSL security protocol should be set"); + } +}