feat(ingestion): backend changes from saas

This commit is contained in:
Victor Tarasevich 2025-06-17 15:10:43 +03:00 committed by v-tarasevich-blitz-brain
parent b28a65bdf2
commit bae4f90386
3 changed files with 115 additions and 109 deletions

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -41,7 +42,7 @@ public class ListExecutionRequestsResolver
private static final String EXECUTION_REQUEST_INGESTION_SOURCE_FIELD = "ingestionSource";
private static final String INGESTION_SOURCE_SOURCE_TYPE_FIELD = "sourceType";
private static final String INGESTION_SOURCE_SOURCE_TYPE_SYSTEM = "SYSTEM";
private static final Integer NUMBER_OF_SYSTEM_INGESTION_SOURCES_TO_FETCH = 1000;
private static final Integer NUMBER_OF_INGESTION_SOURCES_TO_CHECK = 1000;
private final EntityClient _entityClient;
@ -58,7 +59,7 @@ public class ListExecutionRequestsResolver
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
List<FacetFilterInput> filters =
input.getFilters() == null ? Collections.emptyList() : input.getFilters();
input.getFilters() == null ? new ArrayList<>() : input.getFilters();
// construct sort criteria, defaulting to systemCreated
final SortCriterion sortCriterion;
@ -78,7 +79,7 @@ public class ListExecutionRequestsResolver
() -> {
try {
// Add additional filters to show only or hide all system ingestion sources
addSystemIngestionSourceFilter(filters, input.getSystemSources(), context);
addDefaultFilters(context, filters, input.getSystemSources());
// First, get all execution request Urns.
final SearchResult gmsResult =
_entityClient.search(
@ -122,31 +123,40 @@ public class ListExecutionRequestsResolver
return results;
}
private void addSystemIngestionSourceFilter(
List<FacetFilterInput> filters, final Boolean systemSources, final QueryContext context)
private void addDefaultFilters(
final QueryContext context,
List<FacetFilterInput> filters,
@Nullable final Boolean systemSources)
throws Exception {
if (systemSources != null) {
List<Urn> urns = getUrnsOfSystemIngestionSources(context);
filters.add(
new FacetFilterInput(
EXECUTION_REQUEST_INGESTION_SOURCE_FIELD,
null,
urns.stream().map(Urn::toString).toList(),
!systemSources,
FilterOperator.EQUAL));
}
addAccessibleIngestionSourceFilter(context, filters, systemSources); // Saas only
}
private List<Urn> getUrnsOfSystemIngestionSources(final QueryContext context) throws Exception {
private void addAccessibleIngestionSourceFilter(
QueryContext context, List<FacetFilterInput> filters, @Nullable Boolean systemSources)
throws Exception {
List<Urn> sourceUrns = getUrnsOfIngestionSources(context, systemSources);
filters.add(
new FacetFilterInput(
EXECUTION_REQUEST_INGESTION_SOURCE_FIELD,
null,
sourceUrns.stream().map(Urn::toString).toList(),
false,
FilterOperator.EQUAL));
}
private List<Urn> getUrnsOfIngestionSources(
final QueryContext context, @Nullable final Boolean systemSources) throws Exception {
List<FacetFilterInput> filters =
List.of(
new FacetFilterInput(
INGESTION_SOURCE_SOURCE_TYPE_FIELD,
null,
ImmutableList.of(INGESTION_SOURCE_SOURCE_TYPE_SYSTEM),
false,
FilterOperator.EQUAL));
systemSources != null
? List.of(
new FacetFilterInput(
INGESTION_SOURCE_SOURCE_TYPE_FIELD,
null,
ImmutableList.of(INGESTION_SOURCE_SOURCE_TYPE_SYSTEM),
!systemSources,
FilterOperator.EQUAL))
: Collections.emptyList();
final SearchResult gmsResult =
_entityClient.search(
context.getOperationContext(),
@ -155,7 +165,7 @@ public class ListExecutionRequestsResolver
buildFilter(filters, Collections.emptyList()),
null,
0,
NUMBER_OF_SYSTEM_INGESTION_SOURCES_TO_FETCH);
NUMBER_OF_INGESTION_SOURCES_TO_CHECK);
return gmsResult.getEntities().stream().map(SearchEntity::getEntity).toList();
}

View File

@ -94,6 +94,7 @@ public class ExecutionRequestMapper implements ModelMapper<EntityResponse, Execu
if (executionRequestInput.hasExecutorId()) {
inputResult.setExecutorId(executionRequestInput.getExecutorId());
}
result.setInput(inputResult);
}
@ -134,8 +135,8 @@ public class ExecutionRequestMapper implements ModelMapper<EntityResponse, Execu
* @param execRequestResult the ExecutionRequestResult to map
* @return the mapped GraphQL ExecutionRequestResult object
*/
private com.linkedin.datahub.graphql.generated.ExecutionRequestResult mapExecutionRequestResult(
final ExecutionRequestResult execRequestResult) {
public static com.linkedin.datahub.graphql.generated.ExecutionRequestResult
mapExecutionRequestResult(final ExecutionRequestResult execRequestResult) {
final com.linkedin.datahub.graphql.generated.ExecutionRequestResult result =
new com.linkedin.datahub.graphql.generated.ExecutionRequestResult();
result.setStatus(execRequestResult.getStatus());
@ -154,7 +155,8 @@ public class ExecutionRequestMapper implements ModelMapper<EntityResponse, Execu
* @param structuredReport the StructuredExecutionReport to map
* @return the mapped GraphQL StructuredReport object
*/
private StructuredReport mapStructuredReport(final StructuredExecutionReport structuredReport) {
private static StructuredReport mapStructuredReport(
final StructuredExecutionReport structuredReport) {
StructuredReport structuredReportResult = new StructuredReport();
structuredReportResult.setType(structuredReport.getType());
structuredReportResult.setSerializedValue(structuredReport.getSerializedValue());

View File

@ -2,7 +2,7 @@ package com.linkedin.datahub.graphql.resolvers.ingest.execution;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.buildFilter;
import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.*;
import static org.testng.Assert.*;
import com.google.common.collect.ImmutableList;
@ -23,6 +23,7 @@ import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@ -36,7 +37,7 @@ public class ListExecutionRequestsResolverTest {
@Test
public void testGetSuccess() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityClient mockClient = getTestEntityClient(null);
// Mock search response
Mockito.when(
@ -82,10 +83,11 @@ public class ListExecutionRequestsResolverTest {
@Test
public void testGetEntityClientException() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityClient mockClient = getTestEntityClient(null);
Mockito.doThrow(RemoteInvocationException.class)
.when(mockClient)
.batchGetV2(any(), Mockito.any(), Mockito.anySet(), Mockito.anySet());
.search(
any(), eq(Constants.EXECUTION_REQUEST_ENTITY_NAME), any(), any(), any(), eq(0), eq(20));
ListExecutionRequestsResolver resolver = new ListExecutionRequestsResolver(mockClient);
// Execute resolver
@ -100,7 +102,7 @@ public class ListExecutionRequestsResolverTest {
@Test
public void testGetWithCustomQuery() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityClient mockClient = getTestEntityClient(null);
ListExecutionRequestsInput customInput =
new ListExecutionRequestsInput(0, 20, "custom-query", null, null, null);
@ -135,67 +137,14 @@ public class ListExecutionRequestsResolverTest {
@Test
public void testGetWithSystemSourcesOnly() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityClient mockClient =
getTestEntityClient(
new FacetFilterInput(
"sourceType", null, ImmutableList.of("SYSTEM"), false, FilterOperator.EQUAL));
ListExecutionRequestsInput inputWithSystemSourcesOnly =
new ListExecutionRequestsInput(0, 20, "*", List.of(), null, true);
Mockito.when(
mockClient.search(
any(),
Mockito.eq(Constants.INGESTION_SOURCE_ENTITY_NAME),
Mockito.eq("*"),
Mockito.eq(
buildFilter(
Stream.of(
new FacetFilterInput(
"sourceType",
null,
ImmutableList.of("SYSTEM"),
false,
FilterOperator.EQUAL))
.toList(),
Collections.emptyList())),
Mockito.any(),
Mockito.eq(0),
Mockito.eq(1000)))
.thenReturn(
new SearchResult()
.setFrom(0)
.setPageSize(0)
.setNumEntities(0)
.setEntities(
new SearchEntityArray(
ImmutableSet.of(
new SearchEntity()
.setEntity(
Urn.createFromString("urn:li:dataHubIngestionSource:id-1"))))));
Mockito.when(
mockClient.search(
any(),
Mockito.eq(Constants.EXECUTION_REQUEST_ENTITY_NAME),
Mockito.eq("*"),
Mockito.eq(
buildFilter(
Stream.of(
new FacetFilterInput(
"ingestionSource",
null,
ImmutableList.of("urn:li:dataHubIngestionSource:id-1"),
false,
FilterOperator.EQUAL))
.toList(),
Collections.emptyList())),
Mockito.any(),
Mockito.eq(0),
Mockito.eq(20)))
.thenReturn(
new SearchResult()
.setFrom(0)
.setPageSize(0)
.setNumEntities(0)
.setEntities(new SearchEntityArray()));
ListExecutionRequestsResolver resolver = new ListExecutionRequestsResolver(mockClient);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
@ -209,11 +158,68 @@ public class ListExecutionRequestsResolverTest {
@Test
public void testGetWithoutSystemSources() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityClient mockClient =
getTestEntityClient(
new FacetFilterInput(
"sourceType", null, ImmutableList.of("SYSTEM"), true, FilterOperator.EQUAL));
ListExecutionRequestsInput inputWithSystemSourcesOnly =
new ListExecutionRequestsInput(0, 20, "*", List.of(), null, false);
ListExecutionRequestsResolver resolver = new ListExecutionRequestsResolver(mockClient);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockAllowContext();
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(inputWithSystemSourcesOnly);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
var result = resolver.get(mockEnv).get();
assertEquals(result.getExecutionRequests().size(), 0);
}
@Test
public void testGetWithFilteringByAccessibleIngestionSourcesWhenNoPermissions() throws Exception {
EntityClient mockClient = getTestEntityClient(null);
ListExecutionRequestsInput inputWithSystemSourcesOnly =
new ListExecutionRequestsInput(0, 20, "*", List.of(), null, null);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockDenyContext();
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(inputWithSystemSourcesOnly);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
ListExecutionRequestsResolver resolver = new ListExecutionRequestsResolver(mockClient);
var result = resolver.get(mockEnv).get();
assertEquals(result.getExecutionRequests().size(), 0);
}
@Test
public void testGetWithFilteringByAccessibleSystemIngestionSourcesWhenNoPermissions()
throws Exception {
EntityClient mockClient =
getTestEntityClient(
new FacetFilterInput(
"sourceType", null, ImmutableList.of("SYSTEM"), false, FilterOperator.EQUAL));
ListExecutionRequestsInput inputWithSystemSourcesOnly =
new ListExecutionRequestsInput(0, 20, "*", List.of(), null, true);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockDenyContext();
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(inputWithSystemSourcesOnly);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
ListExecutionRequestsResolver resolver = new ListExecutionRequestsResolver(mockClient);
var result = resolver.get(mockEnv).get();
assertEquals(result.getExecutionRequests().size(), 0);
}
private EntityClient getTestEntityClient(@Nullable FacetFilterInput ingestionSourceFilter)
throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
Mockito.when(
mockClient.search(
any(),
@ -221,14 +227,9 @@ public class ListExecutionRequestsResolverTest {
Mockito.eq("*"),
Mockito.eq(
buildFilter(
Stream.of(
new FacetFilterInput(
"sourceType",
null,
ImmutableList.of("SYSTEM"),
false,
FilterOperator.EQUAL))
.toList(),
ingestionSourceFilter != null
? Stream.of(ingestionSourceFilter).toList()
: Collections.emptyList(),
Collections.emptyList())),
Mockito.any(),
Mockito.eq(0),
@ -244,6 +245,7 @@ public class ListExecutionRequestsResolverTest {
new SearchEntity()
.setEntity(
Urn.createFromString("urn:li:dataHubIngestionSource:id-1"))))));
Mockito.when(
mockClient.search(
any(),
@ -256,7 +258,7 @@ public class ListExecutionRequestsResolverTest {
"ingestionSource",
null,
ImmutableList.of("urn:li:dataHubIngestionSource:id-1"),
true,
false,
FilterOperator.EQUAL))
.toList(),
Collections.emptyList())),
@ -270,14 +272,6 @@ public class ListExecutionRequestsResolverTest {
.setNumEntities(0)
.setEntities(new SearchEntityArray()));
ListExecutionRequestsResolver resolver = new ListExecutionRequestsResolver(mockClient);
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockAllowContext();
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(inputWithSystemSourcesOnly);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
var result = resolver.get(mockEnv).get();
assertEquals(result.getExecutionRequests().size(), 0);
return mockClient;
}
}