feat(ingestion): add ownership aspect to ingestion source

This commit is contained in:
Victor Tarasevich 2025-05-29 19:29:00 +03:00
parent 393d5525ee
commit 05e0976c81
10 changed files with 91 additions and 27 deletions

View File

@ -1193,9 +1193,11 @@ public class GmsGraphQLEngine {
"revokeAccessToken",
new RevokeAccessTokenResolver(this.entityClient, this.statefulTokenService))
.dataFetcher(
"createIngestionSource", new UpsertIngestionSourceResolver(this.entityClient))
"createIngestionSource",
new UpsertIngestionSourceResolver(this.entityClient, this.entityService))
.dataFetcher(
"updateIngestionSource", new UpsertIngestionSourceResolver(this.entityClient))
"updateIngestionSource",
new UpsertIngestionSourceResolver(this.entityClient, this.entityService))
.dataFetcher(
"deleteIngestionSource", new DeleteIngestionSourceResolver(this.entityClient))
.dataFetcher(

View File

@ -11,15 +11,18 @@ import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.OwnerEntityType;
import com.linkedin.datahub.graphql.generated.StringMapEntryInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.ingestion.DataHubIngestionSourceConfig;
import com.linkedin.ingestion.DataHubIngestionSourceInfo;
import com.linkedin.ingestion.DataHubIngestionSourceSchedule;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubIngestionSourceKey;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
@ -41,9 +44,12 @@ import org.springframework.scheduling.support.CronExpression;
public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFuture<String>> {
private final EntityClient _entityClient;
private final EntityService<?> _entityService;
public UpsertIngestionSourceResolver(final EntityClient entityClient) {
public UpsertIngestionSourceResolver(
final EntityClient entityClient, final EntityService<?> entityService) {
_entityClient = entityClient;
_entityService = entityService;
}
@Override
@ -87,7 +93,15 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
String urn =
_entityClient.ingestProposal(context.getOperationContext(), proposal, false);
if (!ingestionSourceUrn.isPresent()) {
OwnerUtils.addCreatorAsOwner(context, urn, OwnerEntityType.CORP_USER, _entityService);
}
return urn;
} catch (Exception e) {
throw new RuntimeException(
String.format(

View File

@ -1,11 +1,13 @@
package com.linkedin.datahub.graphql.types.ingestion;
import com.linkedin.common.Ownership;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.IngestionConfig;
import com.linkedin.datahub.graphql.generated.IngestionSchedule;
import com.linkedin.datahub.graphql.generated.IngestionSource;
import com.linkedin.datahub.graphql.generated.StringMapEntry;
import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
@ -50,7 +52,7 @@ public class IngestionSourceMapper implements ModelMapper<EntityResponse, Ingest
@Override
public IngestionSource apply(
@Nullable QueryContext context, @Nonnull EntityResponse entityResponse) {
return mapIngestionSource(entityResponse);
return mapIngestionSource(context, entityResponse);
}
/**
@ -59,9 +61,12 @@ public class IngestionSourceMapper implements ModelMapper<EntityResponse, Ingest
* @param ingestionSource The entity response to map.
* @return The mapped {@link IngestionSource}.
*/
private IngestionSource mapIngestionSource(final EntityResponse ingestionSource) {
private IngestionSource mapIngestionSource(
@Nullable QueryContext context, final EntityResponse ingestionSource) {
final Urn entityUrn = ingestionSource.getUrn();
final EnvelopedAspectMap aspects = ingestionSource.getAspects();
final IngestionSource result = new IngestionSource();
result.setUrn(entityUrn.toString());
// There should ALWAYS be an info aspect
// (excepting a case when the source with urn was deleted)
@ -71,31 +76,29 @@ public class IngestionSourceMapper implements ModelMapper<EntityResponse, Ingest
return null;
}
// Bind into a strongly typed object.
final DataHubIngestionSourceInfo ingestionSourceInfo =
new DataHubIngestionSourceInfo(envelopedInfo.getValue().data());
mapIngestionSourceInfo(result, envelopedInfo);
mapOwnership(result, context, entityUrn, aspects);
return mapIngestionSourceInfo(entityUrn, ingestionSourceInfo);
return result;
}
/**
* Maps {@link DataHubIngestionSourceInfo} to {@link IngestionSource}.
* Maps {@link EnvelopedAspect} to {@link IngestionSource}.
*
* @param urn the urn of the ingestion source
* @param info the {@link DataHubIngestionSourceInfo}
* @return the mapped {@link IngestionSource}
* @param result the {@link IngestionSource}
* @param envelopedInfo the {@link EnvelopedAspect}
*/
private IngestionSource mapIngestionSourceInfo(
final Urn urn, final DataHubIngestionSourceInfo info) {
final IngestionSource result = new IngestionSource();
result.setUrn(urn.toString());
private void mapIngestionSourceInfo(
final IngestionSource result, final EnvelopedAspect envelopedInfo) {
final DataHubIngestionSourceInfo info =
new DataHubIngestionSourceInfo(envelopedInfo.getValue().data());
result.setName(info.getName());
result.setType(info.getType());
result.setConfig(mapIngestionSourceConfig(info.getConfig()));
if (info.hasSchedule()) {
result.setSchedule(mapIngestionSourceSchedule(info.getSchedule()));
}
return result;
}
/**
@ -133,4 +136,16 @@ public class IngestionSourceMapper implements ModelMapper<EntityResponse, Ingest
result.setTimezone(schedule.getTimezone());
return result;
}
private void mapOwnership(
final IngestionSource result,
@Nullable QueryContext context,
final Urn urn,
final EnvelopedAspectMap aspects) {
final EnvelopedAspect envelopedOwnership = aspects.get(Constants.OWNERSHIP_ASPECT_NAME);
if (envelopedOwnership != null) {
result.setOwnership(
OwnershipMapper.map(context, new Ownership(envelopedOwnership.getValue().data()), urn));
}
}
}

View File

@ -31,7 +31,8 @@ public class IngestionSourceType
ImmutableSet.of(
Constants.INGESTION_SOURCE_ENTITY_NAME,
Constants.INGESTION_INFO_ASPECT_NAME,
Constants.INGESTION_SOURCE_KEY_ASPECT_NAME);
Constants.INGESTION_SOURCE_KEY_ASPECT_NAME,
Constants.OWNERSHIP_ASPECT_NAME);
/**
* Returns the class of the object that this type loads.

View File

@ -493,6 +493,11 @@ type IngestionSource {
Previous requests to execute the ingestion source
"""
executions(start: Int, count: Int): IngestionSourceExecutionRequests
"""
Ownership metadata of the ingestion source
"""
ownership: Ownership
}
"""

View File

@ -1,5 +1,6 @@
package com.linkedin.datahub.graphql.resolvers.ingest.secret;
import static com.linkedin.datahub.graphql.TestUtils.getMockEntityService;
import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -13,6 +14,7 @@ import com.linkedin.datahub.graphql.resolvers.ingest.source.UpsertIngestionSourc
import com.linkedin.entity.client.EntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubSecretKey;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
@ -75,7 +77,9 @@ public class CreateSecretResolverTest {
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
EntityService<?> mockService = getMockEntityService();
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
@ -91,10 +95,12 @@ public class CreateSecretResolverTest {
public void testGetEntityClientException() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityService<?> mockService = getMockEntityService();
Mockito.doThrow(RemoteInvocationException.class)
.when(mockClient)
.ingestProposal(any(), Mockito.any(), anyBoolean());
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);

View File

@ -5,6 +5,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.testng.Assert.*;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.Ownership;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput;
import com.linkedin.entity.client.EntityClient;
@ -30,6 +31,7 @@ public class ListIngestionSourceResolverTest {
EntityClient mockClient = Mockito.mock(EntityClient.class);
DataHubIngestionSourceInfo returnedInfo = getTestIngestionSourceInfo();
Ownership ownership = getTestOwnership();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId("test");

View File

@ -1,5 +1,6 @@
package com.linkedin.datahub.graphql.resolvers.ingest.source;
import static com.linkedin.datahub.graphql.TestUtils.getMockEntityService;
import static com.linkedin.datahub.graphql.TestUtils.verifyIngestProposal;
import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.*;
import static com.linkedin.metadata.Constants.*;
@ -17,6 +18,7 @@ import com.linkedin.entity.client.EntityClient;
import com.linkedin.ingestion.DataHubIngestionSourceConfig;
import com.linkedin.ingestion.DataHubIngestionSourceInfo;
import com.linkedin.ingestion.DataHubIngestionSourceSchedule;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetchingEnvironment;
import org.mockito.Mockito;
@ -40,7 +42,9 @@ public class UpsertIngestionSourceResolverTest {
public void testGetSuccess() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
EntityService<?> mockService = getMockEntityService();
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
QueryContext mockContext = getMockAllowContext();
@ -78,7 +82,9 @@ public class UpsertIngestionSourceResolverTest {
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
EntityService<?> mockService = getMockEntityService();
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
@ -94,10 +100,12 @@ public class UpsertIngestionSourceResolverTest {
public void testGetEntityClientException() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityService<?> mockService = getMockEntityService();
Mockito.doThrow(RemoteInvocationException.class)
.when(mockClient)
.ingestProposal(any(), any(), Mockito.eq(false));
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
@ -115,7 +123,9 @@ public class UpsertIngestionSourceResolverTest {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
EntityService<?> mockService = getMockEntityService();
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
QueryContext mockContext = getMockAllowContext();
@ -140,7 +150,9 @@ public class UpsertIngestionSourceResolverTest {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
EntityService<?> mockService = getMockEntityService();
UpsertIngestionSourceResolver resolver =
new UpsertIngestionSourceResolver(mockClient, mockService);
// Execute resolver
QueryContext mockContext = getMockAllowContext();

View File

@ -261,6 +261,7 @@ export const IngestionSourceList = () => {
},
platform: null,
executions: null,
ownership: null,
};
addToListIngestionSourcesCache(client, newSource, pageSize, query);
setTimeout(() => {

View File

@ -46,6 +46,9 @@ query listIngestionSources($input: ListIngestionSourcesInput!) {
}
}
}
ownership {
...ownershipFields
}
}
}
}
@ -97,6 +100,9 @@ query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) {
}
}
}
ownership {
...ownershipFields
}
}
}