mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00

Co-authored-by: mohdsiddique <mohdsiddiquebagwan@gmail.com> Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
97 lines
2.7 KiB
Python
97 lines
2.7 KiB
Python
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from datahub.api.entities.datajob import DataFlow, DataJob
|
|
from datahub.api.entities.dataprocess.dataprocess_instance import (
|
|
DataProcessInstance,
|
|
InstanceRunResult,
|
|
)
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
|
|
emitter = DatahubRestEmitter("http://localhost:8080")
|
|
|
|
jobFlow = DataFlow(env="prod", orchestrator="airflow", id="flow_api_simple")
|
|
jobFlow.emit(emitter)
|
|
|
|
# Flowurn as constructor
|
|
dataJob = DataJob(flow_urn=jobFlow.urn, id="job1", name="My Job 1")
|
|
dataJob.properties["custom_properties"] = "test"
|
|
dataJob.emit(emitter)
|
|
|
|
dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2")
|
|
dataJob2.upstream_urns.append(dataJob.urn)
|
|
dataJob2.tags.add("TestTag")
|
|
dataJob2.owners.add("test@test.com")
|
|
dataJob2.emit(emitter)
|
|
|
|
dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3")
|
|
dataJob3.upstream_urns.append(dataJob.urn)
|
|
dataJob3.emit(emitter)
|
|
|
|
dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4")
|
|
dataJob4.upstream_urns.append(dataJob2.urn)
|
|
dataJob4.upstream_urns.append(dataJob3.urn)
|
|
dataJob4.emit(emitter)
|
|
|
|
jobFlowRun = DataProcessInstance.from_dataflow(
|
|
dataflow=jobFlow, id=f"{jobFlow.id}-{uuid.uuid4()}"
|
|
)
|
|
jobFlowRun.emit_process_start(
|
|
emitter, int(datetime.now(timezone.utc).timestamp() * 1000)
|
|
)
|
|
|
|
|
|
jobRun = DataProcessInstance.from_datajob(
|
|
datajob=dataJob, id=f"{jobFlow.id}-{uuid.uuid4()}"
|
|
)
|
|
jobRun.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
|
|
|
|
jobRun.emit_process_end(
|
|
emitter,
|
|
int(datetime.now(timezone.utc).timestamp() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
|
|
job2Run = DataProcessInstance.from_datajob(
|
|
datajob=dataJob2, id=f"{jobFlow.id}-{uuid.uuid4()}"
|
|
)
|
|
job2Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
|
|
|
|
job2Run.emit_process_end(
|
|
emitter,
|
|
int(datetime.now(timezone.utc).timestamp() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
|
|
job3Run = DataProcessInstance.from_datajob(
|
|
datajob=dataJob3, id=f"{jobFlow.id}-{uuid.uuid4()}"
|
|
)
|
|
job3Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
|
|
|
|
job3Run.emit_process_end(
|
|
emitter,
|
|
int(datetime.now(timezone.utc).timestamp() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
|
|
job4Run = DataProcessInstance.from_datajob(
|
|
datajob=dataJob4, id=f"{jobFlow.id}-{uuid.uuid4()}"
|
|
)
|
|
job4Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
|
|
|
|
job4Run.emit_process_end(
|
|
emitter,
|
|
int(datetime.now(timezone.utc).timestamp() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
|
|
jobFlowRun.emit_process_end(
|
|
emitter,
|
|
int(datetime.now(timezone.utc).timestamp() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|