feat(auditEvents): add in top level delete policy event by default (#13928)

This commit is contained in:
RyanHolstien 2025-07-01 17:46:25 -05:00 committed by GitHub
parent aa6e658cda
commit 1c34f96b6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 255 additions and 8 deletions

View File

@ -133,6 +133,7 @@ Several event types track specific creation, update, and deletion actions:
- **RevokeAccessTokenEvent**: Tracks access token revocation - **RevokeAccessTokenEvent**: Tracks access token revocation
- **CreatePolicyEvent**: Tracks policy creation - **CreatePolicyEvent**: Tracks policy creation
- **UpdatePolicyEvent**: Tracks policy updates - **UpdatePolicyEvent**: Tracks policy updates
- **DeletePolicyEvent**: Tracks policy deletes
- **CreateIngestionSourceEvent**: Tracks ingestion source creation - **CreateIngestionSourceEvent**: Tracks ingestion source creation
- **UpdateIngestionSourceEvent**: Tracks ingestion source updates - **UpdateIngestionSourceEvent**: Tracks ingestion source updates
- **DeleteEntityEvent**: Tracks entity deletion - **DeleteEntityEvent**: Tracks entity deletion

View File

@ -2012,8 +2012,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
}); });
} }
protected Attributes mapEventAttributes( @VisibleForTesting
MetadataChangeLog metadataChangeLog, OperationContext opContext) { Attributes mapEventAttributes(MetadataChangeLog metadataChangeLog, OperationContext opContext) {
AttributesBuilder attributesBuilder = Attributes.builder(); AttributesBuilder attributesBuilder = Attributes.builder();
Optional.ofNullable(metadataChangeLog.getSystemMetadata()) Optional.ofNullable(metadataChangeLog.getSystemMetadata())
@ -2055,7 +2055,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
* Right now this is limited to target use cases so the logic is simplified, might make sense to * Right now this is limited to target use cases so the logic is simplified, might make sense to
* make this entity registry & model driven * make this entity registry & model driven
*/ */
protected void mapAspectToUsageEvent( @VisibleForTesting
void mapAspectToUsageEvent(
AttributesBuilder attributesBuilder, MetadataChangeLog metadataChangeLog) { AttributesBuilder attributesBuilder, MetadataChangeLog metadataChangeLog) {
String aspectName = String aspectName =
Optional.ofNullable(metadataChangeLog.getAspectName()).orElse(StringUtils.EMPTY); Optional.ofNullable(metadataChangeLog.getAspectName()).orElse(StringUtils.EMPTY);
@ -2100,6 +2101,9 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
case ACCESS_TOKEN_KEY_ASPECT_NAME: case ACCESS_TOKEN_KEY_ASPECT_NAME:
eventType = DataHubUsageEventType.REVOKE_ACCESS_TOKEN_EVENT.getType(); eventType = DataHubUsageEventType.REVOKE_ACCESS_TOKEN_EVENT.getType();
break; break;
case DATAHUB_POLICY_KEY_ASPECT_NAME:
eventType = DataHubUsageEventType.DELETE_POLICY_EVENT.getType();
break;
default: default:
eventType = DataHubUsageEventType.DELETE_ENTITY_EVENT.getType(); eventType = DataHubUsageEventType.DELETE_ENTITY_EVENT.getType();
} }

View File

@ -1,9 +1,8 @@
package com.linkedin.metadata.entity; package com.linkedin.metadata.entity;
import static com.linkedin.metadata.Constants.DATASET_PROFILE_ASPECT_NAME; import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP; import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP;
import static com.linkedin.metadata.telemetry.OpenTelemetryKeyConstants.EVENT_TYPE_ATTR;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
@ -40,6 +39,7 @@ import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.datahubusage.DataHubUsageEventType;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.EbeanSystemAspect; import com.linkedin.metadata.entity.ebean.EbeanSystemAspect;
import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.ebean.PartitionedStream;
@ -53,11 +53,15 @@ import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils; import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata; import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair; import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts; import io.datahubproject.test.metadata.context.TestOperationContexts;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import jakarta.persistence.EntityNotFoundException; import jakarta.persistence.EntityNotFoundException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
@ -1036,4 +1040,143 @@ public class EntityServiceImplTest {
assertEquals(1, results.size()); assertEquals(1, results.size());
assertEquals(1, results.get(0).rowsMigrated); assertEquals(1, results.get(0).rowsMigrated);
} }
@Test
public void testMapAspectToUsageEvent() {
// Test cases for create/update operations
// Test ACCESS_TOKEN_KEY_ASPECT_NAME
AttributeKey<String> eventTypeAttrKey = AttributeKey.stringKey(EVENT_TYPE_ATTR);
AttributesBuilder attributesBuilder = Attributes.builder();
MetadataChangeLog mcl = new MetadataChangeLog();
mcl.setAspectName(ACCESS_TOKEN_KEY_ASPECT_NAME);
mcl.setChangeType(ChangeType.UPSERT);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.CREATE_ACCESS_TOKEN_EVENT.getType());
// Test INGESTION_SOURCE_KEY_ASPECT_NAME
attributesBuilder = Attributes.builder();
mcl.setAspectName(INGESTION_SOURCE_KEY_ASPECT_NAME);
mcl.setChangeType(ChangeType.CREATE);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.CREATE_INGESTION_SOURCE_EVENT.getType());
// Test INGESTION_INFO_ASPECT_NAME
attributesBuilder = Attributes.builder();
mcl.setAspectName(INGESTION_INFO_ASPECT_NAME);
mcl.setChangeType(ChangeType.UPDATE);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.UPDATE_INGESTION_SOURCE_EVENT.getType());
// Test DATAHUB_POLICY_KEY_ASPECT_NAME
attributesBuilder = Attributes.builder();
mcl.setAspectName(DATAHUB_POLICY_KEY_ASPECT_NAME);
mcl.setChangeType(ChangeType.CREATE_ENTITY);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.CREATE_POLICY_EVENT.getType());
// Test DATAHUB_POLICY_INFO_ASPECT_NAME
attributesBuilder = Attributes.builder();
mcl.setAspectName(DATAHUB_POLICY_INFO_ASPECT_NAME);
mcl.setChangeType(ChangeType.PATCH);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.UPDATE_POLICY_EVENT.getType());
// Test CORP_USER_KEY_ASPECT_NAME
attributesBuilder = Attributes.builder();
mcl.setAspectName(CORP_USER_KEY_ASPECT_NAME);
mcl.setChangeType(ChangeType.UPSERT);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.CREATE_USER_EVENT.getType());
// Test CORP_USER_INFO_ASPECT_NAME (should map to UPDATE_USER_EVENT)
attributesBuilder = Attributes.builder();
mcl.setAspectName(CORP_USER_INFO_ASPECT_NAME);
mcl.setChangeType(ChangeType.UPSERT);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.UPDATE_USER_EVENT.getType());
// Test GROUP_MEMBERSHIP_ASPECT_NAME (should map to UPDATE_USER_EVENT)
attributesBuilder = Attributes.builder();
mcl.setAspectName(GROUP_MEMBERSHIP_ASPECT_NAME);
mcl.setChangeType(ChangeType.UPSERT);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.UPDATE_USER_EVENT.getType());
// Test default case for create/update
attributesBuilder = Attributes.builder();
mcl.setAspectName("unknownAspect");
mcl.setChangeType(ChangeType.UPSERT);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.UPDATE_ASPECT_EVENT.getType());
// Test DELETE operations
// Test ACCESS_TOKEN_KEY_ASPECT_NAME delete
attributesBuilder = Attributes.builder();
mcl.setAspectName(ACCESS_TOKEN_KEY_ASPECT_NAME);
mcl.setChangeType(ChangeType.DELETE);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.REVOKE_ACCESS_TOKEN_EVENT.getType());
// Test DATAHUB_POLICY_KEY_ASPECT_NAME delete
attributesBuilder = Attributes.builder();
mcl.setAspectName(DATAHUB_POLICY_KEY_ASPECT_NAME);
mcl.setChangeType(ChangeType.DELETE);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.DELETE_POLICY_EVENT.getType());
// Test default delete case
attributesBuilder = Attributes.builder();
mcl.setAspectName("unknownAspect");
mcl.setChangeType(ChangeType.DELETE);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.DELETE_ENTITY_EVENT.getType());
// Test other change types (should default to ENTITY_EVENT)
attributesBuilder = Attributes.builder();
mcl.setAspectName("someAspect");
mcl.setChangeType(ChangeType.RESTATE);
entityService.mapAspectToUsageEvent(attributesBuilder, mcl);
assertEquals(
attributesBuilder.build().get(eventTypeAttrKey),
DataHubUsageEventType.ENTITY_EVENT.getType());
}
} }

