mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-12 09:23:52 +00:00
78 lines
2.7 KiB
Python
78 lines
2.7 KiB
Python
# metadata-ingestion/examples/library/incident_update_status.py
|
|
import logging
|
|
|
|
import datahub.emitter.mce_builder as builder
|
|
import datahub.metadata.schema_classes as models
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
|
from datahub.metadata.urns import IncidentUrn
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Configuration
|
|
gms_endpoint = "http://localhost:8080"
|
|
emitter = DatahubRestEmitter(gms_server=gms_endpoint, extra_headers={})
|
|
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))
|
|
|
|
# Specify the incident to update (use the incident ID from incident_create.py)
|
|
incident_id = "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d"
|
|
incident_urn = IncidentUrn(incident_id)
|
|
|
|
# Retrieve the current incident info to preserve other fields
|
|
current_incident_info = graph.get_aspect(
|
|
entity_urn=str(incident_urn),
|
|
aspect_type=models.IncidentInfoClass,
|
|
)
|
|
|
|
if not current_incident_info:
|
|
raise ValueError(f"Incident {incident_urn} not found")
|
|
|
|
# Get the current actor URN for audit stamps
|
|
actor_urn = builder.make_user_urn("jdoe")
|
|
audit_stamp = models.AuditStampClass(
|
|
time=int(builder.get_sys_time() * 1000),
|
|
actor=actor_urn,
|
|
)
|
|
|
|
# Update the status to reflect progress in resolving the incident
|
|
current_incident_info.status = models.IncidentStatusClass(
|
|
state=models.IncidentStateClass.ACTIVE,
|
|
stage=models.IncidentStageClass.WORK_IN_PROGRESS,
|
|
message="Pipeline has been restarted. Monitoring for successful completion.",
|
|
lastUpdated=audit_stamp,
|
|
)
|
|
|
|
# Optionally update priority if severity assessment changed
|
|
current_incident_info.priority = (
|
|
1 # HIGH priority (0=CRITICAL, 1=HIGH, 2=MEDIUM, 3=LOW)
|
|
)
|
|
|
|
# Optionally assign team members to work on the incident
|
|
assignee1 = models.IncidentAssigneeClass(
|
|
actor=builder.make_user_urn("jdoe"),
|
|
assignedAt=audit_stamp,
|
|
)
|
|
assignee2 = models.IncidentAssigneeClass(
|
|
actor=builder.make_user_urn("asmith"),
|
|
assignedAt=audit_stamp,
|
|
)
|
|
current_incident_info.assignees = [assignee1, assignee2]
|
|
|
|
# Create and emit the metadata change proposal
|
|
metadata_change_proposal = MetadataChangeProposalWrapper(
|
|
entityUrn=str(incident_urn),
|
|
aspect=current_incident_info,
|
|
)
|
|
|
|
emitter.emit(metadata_change_proposal)
|
|
log.info(
|
|
f"Updated incident {incident_urn} status to {current_incident_info.status.state}"
|
|
)
|
|
log.info(
|
|
f"Status details: stage={current_incident_info.status.stage}, message={current_incident_info.status.message}"
|
|
)
|
|
log.info(f"Priority updated to {current_incident_info.priority}")
|
|
log.info(f"Assigned to {len(current_incident_info.assignees)} team members")
|