mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-29 10:57:52 +00:00
fix(ingest): add mutator for ownership types (#13002)
This commit is contained in:
parent
ee4827e1b2
commit
68b4fcb054
@ -0,0 +1,102 @@
|
||||
package com.linkedin.metadata.aspect.hooks;
|
||||
|
||||
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
|
||||
|
||||
import com.linkedin.common.*;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.metadata.aspect.ReadItem;
|
||||
import com.linkedin.metadata.aspect.RetrieverContext;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
|
||||
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nonnull;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
@Setter
|
||||
@Accessors(chain = true)
|
||||
public class OwnershipOwnerTypes extends MutationHook {
|
||||
@Nonnull private AspectPluginConfig config;
|
||||
|
||||
protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
|
||||
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
|
||||
return changeMCPS.stream()
|
||||
.map(
|
||||
item ->
|
||||
aspectFilter(item)
|
||||
? Pair.of(item, processOwnershipAspect(item))
|
||||
: Pair.of(item, false));
|
||||
}
|
||||
|
||||
private static boolean aspectFilter(ReadItem item) {
|
||||
return item.getAspectName().equals(OWNERSHIP_ASPECT_NAME);
|
||||
}
|
||||
|
||||
public static Map<String, List<Urn>> toMap(UrnArrayMap map) {
|
||||
Map<String, List<Urn>> result = new HashMap<>();
|
||||
map.forEach(
|
||||
((s, urns) -> {
|
||||
result.put(s, new ArrayList<>(urns));
|
||||
}));
|
||||
return result;
|
||||
}
|
||||
|
||||
public static UrnArrayMap toUrnArrayMap(Map<String, List<Urn>> map) {
|
||||
UrnArrayMap result = new UrnArrayMap();
|
||||
map.forEach(
|
||||
(key, urns) -> {
|
||||
result.put(key, new UrnArray(urns));
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
public static boolean processOwnershipAspect(ChangeMCP item) {
|
||||
boolean mutated = false;
|
||||
Ownership ownership = item.getAspect(Ownership.class);
|
||||
if (ownership == null) {
|
||||
return false;
|
||||
}
|
||||
UrnArrayMap ownerTypes = ownership.getOwnerTypes();
|
||||
Map<String, List<Urn>> ownerTypesMap;
|
||||
if (ownerTypes == null) {
|
||||
ownerTypesMap = new HashMap<>();
|
||||
mutated = true;
|
||||
} else {
|
||||
ownerTypesMap = toMap(ownerTypes);
|
||||
}
|
||||
OwnerArray owners = ownership.getOwners();
|
||||
for (Owner owner : owners) {
|
||||
String typeKey =
|
||||
Optional.ofNullable(owner.getTypeUrn())
|
||||
.map(Urn::toString)
|
||||
.orElseGet(
|
||||
() ->
|
||||
"urn:li:ownershipType:__system__" + owner.getType().toString().toLowerCase());
|
||||
|
||||
List<Urn> ownerOfType;
|
||||
if (ownerTypesMap.containsKey(typeKey)) {
|
||||
ownerOfType = ownerTypesMap.get(typeKey);
|
||||
} else {
|
||||
ownerOfType = new ArrayList<>();
|
||||
ownerTypesMap.put(typeKey, ownerOfType);
|
||||
mutated = true;
|
||||
}
|
||||
Urn ownerUrn = owner.getOwner();
|
||||
if (!ownerOfType.contains(ownerUrn)) {
|
||||
ownerOfType.add(ownerUrn);
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
if (mutated) {
|
||||
ownership.setOwnerTypes((toUrnArrayMap(ownerTypesMap)));
|
||||
}
|
||||
return mutated;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,151 @@
|
||||
package com.linkedin.metadata.aspect.hooks;
|
||||
|
||||
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
||||
import com.linkedin.common.*;
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.aspect.AspectRetriever;
|
||||
import com.linkedin.metadata.aspect.GraphRetriever;
|
||||
import com.linkedin.metadata.aspect.RetrieverContext;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
|
||||
import com.linkedin.test.metadata.aspect.batch.TestMCP;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class OwnershipOwnerTypeTest {
|
||||
|
||||
private EntityRegistry entityRegistry;
|
||||
private RetrieverContext mockRetrieverContext;
|
||||
private DatasetUrn testDatasetUrn;
|
||||
private final OwnershipOwnerTypes test =
|
||||
new OwnershipOwnerTypes().setConfig(mock(AspectPluginConfig.class));
|
||||
|
||||
@BeforeTest
|
||||
public void init() throws URISyntaxException {
|
||||
testDatasetUrn =
|
||||
DatasetUrn.createFromUrn(
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"));
|
||||
|
||||
entityRegistry = new TestEntityRegistry();
|
||||
AspectRetriever mockAspectRetriever = mock(AspectRetriever.class);
|
||||
when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry);
|
||||
GraphRetriever mockGraphRetriever = mock(GraphRetriever.class);
|
||||
mockRetrieverContext = mock(RetrieverContext.class);
|
||||
when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever);
|
||||
when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoChange() throws URISyntaxException {
|
||||
Ownership ownership = getOwnership(true, true, false);
|
||||
TestMCP mcp = getTestMcpBuilder().recordTemplate(ownership).build();
|
||||
|
||||
List<Pair<ChangeMCP, Boolean>> result =
|
||||
test.writeMutation(Set.of(mcp), mockRetrieverContext).toList();
|
||||
|
||||
assertEquals(result.size(), 1);
|
||||
Pair<ChangeMCP, Boolean> resulted = result.get(0);
|
||||
assertEquals(resulted.getSecond(), false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangeAddBusinessOwnerType() throws URISyntaxException {
|
||||
Ownership ownership = getOwnership(true, false, false);
|
||||
TestMCP mcp = getTestMcpBuilder().recordTemplate(ownership).build();
|
||||
|
||||
List<Pair<ChangeMCP, Boolean>> result =
|
||||
test.writeMutation(Set.of(mcp), mockRetrieverContext).toList();
|
||||
|
||||
assertEquals(result.size(), 1);
|
||||
Pair<ChangeMCP, Boolean> resulted = result.get(0);
|
||||
assertEquals(resulted.getSecond(), true);
|
||||
Ownership resultOwnership = resulted.getFirst().getAspect(Ownership.class);
|
||||
assertEquals(resultOwnership.getOwnerTypes().keySet().size(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangeAddBusinessOwnerTypeUrn() throws URISyntaxException {
|
||||
Ownership ownership = getOwnership(false, false, true);
|
||||
TestMCP mcp = getTestMcpBuilder().recordTemplate(ownership).build();
|
||||
|
||||
List<Pair<ChangeMCP, Boolean>> result =
|
||||
test.writeMutation(Set.of(mcp), mockRetrieverContext).toList();
|
||||
|
||||
assertEquals(result.size(), 1);
|
||||
Pair<ChangeMCP, Boolean> resulted = result.get(0);
|
||||
assertEquals(resulted.getSecond(), true);
|
||||
Ownership resultOwnership = resulted.getFirst().getAspect(Ownership.class);
|
||||
Set<String> ownerTypes = resultOwnership.getOwnerTypes().keySet();
|
||||
assertEquals(ownerTypes, Set.of("urn:li:ownershipType:__system__business_owner"));
|
||||
assertEquals(
|
||||
resultOwnership
|
||||
.getOwnerTypes()
|
||||
.get("urn:li:ownershipType:__system__business_owner")
|
||||
.get(0)
|
||||
.toString(),
|
||||
"urn:li:corpGroup:business_group");
|
||||
}
|
||||
|
||||
private TestMCP.TestMCPBuilder getTestMcpBuilder() {
|
||||
return TestMCP.builder()
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(OWNERSHIP_ASPECT_NAME));
|
||||
}
|
||||
|
||||
private Ownership getOwnership(
|
||||
final boolean has_business_owner_with_type,
|
||||
final boolean has_business_owner_in_owner_type,
|
||||
final boolean has_business_owner_with_type_urn)
|
||||
throws URISyntaxException {
|
||||
|
||||
Ownership ownership = new Ownership();
|
||||
OwnerArray ownerArray = new OwnerArray();
|
||||
if (has_business_owner_with_type) {
|
||||
Owner businessOwner =
|
||||
new Owner().setOwner(getBusinessOwner()).setType(OwnershipType.BUSINESS_OWNER);
|
||||
ownerArray.add(businessOwner);
|
||||
}
|
||||
if (has_business_owner_with_type_urn) {
|
||||
Owner businessOwner =
|
||||
new Owner()
|
||||
.setOwner(getBusinessOwner())
|
||||
.setTypeUrn(new Urn("urn:li:ownershipType:__system__business_owner"));
|
||||
ownerArray.add(businessOwner);
|
||||
}
|
||||
|
||||
if (!ownerArray.isEmpty()) {
|
||||
ownership.setOwners(ownerArray);
|
||||
}
|
||||
UrnArrayMap ownerTypes = new UrnArrayMap();
|
||||
if (has_business_owner_in_owner_type) {
|
||||
ownerTypes.put(
|
||||
"urn:li:ownershipType:__system__business_owner", new UrnArray(getBusinessOwner()));
|
||||
}
|
||||
if (!ownerTypes.isEmpty()) {
|
||||
ownership.setOwnerTypes(ownerTypes);
|
||||
}
|
||||
return ownership;
|
||||
}
|
||||
|
||||
private Urn getBusinessOwner() throws URISyntaxException {
|
||||
return new Urn("urn:li:corpGroup:business_group");
|
||||
}
|
||||
}
|
||||
@ -757,6 +757,17 @@ plugins:
|
||||
aspectName: 'schemaMetadata'
|
||||
- entityName: '*'
|
||||
aspectName: 'editableSchemaMetadata'
|
||||
- className: 'com.linkedin.metadata.aspect.hooks.OwnershipOwnerTypes'
|
||||
enabled: true
|
||||
supportedOperations:
|
||||
- CREATE
|
||||
- UPSERT
|
||||
- UPDATE
|
||||
- RESTATE
|
||||
- PATCH
|
||||
supportedEntityAspectNames:
|
||||
- entityName: '*'
|
||||
aspectName: 'ownership'
|
||||
- className: 'com.linkedin.metadata.aspect.plugins.hooks.MutationHook'
|
||||
enabled: true
|
||||
spring:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user