View File

@ -172,7 +172,7 @@ platformAnalytics:
enabled: ${DATAHUB_ANALYTICS_ENABLED:true} enabled: ${DATAHUB_ANALYTICS_ENABLED:true}
usageExport: usageExport:
enabled: ${DATAHUB_ANALYTICS_TRACING_ENABLED:true} # Enables/Disables backend usage tracing enabled: ${DATAHUB_ANALYTICS_TRACING_ENABLED:true} # Enables/Disables backend usage tracing
usageEventTypes: ${ANALYTICS_DATAHUB_USAGE_EVENT_TYPES:CreateAccessTokenEvent,CreatePolicyEvent,UpdatePolicyEvent,CreateIngestionSourceEvent,UpdateIngestionSourceEvent,RevokeAccessTokenEvent,CreateUserEvent,UpdateUserEvent} # Comma separated list that determines which usage event types to listen to, See DataHubUsageEventType for list usageEventTypes: ${ANALYTICS_DATAHUB_USAGE_EVENT_TYPES:CreateAccessTokenEvent,CreatePolicyEvent,UpdatePolicyEvent,CreateIngestionSourceEvent,UpdateIngestionSourceEvent,RevokeAccessTokenEvent,CreateUserEvent,UpdateUserEvent,DeletePolicyEvent} # Comma separated list that determines which usage event types to listen to, See DataHubUsageEventType for list
aspectTypes: ${ANALYTICS_GENERIC_ASPECT_TYPES:} # A filter list for the generic aspect events, i.e. events that don't fall into a typed bucket in the above specific event types aspectTypes: ${ANALYTICS_GENERIC_ASPECT_TYPES:} # A filter list for the generic aspect events, i.e. events that don't fall into a typed bucket in the above specific event types
userFilters: "${ANALYTICS_USER_FILTERS:}" # Filter out specific users' events from being published at all userFilters: "${ANALYTICS_USER_FILTERS:}" # Filter out specific users' events from being published at all

