mirror of
https://github.com/datahub-project/datahub.git
synced 2025-06-27 05:03:31 +00:00
feat(operations): add es raw operations endpoints (#13855)
This commit is contained in:
parent
83b9eca358
commit
677182daf7
@ -579,6 +579,24 @@ public class ESGraphQueryDAO {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a search request against the graph index. This method is exposed for use by other
|
||||
* graph service methods that need direct search access.
|
||||
*
|
||||
* @param searchRequest The search request to execute
|
||||
* @return The search response from Elasticsearch
|
||||
* @throws ESQueryException if the search fails
|
||||
*/
|
||||
SearchResponse executeSearch(@Nonnull SearchRequest searchRequest) {
|
||||
try {
|
||||
MetricUtils.counter(this.getClass(), SEARCH_EXECUTIONS_METRIC).inc();
|
||||
return client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
} catch (Exception e) {
|
||||
log.error("Search query failed", e);
|
||||
throw new ESQueryException("Search query failed:", e);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public QueryBuilder getLineageQuery(
|
||||
@Nonnull OperationContext opContext,
|
||||
|
@ -44,6 +44,7 @@ import com.linkedin.util.Pair;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.opentelemetry.instrumentation.annotations.WithSpan;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -57,12 +58,16 @@ import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.opensearch.action.search.SearchRequest;
|
||||
import org.opensearch.action.search.SearchResponse;
|
||||
import org.opensearch.index.query.BoolQueryBuilder;
|
||||
import org.opensearch.index.query.QueryBuilder;
|
||||
import org.opensearch.index.query.QueryBuilders;
|
||||
import org.opensearch.script.Script;
|
||||
import org.opensearch.script.ScriptType;
|
||||
import org.opensearch.search.SearchHit;
|
||||
import org.opensearch.search.SearchHits;
|
||||
import org.opensearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@ -353,6 +358,100 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of edge documents for the given graph node and relationship tuples. Non-directed
|
||||
*
|
||||
* @param opContext operation context
|
||||
* @param edgeTuples Non-directed nodes and relationship types
|
||||
* @return list of documents matching the input criteria
|
||||
*/
|
||||
@Override
|
||||
public List<Map<String, Object>> raw(OperationContext opContext, List<EdgeTuple> edgeTuples) {
|
||||
|
||||
if (edgeTuples == null || edgeTuples.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<Map<String, Object>> results = new ArrayList<>();
|
||||
|
||||
// Build a single query for all edge tuples
|
||||
BoolQueryBuilder mainQuery = QueryBuilders.boolQuery();
|
||||
|
||||
// For each edge tuple, create a query that matches edges in either direction
|
||||
for (EdgeTuple tuple : edgeTuples) {
|
||||
if (tuple.getA() == null || tuple.getB() == null || tuple.getRelationshipType() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create a query for this specific edge tuple (non-directed)
|
||||
BoolQueryBuilder tupleQuery = QueryBuilders.boolQuery();
|
||||
|
||||
// Match relationship type
|
||||
tupleQuery.filter(
|
||||
QueryBuilders.termQuery(EDGE_FIELD_RELNSHIP_TYPE, tuple.getRelationshipType()));
|
||||
|
||||
// Match nodes in either direction: (a->b) OR (b->a)
|
||||
BoolQueryBuilder directionQuery = QueryBuilders.boolQuery();
|
||||
|
||||
// Direction 1: a is source, b is destination
|
||||
BoolQueryBuilder direction1 = QueryBuilders.boolQuery();
|
||||
direction1.filter(QueryBuilders.termQuery(EDGE_FIELD_SOURCE + ".urn", tuple.getA()));
|
||||
direction1.filter(QueryBuilders.termQuery(EDGE_FIELD_DESTINATION + ".urn", tuple.getB()));
|
||||
|
||||
// Direction 2: b is source, a is destination
|
||||
BoolQueryBuilder direction2 = QueryBuilders.boolQuery();
|
||||
direction2.filter(QueryBuilders.termQuery(EDGE_FIELD_SOURCE + ".urn", tuple.getB()));
|
||||
direction2.filter(QueryBuilders.termQuery(EDGE_FIELD_DESTINATION + ".urn", tuple.getA()));
|
||||
|
||||
// Either direction is acceptable
|
||||
directionQuery.should(direction1);
|
||||
directionQuery.should(direction2);
|
||||
directionQuery.minimumShouldMatch(1);
|
||||
|
||||
// Combine relationship type and direction queries
|
||||
tupleQuery.filter(directionQuery);
|
||||
|
||||
// Add this tuple query as a "should" clause (OR condition)
|
||||
mainQuery.should(tupleQuery);
|
||||
}
|
||||
|
||||
// At least one of the edge tuples must match
|
||||
mainQuery.minimumShouldMatch(1);
|
||||
|
||||
// Build search request
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
|
||||
searchSourceBuilder.query(mainQuery);
|
||||
// Set a reasonable size limit - adjust based on expected number of edges
|
||||
searchSourceBuilder.size(getGraphServiceConfig().getLimit().getResults().getApiDefault());
|
||||
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
searchRequest.indices(indexConvention.getIndexName(INDEX_NAME));
|
||||
|
||||
// Execute search using the graphReadDAO's search client
|
||||
SearchResponse searchResponse = graphReadDAO.executeSearch(searchRequest);
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
|
||||
// Process each hit
|
||||
for (SearchHit hit : hits.getHits()) {
|
||||
Map<String, Object> sourceMap = hit.getSourceAsMap();
|
||||
|
||||
results.add(sourceMap);
|
||||
}
|
||||
|
||||
// Log if we hit the size limit
|
||||
if (hits.getTotalHits() != null && hits.getTotalHits().value > hits.getHits().length) {
|
||||
log.warn(
|
||||
"Total hits {} exceeds returned size {}. Some edges may be missing. "
|
||||
+ "Consider implementing pagination or increasing the size limit.",
|
||||
hits.getTotalHits().value,
|
||||
hits.getHits().length);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private static List<RelatedEntities> searchHitsToRelatedEntities(
|
||||
SearchHit[] searchHits, RelationshipDirection relationshipDirection) {
|
||||
return Arrays.stream(searchHits)
|
||||
|
@ -1072,4 +1072,9 @@ public class Neo4jGraphService implements GraphService {
|
||||
.scrollId(nextScrollId)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Map<String, Object>> raw(OperationContext opContext, List<EdgeTuple> edgeTuples) {
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.SetMode;
|
||||
import com.linkedin.metadata.config.SystemMetadataServiceConfig;
|
||||
import com.linkedin.metadata.run.AspectRowSummary;
|
||||
@ -18,10 +19,12 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import com.linkedin.structured.StructuredPropertyDefinition;
|
||||
import com.linkedin.util.Pair;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collection;
|
||||
@ -268,6 +271,79 @@ public class ElasticSearchSystemMetadataService
|
||||
QueryBuilders.matchAllQuery(), true, _indexConvention.getIndexName(INDEX_NAME));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Urn, Map<String, Map<String, Object>>> raw(
|
||||
OperationContext opContext, Map<String, Set<String>> urnAspects) {
|
||||
|
||||
if (urnAspects == null || urnAspects.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result = new HashMap<>();
|
||||
|
||||
// Build a list of all document IDs we need to fetch
|
||||
List<String> docIds = new ArrayList<>();
|
||||
for (Map.Entry<String, Set<String>> entry : urnAspects.entrySet()) {
|
||||
String urnString = entry.getKey();
|
||||
Set<String> aspects = entry.getValue();
|
||||
|
||||
if (aspects != null && !aspects.isEmpty()) {
|
||||
for (String aspect : aspects) {
|
||||
docIds.add(toDocId(urnString, aspect));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (docIds.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
// Query for all documents by their IDs
|
||||
BoolQueryBuilder query = QueryBuilders.boolQuery();
|
||||
query.filter(QueryBuilders.idsQuery().addIds(docIds.toArray(new String[0])));
|
||||
|
||||
// Use scroll to retrieve all matching documents
|
||||
SearchResponse searchResponse =
|
||||
_esDAO.scroll(
|
||||
query,
|
||||
true,
|
||||
null, // scrollId
|
||||
null, // pitId
|
||||
null, // keepAlive
|
||||
systemMetadataServiceConfig.getLimit().getResults().getApiDefault());
|
||||
|
||||
if (searchResponse != null && searchResponse.getHits() != null) {
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
|
||||
// Process each hit
|
||||
Arrays.stream(hits.getHits())
|
||||
.forEach(
|
||||
hit -> {
|
||||
Map<String, Object> sourceMap = hit.getSourceAsMap();
|
||||
String urnString = (String) sourceMap.get(FIELD_URN);
|
||||
String aspectName = (String) sourceMap.get(FIELD_ASPECT);
|
||||
|
||||
if (urnString != null && aspectName != null) {
|
||||
try {
|
||||
Urn urn = UrnUtils.getUrn(urnString);
|
||||
|
||||
// Get or create the aspect map for this URN
|
||||
Map<String, Map<String, Object>> aspectDocuments =
|
||||
result.computeIfAbsent(urn, k -> new HashMap<>());
|
||||
|
||||
// Store the raw document for this aspect
|
||||
aspectDocuments.put(aspectName, sourceMap);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error parsing URN {} in raw method: {}", urnString, e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static List<AspectRowSummary> toAspectRowSummary(SearchResponse searchResponse) {
|
||||
if (searchResponse != null) {
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
|
@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.ByteString;
|
||||
import com.linkedin.metadata.aspect.EnvelopedAspect;
|
||||
import com.linkedin.metadata.config.ConfigUtils;
|
||||
@ -759,6 +760,100 @@ public class ElasticSearchTimeseriesAspectService
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Urn, Map<String, Map<String, Object>>> raw(
|
||||
OperationContext opContext, Map<String, Set<String>> urnAspects) {
|
||||
|
||||
if (urnAspects == null || urnAspects.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result = new HashMap<>();
|
||||
|
||||
// Process each URN and its timeseries aspects
|
||||
for (Map.Entry<String, Set<String>> entry : urnAspects.entrySet()) {
|
||||
String urnString = entry.getKey();
|
||||
Set<String> aspectNames =
|
||||
Optional.ofNullable(entry.getValue()).orElse(Collections.emptySet()).stream()
|
||||
.filter(
|
||||
aspectName -> {
|
||||
AspectSpec aspectSpec = entityRegistry.getAspectSpecs().get(aspectName);
|
||||
return aspectSpec != null && aspectSpec.isTimeseries();
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (aspectNames.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
Urn urn = UrnUtils.getUrn(urnString);
|
||||
String entityName = urn.getEntityType();
|
||||
Map<String, Map<String, Object>> aspectDocuments = new HashMap<>();
|
||||
|
||||
// For each aspect, find the latest document
|
||||
for (String aspectName : aspectNames) {
|
||||
try {
|
||||
// Build query to find the latest document for this URN and aspect
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.termQuery(MappingsBuilder.URN_FIELD, urnString));
|
||||
|
||||
// Build search request
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.size(1); // Only need the latest document
|
||||
// Sort by timestamp descending to get the most recent document
|
||||
searchSourceBuilder.sort(
|
||||
SortBuilders.fieldSort(MappingsBuilder.TIMESTAMP_FIELD).order(SortOrder.DESC));
|
||||
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
|
||||
// Get the index name for this entity and aspect
|
||||
String indexName =
|
||||
opContext
|
||||
.getSearchContext()
|
||||
.getIndexConvention()
|
||||
.getTimeseriesAspectIndexName(entityName, aspectName);
|
||||
searchRequest.indices(indexName);
|
||||
|
||||
// Execute search
|
||||
SearchResponse searchResponse =
|
||||
searchClient.search(searchRequest, RequestOptions.DEFAULT);
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
|
||||
if (hits.getTotalHits() != null
|
||||
&& hits.getTotalHits().value > 0
|
||||
&& hits.getHits().length > 0) {
|
||||
SearchHit latestHit = hits.getHits()[0];
|
||||
Map<String, Object> sourceMap = latestHit.getSourceAsMap();
|
||||
|
||||
// Store the raw document for this aspect
|
||||
aspectDocuments.put(aspectName, sourceMap);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error(
|
||||
"Error fetching latest document for URN: {}, aspect: {}", urnString, aspectName, e);
|
||||
// Continue processing other aspects
|
||||
}
|
||||
}
|
||||
|
||||
// Only add to result if we found documents
|
||||
if (!aspectDocuments.isEmpty()) {
|
||||
result.put(urn, aspectDocuments);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error parsing URN {} in raw method: {}", urnString, e.getMessage(), e);
|
||||
// Continue processing other URNs
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private SearchResponse executeScrollSearchQuery(
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull final String entityName,
|
||||
|
@ -21,6 +21,7 @@ import com.linkedin.metadata.aspect.models.graph.RelatedEntities;
|
||||
import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult;
|
||||
import com.linkedin.metadata.entity.TestEntityRegistry;
|
||||
import com.linkedin.metadata.graph.GraphFilters;
|
||||
import com.linkedin.metadata.graph.GraphService.EdgeTuple;
|
||||
import com.linkedin.metadata.graph.RelatedEntitiesResult;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.LineageRegistry;
|
||||
@ -30,6 +31,7 @@ import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.test.metadata.context.TestOperationContexts;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -37,6 +39,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.opensearch.action.search.SearchRequest;
|
||||
import org.opensearch.action.search.SearchResponse;
|
||||
import org.opensearch.index.query.BoolQueryBuilder;
|
||||
import org.opensearch.index.query.ExistsQueryBuilder;
|
||||
@ -62,7 +65,7 @@ public class ElasticSearchGraphServiceTest {
|
||||
mockESBulkProcessor = mock(ESBulkProcessor.class);
|
||||
mockWriteDAO = mock(ESGraphWriteDAO.class);
|
||||
mockReadDAO = mock(ESGraphQueryDAO.class);
|
||||
when(mockReadDAO.getGraphServiceConfig()).thenReturn(TEST_GRAPH_SERVICE_CONFIG);
|
||||
|
||||
test =
|
||||
new ElasticSearchGraphService(
|
||||
new LineageRegistry(entityRegistry),
|
||||
@ -77,6 +80,7 @@ public class ElasticSearchGraphServiceTest {
|
||||
@BeforeMethod
|
||||
public void beforeMethod() {
|
||||
reset(mockESBulkProcessor, mockWriteDAO, mockReadDAO);
|
||||
when(mockReadDAO.getGraphServiceConfig()).thenReturn(TEST_GRAPH_SERVICE_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -237,6 +241,113 @@ public class ElasticSearchGraphServiceTest {
|
||||
assertTrue(result.getEntities().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRaw() {
|
||||
// Create test edge tuples
|
||||
List<EdgeTuple> edgeTuples =
|
||||
Arrays.asList(
|
||||
new EdgeTuple("urn:li:dataset:1", "urn:li:dataset:2", "Produces"),
|
||||
new EdgeTuple("urn:li:dataset:3", "urn:li:dataset:4", "Consumes"));
|
||||
|
||||
// Create mock search response
|
||||
SearchResponse mockResponse = mock(SearchResponse.class);
|
||||
SearchHits mockHits = mock(SearchHits.class);
|
||||
when(mockResponse.getHits()).thenReturn(mockHits);
|
||||
|
||||
TotalHits totalHits = new TotalHits(2L, TotalHits.Relation.EQUAL_TO);
|
||||
when(mockHits.getTotalHits()).thenReturn(totalHits);
|
||||
|
||||
// Create search hits
|
||||
SearchHit[] searchHits = new SearchHit[2];
|
||||
searchHits[0] = createMockSearchHit("urn:li:dataset:1", "urn:li:dataset:2", "Produces", null);
|
||||
searchHits[1] =
|
||||
createMockSearchHit(
|
||||
"urn:li:dataset:4", "urn:li:dataset:3", "Consumes", "urn:li:dataset:via");
|
||||
when(mockHits.getHits()).thenReturn(searchHits);
|
||||
|
||||
// Mock the executeSearch method
|
||||
when(mockReadDAO.executeSearch(any(SearchRequest.class))).thenReturn(mockResponse);
|
||||
|
||||
// Execute the method
|
||||
OperationContext opContext = TestOperationContexts.systemContextNoValidate();
|
||||
List<Map<String, Object>> results = test.raw(opContext, edgeTuples);
|
||||
|
||||
// Verify the search request was made
|
||||
ArgumentCaptor<SearchRequest> requestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
|
||||
verify(mockReadDAO).executeSearch(requestCaptor.capture());
|
||||
|
||||
// Verify results
|
||||
assertEquals(results.size(), 2);
|
||||
|
||||
// Verify first result
|
||||
Map<String, Object> firstResult = results.get(0);
|
||||
Map<String, Object> source = (Map<String, Object>) firstResult.get("source");
|
||||
Map<String, Object> destination = (Map<String, Object>) firstResult.get("destination");
|
||||
assertEquals(source.get("urn"), "urn:li:dataset:1");
|
||||
assertEquals(destination.get("urn"), "urn:li:dataset:2");
|
||||
assertEquals(firstResult.get("relationshipType"), "Produces");
|
||||
|
||||
// Verify second result
|
||||
Map<String, Object> secondResult = results.get(1);
|
||||
source = (Map<String, Object>) secondResult.get("source");
|
||||
destination = (Map<String, Object>) secondResult.get("destination");
|
||||
assertEquals(source.get("urn"), "urn:li:dataset:4");
|
||||
assertEquals(destination.get("urn"), "urn:li:dataset:3");
|
||||
assertEquals(secondResult.get("relationshipType"), "Consumes");
|
||||
assertEquals(secondResult.get("via"), "urn:li:dataset:via");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithNullInput() {
|
||||
OperationContext opContext = TestOperationContexts.systemContextNoValidate();
|
||||
|
||||
// Test with null input
|
||||
List<Map<String, Object>> results = test.raw(opContext, null);
|
||||
assertTrue(results.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithEmptyInput() {
|
||||
OperationContext opContext = TestOperationContexts.systemContextNoValidate();
|
||||
|
||||
// Test with empty list
|
||||
List<Map<String, Object>> results = test.raw(opContext, Collections.emptyList());
|
||||
assertTrue(results.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithInvalidEdgeTuples() {
|
||||
// Create edge tuples with null values
|
||||
List<EdgeTuple> edgeTuples =
|
||||
Arrays.asList(
|
||||
new EdgeTuple(null, "urn:li:dataset:2", "Produces"),
|
||||
new EdgeTuple("urn:li:dataset:1", null, "Consumes"),
|
||||
new EdgeTuple("urn:li:dataset:1", "urn:li:dataset:2", null),
|
||||
new EdgeTuple("urn:li:dataset:3", "urn:li:dataset:4", "ValidRel"));
|
||||
|
||||
// Mock response for the valid tuple only
|
||||
SearchResponse mockResponse = mock(SearchResponse.class);
|
||||
SearchHits mockHits = mock(SearchHits.class);
|
||||
when(mockResponse.getHits()).thenReturn(mockHits);
|
||||
|
||||
TotalHits totalHits = new TotalHits(1L, TotalHits.Relation.EQUAL_TO);
|
||||
when(mockHits.getTotalHits()).thenReturn(totalHits);
|
||||
|
||||
SearchHit[] searchHits = new SearchHit[1];
|
||||
searchHits[0] = createMockSearchHit("urn:li:dataset:3", "urn:li:dataset:4", "ValidRel", null);
|
||||
when(mockHits.getHits()).thenReturn(searchHits);
|
||||
|
||||
when(mockReadDAO.executeSearch(any(SearchRequest.class))).thenReturn(mockResponse);
|
||||
|
||||
OperationContext opContext = TestOperationContexts.systemContextNoValidate();
|
||||
List<Map<String, Object>> results = test.raw(opContext, edgeTuples);
|
||||
|
||||
// Should only return results for the valid tuple
|
||||
assertEquals(results.size(), 1);
|
||||
Map<String, Object> result = results.get(0);
|
||||
assertEquals(((Map<String, Object>) result.get("source")).get("urn"), "urn:li:dataset:3");
|
||||
}
|
||||
|
||||
// Helper method to create mock search hits
|
||||
private SearchHit createMockSearchHit(
|
||||
String sourceUrn, String destUrn, String relType, String via) {
|
||||
|
@ -3,7 +3,10 @@ package com.linkedin.metadata.systemmetadata;
|
||||
import static io.datahubproject.test.search.SearchTestUtils.TEST_SYSTEM_METADATA_SERVICE_CONFIG;
|
||||
import static io.datahubproject.test.search.SearchTestUtils.syncAfterWrite;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.run.AspectRowSummary;
|
||||
import com.linkedin.metadata.run.IngestionRunSummary;
|
||||
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
|
||||
@ -11,8 +14,13 @@ import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nonnull;
|
||||
import org.opensearch.client.RestHighLevelClient;
|
||||
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
|
||||
@ -196,4 +204,119 @@ public abstract class SystemMetadataServiceTestBase extends AbstractTestNGSpring
|
||||
|
||||
assertEquals(runs.size(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRaw() throws Exception {
|
||||
// Create test data with various system metadata
|
||||
SystemMetadata metadata1 = new SystemMetadata();
|
||||
metadata1.setRunId("abc-123");
|
||||
metadata1.setLastObserved(Long.valueOf(120L));
|
||||
metadata1.setRegistryName("test-registry");
|
||||
metadata1.setRegistryVersion("1.0.0");
|
||||
|
||||
SystemMetadata metadata2 = new SystemMetadata();
|
||||
metadata2.setRunId("abc-456");
|
||||
metadata2.setLastObserved(Long.valueOf(240L));
|
||||
metadata2.setRegistryName("test-registry");
|
||||
metadata2.setRegistryVersion("2.0.0");
|
||||
|
||||
// Insert test data
|
||||
_client.insert(metadata1, "urn:li:chart:1", "chartKey");
|
||||
_client.insert(metadata1, "urn:li:chart:1", "ChartInfo");
|
||||
_client.insert(metadata1, "urn:li:chart:1", "Ownership");
|
||||
|
||||
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
|
||||
_client.insert(metadata2, "urn:li:chart:2", "ChartInfo");
|
||||
|
||||
_client.insert(metadata1, "urn:li:dataset:3", "DatasetKey");
|
||||
_client.insert(metadata1, "urn:li:dataset:3", "DatasetProperties");
|
||||
|
||||
syncAfterWrite(getBulkProcessor());
|
||||
|
||||
// Test 1: Query for specific URN with specific aspects
|
||||
Map<String, Set<String>> urnAspects1 = new HashMap<>();
|
||||
urnAspects1.put("urn:li:chart:1", new HashSet<>(Arrays.asList("chartKey", "ChartInfo")));
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result1 = _client.raw(null, urnAspects1);
|
||||
|
||||
assertEquals(result1.size(), 1);
|
||||
assertTrue(result1.containsKey(UrnUtils.getUrn("urn:li:chart:1")));
|
||||
|
||||
Map<String, Map<String, Object>> chart1Aspects = result1.get(UrnUtils.getUrn("urn:li:chart:1"));
|
||||
assertEquals(chart1Aspects.size(), 2);
|
||||
assertTrue(chart1Aspects.containsKey("chartKey"));
|
||||
assertTrue(chart1Aspects.containsKey("ChartInfo"));
|
||||
|
||||
// Verify content of returned documents
|
||||
Map<String, Object> chartKeyDoc = chart1Aspects.get("chartKey");
|
||||
assertEquals(chartKeyDoc.get("urn"), "urn:li:chart:1");
|
||||
assertEquals(chartKeyDoc.get("aspect"), "chartKey");
|
||||
assertEquals(chartKeyDoc.get("runId"), "abc-123");
|
||||
assertEquals(chartKeyDoc.get("lastUpdated"), 120);
|
||||
assertEquals(chartKeyDoc.get("registryName"), "test-registry");
|
||||
assertEquals(chartKeyDoc.get("registryVersion"), "1.0.0");
|
||||
|
||||
// Test 2: Query for multiple URNs
|
||||
Map<String, Set<String>> urnAspects2 = new HashMap<>();
|
||||
urnAspects2.put("urn:li:chart:1", new HashSet<>(Arrays.asList("Ownership")));
|
||||
urnAspects2.put("urn:li:chart:2", new HashSet<>(Arrays.asList("chartKey", "ChartInfo")));
|
||||
urnAspects2.put("urn:li:dataset:3", new HashSet<>(Arrays.asList("DatasetKey")));
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result2 = _client.raw(null, urnAspects2);
|
||||
|
||||
assertEquals(result2.size(), 3);
|
||||
assertTrue(result2.containsKey(UrnUtils.getUrn("urn:li:chart:1")));
|
||||
assertTrue(result2.containsKey(UrnUtils.getUrn("urn:li:chart:2")));
|
||||
assertTrue(result2.containsKey(UrnUtils.getUrn("urn:li:dataset:3")));
|
||||
|
||||
// Verify chart:1 has only Ownership
|
||||
assertEquals(result2.get(UrnUtils.getUrn("urn:li:chart:1")).size(), 1);
|
||||
assertTrue(result2.get(UrnUtils.getUrn("urn:li:chart:1")).containsKey("Ownership"));
|
||||
|
||||
// Verify chart:2 has both aspects
|
||||
assertEquals(result2.get(UrnUtils.getUrn("urn:li:chart:2")).size(), 2);
|
||||
assertTrue(result2.get(UrnUtils.getUrn("urn:li:chart:2")).containsKey("chartKey"));
|
||||
assertTrue(result2.get(UrnUtils.getUrn("urn:li:chart:2")).containsKey("ChartInfo"));
|
||||
|
||||
// Verify dataset:3 has DatasetKey
|
||||
assertEquals(result2.get(UrnUtils.getUrn("urn:li:dataset:3")).size(), 1);
|
||||
assertTrue(result2.get(UrnUtils.getUrn("urn:li:dataset:3")).containsKey("DatasetKey"));
|
||||
|
||||
// Test 3: Query for non-existent URN
|
||||
Map<String, Set<String>> urnAspects3 = new HashMap<>();
|
||||
urnAspects3.put("urn:li:chart:999", new HashSet<>(Arrays.asList("chartKey")));
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result3 = _client.raw(null, urnAspects3);
|
||||
assertTrue(result3.isEmpty());
|
||||
|
||||
// Test 4: Query for existing URN but non-existent aspect
|
||||
Map<String, Set<String>> urnAspects4 = new HashMap<>();
|
||||
urnAspects4.put("urn:li:chart:1", new HashSet<>(Arrays.asList("NonExistentAspect")));
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result4 = _client.raw(null, urnAspects4);
|
||||
assertTrue(result4.isEmpty());
|
||||
|
||||
// Test 5: Empty input map
|
||||
Map<String, Set<String>> urnAspects5 = new HashMap<>();
|
||||
Map<Urn, Map<String, Map<String, Object>>> result5 = _client.raw(null, urnAspects5);
|
||||
assertTrue(result5.isEmpty());
|
||||
|
||||
// Test 6: Null input
|
||||
Map<Urn, Map<String, Map<String, Object>>> result6 = _client.raw(null, null);
|
||||
assertTrue(result6.isEmpty());
|
||||
|
||||
// Test 7: URN with null aspects set
|
||||
Map<String, Set<String>> urnAspects7 = new HashMap<>();
|
||||
urnAspects7.put("urn:li:chart:1", null);
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result7 = _client.raw(null, urnAspects7);
|
||||
assertTrue(result7.isEmpty());
|
||||
|
||||
// Test 8: URN with empty aspects set
|
||||
Map<String, Set<String>> urnAspects8 = new HashMap<>();
|
||||
urnAspects8.put("urn:li:chart:1", new HashSet<>());
|
||||
|
||||
Map<Urn, Map<String, Map<String, Object>>> result8 = _client.raw(null, urnAspects8);
|
||||
assertTrue(result8.isEmpty());
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.node.NumericNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.models.AspectSpec;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.query.filter.Filter;
|
||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
||||
@ -68,7 +69,9 @@ public class TimeseriesAspectServiceUnitTest {
|
||||
private final IndexConvention indexConvention = mock(IndexConvention.class);
|
||||
private final ESBulkProcessor bulkProcessor = mock(ESBulkProcessor.class);
|
||||
private final RestClient restClient = mock(RestClient.class);
|
||||
private final EntityRegistry entityRegistry = mock(EntityRegistry.class);
|
||||
private final OperationContext opContext =
|
||||
TestOperationContexts.systemContextNoSearchAuthorization(indexConvention);
|
||||
private final EntityRegistry entityRegistry = opContext.getEntityRegistry();
|
||||
private final ESIndexBuilder indexBuilder = mock(ESIndexBuilder.class);
|
||||
private final TimeseriesAspectService _timeseriesAspectService =
|
||||
new ElasticSearchTimeseriesAspectService(
|
||||
@ -80,14 +83,12 @@ public class TimeseriesAspectServiceUnitTest {
|
||||
entityRegistry,
|
||||
indexConvention,
|
||||
indexBuilder);
|
||||
private final OperationContext opContext =
|
||||
TestOperationContexts.systemContextNoSearchAuthorization(indexConvention);
|
||||
|
||||
private static final String INDEX_PATTERN = "indexPattern";
|
||||
|
||||
@BeforeMethod
|
||||
public void resetMocks() {
|
||||
reset(searchClient, indexConvention, bulkProcessor, restClient, entityRegistry, indexBuilder);
|
||||
reset(searchClient, indexConvention, bulkProcessor, restClient, indexBuilder);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -507,4 +508,284 @@ public class TimeseriesAspectServiceUnitTest {
|
||||
Assert.assertTrue(e.getCause() instanceof ExecutionException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithNullUrnAspects() {
|
||||
// Test with null input
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, null);
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithEmptyUrnAspects() {
|
||||
// Test with empty input
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, Collections.emptyMap());
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithNonTimeseriesAspects() {
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put("urn:li:dataset:123", new HashSet<>(Arrays.asList("status")));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify - should be empty since no timeseries aspects
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithValidTimeseriesAspect() throws IOException {
|
||||
// Setup
|
||||
String urnString = "urn:li:dataset:123";
|
||||
String aspectName = "datasetProfile";
|
||||
String indexName = "dataset_datasetProfile_index_v1";
|
||||
|
||||
when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName)))
|
||||
.thenReturn(indexName);
|
||||
|
||||
// Mock search response
|
||||
SearchHit mockHit = mock(SearchHit.class);
|
||||
Map<String, Object> sourceMap = new HashMap<>();
|
||||
sourceMap.put(MappingsBuilder.URN_FIELD, urnString);
|
||||
sourceMap.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567890L);
|
||||
sourceMap.put("field1", "value1");
|
||||
sourceMap.put("field2", "value2");
|
||||
when(mockHit.getSourceAsMap()).thenReturn(sourceMap);
|
||||
|
||||
SearchHits mockHits = mock(SearchHits.class);
|
||||
when(mockHits.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO));
|
||||
when(mockHits.getHits()).thenReturn(new SearchHit[] {mockHit});
|
||||
|
||||
SearchResponse mockResponse = mock(SearchResponse.class);
|
||||
when(mockResponse.getHits()).thenReturn(mockHits);
|
||||
when(searchClient.search(any(), any())).thenReturn(mockResponse);
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName)));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify
|
||||
Assert.assertEquals(result.size(), 1);
|
||||
Urn expectedUrn = UrnUtils.getUrn(urnString);
|
||||
Assert.assertTrue(result.containsKey(expectedUrn));
|
||||
Assert.assertTrue(result.get(expectedUrn).containsKey(aspectName));
|
||||
Assert.assertEquals(result.get(expectedUrn).get(aspectName), sourceMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithMultipleAspects() throws IOException {
|
||||
// Setup
|
||||
String urnString = "urn:li:dataset:123";
|
||||
String aspectName1 = "datasetProfile";
|
||||
String aspectName2 = "operation";
|
||||
|
||||
// Mock aspect specs
|
||||
AspectSpec mockSpec1 = mock(AspectSpec.class);
|
||||
when(mockSpec1.isTimeseries()).thenReturn(true);
|
||||
AspectSpec mockSpec2 = mock(AspectSpec.class);
|
||||
when(mockSpec2.isTimeseries()).thenReturn(true);
|
||||
|
||||
when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName1)))
|
||||
.thenReturn("dataset_datasetProfile_index_v1");
|
||||
when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName2)))
|
||||
.thenReturn("dataset_operation_index_v1");
|
||||
|
||||
// Mock search responses for both aspects
|
||||
Map<String, Object> sourceMap1 = new HashMap<>();
|
||||
sourceMap1.put(MappingsBuilder.URN_FIELD, urnString);
|
||||
sourceMap1.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567890L);
|
||||
sourceMap1.put(MappingsBuilder.TIMESTAMP_MILLIS_FIELD, 1234567890L);
|
||||
sourceMap1.put("profileField", "profileValue");
|
||||
|
||||
Map<String, Object> sourceMap2 = new HashMap<>();
|
||||
sourceMap2.put(MappingsBuilder.URN_FIELD, urnString);
|
||||
sourceMap2.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567891L);
|
||||
sourceMap2.put(MappingsBuilder.TIMESTAMP_MILLIS_FIELD, 1234567891L);
|
||||
sourceMap2.put("operationField", "operationValue");
|
||||
|
||||
SearchHit mockHit1 = mock(SearchHit.class);
|
||||
when(mockHit1.getSourceAsMap()).thenReturn(sourceMap1);
|
||||
SearchHits mockHits1 = mock(SearchHits.class);
|
||||
when(mockHits1.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO));
|
||||
when(mockHits1.getHits()).thenReturn(new SearchHit[] {mockHit1});
|
||||
SearchResponse mockResponse1 = mock(SearchResponse.class);
|
||||
when(mockResponse1.getHits()).thenReturn(mockHits1);
|
||||
|
||||
SearchHit mockHit2 = mock(SearchHit.class);
|
||||
when(mockHit2.getSourceAsMap()).thenReturn(sourceMap2);
|
||||
SearchHits mockHits2 = mock(SearchHits.class);
|
||||
when(mockHits2.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO));
|
||||
when(mockHits2.getHits()).thenReturn(new SearchHit[] {mockHit2});
|
||||
SearchResponse mockResponse2 = mock(SearchResponse.class);
|
||||
when(mockResponse2.getHits()).thenReturn(mockHits2);
|
||||
|
||||
when(searchClient.search(any(), any())).thenReturn(mockResponse1).thenReturn(mockResponse2);
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName1, aspectName2)));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify
|
||||
Assert.assertEquals(result.size(), 1);
|
||||
Urn expectedUrn = UrnUtils.getUrn(urnString);
|
||||
Assert.assertTrue(result.containsKey(expectedUrn));
|
||||
Assert.assertEquals(result.get(expectedUrn).size(), 2);
|
||||
Assert.assertTrue(result.get(expectedUrn).containsKey(aspectName1));
|
||||
Assert.assertTrue(result.get(expectedUrn).containsKey(aspectName2));
|
||||
Assert.assertEquals(result.get(expectedUrn).get(aspectName1), sourceMap1);
|
||||
Assert.assertEquals(result.get(expectedUrn).get(aspectName2), sourceMap2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithNoResults() throws IOException {
|
||||
// Setup
|
||||
String urnString = "urn:li:dataset:123";
|
||||
String aspectName = "datasetProfile";
|
||||
|
||||
when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName)))
|
||||
.thenReturn("dataset_datasetProfile_index_v1");
|
||||
|
||||
// Mock empty search response
|
||||
SearchHits mockHits = mock(SearchHits.class);
|
||||
when(mockHits.getTotalHits()).thenReturn(new TotalHits(0, TotalHits.Relation.EQUAL_TO));
|
||||
when(mockHits.getHits()).thenReturn(new SearchHit[] {});
|
||||
|
||||
SearchResponse mockResponse = mock(SearchResponse.class);
|
||||
when(mockResponse.getHits()).thenReturn(mockHits);
|
||||
when(searchClient.search(any(), any())).thenReturn(mockResponse);
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName)));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify - should be empty since no documents found
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithSearchException() throws IOException {
|
||||
// Setup
|
||||
String urnString = "urn:li:dataset:123";
|
||||
String aspectName = "datasetProfile";
|
||||
|
||||
when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName)))
|
||||
.thenReturn("dataset_datasetProfile_index_v1");
|
||||
|
||||
// Mock search to throw IOException
|
||||
when(searchClient.search(any(), any())).thenThrow(new IOException("Search failed"));
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(urnString, new HashSet<>(Arrays.asList(aspectName)));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify - should return empty map and not throw exception
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithInvalidUrn() {
|
||||
// Setup
|
||||
String invalidUrnString = "invalid:urn:format";
|
||||
String aspectName = "datasetProfile";
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(invalidUrnString, new HashSet<>(Arrays.asList(aspectName)));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify - should return empty map due to URN parsing error
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithNullAspectSet() throws IOException {
|
||||
// Setup
|
||||
String urnString = "urn:li:dataset:123";
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(urnString, null); // null aspect set
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify - should handle null gracefully and return empty map
|
||||
Assert.assertEquals(result, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawWithMultipleUrns() throws IOException {
|
||||
// Setup
|
||||
String urnString1 = "urn:li:dataset:123";
|
||||
String urnString2 = "urn:li:dataset:456";
|
||||
String aspectName = "datasetProfile";
|
||||
|
||||
when(indexConvention.getTimeseriesAspectIndexName(eq("dataset"), eq(aspectName)))
|
||||
.thenReturn("dataset_datasetProfile_index_v1");
|
||||
|
||||
// Mock search responses for both URNs
|
||||
Map<String, Object> sourceMap1 = new HashMap<>();
|
||||
sourceMap1.put(MappingsBuilder.URN_FIELD, urnString1);
|
||||
sourceMap1.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567890L);
|
||||
sourceMap1.put("data", "value1");
|
||||
|
||||
Map<String, Object> sourceMap2 = new HashMap<>();
|
||||
sourceMap2.put(MappingsBuilder.URN_FIELD, urnString2);
|
||||
sourceMap2.put(MappingsBuilder.TIMESTAMP_FIELD, 1234567891L);
|
||||
sourceMap2.put("data", "value2");
|
||||
|
||||
SearchHit mockHit1 = mock(SearchHit.class);
|
||||
when(mockHit1.getSourceAsMap()).thenReturn(sourceMap1);
|
||||
SearchHits mockHits1 = mock(SearchHits.class);
|
||||
when(mockHits1.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO));
|
||||
when(mockHits1.getHits()).thenReturn(new SearchHit[] {mockHit1});
|
||||
SearchResponse mockResponse1 = mock(SearchResponse.class);
|
||||
when(mockResponse1.getHits()).thenReturn(mockHits1);
|
||||
|
||||
SearchHit mockHit2 = mock(SearchHit.class);
|
||||
when(mockHit2.getSourceAsMap()).thenReturn(sourceMap2);
|
||||
SearchHits mockHits2 = mock(SearchHits.class);
|
||||
when(mockHits2.getTotalHits()).thenReturn(new TotalHits(1, TotalHits.Relation.EQUAL_TO));
|
||||
when(mockHits2.getHits()).thenReturn(new SearchHit[] {mockHit2});
|
||||
SearchResponse mockResponse2 = mock(SearchResponse.class);
|
||||
when(mockResponse2.getHits()).thenReturn(mockHits2);
|
||||
|
||||
when(searchClient.search(any(), any())).thenReturn(mockResponse1).thenReturn(mockResponse2);
|
||||
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
urnAspects.put(urnString1, new HashSet<>(Arrays.asList(aspectName)));
|
||||
urnAspects.put(urnString2, new HashSet<>(Arrays.asList(aspectName)));
|
||||
|
||||
// Execute
|
||||
Map<Urn, Map<String, Map<String, Object>>> result =
|
||||
_timeseriesAspectService.raw(opContext, urnAspects);
|
||||
|
||||
// Verify
|
||||
Assert.assertEquals(result.size(), 2);
|
||||
Urn expectedUrn1 = UrnUtils.getUrn(urnString1);
|
||||
Urn expectedUrn2 = UrnUtils.getUrn(urnString2);
|
||||
Assert.assertTrue(result.containsKey(expectedUrn1));
|
||||
Assert.assertTrue(result.containsKey(expectedUrn2));
|
||||
Assert.assertEquals(result.get(expectedUrn1).get(aspectName), sourceMap1);
|
||||
Assert.assertEquals(result.get(expectedUrn2).get(aspectName), sourceMap2);
|
||||
}
|
||||
}
|
||||
|
@ -75,6 +75,24 @@ public class GlobalControllerExceptionHandler extends DefaultHandlerExceptionRes
|
||||
return new ResponseEntity<>(Map.of("error", e.getMessage()), HttpStatus.FORBIDDEN);
|
||||
}
|
||||
|
||||
@ExceptionHandler(RuntimeException.class)
|
||||
public ResponseEntity<Map<String, String>> handleRuntimeException(
|
||||
RuntimeException e, HttpServletRequest request) {
|
||||
|
||||
// Check if this exception originates from UrnUtils.getUrn()
|
||||
for (StackTraceElement element : e.getStackTrace()) {
|
||||
if (element.getClassName().equals("com.linkedin.common.urn.UrnUtils")
|
||||
&& element.getMethodName().equals("getUrn")) {
|
||||
log.error("Invalid URN format in request: {}", request.getRequestURI(), e);
|
||||
return new ResponseEntity<>(
|
||||
Map.of("error", "Invalid URN format: " + e.getMessage()), HttpStatus.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
// For other RuntimeExceptions, let them bubble up to the generic handler
|
||||
return handleGenericException(e, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void logException(Exception ex, HttpServletRequest request) {
|
||||
log.error("Error while resolving request: {}", request.getRequestURI(), ex);
|
||||
|
@ -13,7 +13,6 @@ import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher;
|
||||
import com.deblock.jsondiff.viewer.PatchDiffViewer;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.authorization.PoliciesConfig;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
@ -32,16 +31,12 @@ import io.datahubproject.metadata.context.RequestContext;
|
||||
import io.datahubproject.openapi.util.ElasticsearchUtils;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -181,53 +176,6 @@ public class ElasticsearchController {
|
||||
return ResponseEntity.ok(j.toString());
|
||||
}
|
||||
|
||||
@PostMapping(path = "/entity/raw", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
@Operation(
|
||||
description =
|
||||
"Retrieves raw Elasticsearch documents for the provided URNs. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully retrieved raw documents",
|
||||
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
|
||||
@ApiResponse(
|
||||
responseCode = "403",
|
||||
description = "Caller not authorized to access raw documents"),
|
||||
@ApiResponse(responseCode = "400", description = "Invalid URN format provided")
|
||||
})
|
||||
public ResponseEntity<Map<Urn, Map<String, Object>>> getEntityRaw(
|
||||
HttpServletRequest request,
|
||||
@RequestBody
|
||||
@Nonnull
|
||||
@Schema(
|
||||
description = "Set of URN strings to fetch raw documents for",
|
||||
example = "[\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\"]")
|
||||
Set<String> urnStrs) {
|
||||
|
||||
Set<Urn> urns = urnStrs.stream().map(UrnUtils::getUrn).collect(Collectors.toSet());
|
||||
|
||||
Authentication authentication = AuthenticationContext.getAuthentication();
|
||||
String actorUrnStr = authentication.getActor().toUrnStr();
|
||||
OperationContext opContext =
|
||||
systemOperationContext.asSession(
|
||||
RequestContext.builder()
|
||||
.buildOpenapi(
|
||||
actorUrnStr,
|
||||
request,
|
||||
"getRawEntity",
|
||||
urns.stream().map(Urn::getEntityType).distinct().toList()),
|
||||
authorizerChain,
|
||||
authentication);
|
||||
|
||||
if (!AuthUtil.isAPIOperationsAuthorized(
|
||||
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
|
||||
log.error("{} is not authorized to get raw ES documents", actorUrnStr);
|
||||
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(searchService.raw(opContext, urns));
|
||||
}
|
||||
|
||||
@GetMapping(path = "/explainSearchQuery", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
@Operation(summary = "Explain Search Query")
|
||||
public ResponseEntity<ExplainResponse> explainSearchQuery(
|
||||
|
@ -0,0 +1,259 @@
|
||||
package io.datahubproject.openapi.operations.elastic;
|
||||
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.datahub.authentication.AuthenticationContext;
|
||||
import com.datahub.authorization.AuthUtil;
|
||||
import com.datahub.authorization.AuthorizerChain;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.authorization.PoliciesConfig;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.search.EntitySearchService;
|
||||
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
|
||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.metadata.context.RequestContext;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nonnull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/openapi/operations/elasticSearch")
|
||||
@Slf4j
|
||||
@Tag(name = "ElasticSearch Raw Operations", description = "An API debugging raw ES documents")
|
||||
public class ElasticsearchRawController {
|
||||
private final AuthorizerChain authorizerChain;
|
||||
private final OperationContext systemOperationContext;
|
||||
private final SystemMetadataService systemMetadataService;
|
||||
private final TimeseriesAspectService timeseriesAspectService;
|
||||
private final EntitySearchService searchService;
|
||||
private final GraphService graphService;
|
||||
|
||||
public ElasticsearchRawController(
|
||||
OperationContext systemOperationContext,
|
||||
SystemMetadataService systemMetadataService,
|
||||
TimeseriesAspectService timeseriesAspectService,
|
||||
EntitySearchService searchService,
|
||||
AuthorizerChain authorizerChain,
|
||||
GraphService graphService) {
|
||||
this.systemOperationContext = systemOperationContext;
|
||||
this.authorizerChain = authorizerChain;
|
||||
this.systemMetadataService = systemMetadataService;
|
||||
this.timeseriesAspectService = timeseriesAspectService;
|
||||
this.searchService = searchService;
|
||||
this.graphService = graphService;
|
||||
}
|
||||
|
||||
@PostMapping(path = "/entity/raw", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
@Operation(
|
||||
description =
|
||||
"Retrieves raw Elasticsearch documents for the provided URNs. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully retrieved raw documents",
|
||||
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
|
||||
@ApiResponse(
|
||||
responseCode = "403",
|
||||
description = "Caller not authorized to access raw documents"),
|
||||
@ApiResponse(responseCode = "400", description = "Invalid URN format provided")
|
||||
})
|
||||
public ResponseEntity<Map<Urn, Map<String, Object>>> getEntityRaw(
|
||||
HttpServletRequest request,
|
||||
@RequestBody
|
||||
@Nonnull
|
||||
@Schema(
|
||||
description = "Set of URN strings to fetch raw documents for",
|
||||
example = "[\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\"]")
|
||||
Set<String> urnStrs) {
|
||||
|
||||
Set<Urn> urns = urnStrs.stream().map(UrnUtils::getUrn).collect(Collectors.toSet());
|
||||
|
||||
Authentication authentication = AuthenticationContext.getAuthentication();
|
||||
String actorUrnStr = authentication.getActor().toUrnStr();
|
||||
OperationContext opContext =
|
||||
systemOperationContext.asSession(
|
||||
RequestContext.builder()
|
||||
.buildOpenapi(
|
||||
actorUrnStr,
|
||||
request,
|
||||
"getRawEntity",
|
||||
urns.stream().map(Urn::getEntityType).distinct().toList()),
|
||||
authorizerChain,
|
||||
authentication);
|
||||
|
||||
if (!AuthUtil.isAPIOperationsAuthorized(
|
||||
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
|
||||
log.error("{} is not authorized to get raw ES documents", actorUrnStr);
|
||||
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(searchService.raw(opContext, urns));
|
||||
}
|
||||
|
||||
@PostMapping(path = "/systemmetadata/raw", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
@Operation(
|
||||
description =
|
||||
"Retrieves raw Elasticsearch system metadata document for the provided URN & aspect. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully retrieved raw documents",
|
||||
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
|
||||
@ApiResponse(
|
||||
responseCode = "403",
|
||||
description = "Caller not authorized to access raw documents"),
|
||||
@ApiResponse(responseCode = "400", description = "Invalid URN format provided")
|
||||
})
|
||||
public ResponseEntity<Map<Urn, Map<String, Map<String, Object>>>> getSystemMetadataRaw(
|
||||
HttpServletRequest request,
|
||||
@RequestBody
|
||||
@Nonnull
|
||||
@Schema(
|
||||
description = "Map of URNs to aspect names to fetch raw documents for",
|
||||
example =
|
||||
"{\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\":[\"status\",\"datasetProperties\"]}")
|
||||
Map<String, Set<String>> urnAspects) {
|
||||
|
||||
Set<Urn> urns = urnAspects.keySet().stream().map(UrnUtils::getUrn).collect(Collectors.toSet());
|
||||
|
||||
Authentication authentication = AuthenticationContext.getAuthentication();
|
||||
String actorUrnStr = authentication.getActor().toUrnStr();
|
||||
OperationContext opContext =
|
||||
systemOperationContext.asSession(
|
||||
RequestContext.builder()
|
||||
.buildOpenapi(
|
||||
actorUrnStr,
|
||||
request,
|
||||
"getRawSystemMetadata",
|
||||
urns.stream().map(Urn::getEntityType).distinct().toList()),
|
||||
authorizerChain,
|
||||
authentication);
|
||||
|
||||
if (!AuthUtil.isAPIOperationsAuthorized(
|
||||
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
|
||||
log.error("{} is not authorized to get raw ES documents", actorUrnStr);
|
||||
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(systemMetadataService.raw(opContext, urnAspects));
|
||||
}
|
||||
|
||||
@PostMapping(path = "/timeseries/raw", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
@Operation(
|
||||
description =
|
||||
"Retrieves raw Elasticsearch timeseries document for the provided URN & aspect. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully retrieved raw documents",
|
||||
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
|
||||
@ApiResponse(
|
||||
responseCode = "403",
|
||||
description = "Caller not authorized to access raw documents"),
|
||||
@ApiResponse(responseCode = "400", description = "Invalid URN format provided")
|
||||
})
|
||||
public ResponseEntity<Map<Urn, Map<String, Map<String, Object>>>> getTimeseriesRaw(
|
||||
HttpServletRequest request,
|
||||
@RequestBody
|
||||
@Nonnull
|
||||
@Schema(
|
||||
description = "Map of URNs to aspect names to fetch raw documents for",
|
||||
example =
|
||||
"{\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)\":[\"datasetProfile\"]}")
|
||||
Map<String, Set<String>> urnAspects) {
|
||||
|
||||
Set<Urn> urns = urnAspects.keySet().stream().map(UrnUtils::getUrn).collect(Collectors.toSet());
|
||||
|
||||
Authentication authentication = AuthenticationContext.getAuthentication();
|
||||
String actorUrnStr = authentication.getActor().toUrnStr();
|
||||
OperationContext opContext =
|
||||
systemOperationContext.asSession(
|
||||
RequestContext.builder()
|
||||
.buildOpenapi(
|
||||
actorUrnStr,
|
||||
request,
|
||||
"getRawTimeseries",
|
||||
urns.stream().map(Urn::getEntityType).distinct().toList()),
|
||||
authorizerChain,
|
||||
authentication);
|
||||
|
||||
if (!AuthUtil.isAPIOperationsAuthorized(
|
||||
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
|
||||
log.error("{} is not authorized to get raw ES documents", actorUrnStr);
|
||||
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(timeseriesAspectService.raw(opContext, urnAspects));
|
||||
}
|
||||
|
||||
@PostMapping(path = "/graph/raw", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
@Operation(
|
||||
description =
|
||||
"Retrieves raw Elasticsearch graph edge document for the provided graph nodes and relationship type. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully retrieved raw documents",
|
||||
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
|
||||
@ApiResponse(
|
||||
responseCode = "403",
|
||||
description = "Caller not authorized to access raw documents"),
|
||||
@ApiResponse(responseCode = "400", description = "Invalid URN format provided")
|
||||
})
|
||||
public ResponseEntity<List<Map<String, Object>>> getGraphRaw(
|
||||
HttpServletRequest request,
|
||||
@RequestBody
|
||||
@Nonnull
|
||||
@Schema(
|
||||
description =
|
||||
"List of node and relationship tuples. Non-directed, multiple edges may be returned.",
|
||||
example =
|
||||
"[{\"a\":\"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)\", \"b\": \"urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)\", \"relationshipType\": \"DownstreamOf\"}]")
|
||||
List<GraphService.EdgeTuple> edgeTuples) {
|
||||
|
||||
Set<Urn> urns =
|
||||
edgeTuples.stream()
|
||||
.flatMap(tuple -> Stream.of(tuple.getA(), tuple.getB()))
|
||||
.map(UrnUtils::getUrn)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Authentication authentication = AuthenticationContext.getAuthentication();
|
||||
String actorUrnStr = authentication.getActor().toUrnStr();
|
||||
OperationContext opContext =
|
||||
systemOperationContext.asSession(
|
||||
RequestContext.builder()
|
||||
.buildOpenapi(
|
||||
actorUrnStr,
|
||||
request,
|
||||
"getRawGraph",
|
||||
urns.stream().map(Urn::getEntityType).distinct().toList()),
|
||||
authorizerChain,
|
||||
authentication);
|
||||
|
||||
if (!AuthUtil.isAPIOperationsAuthorized(
|
||||
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
|
||||
log.error("{} is not authorized to get raw ES documents", actorUrnStr);
|
||||
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(graphService.raw(opContext, edgeTuples));
|
||||
}
|
||||
}
|
@ -23,6 +23,7 @@ import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.search.EntitySearchService;
|
||||
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
|
||||
@ -395,6 +396,7 @@ public class ElasticsearchControllerTest extends AbstractTestNGSpringContextTest
|
||||
@MockBean public TimeseriesAspectService timeseriesAspectService;
|
||||
@MockBean public EntitySearchService searchService;
|
||||
@MockBean public EntityService<?> entityService;
|
||||
@MockBean public GraphService graphService;
|
||||
|
||||
@Bean
|
||||
public ObjectMapper objectMapper() {
|
||||
@ -443,5 +445,11 @@ public class ElasticsearchControllerTest extends AbstractTestNGSpringContextTest
|
||||
|
||||
return authorizerChain;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Primary
|
||||
public GraphService graphService() {
|
||||
return graphService;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,363 @@
|
||||
package io.datahubproject.openapi.operations.elastic;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyList;
|
||||
import static org.mockito.ArgumentMatchers.anyMap;
|
||||
import static org.mockito.ArgumentMatchers.anySet;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
|
||||
import com.datahub.authentication.Actor;
|
||||
import com.datahub.authentication.ActorType;
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.datahub.authentication.AuthenticationContext;
|
||||
import com.datahub.authorization.AuthUtil;
|
||||
import com.datahub.authorization.AuthorizationResult;
|
||||
import com.datahub.authorization.AuthorizerChain;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.search.EntitySearchService;
|
||||
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
|
||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import io.datahubproject.openapi.config.GlobalControllerExceptionHandler;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
|
||||
import org.springframework.test.web.servlet.MockMvc;
|
||||
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@WebMvcTest(ElasticsearchRawController.class)
|
||||
@ContextConfiguration(classes = ElasticsearchControllerTest.ElasticsearchControllerTestConfig.class)
|
||||
@Import({AuthUtil.class, GlobalControllerExceptionHandler.class})
|
||||
@AutoConfigureMockMvc
|
||||
public class ElasticsearchRawControllerTest extends AbstractTestNGSpringContextTests {
|
||||
|
||||
private static final Urn TEST_URN_1 =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)");
|
||||
private static final Urn TEST_URN_2 =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,test.table,PROD)");
|
||||
private static final Urn TEST_URN_3 =
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,/path/to/data,PROD)");
|
||||
|
||||
@Autowired private ElasticsearchRawController elasticsearchRawController;
|
||||
|
||||
@Autowired private MockMvc mockMvc;
|
||||
|
||||
@Autowired private SystemMetadataService mockSystemMetadataService;
|
||||
|
||||
@Autowired private TimeseriesAspectService mockTimeseriesAspectService;
|
||||
|
||||
@Autowired private EntitySearchService mockSearchService;
|
||||
|
||||
@Autowired private GraphService mockGraphService;
|
||||
|
||||
@Autowired private AuthorizerChain authorizerChain;
|
||||
|
||||
@BeforeMethod
|
||||
public void setupMocks() {
|
||||
// Setup Authentication
|
||||
Authentication authentication = mock(Authentication.class);
|
||||
when(authentication.getActor()).thenReturn(new Actor(ActorType.USER, "datahub"));
|
||||
AuthenticationContext.setAuthentication(authentication);
|
||||
|
||||
// Setup AuthorizerChain to allow access
|
||||
when(authorizerChain.authorize(any()))
|
||||
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void initTest() {
|
||||
assertNotNull(elasticsearchRawController);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntityRaw() throws Exception {
|
||||
// Mock raw entity response
|
||||
Map<Urn, Map<String, Object>> rawEntityMap = new HashMap<>();
|
||||
|
||||
Map<String, Object> entity1Map = new HashMap<>();
|
||||
entity1Map.put("urn", TEST_URN_1.toString());
|
||||
entity1Map.put("name", "Sample Table");
|
||||
entity1Map.put("platform", "hive");
|
||||
entity1Map.put("_index", "datahub_entity_v2");
|
||||
entity1Map.put("_id", TEST_URN_1.toString());
|
||||
|
||||
Map<String, Object> entity2Map = new HashMap<>();
|
||||
entity2Map.put("urn", TEST_URN_2.toString());
|
||||
entity2Map.put("name", "Test Table");
|
||||
entity2Map.put("platform", "snowflake");
|
||||
entity2Map.put("_index", "datahub_entity_v2");
|
||||
entity2Map.put("_id", TEST_URN_2.toString());
|
||||
|
||||
rawEntityMap.put(TEST_URN_1, entity1Map);
|
||||
rawEntityMap.put(TEST_URN_2, entity2Map);
|
||||
|
||||
when(mockSearchService.raw(any(OperationContext.class), anySet())).thenReturn(rawEntityMap);
|
||||
|
||||
// Prepare request body
|
||||
Set<String> urnStrs = new HashSet<>();
|
||||
urnStrs.add(TEST_URN_1.toString());
|
||||
urnStrs.add(TEST_URN_2.toString());
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String requestBody = objectMapper.writeValueAsString(urnStrs);
|
||||
|
||||
// Test the endpoint
|
||||
mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/entity/raw")
|
||||
.content(requestBody)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk())
|
||||
.andDo(
|
||||
result -> {
|
||||
String responseContent = result.getResponse().getContentAsString();
|
||||
System.out.println("Response content: " + responseContent);
|
||||
})
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$['" + TEST_URN_1.toString() + "'].name")
|
||||
.value("Sample Table"))
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$['" + TEST_URN_1.toString() + "'].platform")
|
||||
.value("hive"))
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$['" + TEST_URN_2.toString() + "'].name")
|
||||
.value("Test Table"))
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$['" + TEST_URN_2.toString() + "'].platform")
|
||||
.value("snowflake"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntityRawUnauthorized() throws Exception {
|
||||
// Setup AuthorizerChain to deny access
|
||||
when(authorizerChain.authorize(any()))
|
||||
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.DENY, ""));
|
||||
|
||||
// Prepare request body
|
||||
Set<String> urnStrs = new HashSet<>();
|
||||
urnStrs.add(TEST_URN_1.toString());
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String requestBody = objectMapper.writeValueAsString(urnStrs);
|
||||
|
||||
// Test the endpoint - should return 403
|
||||
mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/entity/raw")
|
||||
.content(requestBody)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isForbidden());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSystemMetadataRaw() throws Exception {
|
||||
// Mock raw system metadata response
|
||||
Map<Urn, Map<String, Map<String, Object>>> rawSystemMetadataMap = new HashMap<>();
|
||||
|
||||
Map<String, Map<String, Object>> urn1Aspects = new HashMap<>();
|
||||
Map<String, Object> statusAspect = new HashMap<>();
|
||||
statusAspect.put("removed", false);
|
||||
statusAspect.put("_index", "system_metadata_service_v1");
|
||||
statusAspect.put("_id", TEST_URN_1.toString() + "_status");
|
||||
urn1Aspects.put("status", statusAspect);
|
||||
|
||||
Map<String, Object> propertiesAspect = new HashMap<>();
|
||||
propertiesAspect.put("description", "Sample dataset");
|
||||
propertiesAspect.put("_index", "system_metadata_service_v1");
|
||||
propertiesAspect.put("_id", TEST_URN_1.toString() + "_datasetProperties");
|
||||
urn1Aspects.put("datasetProperties", propertiesAspect);
|
||||
|
||||
rawSystemMetadataMap.put(TEST_URN_1, urn1Aspects);
|
||||
|
||||
when(mockSystemMetadataService.raw(any(OperationContext.class), anyMap()))
|
||||
.thenReturn(rawSystemMetadataMap);
|
||||
|
||||
// Prepare request body
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
Set<String> aspects = new HashSet<>();
|
||||
aspects.add("status");
|
||||
aspects.add("datasetProperties");
|
||||
urnAspects.put(TEST_URN_1.toString(), aspects);
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String requestBody = objectMapper.writeValueAsString(urnAspects);
|
||||
|
||||
// Test the endpoint
|
||||
mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/systemmetadata/raw")
|
||||
.content(requestBody)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk())
|
||||
.andDo(
|
||||
result -> {
|
||||
String responseContent = result.getResponse().getContentAsString();
|
||||
System.out.println("Response content: " + responseContent);
|
||||
})
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$['" + TEST_URN_1.toString() + "'].status.removed")
|
||||
.value(false))
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath(
|
||||
"$['" + TEST_URN_1.toString() + "'].datasetProperties.description")
|
||||
.value("Sample dataset"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTimeseriesRaw() throws Exception {
|
||||
// Mock raw timeseries response
|
||||
Map<Urn, Map<String, Map<String, Object>>> rawTimeseriesMap = new HashMap<>();
|
||||
|
||||
Map<String, Map<String, Object>> urn1Aspects = new HashMap<>();
|
||||
Map<String, Object> profileAspect = new HashMap<>();
|
||||
profileAspect.put("timestampMillis", 1234567890L);
|
||||
profileAspect.put("rowCount", 1000000L);
|
||||
profileAspect.put("columnCount", 25);
|
||||
profileAspect.put("_index", "dataset_datasetprofileaspect_v1");
|
||||
profileAspect.put("_id", TEST_URN_1.toString() + "_datasetProfile_1234567890");
|
||||
urn1Aspects.put("datasetProfile", profileAspect);
|
||||
|
||||
rawTimeseriesMap.put(TEST_URN_1, urn1Aspects);
|
||||
|
||||
when(mockTimeseriesAspectService.raw(any(OperationContext.class), anyMap()))
|
||||
.thenReturn(rawTimeseriesMap);
|
||||
|
||||
// Prepare request body
|
||||
Map<String, Set<String>> urnAspects = new HashMap<>();
|
||||
Set<String> aspects = new HashSet<>();
|
||||
aspects.add("datasetProfile");
|
||||
urnAspects.put(TEST_URN_1.toString(), aspects);
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String requestBody = objectMapper.writeValueAsString(urnAspects);
|
||||
|
||||
// Test the endpoint
|
||||
mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/timeseries/raw")
|
||||
.content(requestBody)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk())
|
||||
.andDo(
|
||||
result -> {
|
||||
String responseContent = result.getResponse().getContentAsString();
|
||||
System.out.println("Response content: " + responseContent);
|
||||
})
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath(
|
||||
"$['" + TEST_URN_1.toString() + "'].datasetProfile.timestampMillis")
|
||||
.value(1234567890L))
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath(
|
||||
"$['" + TEST_URN_1.toString() + "'].datasetProfile.rowCount")
|
||||
.value(1000000L))
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath(
|
||||
"$['" + TEST_URN_1.toString() + "'].datasetProfile.columnCount")
|
||||
.value(25));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetGraphRaw() throws Exception {
|
||||
// Mock raw graph response
|
||||
List<Map<String, Object>> rawGraphList = new ArrayList<>();
|
||||
|
||||
Map<String, Object> edge1 = new HashMap<>();
|
||||
edge1.put("source", TEST_URN_1.toString());
|
||||
edge1.put("destination", TEST_URN_2.toString());
|
||||
edge1.put("relationshipType", "DownstreamOf");
|
||||
edge1.put("_index", "graph_service_v1");
|
||||
edge1.put("_id", "edge1_id");
|
||||
|
||||
Map<String, Object> edge2 = new HashMap<>();
|
||||
edge2.put("source", TEST_URN_2.toString());
|
||||
edge2.put("destination", TEST_URN_3.toString());
|
||||
edge2.put("relationshipType", "DownstreamOf");
|
||||
edge2.put("_index", "graph_service_v1");
|
||||
edge2.put("_id", "edge2_id");
|
||||
|
||||
rawGraphList.add(edge1);
|
||||
rawGraphList.add(edge2);
|
||||
|
||||
when(mockGraphService.raw(any(OperationContext.class), anyList())).thenReturn(rawGraphList);
|
||||
|
||||
// Prepare request body
|
||||
List<GraphService.EdgeTuple> edgeTuples = new ArrayList<>();
|
||||
GraphService.EdgeTuple tuple1 = new GraphService.EdgeTuple();
|
||||
tuple1.setA(TEST_URN_1.toString());
|
||||
tuple1.setB(TEST_URN_2.toString());
|
||||
tuple1.setRelationshipType("DownstreamOf");
|
||||
edgeTuples.add(tuple1);
|
||||
|
||||
GraphService.EdgeTuple tuple2 = new GraphService.EdgeTuple();
|
||||
tuple2.setA(TEST_URN_2.toString());
|
||||
tuple2.setB(TEST_URN_3.toString());
|
||||
tuple2.setRelationshipType("DownstreamOf");
|
||||
edgeTuples.add(tuple2);
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String requestBody = objectMapper.writeValueAsString(edgeTuples);
|
||||
|
||||
// Test the endpoint
|
||||
mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/graph/raw")
|
||||
.content(requestBody)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk())
|
||||
.andDo(
|
||||
result -> {
|
||||
String responseContent = result.getResponse().getContentAsString();
|
||||
System.out.println("Response content: " + responseContent);
|
||||
})
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$.length()").value(2))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].source").value(TEST_URN_1.toString()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].destination").value(TEST_URN_2.toString()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].relationshipType").value("DownstreamOf"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].source").value(TEST_URN_2.toString()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].destination").value(TEST_URN_3.toString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidUrnFormatWithProperExceptionHandling() throws Exception {
|
||||
Set<String> urnStrs = new HashSet<>();
|
||||
urnStrs.add("invalid:urn:format");
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String requestBody = objectMapper.writeValueAsString(urnStrs);
|
||||
|
||||
mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders.post("/openapi/operations/elasticSearch/entity/raw")
|
||||
.content(requestBody)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isBadRequest())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$.error").exists());
|
||||
}
|
||||
}
|
@ -156,4 +156,10 @@ public class MockTimeseriesAspectService implements TimeseriesAspectService {
|
||||
@Nullable Long endTimeMillis) {
|
||||
return TimeseriesScrollResult.builder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Urn, Map<String, Map<String, Object>>> raw(
|
||||
OperationContext opContext, Map<String, Set<String>> urnAspects) {
|
||||
return Map.of();
|
||||
}
|
||||
}
|
||||
|
@ -12,9 +12,13 @@ import com.linkedin.metadata.query.filter.RelationshipFilter;
|
||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
public interface GraphService {
|
||||
|
||||
@ -259,4 +263,22 @@ public interface GraphService {
|
||||
@Nullable Integer count,
|
||||
@Nullable Long startTimeMillis,
|
||||
@Nullable Long endTimeMillis);
|
||||
|
||||
/**
|
||||
* Returns list of edge documents for the given graph node and relationship tuples. Non-directed
|
||||
*
|
||||
* @param opContext operation context
|
||||
* @param edgeTuples Non-directed nodes and relationship types
|
||||
* @return list of documents matching the input criteria
|
||||
*/
|
||||
List<Map<String, Object>> raw(OperationContext opContext, List<EdgeTuple> edgeTuples);
|
||||
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
class EdgeTuple {
|
||||
String a;
|
||||
String b;
|
||||
String relationshipType;
|
||||
}
|
||||
}
|
||||
|
@ -5,9 +5,11 @@ import com.linkedin.metadata.config.SystemMetadataServiceConfig;
|
||||
import com.linkedin.metadata.run.AspectRowSummary;
|
||||
import com.linkedin.metadata.run.IngestionRunSummary;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import org.opensearch.client.tasks.GetTaskResponse;
|
||||
@ -60,4 +62,14 @@ public interface SystemMetadataService {
|
||||
default void configure() {}
|
||||
|
||||
void clear();
|
||||
|
||||
/**
|
||||
* Returns raw elasticsearch documents
|
||||
*
|
||||
* @param opContext operation context
|
||||
* @param urnAspects the map of urns to aspect names
|
||||
* @return map of urns, aspect names, to raw elasticsearch documents
|
||||
*/
|
||||
Map<Urn, Map<String, Map<String, Object>>> raw(
|
||||
OperationContext opContext, Map<String, Set<String>> urnAspects);
|
||||
}
|
||||
|
@ -249,4 +249,14 @@ public interface TimeseriesAspectService {
|
||||
@Nullable Integer count,
|
||||
@Nullable Long startTimeMillis,
|
||||
@Nullable Long endTimeMillis);
|
||||
|
||||
/**
|
||||
* Returns the latest raw timeseries document
|
||||
*
|
||||
* @param opContext operation context
|
||||
* @param urnAspects the urn to timeseries aspects to retrieve
|
||||
* @return the raw ES documents
|
||||
*/
|
||||
Map<Urn, Map<String, Map<String, Object>>> raw(
|
||||
OperationContext opContext, Map<String, Set<String>> urnAspects);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user