feat(sdk): data process instance - container, subtype, dataplatformin… (#12476)

This commit is contained in:
Shirshanka Das 2025-01-29 17:53:34 -08:00 committed by GitHub
parent c2a43589c7
commit ffce018cea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 318 additions and 11 deletions

View File

@ -5,7 +5,7 @@ from typing import Callable, Dict, Iterable, List, Optional, Union, cast
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import DatahubKey
from datahub.emitter.mcp_builder import ContainerKey, DatahubKey
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
DataProcessInstanceInput,
DataProcessInstanceOutput,
@ -15,11 +15,15 @@ from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
)
from datahub.metadata.schema_classes import (
AuditStampClass,
ContainerClass,
DataPlatformInstanceClass,
DataProcessInstanceRunEventClass,
DataProcessInstanceRunResultClass,
DataProcessRunStatusClass,
DataProcessTypeClass,
SubTypesClass,
)
from datahub.metadata.urns import DataPlatformInstanceUrn, DataPlatformUrn
from datahub.utilities.str_enum import StrEnum
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
@ -42,7 +46,7 @@ class InstanceRunResult(StrEnum):
@dataclass
class DataProcessInstance:
"""This is a DataProcessInstance class which represent an instance of a DataFlow or DataJob.
"""This is a DataProcessInstance class which represents an instance of a DataFlow, DataJob, or a standalone process within a Container.
Args:
id: The id of the dataprocess instance execution.
@ -71,6 +75,10 @@ class DataProcessInstance:
_template_object: Optional[Union[DataJob, DataFlow]] = field(
init=False, default=None, repr=False
)
data_platform_instance: Optional[str] = None
subtype: Optional[str] = None
container_urn: Optional[str] = None
_platform: Optional[str] = field(init=False, repr=False, default=None)
def __post_init__(self):
self.urn = DataProcessInstanceUrn(
@ -80,6 +88,28 @@ class DataProcessInstance:
id=self.id,
).guid()
)
self._platform = self.orchestrator
try:
# We first try to create from string assuming its an urn
self._platform = str(DataPlatformUrn.from_string(self._platform))
except Exception:
# If it fails, we assume its an id
self._platform = str(DataPlatformUrn(self._platform))
if self.data_platform_instance is not None:
try:
# We first try to create from string assuming its an urn
self.data_platform_instance = str(
DataPlatformInstanceUrn.from_string(self.data_platform_instance)
)
except Exception:
# If it fails, we assume its an id
self.data_platform_instance = str(
DataPlatformInstanceUrn(
platform=self._platform, instance=self.data_platform_instance
)
)
def start_event_mcp(
self, start_timestamp_millis: int, attempt: Optional[int] = None
@ -269,6 +299,29 @@ class DataProcessInstance:
)
yield mcp
assert self._platform
if self.data_platform_instance:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataPlatformInstanceClass(
platform=self._platform, instance=self.data_platform_instance
),
)
yield mcp
if self.subtype:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn), aspect=SubTypesClass(typeNames=[self.subtype])
)
yield mcp
if self.container_urn:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=ContainerClass(container=self.container_urn),
)
yield mcp
yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets)
@staticmethod
@ -309,13 +362,20 @@ class DataProcessInstance:
clone_outlets: bool = False,
) -> "DataProcessInstance":
"""
Generates DataProcessInstance from a DataJob
Generates a DataProcessInstance from a given DataJob.
:param datajob: (DataJob) the datajob from generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:param clone_inlets: (bool) whether to clone datajob's inlets
:param clone_outlets: (bool) whether to clone datajob's outlets
:return: DataProcessInstance
This method creates a DataProcessInstance object using the provided DataJob
and assigns it a unique identifier. Optionally, it can clone the inlets and
outlets from the DataJob to the DataProcessInstance.
Args:
datajob (DataJob): The DataJob instance from which to generate the DataProcessInstance.
id (str): The unique identifier for the DataProcessInstance.
clone_inlets (bool, optional): If True, clones the inlets from the DataJob to the DataProcessInstance. Defaults to False.
clone_outlets (bool, optional): If True, clones the outlets from the DataJob to the DataProcessInstance. Defaults to False.
Returns:
DataProcessInstance: The generated DataProcessInstance object.
"""
dpi: DataProcessInstance = DataProcessInstance(
orchestrator=datajob.flow_urn.orchestrator,
@ -332,14 +392,47 @@ class DataProcessInstance:
return dpi
@staticmethod
def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
def from_container(
container_key: ContainerKey,
id: str,
) -> "DataProcessInstance":
"""
Generates DataProcessInstance from a DataFlow
Create a DataProcessInstance that is located within a Container.
Use this method when you need to represent a DataProcessInstance that
is not an instance of a DataJob or a DataFlow.
e.g. If recording an ad-hoc training run that is just associated with an Experiment.
:param dataflow: (DataFlow) the DataFlow from generate the DataProcessInstance
:param container_key: (ContainerKey) the container key to generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:return: DataProcessInstance
"""
dpi: DataProcessInstance = DataProcessInstance(
id=id,
orchestrator=DataPlatformUrn.from_string(
container_key.platform
).platform_name,
template_urn=None,
container_urn=container_key.as_urn(),
)
return dpi
@staticmethod
def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
"""
Creates a DataProcessInstance from a given DataFlow.
This method generates a DataProcessInstance object using the provided DataFlow
and a specified id. The DataProcessInstance will inherit properties from the
DataFlow such as orchestrator, environment, and template URN.
Args:
dataflow (DataFlow): The DataFlow object from which to generate the DataProcessInstance.
id (str): The unique identifier for the DataProcessInstance.
Returns:
DataProcessInstance: The newly created DataProcessInstance object.
"""
dpi = DataProcessInstance(
id=id,
orchestrator=dataflow.orchestrator,

View File

@ -0,0 +1,214 @@
import time
import unittest
from unittest.mock import Mock
import datahub.metadata.schema_classes as models
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.mcp_builder import ContainerKey
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import RunResultType
from datahub.metadata.schema_classes import (
DataProcessRunStatusClass,
DataProcessTypeClass,
)
from datahub.metadata.urns import DataFlowUrn, DataJobUrn, DataPlatformUrn, DatasetUrn
class TestDataProcessInstance(unittest.TestCase):
def setUp(self):
# Common test data
self.test_id = "test_process_123"
self.test_orchestrator = "airflow"
self.test_cluster = "prod"
# Create mock ContainerKey
self.mock_container_key = ContainerKey(
platform="urn:li:dataPlatform:mlflow", name="test_experiment", env="PROD"
)
# Create mock DataJob
self.mock_flow_urn = DataFlowUrn.create_from_ids(
orchestrator="airflow", flow_id="test_flow", env="prod"
)
self.mock_job_urn = DataJobUrn.create_from_ids(
job_id="test_job", data_flow_urn=str(self.mock_flow_urn)
)
self.mock_datajob = DataJob(
id="test_job",
flow_urn=self.mock_flow_urn,
inlets=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:hive,test_input,PROD)"
)
],
outlets=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:hive,test_output,PROD)"
)
],
)
# Create mock DataFlow
self.mock_dataflow = DataFlow(
orchestrator="airflow", id="test_flow", env="prod"
)
def test_basic_initialization(self):
"""Test basic initialization of DataProcessInstance"""
instance = DataProcessInstance(
id=self.test_id,
orchestrator=self.test_orchestrator,
cluster=self.test_cluster,
)
self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, self.test_orchestrator)
self.assertEqual(instance.cluster, self.test_cluster)
self.assertEqual(instance.type, DataProcessTypeClass.BATCH_SCHEDULED)
def test_from_datajob_creation(self):
"""Test creation of DataProcessInstance from DataJob"""
instance = DataProcessInstance.from_datajob(
datajob=self.mock_datajob,
id=self.test_id,
clone_inlets=True,
clone_outlets=True,
)
self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, "airflow")
self.assertEqual(instance.template_urn, self.mock_datajob.urn)
self.assertEqual(len(instance.inlets), 1)
self.assertEqual(len(instance.outlets), 1)
def test_from_dataflow_creation(self):
"""Test creation of DataProcessInstance from DataFlow"""
instance = DataProcessInstance.from_dataflow(
dataflow=self.mock_dataflow, id=self.test_id
)
self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, "airflow")
self.assertEqual(instance.template_urn, self.mock_dataflow.urn)
def test_from_container_creation(self):
"""Test creation of DataProcessInstance from ContainerKey"""
instance = DataProcessInstance.from_container(
container_key=self.mock_container_key, id=self.test_id
)
self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, "mlflow") # Platform name from URN
self.assertIsNone(
instance.template_urn
) # Should be None for container-based instances
self.assertEqual(instance.container_urn, self.mock_container_key.as_urn())
# Verify the platform is set correctly
expected_platform = str(
DataPlatformUrn.from_string(self.mock_container_key.platform)
)
self.assertEqual(instance._platform, expected_platform)
def test_start_event_generation(self):
"""Test generation of start event MCPs"""
instance = DataProcessInstance(
id=self.test_id, orchestrator=self.test_orchestrator
)
start_time = int(time.time() * 1000)
mcps = list(instance.start_event_mcp(start_time, attempt=1))
self.assertEqual(len(mcps), 1)
start_event = mcps[0]
assert isinstance(start_event.aspect, models.DataProcessInstanceRunEventClass)
self.assertEqual(start_event.aspect.status, DataProcessRunStatusClass.STARTED)
self.assertEqual(start_event.aspect.timestampMillis, start_time)
self.assertEqual(start_event.aspect.attempt, 1)
def test_end_event_generation(self):
"""Test generation of end event MCPs"""
instance = DataProcessInstance(
id=self.test_id, orchestrator=self.test_orchestrator
)
end_time = int(time.time() * 1000)
mcps = list(
instance.end_event_mcp(
end_time, result=InstanceRunResult.SUCCESS, attempt=1
)
)
self.assertEqual(len(mcps), 1)
end_event = mcps[0]
assert isinstance(end_event.aspect, models.DataProcessInstanceRunEventClass)
self.assertEqual(end_event.aspect.status, DataProcessRunStatusClass.COMPLETE)
self.assertEqual(end_event.aspect.timestampMillis, end_time)
assert end_event.aspect.result is not None
self.assertEqual(end_event.aspect.result.type, RunResultType.SUCCESS)
def test_emit_process_with_emitter(self):
"""Test emitting process events with mock emitter"""
mock_emitter = Mock()
instance = DataProcessInstance(
id=self.test_id, orchestrator=self.test_orchestrator
)
# Test emit method
instance.emit(mock_emitter)
self.assertTrue(mock_emitter.emit.called)
# Test emit_process_start
start_time = int(time.time() * 1000)
instance.emit_process_start(mock_emitter, start_time)
self.assertTrue(mock_emitter.emit.called)
# Test emit_process_end
end_time = int(time.time() * 1000)
instance.emit_process_end(
mock_emitter, end_time, result=InstanceRunResult.SUCCESS
)
self.assertTrue(mock_emitter.emit.called)
def test_generate_mcp(self):
"""Test generation of MCPs"""
instance = DataProcessInstance(
id=self.test_id,
orchestrator=self.test_orchestrator,
properties={"env": "prod"},
url="http://test.url",
)
created_time = int(time.time() * 1000)
mcps = list(instance.generate_mcp(created_time, materialize_iolets=True))
# Check if we have the basic MCPs generated
self.assertGreaterEqual(
len(mcps), 2
) # Should at least have properties and relationships
# Verify the properties MCP
properties_mcp = next(
mcp for mcp in mcps if hasattr(mcp.aspect, "customProperties")
)
assert isinstance(
properties_mcp.aspect, models.DataProcessInstancePropertiesClass
)
self.assertEqual(properties_mcp.aspect.name, self.test_id)
self.assertEqual(properties_mcp.aspect.customProperties["env"], "prod")
self.assertEqual(properties_mcp.aspect.externalUrl, "http://test.url")
if __name__ == "__main__":
unittest.main()