feat(ingest) - add audit actor urn to auditStamp (#5264)

This commit is contained in:
neojunjie 2022-07-09 00:21:16 +08:00 committed by GitHub
parent 39a4ef842d
commit 5bb7fe3691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 43 additions and 16 deletions

View File

@ -2,6 +2,7 @@ apply plugin: 'java'
dependencies {
compile project(':metadata-service:auth-api')
compile project(':metadata-service:factories')
compile externalDependency.reflections

View File

@ -2,6 +2,8 @@ package io.datahubproject.openapi.entities;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
@ -104,9 +106,12 @@ public class EntitiesController {
@RequestBody @Nonnull List<UpsertAspectRequest> aspectRequests) {
log.info("INGEST PROPOSAL proposal: {}", aspectRequests);
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
List<Pair<String, Boolean>> responses = aspectRequests.stream()
.map(MappingUtil::mapToProposal)
.map(proposal -> MappingUtil.ingestProposal(proposal, _entityService, _objectMapper))
.map(proposal -> MappingUtil.ingestProposal(proposal, actorUrnStr, _entityService, _objectMapper))
.collect(Collectors.toList());
if (responses.stream().anyMatch(Pair::getSecond)) {
return ResponseEntity.status(HttpStatus.CREATED)
@ -140,10 +145,14 @@ public class EntitiesController {
List<UpsertAspectRequest> deleteRequests = entityUrns.stream()
.map(entityUrn -> MappingUtil.createStatusRemoval(entityUrn, _entityService))
.collect(Collectors.toList());
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
return ResponseEntity.ok(Collections.singletonList(RollbackRunResultDto.builder()
.rowsRolledBack(deleteRequests.stream()
.map(MappingUtil::mapToProposal)
.map(proposal -> MappingUtil.ingestProposal(proposal, _entityService, _objectMapper))
.map(proposal -> MappingUtil.ingestProposal(proposal, actorUrnStr, _entityService, _objectMapper))
.filter(Pair::getSecond)
.map(Pair::getFirst)
.map(urnString -> new AspectRowSummary().urn(urnString))

View File

@ -1,5 +1,7 @@
package io.datahubproject.openapi.platform.entities;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.util.Pair;
@ -44,8 +46,11 @@ public class PlatformEntitiesController {
@RequestBody @Nonnull List<MetadataChangeProposal> metadataChangeProposals) {
log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposals);
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
List<Pair<String, Boolean>> responses = metadataChangeProposals.stream()
.map(proposal -> MappingUtil.ingestProposal(proposal, _entityService, _objectMapper))
.map(proposal -> MappingUtil.ingestProposal(proposal, actorUrnStr, _entityService, _objectMapper))
.collect(Collectors.toList());
if (responses.stream().anyMatch(Pair::getSecond)) {
return ResponseEntity.status(HttpStatus.CREATED)

View File

@ -14,7 +14,6 @@ import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RollbackRunResult;
import com.linkedin.metadata.entity.ValidationException;
@ -246,13 +245,13 @@ public class MappingUtil {
}
}
public static Pair<String, Boolean> ingestProposal(MetadataChangeProposal metadataChangeProposal, EntityService entityService,
public static Pair<String, Boolean> ingestProposal(MetadataChangeProposal metadataChangeProposal, String actorUrn, EntityService entityService,
ObjectMapper objectMapper) {
// TODO: Use the actor present in the IC.
Timer.Context context = MetricUtils.timer("postEntity").time();
final com.linkedin.common.AuditStamp auditStamp =
new com.linkedin.common.AuditStamp().setTime(System.currentTimeMillis())
.setActor(UrnUtils.getUrn(Constants.UNKNOWN_ACTOR));
.setActor(UrnUtils.getUrn(actorUrn));
io.datahubproject.openapi.generated.KafkaAuditHeader auditHeader = metadataChangeProposal.getAuditHeader();
com.linkedin.mxe.MetadataChangeProposal serviceProposal =

View File

@ -1,5 +1,9 @@
package entities;
import com.datahub.authentication.Actor;
import com.datahub.authentication.ActorType;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.event.EventProducer;
@ -34,6 +38,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static com.linkedin.metadata.Constants.*;
import static org.mockito.Mockito.when;
public class EntitiesControllerTest {
@ -53,6 +58,9 @@ public class EntitiesControllerTest {
EventProducer mockEntityEventProducer = Mockito.mock(EventProducer.class);
MockEntityService mockEntityService = new MockEntityService(aspectDao, mockEntityEventProducer, mockEntityRegistry);
_entitiesController = new EntitiesController(mockEntityService, new ObjectMapper());
Authentication authentication = Mockito.mock(Authentication.class);
when(authentication.getActor()).thenReturn(new Actor(ActorType.USER, "datahub"));
AuthenticationContext.setAuthentication(authentication);
}
EntitiesController _entitiesController;

View File

@ -33,6 +33,7 @@ dependencies {
}
compile project(':metadata-service:restli-api')
compile project(':metadata-service:auth-api')
compile project(path: ':metadata-service:restli-api', configuration: 'dataTemplate')
compile project(':li-utils')
compile project(':metadata-models')

View File

@ -1,10 +1,11 @@
package com.linkedin.metadata.resources.entity;
import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.entity.EntityService;
@ -124,9 +125,10 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
@ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal) throws URISyntaxException {
log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposal);
// TODO: Use the actor present in the IC.
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));
final List<MetadataChangeProposal> additionalChanges =
AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService);

View File

@ -1,13 +1,14 @@
package com.linkedin.metadata.resources.entity;
import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.LongMap;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.Entity;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.browse.BrowseResult;
import com.linkedin.metadata.entity.DeleteEntityService;
import com.linkedin.metadata.entity.EntityService;
@ -196,9 +197,9 @@ public class EntityResource extends CollectionResourceTaskTemplate<String, Entit
SystemMetadata systemMetadata = populateDefaultFieldsIfEmpty(providedSystemMetadata);
// TODO Correctly audit ingestions.
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));
// variables referenced in lambdas are required to be final
final SystemMetadata finalSystemMetadata = systemMetadata;
@ -222,8 +223,9 @@ public class EntityResource extends CollectionResourceTaskTemplate<String, Entit
}
}
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));
if (systemMetadataList == null) {
systemMetadataList = new SystemMetadata[entities.length];