View File

@ -110,7 +110,8 @@ public enum DataHubUsageEventType {
UPDATE_USER_EVENT("UpdateUserEvent"), UPDATE_USER_EVENT("UpdateUserEvent"),
UPDATE_ASPECT_EVENT("UpdateAspectEvent"), UPDATE_ASPECT_EVENT("UpdateAspectEvent"),
ENTITY_EVENT("EntityEvent"), ENTITY_EVENT("EntityEvent"),
FAILED_LOGIN_EVENT("FailedLogInEvent"); FAILED_LOGIN_EVENT("FailedLogInEvent"),
DELETE_POLICY_EVENT("DeletePolicyEvent");
private final String type; private final String type;

View File

@ -0,0 +1,11 @@
package com.linkedin.metadata.datahubusage.event;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@NoArgsConstructor(force = true, access = AccessLevel.PROTECTED)
@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
public class DeletePolicyEvent extends DeleteEntityEvent {}

View File

@ -29,6 +29,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = CreatePolicyEvent.class, name = "CreatePolicyEvent"), @JsonSubTypes.Type(value = CreatePolicyEvent.class, name = "CreatePolicyEvent"),
@JsonSubTypes.Type(value = CreateUserEvent.class, name = "CreateUserEvent"), @JsonSubTypes.Type(value = CreateUserEvent.class, name = "CreateUserEvent"),
@JsonSubTypes.Type(value = DeleteEntityEvent.class, name = "DeleteEntityEvent"), @JsonSubTypes.Type(value = DeleteEntityEvent.class, name = "DeleteEntityEvent"),
@JsonSubTypes.Type(value = DeletePolicyEvent.class, name = "DeletePolicyEvent"),
@JsonSubTypes.Type(value = LogInEvent.class, name = "LogInEvent"), @JsonSubTypes.Type(value = LogInEvent.class, name = "LogInEvent"),
@JsonSubTypes.Type(value = RevokeAccessTokenEvent.class, name = "RevokeAccessTokenEvent"), @JsonSubTypes.Type(value = RevokeAccessTokenEvent.class, name = "RevokeAccessTokenEvent"),
@JsonSubTypes.Type(value = EntityEvent.class, name = "EntityEvent"), @JsonSubTypes.Type(value = EntityEvent.class, name = "EntityEvent"),

