From 677182daf754d4b7ea7fa976960dbd699ec0dbd5 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Wed, 25 Jun 2025 12:30:41 -0500 Subject: [PATCH] feat(operations): add es raw operations endpoints (#13855) --- .../graph/elastic/ESGraphQueryDAO.java | 18 + .../elastic/ElasticSearchGraphService.java | 99 +++++ .../graph/neo4j/Neo4jGraphService.java | 5 + .../ElasticSearchSystemMetadataService.java | 76 ++++ .../ElasticSearchTimeseriesAspectService.java | 95 +++++ .../ElasticSearchGraphServiceTest.java | 113 +++++- .../SystemMetadataServiceTestBase.java | 123 ++++++ .../TimeseriesAspectServiceUnitTest.java | 289 +++++++++++++- .../GlobalControllerExceptionHandler.java | 18 + .../elastic/ElasticsearchController.java | 52 --- .../elastic/ElasticsearchRawController.java | 259 +++++++++++++ .../elastic/ElasticsearchControllerTest.java | 8 + .../ElasticsearchRawControllerTest.java | 363 ++++++++++++++++++ .../mock/MockTimeseriesAspectService.java | 6 + .../linkedin/metadata/graph/GraphService.java | 22 ++ .../systemmetadata/SystemMetadataService.java | 12 + .../timeseries/TimeseriesAspectService.java | 10 + 17 files changed, 1511 insertions(+), 57 deletions(-) create mode 100644 metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawController.java create mode 100644 metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawControllerTest.java diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index a0264c16a2..2650e0f405 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -579,6 +579,24 @@ public class ESGraphQueryDAO { } } + /** + * Executes a search request against the graph index. This method is exposed for use by other + * graph service methods that need direct search access. + * + * @param searchRequest The search request to execute + * @return The search response from Elasticsearch + * @throws ESQueryException if the search fails + */ + SearchResponse executeSearch(@Nonnull SearchRequest searchRequest) { + try { + MetricUtils.counter(this.getClass(), SEARCH_EXECUTIONS_METRIC).inc(); + return client.search(searchRequest, RequestOptions.DEFAULT); + } catch (Exception e) { + log.error("Search query failed", e); + throw new ESQueryException("Search query failed:", e); + } + } + @VisibleForTesting public QueryBuilder getLineageQuery( @Nonnull OperationContext opContext, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index eac6ed54bb..6ef183860b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -44,6 +44,7 @@ import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.instrumentation.annotations.WithSpan; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -57,12 +58,16 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; @Slf4j @RequiredArgsConstructor @@ -353,6 +358,100 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd .build(); } + /** + * Returns list of edge documents for the given graph node and relationship tuples. Non-directed + * + * @param opContext operation context + * @param edgeTuples Non-directed nodes and relationship types + * @return list of documents matching the input criteria + */ + @Override + public List> raw(OperationContext opContext, List edgeTuples) { + + if (edgeTuples == null || edgeTuples.isEmpty()) { + return Collections.emptyList(); + } + + List> results = new ArrayList<>(); + + // Build a single query for all edge tuples + BoolQueryBuilder mainQuery = QueryBuilders.boolQuery(); + + // For each edge tuple, create a query that matches edges in either direction + for (EdgeTuple tuple : edgeTuples) { + if (tuple.getA() == null || tuple.getB() == null || tuple.getRelationshipType() == null) { + continue; + } + + // Create a query for this specific edge tuple (non-directed) + BoolQueryBuilder tupleQuery = QueryBuilders.boolQuery(); + + // Match relationship type + tupleQuery.filter( + QueryBuilders.termQuery(EDGE_FIELD_RELNSHIP_TYPE, tuple.getRelationshipType())); + + // Match nodes in either direction: (a->b) OR (b->a) + BoolQueryBuilder directionQuery = QueryBuilders.boolQuery(); + + // Direction 1: a is source, b is destination + BoolQueryBuilder direction1 = QueryBuilders.boolQuery(); + direction1.filter(QueryBuilders.termQuery(EDGE_FIELD_SOURCE + ".urn", tuple.getA())); + direction1.filter(QueryBuilders.termQuery(EDGE_FIELD_DESTINATION + ".urn", tuple.getB())); + + // Direction 2: b is source, a is destination + BoolQueryBuilder direction2 = QueryBuilders.boolQuery(); + direction2.filter(QueryBuilders.termQuery(EDGE_FIELD_SOURCE + ".urn", tuple.getB())); + direction2.filter(QueryBuilders.termQuery(EDGE_FIELD_DESTINATION + ".urn", tuple.getA())); + + // Either direction is acceptable + directionQuery.should(direction1); + directionQuery.should(direction2); + directionQuery.minimumShouldMatch(1); + + // Combine relationship type and direction queries + tupleQuery.filter(directionQuery); + + // Add this tuple query as a "should" clause (OR condition) + mainQuery.should(tupleQuery); + } + + // At least one of the edge tuples must match + mainQuery.minimumShouldMatch(1); + + // Build search request + SearchRequest searchRequest = new SearchRequest(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchSourceBuilder.query(mainQuery); + // Set a reasonable size limit - adjust based on expected number of edges + searchSourceBuilder.size(getGraphServiceConfig().getLimit().getResults().getApiDefault()); + + searchRequest.source(searchSourceBuilder); + searchRequest.indices(indexConvention.getIndexName(INDEX_NAME)); + + // Execute search using the graphReadDAO's search client + SearchResponse searchResponse = graphReadDAO.executeSearch(searchRequest); + SearchHits hits = searchResponse.getHits(); + + // Process each hit + for (SearchHit hit : hits.getHits()) { + Map sourceMap = hit.getSourceAsMap(); + + results.add(sourceMap); + } + + // Log if we hit the size limit + if (hits.getTotalHits() != null && hits.getTotalHits().value > hits.getHits().length) { + log.warn( + "Total hits {} exceeds returned size {}. Some edges may be missing. " + + "Consider implementing pagination or increasing the size limit.", + hits.getTotalHits().value, + hits.getHits().length); + } + + return results; + } + private static List searchHitsToRelatedEntities( SearchHit[] searchHits, RelationshipDirection relationshipDirection) { return Arrays.stream(searchHits) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index dbf3b24cc9..e111b37d0f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -1072,4 +1072,9 @@ public class Neo4jGraphService implements GraphService { .scrollId(nextScrollId) .build(); } + + @Override + public List> raw(OperationContext opContext, List edgeTuples) { + return List.of(); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java index fc66956e7a..b933ea86db 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.SetMode; import com.linkedin.metadata.config.SystemMetadataServiceConfig; import com.linkedin.metadata.run.AspectRowSummary; @@ -18,10 +19,12 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.mxe.SystemMetadata; import com.linkedin.structured.StructuredPropertyDefinition; import com.linkedin.util.Pair; +import io.datahubproject.metadata.context.OperationContext; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collection; @@ -268,6 +271,79 @@ public class ElasticSearchSystemMetadataService QueryBuilders.matchAllQuery(), true, _indexConvention.getIndexName(INDEX_NAME)); } + @Override + public Map>> raw( + OperationContext opContext, Map> urnAspects) { + + if (urnAspects == null || urnAspects.isEmpty()) { + return Collections.emptyMap(); + } + + Map>> result = new HashMap<>(); + + // Build a list of all document IDs we need to fetch + List docIds = new ArrayList<>(); + for (Map.Entry> entry : urnAspects.entrySet()) { + String urnString = entry.getKey(); + Set aspects = entry.getValue(); + + if (aspects != null && !aspects.isEmpty()) { + for (String aspect : aspects) { + docIds.add(toDocId(urnString, aspect)); + } + } + } + + if (docIds.isEmpty()) { + return Collections.emptyMap(); + } + + // Query for all documents by their IDs + BoolQueryBuilder query = QueryBuilders.boolQuery(); + query.filter(QueryBuilders.idsQuery().addIds(docIds.toArray(new String[0]))); + + // Use scroll to retrieve all matching documents + SearchResponse searchResponse = + _esDAO.scroll( + query, + true, + null, // scrollId + null, // pitId + null, // keepAlive + systemMetadataServiceConfig.getLimit().getResults().getApiDefault()); + + if (searchResponse != null && searchResponse.getHits() != null) { + SearchHits hits = searchResponse.getHits(); + + // Process each hit + Arrays.stream(hits.getHits()) + .forEach( + hit -> { + Map sourceMap = hit.getSourceAsMap(); + String urnString = (String) sourceMap.get(FIELD_URN); + String aspectName = (String) sourceMap.get(FIELD_ASPECT); + + if (urnString != null && aspectName != null) { + try { + Urn urn = UrnUtils.getUrn(urnString); + + // Get or create the aspect map for this URN + Map> aspectDocuments = + result.computeIfAbsent(urn, k -> new HashMap<>()); + + // Store the raw document for this aspect + aspectDocuments.put(aspectName, sourceMap); + + } catch (Exception e) { + log.error("Error parsing URN {} in raw method: {}", urnString, e.getMessage()); + } + } + }); + } + + return result; + } + private static List toAspectRowSummary(SearchResponse searchResponse) { if (searchResponse != null) { SearchHits hits = searchResponse.getHits(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index ff0c1fe5cc..d32fb43012 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.ByteString; import com.linkedin.metadata.aspect.EnvelopedAspect; import com.linkedin.metadata.config.ConfigUtils; @@ -759,6 +760,100 @@ public class ElasticSearchTimeseriesAspectService .build(); } + @Override + public Map>> raw( + OperationContext opContext, Map> urnAspects) { + + if (urnAspects == null || urnAspects.isEmpty()) { + return Collections.emptyMap(); + } + + Map>> result = new HashMap<>(); + + // Process each URN and its timeseries aspects + for (Map.Entry> entry : urnAspects.entrySet()) { + String urnString = entry.getKey(); + Set aspectNames = + Optional.ofNullable(entry.getValue()).orElse(Collections.emptySet()).stream() + .filter( + aspectName -> { + AspectSpec aspectSpec = entityRegistry.getAspectSpecs().get(aspectName); + return aspectSpec != null && aspectSpec.isTimeseries(); + }) + .collect(Collectors.toSet()); + + if (aspectNames.isEmpty()) { + continue; + } + + try { + Urn urn = UrnUtils.getUrn(urnString); + String entityName = urn.getEntityType(); + Map> aspectDocuments = new HashMap<>(); + + // For each aspect, find the latest document + for (String aspectName : aspectNames) { + try { + // Build query to find the latest document for this URN and aspect + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.termQuery(MappingsBuilder.URN_FIELD, urnString)); + + // Build search request + SearchRequest searchRequest = new SearchRequest(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.size(1); // Only need the latest document + // Sort by timestamp descending to get the most recent document + searchSourceBuilder.sort( + SortBuilders.fieldSort(MappingsBuilder.TIMESTAMP_FIELD).order(SortOrder.DESC)); + + searchRequest.source(searchSourceBuilder); + + // Get the index name for this entity and aspect + String indexName = + opContext + .getSearchContext() + .getIndexConvention() + .getTimeseriesAspectIndexName(entityName, aspectName); + searchRequest.indices(indexName); + + // Execute search + SearchResponse searchResponse = + searchClient.search(searchRequest, RequestOptions.DEFAULT); + SearchHits hits = searchResponse.getHits(); + + if (hits.getTotalHits() != null + && hits.getTotalHits().value > 0 + && hits.getHits().length > 0) { + SearchHit latestHit = hits.getHits()[0]; + Map sourceMap = latestHit.getSourceAsMap(); + + // Store the raw document for this aspect + aspectDocuments.put(aspectName, sourceMap); + } + + } catch (IOException e) { + log.error( + "Error fetching latest document for URN: {}, aspect: {}", urnString, aspectName, e); + // Continue processing other aspects + } + } + + // Only add to result if we found documents + if (!aspectDocuments.isEmpty()) { + result.put(urn, aspectDocuments); + } + + } catch (Exception e) { + log.error("Error parsing URN {} in raw method: {}", urnString, e.getMessage(), e); + // Continue processing other URNs + } + } + + return result; + } + private SearchResponse executeScrollSearchQuery( @Nonnull OperationContext opContext, @Nonnull final String entityName, diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java index a785f0829b..bea193dde0 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java @@ -21,6 +21,7 @@ import com.linkedin.metadata.aspect.models.graph.RelatedEntities; import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; import com.linkedin.metadata.entity.TestEntityRegistry; import com.linkedin.metadata.graph.GraphFilters; +import com.linkedin.metadata.graph.GraphService.EdgeTuple; import com.linkedin.metadata.graph.RelatedEntitiesResult; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.LineageRegistry; @@ -30,6 +31,7 @@ import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -37,6 +39,7 @@ import java.util.Map; import java.util.Set; import org.apache.lucene.search.TotalHits; import org.mockito.ArgumentCaptor; +import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.ExistsQueryBuilder; @@ -62,7 +65,7 @@ public class ElasticSearchGraphServiceTest { mockESBulkProcessor = mock(ESBulkProcessor.class); mockWriteDAO = mock(ESGraphWriteDAO.class); mockReadDAO = mock(ESGraphQueryDAO.class); - when(mockReadDAO.getGraphServiceConfig()).thenReturn(TEST_GRAPH_SERVICE_CONFIG); + test = new ElasticSearchGraphService( new LineageRegistry(entityRegistry), @@ -77,6 +80,7 @@ public class ElasticSearchGraphServiceTest { @BeforeMethod public void beforeMethod() { reset(mockESBulkProcessor, mockWriteDAO, mockReadDAO); + when(mockReadDAO.getGraphServiceConfig()).thenReturn(TEST_GRAPH_SERVICE_CONFIG); } @Test @@ -237,6 +241,113 @@ public class ElasticSearchGraphServiceTest { assertTrue(result.getEntities().isEmpty()); } + @Test + public void testRaw() { + // Create test edge tuples + List edgeTuples = + Arrays.asList( + new EdgeTuple("urn:li:dataset:1", "urn:li:dataset:2", "Produces"), + new EdgeTuple("urn:li:dataset:3", "urn:li:dataset:4", "Consumes")); + + // Create mock search response + SearchResponse mockResponse = mock(SearchResponse.class); + SearchHits mockHits = mock(SearchHits.class); + when(mockResponse.getHits()).thenReturn(mockHits); + + TotalHits totalHits = new TotalHits(2L, TotalHits.Relation.EQUAL_TO); + when(mockHits.getTotalHits()).thenReturn(totalHits); + + // Create search hits + SearchHit[] searchHits = new SearchHit[2]; + searchHits[0] = createMockSearchHit("urn:li:dataset:1", "urn:li:dataset:2", "Produces", null); + searchHits[1] = + createMockSearchHit( + "urn:li:dataset:4", "urn:li:dataset:3", "Consumes", "urn:li:dataset:via"); + when(mockHits.getHits()).thenReturn(searchHits); + + // Mock the executeSearch method + when(mockReadDAO.executeSearch(any(SearchRequest.class))).thenReturn(mockResponse); + + // Execute the method + OperationContext opContext = TestOperationContexts.systemContextNoValidate(); + List> results = test.raw(opContext, edgeTuples); + + // Verify the search request was made + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SearchRequest.class); + verify(mockReadDAO).executeSearch(requestCaptor.capture()); + + // Verify results + assertEquals(results.size(), 2); + + // Verify first result + Map firstResult = results.get(0); + Map source = (Map) firstResult.get("source"); + Map destination = (Map) firstResult.get("destination"); + assertEquals(source.get("urn"), "urn:li:dataset:1"); + assertEquals(destination.get("urn"), "urn:li:dataset:2"); + assertEquals(firstResult.get("relationshipType"), "Produces"); + + // Verify second result + Map secondResult = results.get(1); + source = (Map) secondResult.get("source"); + destination = (Map) secondResult.get("destination"); + assertEquals(source.get("urn"), "urn:li:dataset:4"); + assertEquals(destination.get("urn"), "urn:li:dataset:3"); + assertEquals(secondResult.get("relationshipType"), "Consumes"); + assertEquals(secondResult.get("via"), "urn:li:dataset:via"); + } + + @Test + public void testRawWithNullInput() { + OperationContext opContext = TestOperationContexts.systemContextNoValidate(); + + // Test with null input + List> results = test.raw(opContext, null); + assertTrue(results.isEmpty()); + } + + @Test + public void testRawWithEmptyInput() { + OperationContext opContext = TestOperationContexts.systemContextNoValidate(); + + // Test with empty list + List> results = test.raw(opContext, Collections.emptyList()); + assertTrue(results.isEmpty()); + } + + @Test + public void testRawWithInvalidEdgeTuples() { + // Create edge tuples with null values + List edgeTuples = + Arrays.asList( + new EdgeTuple(null, "urn:li:dataset:2", "Produces"), + new EdgeTuple("urn:li:dataset:1", null, "Consumes"), + new EdgeTuple("urn:li:dataset:1", "urn:li:dataset:2", null), + new EdgeTuple("urn:li:dataset:3", "urn:li:dataset:4", "ValidRel")); + + // Mock response for the valid tuple only + SearchResponse mockResponse = mock(SearchResponse.class); + SearchHits mockHits = mock(SearchHits.class); + when(mockResponse.getHits()).thenReturn(mockHits); + + TotalHits totalHits = new TotalHits(1L, TotalHits.Relation.EQUAL_TO); + when(mockHits.getTotalHits()).thenReturn(totalHits); + + SearchHit[] searchHits = new SearchHit[1]; + searchHits[0] = createMockSearchHit("urn:li:dataset:3", "urn:li:dataset:4", "ValidRel", null); + when(mockHits.getHits()).thenReturn(searchHits); + + when(mockReadDAO.executeSearch(any(SearchRequest.class))).thenReturn(mockResponse); + + OperationContext opContext = TestOperationContexts.systemContextNoValidate(); + List> results = test.raw(opContext, edgeTuples); + + // Should only return results for the valid tuple + assertEquals(results.size(), 1); + Map result = results.get(0); + assertEquals(((Map) result.get("source")).get("urn"), "urn:li:dataset:3"); + } + // Helper method to create mock search hits private SearchHit createMockSearchHit( String sourceUrn, String destUrn, String relType, String via) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java index cef2b16931..87962be624 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java @@ -3,7 +3,10 @@ package com.linkedin.metadata.systemmetadata; import static io.datahubproject.test.search.SearchTestUtils.TEST_SYSTEM_METADATA_SERVICE_CONFIG; import static io.datahubproject.test.search.SearchTestUtils.syncAfterWrite; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.run.IngestionRunSummary; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; @@ -11,8 +14,13 @@ import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import com.linkedin.mxe.SystemMetadata; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import javax.annotation.Nonnull; import org.opensearch.client.RestHighLevelClient; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; @@ -196,4 +204,119 @@ public abstract class SystemMetadataServiceTestBase extends AbstractTestNGSpring assertEquals(runs.size(), 0); } + + @Test + public void testRaw() throws Exception { + // Create test data with various system metadata + SystemMetadata metadata1 = new SystemMetadata(); + metadata1.setRunId("abc-123"); + metadata1.setLastObserved(Long.valueOf(120L)); + metadata1.setRegistryName("test-registry"); + metadata1.setRegistryVersion("1.0.0"); + + SystemMetadata metadata2 = new SystemMetadata(); + metadata2.setRunId("abc-456"); + metadata2.setLastObserved(Long.valueOf(240L)); + metadata2.setRegistryName("test-registry"); + metadata2.setRegistryVersion("2.0.0"); + + // Insert test data + _client.insert(metadata1, "urn:li:chart:1", "chartKey"); + _client.insert(metadata1, "urn:li:chart:1", "ChartInfo"); + _client.insert(metadata1, "urn:li:chart:1", "Ownership"); + + _client.insert(metadata2, "urn:li:chart:2", "chartKey"); + _client.insert(metadata2, "urn:li:chart:2", "ChartInfo"); + + _client.insert(metadata1, "urn:li:dataset:3", "DatasetKey"); + _client.insert(metadata1, "urn:li:dataset:3", "DatasetProperties"); + + syncAfterWrite(getBulkProcessor()); + + // Test 1: Query for specific URN with specific aspects + Map> urnAspects1 = new HashMap<>(); + urnAspects1.put("urn:li:chart:1", new HashSet<>(Arrays.asList("chartKey", "ChartInfo"))); + + Map>> result1 = _client.raw(null, urnAspects1); + + assertEquals(result1.size(), 1); + assertTrue(result1.containsKey(UrnUtils.getUrn("urn:li:chart:1"))); + + Map> chart1Aspects = result1.get(UrnUtils.getUrn("urn:li:chart:1")); + assertEquals(chart1Aspects.size(), 2); + assertTrue(chart1Aspects.containsKey("chartKey")); + assertTrue(chart1Aspects.containsKey("ChartInfo")); + + // Verify content of returned documents + Map chartKeyDoc = chart1Aspects.get("chartKey"); + assertEquals(chartKeyDoc.get("urn"), "urn:li:chart:1"); + assertEquals(chartKeyDoc.get("aspect"), "chartKey"); + assertEquals(chartKeyDoc.get("runId"), "abc-123"); + assertEquals(chartKeyDoc.get("lastUpdated"), 120); + assertEquals(chartKeyDoc.get("registryName"), "test-registry"); + assertEquals(chartKeyDoc.get("registryVersion"), "1.0.0"); + + // Test 2: Query for multiple URNs + Map> urnAspects2 = new HashMap<>(); + urnAspects2.put("urn:li:chart:1", new HashSet<>(Arrays.asList("Ownership"))); + urnAspects2.put("urn:li:chart:2", new HashSet<>(Arrays.asList("chartKey", "ChartInfo"))); + urnAspects2.put("urn:li:dataset:3", new HashSet<>(Arrays.asList("DatasetKey"))); + + Map>> result2 = _client.raw(null, urnAspects2); + + assertEquals(result2.size(), 3); + assertTrue(result2.containsKey(UrnUtils.getUrn("urn:li:chart:1"))); + assertTrue(result2.containsKey(UrnUtils.getUrn("urn:li:chart:2"))); + assertTrue(result2.containsKey(UrnUtils.getUrn("urn:li:dataset:3"))); + + // Verify chart:1 has only Ownership + assertEquals(result2.get(UrnUtils.getUrn("urn:li:chart:1")).size(), 1); + assertTrue(result2.get(UrnUtils.getUrn("urn:li:chart:1")).containsKey("Ownership")); + + // Verify chart:2 has both aspects + assertEquals(result2.get(UrnUtils.getUrn("urn:li:chart:2")).size(), 2); + assertTrue(result2.get(UrnUtils.getUrn("urn:li:chart:2")).containsKey("chartKey")); + assertTrue(result2.get(UrnUtils.getUrn("urn:li:chart:2")).containsKey("ChartInfo")); + + // Verify dataset:3 has DatasetKey + assertEquals(result2.get(UrnUtils.getUrn("urn:li:dataset:3")).size(), 1); + assertTrue(result2.get(UrnUtils.getUrn("urn:li:dataset:3")).containsKey("DatasetKey")); + + // Test 3: Query for non-existent URN + Map> urnAspects3 = new HashMap<>(); + urnAspects3.put("urn:li:chart:999", new HashSet<>(Arrays.asList("chartKey"))); + + Map>> result3 = _client.raw(null, urnAspects3); + assertTrue(result3.isEmpty()); + + // Test 4: Query for existing URN but non-existent aspect + Map> urnAspects4 = new HashMap<>(); + urnAspects4.put("urn:li:chart:1", new HashSet<>(Arrays.asList("NonExistentAspect"))); + + Map>> result4 = _client.raw(null, urnAspects4); + assertTrue(result4.isEmpty()); + + // Test 5: Empty input map + Map> urnAspects5 = new HashMap<>(); + Map>> result5 = _client.raw(null, urnAspects5); + assertTrue(result5.isEmpty()); + + // Test 6: Null input + Map>> result6 = _client.raw(null, null); + assertTrue(result6.isEmpty()); + + // Test 7: URN with null aspects set + Map> urnAspects7 = new HashMap<>(); + urnAspects7.put("urn:li:chart:1", null); + + Map>> result7 = _client.raw(null, urnAspects7); + assertTrue(result7.isEmpty()); + + // Test 8: URN with empty aspects set + Map> urnAspects8 = new HashMap<>(); + urnAspects8.put("urn:li:chart:1", new HashSet<>()); + + Map>> result8 = _client.raw(null, urnAspects8); + assertTrue(result8.isEmpty()); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java index f9ec3dd06a..5610ab897f 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; @@ -68,7 +69,9 @@ public class TimeseriesAspectServiceUnitTest { private final IndexConvention indexConvention = mock(IndexConvention.class); private final ESBulkProcessor bulkProcessor = mock(ESBulkProcessor.class); private final RestClient restClient = mock(RestClient.class); - private final EntityRegistry entityRegistry = mock(EntityRegistry.class); + private final OperationContext opContext = + TestOperationContexts.systemContextNoSearchAuthorization(indexConvention); + private final EntityRegistry entityRegistry = opContext.getEntityRegistry(); private final ESIndexBuilder indexBuilder = mock(ESIndexBuilder.class); private final TimeseriesAspectService _timeseriesAspectService = new ElasticSearchTimeseriesAspectService( @@ -80,14 +83,12 @@ public class TimeseriesAspectServiceUnitTest { entityRegistry, indexConvention, indexBuilder); - private final OperationContext opContext = - TestOperationContexts.systemContextNoSearchAuthorization(indexConvention); private static final String INDEX_PATTERN = "indexPattern"; @BeforeMethod public void resetMocks() { - reset(searchClient, indexConvention, bulkProcessor, restClient, entityRegistry, indexBuilder); + reset(searchClient, indexConvention, bulkProcessor, restClient, indexBuilder); } @Test @@ -507,4 +508,284 @@ public class TimeseriesAspectServiceUnitTest { Assert.assertTrue(e.getCause() instanceof ExecutionException); } } + + @Test + public void testRawWithNullUrnAspects() { + // Test with null input + Map>> result = + _timeseriesAspectService.raw(opContext, null); + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithEmptyUrnAspects() { + // Test with empty input + Map>> result = + _timeseriesAspectService.raw(opContext, Collections.emptyMap()); + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithNonTimeseriesAspects() { + Map> urnAspects = new HashMap<>(); + urnAspects.put("urn:li:dataset:123", new HashSet<>(Arrays.asList("status"))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify - should be empty since no timeseries aspects + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithValidTimeseriesAspect() throws IOException { + // Setup + String urnString = "urn:li:dataset:123"; + String aspectName = "datasetProfile"; + String indexName = "dataset_datasetProfile_index_v1"; + + when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName))) + .thenReturn(indexName); + + // Mock search response + SearchHit mockHit = mock(SearchHit.class); + Map sourceMap = new HashMap<>(); + sourceMap.put(MappingsBuilder.URN_FIELD, urnString); + sourceMap.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567890L); + sourceMap.put("field1", "value1"); + sourceMap.put("field2", "value2"); + when(mockHit.getSourceAsMap()).thenReturn(sourceMap); + + SearchHits mockHits = mock(SearchHits.class); + when(mockHits.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO)); + when(mockHits.getHits()).thenReturn(new SearchHit[] {mockHit}); + + SearchResponse mockResponse = mock(SearchResponse.class); + when(mockResponse.getHits()).thenReturn(mockHits); + when(searchClient.search(any(), any())).thenReturn(mockResponse); + + Map> urnAspects = new HashMap<>(); + urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify + Assert.assertEquals(result.size(), 1); + Urn expectedUrn = UrnUtils.getUrn(urnString); + Assert.assertTrue(result.containsKey(expectedUrn)); + Assert.assertTrue(result.get(expectedUrn).containsKey(aspectName)); + Assert.assertEquals(result.get(expectedUrn).get(aspectName), sourceMap); + } + + @Test + public void testRawWithMultipleAspects() throws IOException { + // Setup + String urnString = "urn:li:dataset:123"; + String aspectName1 = "datasetProfile"; + String aspectName2 = "operation"; + + // Mock aspect specs + AspectSpec mockSpec1 = mock(AspectSpec.class); + when(mockSpec1.isTimeseries()).thenReturn(true); + AspectSpec mockSpec2 = mock(AspectSpec.class); + when(mockSpec2.isTimeseries()).thenReturn(true); + + when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName1))) + .thenReturn("dataset_datasetProfile_index_v1"); + when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName2))) + .thenReturn("dataset_operation_index_v1"); + + // Mock search responses for both aspects + Map sourceMap1 = new HashMap<>(); + sourceMap1.put(MappingsBuilder.URN_FIELD, urnString); + sourceMap1.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567890L); + sourceMap1.put(MappingsBuilder.TIMESTAMP_MILLIS_FIELD, 1234567890L); + sourceMap1.put("profileField", "profileValue"); + + Map sourceMap2 = new HashMap<>(); + sourceMap2.put(MappingsBuilder.URN_FIELD, urnString); + sourceMap2.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567891L); + sourceMap2.put(MappingsBuilder.TIMESTAMP_MILLIS_FIELD, 1234567891L); + sourceMap2.put("operationField", "operationValue"); + + SearchHit mockHit1 = mock(SearchHit.class); + when(mockHit1.getSourceAsMap()).thenReturn(sourceMap1); + SearchHits mockHits1 = mock(SearchHits.class); + when(mockHits1.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO)); + when(mockHits1.getHits()).thenReturn(new SearchHit[] {mockHit1}); + SearchResponse mockResponse1 = mock(SearchResponse.class); + when(mockResponse1.getHits()).thenReturn(mockHits1); + + SearchHit mockHit2 = mock(SearchHit.class); + when(mockHit2.getSourceAsMap()).thenReturn(sourceMap2); + SearchHits mockHits2 = mock(SearchHits.class); + when(mockHits2.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO)); + when(mockHits2.getHits()).thenReturn(new SearchHit[] {mockHit2}); + SearchResponse mockResponse2 = mock(SearchResponse.class); + when(mockResponse2.getHits()).thenReturn(mockHits2); + + when(searchClient.search(any(), any())).thenReturn(mockResponse1).thenReturn(mockResponse2); + + Map> urnAspects = new HashMap<>(); + urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName1, aspectName2))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify + Assert.assertEquals(result.size(), 1); + Urn expectedUrn = UrnUtils.getUrn(urnString); + Assert.assertTrue(result.containsKey(expectedUrn)); + Assert.assertEquals(result.get(expectedUrn).size(), 2); + Assert.assertTrue(result.get(expectedUrn).containsKey(aspectName1)); + Assert.assertTrue(result.get(expectedUrn).containsKey(aspectName2)); + Assert.assertEquals(result.get(expectedUrn).get(aspectName1), sourceMap1); + Assert.assertEquals(result.get(expectedUrn).get(aspectName2), sourceMap2); + } + + @Test + public void testRawWithNoResults() throws IOException { + // Setup + String urnString = "urn:li:dataset:123"; + String aspectName = "datasetProfile"; + + when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName))) + .thenReturn("dataset_datasetProfile_index_v1"); + + // Mock empty search response + SearchHits mockHits = mock(SearchHits.class); + when(mockHits.getTotalHits()).thenReturn(new TotalHits(0, TotalHits.Relation.EQUAL_TO)); + when(mockHits.getHits()).thenReturn(new SearchHit[] {}); + + SearchResponse mockResponse = mock(SearchResponse.class); + when(mockResponse.getHits()).thenReturn(mockHits); + when(searchClient.search(any(), any())).thenReturn(mockResponse); + + Map> urnAspects = new HashMap<>(); + urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify - should be empty since no documents found + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithSearchException() throws IOException { + // Setup + String urnString = "urn:li:dataset:123"; + String aspectName = "datasetProfile"; + + when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName))) + .thenReturn("dataset_datasetProfile_index_v1"); + + // Mock search to throw IOException + when(searchClient.search(any(), any())).thenThrow(new IOException("Search failed")); + + Map> urnAspects = new HashMap<>(); + urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify - should return empty map and not throw exception + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithInvalidUrn() { + // Setup + String invalidUrnString = "invalid:urn:format"; + String aspectName = "datasetProfile"; + + Map> urnAspects = new HashMap<>(); + urnAspects.put(invalidUrnString, new HashSet<>(Arrays.asList(aspectName))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify - should return empty map due to URN parsing error + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithNullAspectSet() throws IOException { + // Setup + String urnString = "urn:li:dataset:123"; + + Map> urnAspects = new HashMap<>(); + urnAspects.put(urnString, null); // null aspect set + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify - should handle null gracefully and return empty map + Assert.assertEquals(result, Collections.emptyMap()); + } + + @Test + public void testRawWithMultipleUrns() throws IOException { + // Setup + String urnString1 = "urn:li:dataset:123"; + String urnString2 = "urn:li:dataset:456"; + String aspectName = "datasetProfile"; + + when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName))) + .thenReturn("dataset_datasetProfile_index_v1"); + + // Mock search responses for both URNs + Map sourceMap1 = new HashMap<>(); + sourceMap1.put(MappingsBuilder.URN_FIELD, urnString1); + sourceMap1.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567890L); + sourceMap1.put("data", "value1"); + + Map sourceMap2 = new HashMap<>(); + sourceMap2.put(MappingsBuilder.URN_FIELD, urnString2); + sourceMap2.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567891L); + sourceMap2.put("data", "value2"); + + SearchHit mockHit1 = mock(SearchHit.class); + when(mockHit1.getSourceAsMap()).thenReturn(sourceMap1); + SearchHits mockHits1 = mock(SearchHits.class); + when(mockHits1.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO)); + when(mockHits1.getHits()).thenReturn(new SearchHit[] {mockHit1}); + SearchResponse mockResponse1 = mock(SearchResponse.class); + when(mockResponse1.getHits()).thenReturn(mockHits1); + + SearchHit mockHit2 = mock(SearchHit.class); + when(mockHit2.getSourceAsMap()).thenReturn(sourceMap2); + SearchHits mockHits2 = mock(SearchHits.class); + when(mockHits2.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO)); + when(mockHits2.getHits()).thenReturn(new SearchHit[] {mockHit2}); + SearchResponse mockResponse2 = mock(SearchResponse.class); + when(mockResponse2.getHits()).thenReturn(mockHits2); + + when(searchClient.search(any(), any())).thenReturn(mockResponse1).thenReturn(mockResponse2); + + Map> urnAspects = new HashMap<>(); + urnAspects.put(urnString1, new HashSet<>(Arrays.asList(aspectName))); + urnAspects.put(urnString2, new HashSet<>(Arrays.asList(aspectName))); + + // Execute + Map>> result = + _timeseriesAspectService.raw(opContext, urnAspects); + + // Verify + Assert.assertEquals(result.size(), 2); + Urn expectedUrn1 = UrnUtils.getUrn(urnString1); + Urn expectedUrn2 = UrnUtils.getUrn(urnString2); + Assert.assertTrue(result.containsKey(expectedUrn1)); + Assert.assertTrue(result.containsKey(expectedUrn2)); + Assert.assertEquals(result.get(expectedUrn1).get(aspectName), sourceMap1); + Assert.assertEquals(result.get(expectedUrn2).get(aspectName), sourceMap2); + } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java index e65d42bfbf..669deae759 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java @@ -75,6 +75,24 @@ public class GlobalControllerExceptionHandler extends DefaultHandlerExceptionRes return new ResponseEntity<>(Map.of("error", e.getMessage()), HttpStatus.FORBIDDEN); } + @ExceptionHandler(RuntimeException.class) + public ResponseEntity> handleRuntimeException( + RuntimeException e, HttpServletRequest request) { + + // Check if this exception originates from UrnUtils.getUrn() + for (StackTraceElement element : e.getStackTrace()) { + if (element.getClassName().equals("com.linkedin.common.urn.UrnUtils") + && element.getMethodName().equals("getUrn")) { + log.error("Invalid URN format in request: {}", request.getRequestURI(), e); + return new ResponseEntity<>( + Map.of("error", "Invalid URN format: " + e.getMessage()), HttpStatus.BAD_REQUEST); + } + } + + // For other RuntimeExceptions, let them bubble up to the generic handler + return handleGenericException(e, request); + } + @Override protected void logException(Exception ex, HttpServletRequest request) { log.error("Error while resolving request: {}", request.getRequestURI(), ex); diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchController.java index 1fc6adb275..76edc186ef 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchController.java @@ -13,7 +13,6 @@ import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher; import com.deblock.jsondiff.viewer.PatchDiffViewer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; @@ -32,16 +31,12 @@ import io.datahubproject.metadata.context.RequestContext; import io.datahubproject.openapi.util.ElasticsearchUtils; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.media.Content; -import io.swagger.v3.oas.annotations.media.Schema; -import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -181,53 +176,6 @@ public class ElasticsearchController { return ResponseEntity.ok(j.toString()); } - @PostMapping(path = "/entity/raw", produces = MediaType.APPLICATION_JSON_VALUE) - @Operation( - description = - "Retrieves raw Elasticsearch documents for the provided URNs. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.", - responses = { - @ApiResponse( - responseCode = "200", - description = "Successfully retrieved raw documents", - content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)), - @ApiResponse( - responseCode = "403", - description = "Caller not authorized to access raw documents"), - @ApiResponse(responseCode = "400", description = "Invalid URN format provided") - }) - public ResponseEntity>> getEntityRaw( - HttpServletRequest request, - @RequestBody - @Nonnull - @Schema( - description = "Set of URN strings to fetch raw documents for", - example = "[\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\"]") - Set urnStrs) { - - Set urns = urnStrs.stream().map(UrnUtils::getUrn).collect(Collectors.toSet()); - - Authentication authentication = AuthenticationContext.getAuthentication(); - String actorUrnStr = authentication.getActor().toUrnStr(); - OperationContext opContext = - systemOperationContext.asSession( - RequestContext.builder() - .buildOpenapi( - actorUrnStr, - request, - "getRawEntity", - urns.stream().map(Urn::getEntityType).distinct().toList()), - authorizerChain, - authentication); - - if (!AuthUtil.isAPIOperationsAuthorized( - opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) { - log.error("{} is not authorized to get raw ES documents", actorUrnStr); - return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null); - } - - return ResponseEntity.ok(searchService.raw(opContext, urns)); - } - @GetMapping(path = "/explainSearchQuery", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Explain Search Query") public ResponseEntity explainSearchQuery( diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawController.java new file mode 100644 index 0000000000..1211c5240f --- /dev/null +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawController.java @@ -0,0 +1,259 @@ +package io.datahubproject.openapi.operations.elastic; + +import com.datahub.authentication.Authentication; +import com.datahub.authentication.AuthenticationContext; +import com.datahub.authorization.AuthUtil; +import com.datahub.authorization.AuthorizerChain; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.authorization.PoliciesConfig; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.systemmetadata.SystemMetadataService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.RequestContext; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.servlet.http.HttpServletRequest; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/openapi/operations/elasticSearch") +@Slf4j +@Tag(name = "ElasticSearch Raw Operations", description = "An API debugging raw ES documents") +public class ElasticsearchRawController { + private final AuthorizerChain authorizerChain; + private final OperationContext systemOperationContext; + private final SystemMetadataService systemMetadataService; + private final TimeseriesAspectService timeseriesAspectService; + private final EntitySearchService searchService; + private final GraphService graphService; + + public ElasticsearchRawController( + OperationContext systemOperationContext, + SystemMetadataService systemMetadataService, + TimeseriesAspectService timeseriesAspectService, + EntitySearchService searchService, + AuthorizerChain authorizerChain, + GraphService graphService) { + this.systemOperationContext = systemOperationContext; + this.authorizerChain = authorizerChain; + this.systemMetadataService = systemMetadataService; + this.timeseriesAspectService = timeseriesAspectService; + this.searchService = searchService; + this.graphService = graphService; + } + + @PostMapping(path = "/entity/raw", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation( + description = + "Retrieves raw Elasticsearch documents for the provided URNs. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully retrieved raw documents", + content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)), + @ApiResponse( + responseCode = "403", + description = "Caller not authorized to access raw documents"), + @ApiResponse(responseCode = "400", description = "Invalid URN format provided") + }) + public ResponseEntity>> getEntityRaw( + HttpServletRequest request, + @RequestBody + @Nonnull + @Schema( + description = "Set of URN strings to fetch raw documents for", + example = "[\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\"]") + Set urnStrs) { + + Set urns = urnStrs.stream().map(UrnUtils::getUrn).collect(Collectors.toSet()); + + Authentication authentication = AuthenticationContext.getAuthentication(); + String actorUrnStr = authentication.getActor().toUrnStr(); + OperationContext opContext = + systemOperationContext.asSession( + RequestContext.builder() + .buildOpenapi( + actorUrnStr, + request, + "getRawEntity", + urns.stream().map(Urn::getEntityType).distinct().toList()), + authorizerChain, + authentication); + + if (!AuthUtil.isAPIOperationsAuthorized( + opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) { + log.error("{} is not authorized to get raw ES documents", actorUrnStr); + return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null); + } + + return ResponseEntity.ok(searchService.raw(opContext, urns)); + } + + @PostMapping(path = "/systemmetadata/raw", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation( + description = + "Retrieves raw Elasticsearch system metadata document for the provided URN & aspect. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully retrieved raw documents", + content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)), + @ApiResponse( + responseCode = "403", + description = "Caller not authorized to access raw documents"), + @ApiResponse(responseCode = "400", description = "Invalid URN format provided") + }) + public ResponseEntity>>> getSystemMetadataRaw( + HttpServletRequest request, + @RequestBody + @Nonnull + @Schema( + description = "Map of URNs to aspect names to fetch raw documents for", + example = + "{\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\":[\"status\",\"datasetProperties\"]}") + Map> urnAspects) { + + Set urns = urnAspects.keySet().stream().map(UrnUtils::getUrn).collect(Collectors.toSet()); + + Authentication authentication = AuthenticationContext.getAuthentication(); + String actorUrnStr = authentication.getActor().toUrnStr(); + OperationContext opContext = + systemOperationContext.asSession( + RequestContext.builder() + .buildOpenapi( + actorUrnStr, + request, + "getRawSystemMetadata", + urns.stream().map(Urn::getEntityType).distinct().toList()), + authorizerChain, + authentication); + + if (!AuthUtil.isAPIOperationsAuthorized( + opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) { + log.error("{} is not authorized to get raw ES documents", actorUrnStr); + return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null); + } + + return ResponseEntity.ok(systemMetadataService.raw(opContext, urnAspects)); + } + + @PostMapping(path = "/timeseries/raw", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation( + description = + "Retrieves raw Elasticsearch timeseries document for the provided URN & aspect. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully retrieved raw documents", + content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)), + @ApiResponse( + responseCode = "403", + description = "Caller not authorized to access raw documents"), + @ApiResponse(responseCode = "400", description = "Invalid URN format provided") + }) + public ResponseEntity>>> getTimeseriesRaw( + HttpServletRequest request, + @RequestBody + @Nonnull + @Schema( + description = "Map of URNs to aspect names to fetch raw documents for", + example = + "{\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\":[\"datasetProfile\"]}") + Map> urnAspects) { + + Set urns = urnAspects.keySet().stream().map(UrnUtils::getUrn).collect(Collectors.toSet()); + + Authentication authentication = AuthenticationContext.getAuthentication(); + String actorUrnStr = authentication.getActor().toUrnStr(); + OperationContext opContext = + systemOperationContext.asSession( + RequestContext.builder() + .buildOpenapi( + actorUrnStr, + request, + "getRawTimeseries", + urns.stream().map(Urn::getEntityType).distinct().toList()), + authorizerChain, + authentication); + + if (!AuthUtil.isAPIOperationsAuthorized( + opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) { + log.error("{} is not authorized to get raw ES documents", actorUrnStr); + return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null); + } + + return ResponseEntity.ok(timeseriesAspectService.raw(opContext, urnAspects)); + } + + @PostMapping(path = "/graph/raw", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation( + description = + "Retrieves raw Elasticsearch graph edge document for the provided graph nodes and relationship type. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully retrieved raw documents", + content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)), + @ApiResponse( + responseCode = "403", + description = "Caller not authorized to access raw documents"), + @ApiResponse(responseCode = "400", description = "Invalid URN format provided") + }) + public ResponseEntity>> getGraphRaw( + HttpServletRequest request, + @RequestBody + @Nonnull + @Schema( + description = + "List of node and relationship tuples. Non-directed, multiple edges may be returned.", + example = + "[{\"a\":\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)\", \"b\": \"urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)\", \"relationshipType\": \"DownstreamOf\"}]") + List edgeTuples) { + + Set urns = + edgeTuples.stream() + .flatMap(tuple -> Stream.of(tuple.getA(), tuple.getB())) + .map(UrnUtils::getUrn) + .collect(Collectors.toSet()); + + Authentication authentication = AuthenticationContext.getAuthentication(); + String actorUrnStr = authentication.getActor().toUrnStr(); + OperationContext opContext = + systemOperationContext.asSession( + RequestContext.builder() + .buildOpenapi( + actorUrnStr, + request, + "getRawGraph", + urns.stream().map(Urn::getEntityType).distinct().toList()), + authorizerChain, + authentication); + + if (!AuthUtil.isAPIOperationsAuthorized( + opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) { + log.error("{} is not authorized to get raw ES documents", actorUrnStr); + return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null); + } + + return ResponseEntity.ok(graphService.raw(opContext, edgeTuples)); + } +} diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchControllerTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchControllerTest.java index 15e02ecbde..1ad73da6e4 100644 --- a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchControllerTest.java +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchControllerTest.java @@ -23,6 +23,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult; +import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.systemmetadata.SystemMetadataService; @@ -395,6 +396,7 @@ public class ElasticsearchControllerTest extends AbstractTestNGSpringContextTest @MockBean public TimeseriesAspectService timeseriesAspectService; @MockBean public EntitySearchService searchService; @MockBean public EntityService entityService; + @MockBean public GraphService graphService; @Bean public ObjectMapper objectMapper() { @@ -443,5 +445,11 @@ public class ElasticsearchControllerTest extends AbstractTestNGSpringContextTest return authorizerChain; } + + @Bean + @Primary + public GraphService graphService() { + return graphService; + } } } diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawControllerTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawControllerTest.java new file mode 100644 index 0000000000..2477a31c29 --- /dev/null +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/operations/elastic/ElasticsearchRawControllerTest.java @@ -0,0 +1,363 @@ +package io.datahubproject.openapi.operations.elastic; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.testng.Assert.assertNotNull; + +import com.datahub.authentication.Actor; +import com.datahub.authentication.ActorType; +import com.datahub.authentication.Authentication; +import com.datahub.authentication.AuthenticationContext; +import com.datahub.authorization.AuthUtil; +import com.datahub.authorization.AuthorizationResult; +import com.datahub.authorization.AuthorizerChain; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.systemmetadata.SystemMetadataService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.openapi.config.GlobalControllerExceptionHandler; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.context.annotation.Import; +import org.springframework.http.MediaType; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@WebMvcTest(ElasticsearchRawController.class) +@ContextConfiguration(classes = ElasticsearchControllerTest.ElasticsearchControllerTestConfig.class) +@Import({AuthUtil.class, GlobalControllerExceptionHandler.class}) +@AutoConfigureMockMvc +public class ElasticsearchRawControllerTest extends AbstractTestNGSpringContextTests { + + private static final Urn TEST_URN_1 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)"); + private static final Urn TEST_URN_2 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test.table,PROD)"); + private static final Urn TEST_URN_3 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,/path/to/data,PROD)"); + + @Autowired private ElasticsearchRawController elasticsearchRawController; + + @Autowired private MockMvc mockMvc; + + @Autowired private SystemMetadataService mockSystemMetadataService; + + @Autowired private TimeseriesAspectService mockTimeseriesAspectService; + + @Autowired private EntitySearchService mockSearchService; + + @Autowired private GraphService mockGraphService; + + @Autowired private AuthorizerChain authorizerChain; + + @BeforeMethod + public void setupMocks() { + // Setup Authentication + Authentication authentication = mock(Authentication.class); + when(authentication.getActor()).thenReturn(new Actor(ActorType.USER, "datahub")); + AuthenticationContext.setAuthentication(authentication); + + // Setup AuthorizerChain to allow access + when(authorizerChain.authorize(any())) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + } + + @Test + public void initTest() { + assertNotNull(elasticsearchRawController); + } + + @Test + public void testGetEntityRaw() throws Exception { + // Mock raw entity response + Map> rawEntityMap = new HashMap<>(); + + Map entity1Map = new HashMap<>(); + entity1Map.put("urn", TEST_URN_1.toString()); + entity1Map.put("name", "Sample Table"); + entity1Map.put("platform", "hive"); + entity1Map.put("_index", "datahub_entity_v2"); + entity1Map.put("_id", TEST_URN_1.toString()); + + Map entity2Map = new HashMap<>(); + entity2Map.put("urn", TEST_URN_2.toString()); + entity2Map.put("name", "Test Table"); + entity2Map.put("platform", "snowflake"); + entity2Map.put("_index", "datahub_entity_v2"); + entity2Map.put("_id", TEST_URN_2.toString()); + + rawEntityMap.put(TEST_URN_1, entity1Map); + rawEntityMap.put(TEST_URN_2, entity2Map); + + when(mockSearchService.raw(any(OperationContext.class), anySet())).thenReturn(rawEntityMap); + + // Prepare request body + Set urnStrs = new HashSet<>(); + urnStrs.add(TEST_URN_1.toString()); + urnStrs.add(TEST_URN_2.toString()); + + ObjectMapper objectMapper = new ObjectMapper(); + String requestBody = objectMapper.writeValueAsString(urnStrs); + + // Test the endpoint + mockMvc + .perform( + MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/entity/raw") + .content(requestBody) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andDo( + result -> { + String responseContent = result.getResponse().getContentAsString(); + System.out.println("Response content: " + responseContent); + }) + .andExpect( + MockMvcResultMatchers.jsonPath("$['" + TEST_URN_1.toString() + "'].name") + .value("Sample Table")) + .andExpect( + MockMvcResultMatchers.jsonPath("$['" + TEST_URN_1.toString() + "'].platform") + .value("hive")) + .andExpect( + MockMvcResultMatchers.jsonPath("$['" + TEST_URN_2.toString() + "'].name") + .value("Test Table")) + .andExpect( + MockMvcResultMatchers.jsonPath("$['" + TEST_URN_2.toString() + "'].platform") + .value("snowflake")); + } + + @Test + public void testGetEntityRawUnauthorized() throws Exception { + // Setup AuthorizerChain to deny access + when(authorizerChain.authorize(any())) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.DENY, "")); + + // Prepare request body + Set urnStrs = new HashSet<>(); + urnStrs.add(TEST_URN_1.toString()); + + ObjectMapper objectMapper = new ObjectMapper(); + String requestBody = objectMapper.writeValueAsString(urnStrs); + + // Test the endpoint - should return 403 + mockMvc + .perform( + MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/entity/raw") + .content(requestBody) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isForbidden()); + } + + @Test + public void testGetSystemMetadataRaw() throws Exception { + // Mock raw system metadata response + Map>> rawSystemMetadataMap = new HashMap<>(); + + Map> urn1Aspects = new HashMap<>(); + Map statusAspect = new HashMap<>(); + statusAspect.put("removed", false); + statusAspect.put("_index", "system_metadata_service_v1"); + statusAspect.put("_id", TEST_URN_1.toString() + "_status"); + urn1Aspects.put("status", statusAspect); + + Map propertiesAspect = new HashMap<>(); + propertiesAspect.put("description", "Sample dataset"); + propertiesAspect.put("_index", "system_metadata_service_v1"); + propertiesAspect.put("_id", TEST_URN_1.toString() + "_datasetProperties"); + urn1Aspects.put("datasetProperties", propertiesAspect); + + rawSystemMetadataMap.put(TEST_URN_1, urn1Aspects); + + when(mockSystemMetadataService.raw(any(OperationContext.class), anyMap())) + .thenReturn(rawSystemMetadataMap); + + // Prepare request body + Map> urnAspects = new HashMap<>(); + Set aspects = new HashSet<>(); + aspects.add("status"); + aspects.add("datasetProperties"); + urnAspects.put(TEST_URN_1.toString(), aspects); + + ObjectMapper objectMapper = new ObjectMapper(); + String requestBody = objectMapper.writeValueAsString(urnAspects); + + // Test the endpoint + mockMvc + .perform( + MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/systemmetadata/raw") + .content(requestBody) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andDo( + result -> { + String responseContent = result.getResponse().getContentAsString(); + System.out.println("Response content: " + responseContent); + }) + .andExpect( + MockMvcResultMatchers.jsonPath("$['" + TEST_URN_1.toString() + "'].status.removed") + .value(false)) + .andExpect( + MockMvcResultMatchers.jsonPath( + "$['" + TEST_URN_1.toString() + "'].datasetProperties.description") + .value("Sample dataset")); + } + + @Test + public void testGetTimeseriesRaw() throws Exception { + // Mock raw timeseries response + Map>> rawTimeseriesMap = new HashMap<>(); + + Map> urn1Aspects = new HashMap<>(); + Map profileAspect = new HashMap<>(); + profileAspect.put("timestampMillis", 1234567890L); + profileAspect.put("rowCount", 1000000L); + profileAspect.put("columnCount", 25); + profileAspect.put("_index", "dataset_datasetprofileaspect_v1"); + profileAspect.put("_id", TEST_URN_1.toString() + "_datasetProfile_1234567890"); + urn1Aspects.put("datasetProfile", profileAspect); + + rawTimeseriesMap.put(TEST_URN_1, urn1Aspects); + + when(mockTimeseriesAspectService.raw(any(OperationContext.class), anyMap())) + .thenReturn(rawTimeseriesMap); + + // Prepare request body + Map> urnAspects = new HashMap<>(); + Set aspects = new HashSet<>(); + aspects.add("datasetProfile"); + urnAspects.put(TEST_URN_1.toString(), aspects); + + ObjectMapper objectMapper = new ObjectMapper(); + String requestBody = objectMapper.writeValueAsString(urnAspects); + + // Test the endpoint + mockMvc + .perform( + MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/timeseries/raw") + .content(requestBody) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andDo( + result -> { + String responseContent = result.getResponse().getContentAsString(); + System.out.println("Response content: " + responseContent); + }) + .andExpect( + MockMvcResultMatchers.jsonPath( + "$['" + TEST_URN_1.toString() + "'].datasetProfile.timestampMillis") + .value(1234567890L)) + .andExpect( + MockMvcResultMatchers.jsonPath( + "$['" + TEST_URN_1.toString() + "'].datasetProfile.rowCount") + .value(1000000L)) + .andExpect( + MockMvcResultMatchers.jsonPath( + "$['" + TEST_URN_1.toString() + "'].datasetProfile.columnCount") + .value(25)); + } + + @Test + public void testGetGraphRaw() throws Exception { + // Mock raw graph response + List> rawGraphList = new ArrayList<>(); + + Map edge1 = new HashMap<>(); + edge1.put("source", TEST_URN_1.toString()); + edge1.put("destination", TEST_URN_2.toString()); + edge1.put("relationshipType", "DownstreamOf"); + edge1.put("_index", "graph_service_v1"); + edge1.put("_id", "edge1_id"); + + Map edge2 = new HashMap<>(); + edge2.put("source", TEST_URN_2.toString()); + edge2.put("destination", TEST_URN_3.toString()); + edge2.put("relationshipType", "DownstreamOf"); + edge2.put("_index", "graph_service_v1"); + edge2.put("_id", "edge2_id"); + + rawGraphList.add(edge1); + rawGraphList.add(edge2); + + when(mockGraphService.raw(any(OperationContext.class), anyList())).thenReturn(rawGraphList); + + // Prepare request body + List edgeTuples = new ArrayList<>(); + GraphService.EdgeTuple tuple1 = new GraphService.EdgeTuple(); + tuple1.setA(TEST_URN_1.toString()); + tuple1.setB(TEST_URN_2.toString()); + tuple1.setRelationshipType("DownstreamOf"); + edgeTuples.add(tuple1); + + GraphService.EdgeTuple tuple2 = new GraphService.EdgeTuple(); + tuple2.setA(TEST_URN_2.toString()); + tuple2.setB(TEST_URN_3.toString()); + tuple2.setRelationshipType("DownstreamOf"); + edgeTuples.add(tuple2); + + ObjectMapper objectMapper = new ObjectMapper(); + String requestBody = objectMapper.writeValueAsString(edgeTuples); + + // Test the endpoint + mockMvc + .perform( + MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/graph/raw") + .content(requestBody) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andDo( + result -> { + String responseContent = result.getResponse().getContentAsString(); + System.out.println("Response content: " + responseContent); + }) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$.length()").value(2)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].source").value(TEST_URN_1.toString())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].destination").value(TEST_URN_2.toString())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].relationshipType").value("DownstreamOf")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].source").value(TEST_URN_2.toString())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].destination").value(TEST_URN_3.toString())); + } + + @Test + public void testInvalidUrnFormatWithProperExceptionHandling() throws Exception { + Set urnStrs = new HashSet<>(); + urnStrs.add("invalid:urn:format"); + + ObjectMapper objectMapper = new ObjectMapper(); + String requestBody = objectMapper.writeValueAsString(urnStrs); + + mockMvc + .perform( + MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/entity/raw") + .content(requestBody) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isBadRequest()) + .andExpect(MockMvcResultMatchers.jsonPath("$.error").exists()); + } +} diff --git a/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java b/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java index 8a0c368430..051f31004c 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java +++ b/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java @@ -156,4 +156,10 @@ public class MockTimeseriesAspectService implements TimeseriesAspectService { @Nullable Long endTimeMillis) { return TimeseriesScrollResult.builder().build(); } + + @Override + public Map>> raw( + OperationContext opContext, Map> urnAspects) { + return Map.of(); + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java index dc5f7abca4..8701d754ef 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/GraphService.java @@ -12,9 +12,13 @@ import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.query.filter.SortCriterion; import io.datahubproject.metadata.context.OperationContext; import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; public interface GraphService { @@ -259,4 +263,22 @@ public interface GraphService { @Nullable Integer count, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis); + + /** + * Returns list of edge documents for the given graph node and relationship tuples. Non-directed + * + * @param opContext operation context + * @param edgeTuples Non-directed nodes and relationship types + * @return list of documents matching the input criteria + */ + List> raw(OperationContext opContext, List edgeTuples); + + @AllArgsConstructor + @NoArgsConstructor + @Data + class EdgeTuple { + String a; + String b; + String relationshipType; + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java index 63df67f208..4f9334b16c 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java @@ -5,9 +5,11 @@ import com.linkedin.metadata.config.SystemMetadataServiceConfig; import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.run.IngestionRunSummary; import com.linkedin.mxe.SystemMetadata; +import io.datahubproject.metadata.context.OperationContext; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opensearch.client.tasks.GetTaskResponse; @@ -60,4 +62,14 @@ public interface SystemMetadataService { default void configure() {} void clear(); + + /** + * Returns raw elasticsearch documents + * + * @param opContext operation context + * @param urnAspects the map of urns to aspect names + * @return map of urns, aspect names, to raw elasticsearch documents + */ + Map>> raw( + OperationContext opContext, Map> urnAspects); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java index 242210ee3c..aaf978d096 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java @@ -249,4 +249,14 @@ public interface TimeseriesAspectService { @Nullable Integer count, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis); + + /** + * Returns the latest raw timeseries document + * + * @param opContext operation context + * @param urnAspects the urn to timeseries aspects to retrieve + * @return the raw ES documents + */ + Map>> raw( + OperationContext opContext, Map> urnAspects); }