mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00

Co-authored-by: mohdsiddique <mohdsiddiquebagwan@gmail.com> Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
import time
|
|
import uuid
|
|
|
|
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
|
|
from datahub.api.entities.corpuser.corpuser import CorpUser
|
|
from datahub.api.entities.datajob.dataflow import DataFlow
|
|
from datahub.api.entities.datajob.datajob import DataJob
|
|
from datahub.api.entities.dataprocess.dataprocess_instance import (
|
|
DataProcessInstance,
|
|
InstanceRunResult,
|
|
)
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
|
|
emitter = DatahubRestEmitter("http://localhost:8080")
|
|
|
|
jobFlow = DataFlow(env="prod", orchestrator="airflow", id="flow2")
|
|
jobFlow.emit(emitter)
|
|
|
|
# Flowurn as constructor
|
|
dataJob = DataJob(flow_urn=jobFlow.urn, id="job1", name="My Job 1")
|
|
dataJob.properties["custom_properties"] = "test"
|
|
dataJob.emit(emitter)
|
|
|
|
dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2")
|
|
dataJob2.upstream_urns.append(dataJob.urn)
|
|
dataJob2.tags.add("TestTag")
|
|
dataJob2.owners.add("testUser")
|
|
dataJob2.emit(emitter)
|
|
|
|
dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3")
|
|
dataJob3.upstream_urns.append(dataJob.urn)
|
|
dataJob3.emit(emitter)
|
|
|
|
dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4")
|
|
dataJob4.upstream_urns.append(dataJob2.urn)
|
|
dataJob4.upstream_urns.append(dataJob3.urn)
|
|
dataJob4.group_owners.add("testGroup")
|
|
dataJob4.emit(emitter)
|
|
|
|
# Hello World
|
|
jobFlowRun: DataProcessInstance = DataProcessInstance(
|
|
orchestrator="airflow", cluster="prod", id=f"{jobFlow.id}-{uuid.uuid4()}"
|
|
)
|
|
jobRun1: DataProcessInstance = DataProcessInstance(
|
|
orchestrator="airflow",
|
|
cluster="prod",
|
|
id=f"{jobFlow.id}-{dataJob.id}-{uuid.uuid4()}",
|
|
)
|
|
jobRun1.parent_instance = jobFlowRun.urn
|
|
jobRun1.template_urn = dataJob.urn
|
|
jobRun1.emit_process_start(
|
|
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
|
|
)
|
|
jobRun1.emit_process_end(
|
|
emitter=emitter,
|
|
end_timestamp_millis=int(time.time() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
jobRun2: DataProcessInstance = DataProcessInstance(
|
|
orchestrator="airflow",
|
|
cluster="prod",
|
|
id=f"{jobFlow.id}-{dataJob2.id}-{uuid.uuid4()}",
|
|
)
|
|
jobRun2.template_urn = dataJob2.urn
|
|
jobRun2.parent_instance = jobFlowRun.urn
|
|
jobRun2.upstream_urns = [jobRun1.urn]
|
|
jobRun2.emit_process_start(
|
|
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
|
|
)
|
|
jobRun2.emit_process_end(
|
|
emitter=emitter,
|
|
end_timestamp_millis=int(time.time() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
|
|
jobRun3: DataProcessInstance = DataProcessInstance(
|
|
orchestrator="airflow",
|
|
cluster="prod",
|
|
id=f"{jobFlow.id}-{dataJob3.id}-{uuid.uuid4()}",
|
|
)
|
|
jobRun3.parent_instance = jobFlowRun.urn
|
|
jobRun3.template_urn = dataJob3.urn
|
|
jobRun3.upstream_urns = [jobRun1.urn]
|
|
jobRun3.emit_process_start(
|
|
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
|
|
)
|
|
jobRun3.emit_process_end(
|
|
emitter=emitter,
|
|
end_timestamp_millis=int(time.time() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
jobRun4: DataProcessInstance = DataProcessInstance(
|
|
orchestrator="airflow",
|
|
cluster="prod",
|
|
id=f"{jobFlow.id}-{dataJob4.id}-{uuid.uuid4()}",
|
|
)
|
|
jobRun4.parent_instance = jobFlowRun.urn
|
|
jobRun4.template_urn = dataJob4.urn
|
|
jobRun4.upstream_urns = [jobRun2.urn, jobRun3.urn]
|
|
jobRun4.emit_process_start(
|
|
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
|
|
)
|
|
jobRun4.emit_process_end(
|
|
emitter=emitter,
|
|
end_timestamp_millis=int(time.time() * 1000),
|
|
result=InstanceRunResult.SUCCESS,
|
|
)
|
|
|
|
user1 = CorpUser(
|
|
id="testUser",
|
|
display_name="Test User",
|
|
email="test-user@test.com",
|
|
groups=["testGroup"],
|
|
)
|
|
user1.emit(emitter)
|
|
|
|
group1 = CorpGroup(
|
|
id="testGroup",
|
|
display_name="Test Group",
|
|
email="test-group@test.com",
|
|
slack="#test-group",
|
|
overrideEditable=True,
|
|
)
|
|
group1.emit(emitter)
|