datahub/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_verbose.py

128 lines
3.8 KiB
Python
Raw Permalink Normal View History

import time
import uuid
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.api.entities.corpuser.corpuser import CorpUser
from datahub.api.entities.datajob.dataflow import DataFlow
from datahub.api.entities.datajob.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
emitter = DatahubRestEmitter("http://localhost:8080")
jobFlow = DataFlow(env="prod", orchestrator="airflow", id="flow2")
jobFlow.emit(emitter)
# Flowurn as constructor
dataJob = DataJob(flow_urn=jobFlow.urn, id="job1", name="My Job 1")
dataJob.properties["custom_properties"] = "test"
dataJob.emit(emitter)
dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2")
dataJob2.upstream_urns.append(dataJob.urn)
dataJob2.tags.add("TestTag")
dataJob2.owners.add("testUser")
dataJob2.emit(emitter)
dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3")
dataJob3.upstream_urns.append(dataJob.urn)
dataJob3.emit(emitter)
dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4")
dataJob4.upstream_urns.append(dataJob2.urn)
dataJob4.upstream_urns.append(dataJob3.urn)
dataJob4.group_owners.add("testGroup")
dataJob4.emit(emitter)
# Hello World
jobFlowRun: DataProcessInstance = DataProcessInstance(
orchestrator="airflow", cluster="prod", id=f"{jobFlow.id}-{uuid.uuid4()}"
)
jobRun1: DataProcessInstance = DataProcessInstance(
orchestrator="airflow",
cluster="prod",
id=f"{jobFlow.id}-{dataJob.id}-{uuid.uuid4()}",
)
jobRun1.parent_instance = jobFlowRun.urn
jobRun1.template_urn = dataJob.urn
jobRun1.emit_process_start(
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
)
jobRun1.emit_process_end(
emitter=emitter,
end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS,
)
jobRun2: DataProcessInstance = DataProcessInstance(
orchestrator="airflow",
cluster="prod",
id=f"{jobFlow.id}-{dataJob2.id}-{uuid.uuid4()}",
)
jobRun2.template_urn = dataJob2.urn
jobRun2.parent_instance = jobFlowRun.urn
jobRun2.upstream_urns = [jobRun1.urn]
jobRun2.emit_process_start(
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
)
jobRun2.emit_process_end(
emitter=emitter,
end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS,
)
jobRun3: DataProcessInstance = DataProcessInstance(
orchestrator="airflow",
cluster="prod",
id=f"{jobFlow.id}-{dataJob3.id}-{uuid.uuid4()}",
)
jobRun3.parent_instance = jobFlowRun.urn
jobRun3.template_urn = dataJob3.urn
jobRun3.upstream_urns = [jobRun1.urn]
jobRun3.emit_process_start(
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
)
jobRun3.emit_process_end(
emitter=emitter,
end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS,
)
jobRun4: DataProcessInstance = DataProcessInstance(
orchestrator="airflow",
cluster="prod",
id=f"{jobFlow.id}-{dataJob4.id}-{uuid.uuid4()}",
)
jobRun4.parent_instance = jobFlowRun.urn
jobRun4.template_urn = dataJob4.urn
jobRun4.upstream_urns = [jobRun2.urn, jobRun3.urn]
jobRun4.emit_process_start(
emitter=emitter, start_timestamp_millis=int(time.time() * 1000), emit_template=False
)
jobRun4.emit_process_end(
emitter=emitter,
end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS,
)
user1 = CorpUser(
id="testUser",
display_name="Test User",
email="test-user@test.com",
groups=["testGroup"],
)
user1.emit(emitter)
group1 = CorpGroup(
id="testGroup",
display_name="Test Group",
email="test-group@test.com",
slack="#test-group",
overrideEditable=True,
)
group1.emit(emitter)