View File

@ -460,6 +460,92 @@ def test_user_events(auth_exclude_filter):
user_session.cookies.clear() user_session.cookies.clear()
def test_policy_create_delete(auth_exclude_filter):
user_session = login_as(admin_user, admin_pass)
json = {
"query": """mutation createPolicy($input: PolicyUpdateInput!) {\n
createPolicy(input: $input) }""",
"variables": {
"input": {
"type": "METADATA",
"name": "Test Metadata Policy",
"description": "My New Metadata Policy",
"state": "ACTIVE",
"resources": {"type": "dataset", "allResources": True},
"privileges": ["EDIT_ENTITY_TAGS"],
"actors": {
"users": ["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
"resourceOwners": False,
"allUsers": False,
"allGroups": False,
},
}
},
}
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createPolicy"]
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
new_urn = res_data["data"]["createPolicy"]
update_json = {
"query": """mutation deletePolicy($urn: String!) {\n
deletePolicy(urn: $urn) }""",
"variables": {
"urn": new_urn,
},
}
response = user_session.post(
f"{get_frontend_url()}/api/v2/graphql", json=update_json
)
response.raise_for_status()
res_data = response.json()
# Check updated was submitted successfully
assert res_data
assert res_data["data"]
assert res_data["data"]["deletePolicy"]
assert res_data["data"]["deletePolicy"] == new_urn
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
res_data = searchForAuditEvents(
user_session,
3,
["CreatePolicyEvent", "DeletePolicyEvent"],
["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
[],
)
print(res_data)
assert res_data
assert res_data["usageEvents"]
assert len(res_data["usageEvents"]) == 3 or len(res_data["usageEvents"]) == 2
assert (
res_data["usageEvents"][0]["eventType"] == "CreatePolicyEvent"
or res_data["usageEvents"][0]["eventType"] == "DeletePolicyEvent"
)
assert res_data["usageEvents"][0]["entityUrn"] == new_urn
assert (
res_data["usageEvents"][1]["eventType"] == "CreatePolicyEvent"
or res_data["usageEvents"][1]["eventType"] == "DeletePolicyEvent"
)
assert res_data["usageEvents"][1]["entityUrn"] == new_urn
if len(res_data["usageEvents"]) == 3:
assert (
res_data["usageEvents"][2]["eventType"] == "CreatePolicyEvent"
or res_data["usageEvents"][2]["eventType"] == "DeletePolicyEvent"
)
assert res_data["usageEvents"][2]["entityUrn"] == new_urn
user_session.cookies.clear()
def generateAccessToken_v2(session, actorUrn): def generateAccessToken_v2(session, actorUrn):
# Create new token # Create new token
json = { json = {