2025-03-13 11:02:15 -07:00
|
|
|
import contextlib
|
2025-04-10 22:54:17 -07:00
|
|
|
from datetime import timedelta
|
2025-03-13 11:02:15 -07:00
|
|
|
from typing import List
|
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
import pytest
|
2025-03-21 14:06:17 -07:00
|
|
|
from google.cloud.aiplatform import Experiment, ExperimentRun
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
import datahub.emitter.mce_builder as builder
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
2025-04-01 00:00:09 -07:00
|
|
|
from datahub.emitter.mcp_builder import ExperimentKey
|
2025-03-13 11:02:15 -07:00
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
2025-03-21 14:06:17 -07:00
|
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
|
|
from datahub.ingestion.source.common.subtypes import MLAssetSubTypes
|
|
|
|
from datahub.ingestion.source.vertexai.vertexai import (
|
2025-03-13 11:02:15 -07:00
|
|
|
ModelMetadata,
|
|
|
|
TrainingJobMetadata,
|
|
|
|
VertexAIConfig,
|
|
|
|
VertexAISource,
|
|
|
|
)
|
2025-03-21 14:06:17 -07:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
|
2025-04-10 22:54:17 -07:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
|
|
|
|
DataProcessInstanceRelationships,
|
|
|
|
)
|
2025-03-13 11:02:15 -07:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import (
|
|
|
|
MLModelGroupProperties,
|
|
|
|
MLModelProperties,
|
2025-03-21 14:06:17 -07:00
|
|
|
MLTrainingRunProperties,
|
2025-03-13 11:02:15 -07:00
|
|
|
)
|
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
ContainerClass,
|
2025-03-21 14:06:17 -07:00
|
|
|
ContainerPropertiesClass,
|
2025-04-10 22:54:17 -07:00
|
|
|
DataFlowInfoClass,
|
|
|
|
DataJobInfoClass,
|
|
|
|
DataJobInputOutputClass,
|
2025-03-21 14:06:17 -07:00
|
|
|
DataPlatformInstanceClass,
|
2025-03-13 11:02:15 -07:00
|
|
|
DataProcessInstanceInputClass,
|
|
|
|
DataProcessInstancePropertiesClass,
|
|
|
|
DatasetPropertiesClass,
|
2025-03-21 14:06:17 -07:00
|
|
|
EdgeClass,
|
2025-04-10 22:54:17 -07:00
|
|
|
GlobalTagsClass,
|
2025-03-13 11:02:15 -07:00
|
|
|
MLModelDeploymentPropertiesClass,
|
2025-03-21 14:06:17 -07:00
|
|
|
StatusClass,
|
2025-03-13 11:02:15 -07:00
|
|
|
SubTypesClass,
|
2025-03-21 14:06:17 -07:00
|
|
|
TimeStampClass,
|
|
|
|
VersionPropertiesClass,
|
|
|
|
VersionTagClass,
|
2025-03-13 11:02:15 -07:00
|
|
|
)
|
2025-03-21 14:06:17 -07:00
|
|
|
from datahub.metadata.urns import DataPlatformUrn
|
|
|
|
from datahub.utilities.time import datetime_to_ts_millis
|
2025-03-13 11:02:15 -07:00
|
|
|
from tests.integration.vertexai.mock_vertexai import (
|
|
|
|
gen_mock_dataset,
|
|
|
|
gen_mock_endpoint,
|
2025-03-21 14:06:17 -07:00
|
|
|
gen_mock_experiment,
|
|
|
|
gen_mock_experiment_run,
|
2025-03-13 11:02:15 -07:00
|
|
|
gen_mock_model,
|
|
|
|
gen_mock_model_version,
|
|
|
|
gen_mock_training_automl_job,
|
|
|
|
gen_mock_training_custom_job,
|
2025-04-10 22:54:17 -07:00
|
|
|
get_mock_pipeline_job,
|
2025-03-13 11:02:15 -07:00
|
|
|
)
|
|
|
|
|
|
|
|
PROJECT_ID = "acryl-poc"
|
|
|
|
REGION = "us-west2"
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def source() -> VertexAISource:
|
|
|
|
return VertexAISource(
|
|
|
|
ctx=PipelineContext(run_id="vertexai-source-test"),
|
|
|
|
config=VertexAIConfig(project_id=PROJECT_ID, region=REGION),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_ml_model_mcps(source: VertexAISource) -> None:
|
2025-03-21 14:06:17 -07:00
|
|
|
mock_model = gen_mock_model()
|
2025-03-13 11:02:15 -07:00
|
|
|
with contextlib.ExitStack() as exit_stack:
|
|
|
|
mock = exit_stack.enter_context(patch("google.cloud.aiplatform.Model.list"))
|
2025-03-21 14:06:17 -07:00
|
|
|
mock.return_value = [mock_model]
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
# Running _get_ml_models_mcps
|
|
|
|
actual_mcps = [mcp for mcp in source._get_ml_models_mcps()]
|
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
expected_urn = builder.make_ml_model_group_urn(
|
|
|
|
platform=source.platform,
|
|
|
|
group_name=source._make_vertexai_model_group_name(mock_model.name),
|
|
|
|
env=source.config.env,
|
|
|
|
)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
mcp_mlgroup = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=MLModelGroupProperties(
|
|
|
|
name=mock_model.display_name,
|
|
|
|
description=mock_model.description,
|
|
|
|
created=TimeStampClass(
|
|
|
|
time=datetime_to_ts_millis(mock_model.create_time),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
lastModified=TimeStampClass(
|
|
|
|
time=datetime_to_ts_millis(mock_model.update_time),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
2025-03-26 12:32:46 -07:00
|
|
|
externalUrl=source._make_model_external_url(mock_model),
|
2025-03-21 14:06:17 -07:00
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_MODEL_GROUP]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 4
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_mlgroup == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_ml_model_properties_mcps(source: VertexAISource) -> None:
|
2025-03-13 11:02:15 -07:00
|
|
|
mock_model = gen_mock_model()
|
|
|
|
model_version = gen_mock_model_version(mock_model)
|
|
|
|
model_meta = ModelMetadata(mock_model, model_version)
|
|
|
|
|
|
|
|
# Run _gen_ml_model_mcps
|
2025-03-21 14:06:17 -07:00
|
|
|
actual_mcps = list(source._gen_ml_model_mcps(model_meta))
|
|
|
|
expected_urn = source._make_ml_model_urn(
|
2025-03-13 11:02:15 -07:00
|
|
|
model_version, source._make_vertexai_model_name(mock_model.name)
|
|
|
|
)
|
2025-03-21 14:06:17 -07:00
|
|
|
|
|
|
|
mcp_mlmodel = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=MLModelProperties(
|
|
|
|
name=f"{mock_model.display_name}_{model_version.version_id}",
|
|
|
|
description=model_version.version_description,
|
|
|
|
created=TimeStampClass(
|
|
|
|
time=datetime_to_ts_millis(mock_model.create_time),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
lastModified=TimeStampClass(
|
|
|
|
time=datetime_to_ts_millis(mock_model.update_time),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
customProperties={
|
|
|
|
"versionId": model_version.version_id,
|
|
|
|
"resourceName": mock_model.resource_name,
|
|
|
|
},
|
|
|
|
externalUrl=source._make_model_version_external_url(mock_model),
|
|
|
|
version=VersionTagClass(
|
|
|
|
versionTag=model_version.version_id, metadataAttribution=None
|
|
|
|
),
|
|
|
|
groups=[source._make_ml_model_group_urn(mock_model)],
|
|
|
|
type=MLAssetSubTypes.VERTEX_MODEL,
|
|
|
|
deployments=[],
|
|
|
|
),
|
2025-03-13 11:02:15 -07:00
|
|
|
)
|
2025-03-21 14:06:17 -07:00
|
|
|
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_MODEL]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 5
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_mlmodel == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
|
|
|
|
# Version Tag has GUID generated by the system, so we need to check it separately
|
|
|
|
version_tag_mcps = [
|
|
|
|
mcp for mcp in actual_mcps if isinstance(mcp.aspect, VersionPropertiesClass)
|
|
|
|
]
|
|
|
|
assert len(version_tag_mcps) == 1
|
|
|
|
version_tag_mcp = version_tag_mcps[0]
|
|
|
|
assert isinstance(version_tag_mcp.aspect, VersionPropertiesClass)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_get_endpoint_mcps(
|
|
|
|
source: VertexAISource,
|
|
|
|
) -> None:
|
|
|
|
mock_model = gen_mock_model()
|
|
|
|
model_version = gen_mock_model_version(mock_model)
|
|
|
|
mock_endpoint = gen_mock_endpoint()
|
|
|
|
model_meta = ModelMetadata(
|
|
|
|
model=mock_model, model_version=model_version, endpoints=[mock_endpoint]
|
|
|
|
)
|
|
|
|
|
|
|
|
# Run _gen_endpoint_mcps
|
|
|
|
actual_mcps = list(source._gen_endpoints_mcps(model_meta))
|
2025-03-21 14:06:17 -07:00
|
|
|
expected_urn = builder.make_ml_model_deployment_urn(
|
2025-03-13 11:02:15 -07:00
|
|
|
platform=source.platform,
|
|
|
|
deployment_name=source._make_vertexai_endpoint_name(
|
|
|
|
entity_id=mock_endpoint.name
|
|
|
|
),
|
|
|
|
env=source.config.env,
|
|
|
|
)
|
2025-03-21 14:06:17 -07:00
|
|
|
|
|
|
|
mcp_endpoint = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=MLModelDeploymentPropertiesClass(
|
|
|
|
customProperties={"displayName": mock_endpoint.display_name},
|
|
|
|
description=mock_model.description,
|
|
|
|
createdAt=int(datetime_to_ts_millis(mock_endpoint.create_time)),
|
|
|
|
version=VersionTagClass(
|
|
|
|
versionTag=model_version.version_id, metadataAttribution=None
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 2
|
|
|
|
assert any(mcp_endpoint == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_get_training_jobs_mcps(
|
|
|
|
source: VertexAISource,
|
|
|
|
) -> None:
|
|
|
|
mock_training_job = gen_mock_training_custom_job()
|
|
|
|
with contextlib.ExitStack() as exit_stack:
|
|
|
|
for func_to_mock in [
|
|
|
|
"google.cloud.aiplatform.init",
|
|
|
|
"google.cloud.aiplatform.CustomJob.list",
|
|
|
|
"google.cloud.aiplatform.CustomTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.CustomContainerTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.CustomPythonPackageTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.AutoMLTabularTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.AutoMLImageTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.AutoMLTextTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.AutoMLVideoTrainingJob.list",
|
|
|
|
"google.cloud.aiplatform.AutoMLForecastingTrainingJob.list",
|
|
|
|
]:
|
|
|
|
mock = exit_stack.enter_context(patch(func_to_mock))
|
|
|
|
if func_to_mock == "google.cloud.aiplatform.CustomJob.list":
|
|
|
|
mock.return_value = [mock_training_job]
|
|
|
|
else:
|
|
|
|
mock.return_value = []
|
|
|
|
|
|
|
|
"""
|
|
|
|
Test the retrieval of training jobs work units from Vertex AI.
|
|
|
|
This function mocks customJob and AutoMLTabularTrainingJob,
|
|
|
|
and verifies the properties of the work units
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Run _get_training_jobs_mcps
|
|
|
|
actual_mcps = [mcp for mcp in source._get_training_jobs_mcps()]
|
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
# Assert Entity Urns
|
|
|
|
expected_urn = builder.make_data_process_instance_urn(
|
|
|
|
source._make_vertexai_job_name(mock_training_job.name)
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dpi = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataProcessInstancePropertiesClass(
|
|
|
|
name=mock_training_job.display_name,
|
|
|
|
externalUrl=source._make_job_external_url(mock_training_job),
|
|
|
|
customProperties={"jobType": "CustomJob"},
|
|
|
|
created=AuditStamp(
|
|
|
|
time=datetime_to_ts_millis(mock_training_job.create_time),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_ml_props = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=MLTrainingRunProperties(
|
|
|
|
id=mock_training_job.name,
|
|
|
|
externalUrl=source._make_job_external_url(mock_training_job),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 5
|
|
|
|
assert any(mcp_dpi == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_gen_training_job_mcps(source: VertexAISource) -> None:
|
|
|
|
mock_training_job = gen_mock_training_custom_job()
|
|
|
|
mock_dataset = gen_mock_dataset()
|
|
|
|
job_meta = TrainingJobMetadata(mock_training_job, input_dataset=mock_dataset)
|
|
|
|
|
|
|
|
actual_mcps = [mcp for mcp in source._gen_training_job_mcps(job_meta)]
|
|
|
|
|
|
|
|
dataset_name = source._make_vertexai_dataset_name(entity_id=mock_dataset.name)
|
|
|
|
dataset_urn = builder.make_dataset_urn(
|
|
|
|
platform=source.platform,
|
|
|
|
name=dataset_name,
|
|
|
|
env=source.config.env,
|
|
|
|
)
|
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
# Assert Entity Urns
|
|
|
|
expected_urn = builder.make_data_process_instance_urn(
|
|
|
|
source._make_vertexai_job_name(mock_training_job.name)
|
|
|
|
)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
mcp_dpi = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataProcessInstancePropertiesClass(
|
|
|
|
name=mock_training_job.display_name,
|
|
|
|
externalUrl=source._make_job_external_url(mock_training_job),
|
|
|
|
customProperties={"jobType": "CustomJob"},
|
|
|
|
created=AuditStamp(
|
|
|
|
time=datetime_to_ts_millis(mock_training_job.create_time),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
mcp_ml_props = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=MLTrainingRunProperties(
|
|
|
|
id=mock_training_job.name,
|
|
|
|
externalUrl=source._make_job_external_url(mock_training_job),
|
|
|
|
),
|
|
|
|
)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dpi_input = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataProcessInstanceInputClass(
|
|
|
|
inputs=[], inputEdges=[EdgeClass(destinationUrn=dataset_urn)]
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 6
|
|
|
|
assert any(mcp_dpi == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dpi_input == mcp for mcp in actual_mcps)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_vertexai_config_init():
|
|
|
|
config_data = {
|
|
|
|
"project_id": "test-project",
|
|
|
|
"region": "us-central1",
|
|
|
|
"bucket_uri": "gs://test-bucket",
|
|
|
|
"vertexai_url": "https://console.cloud.google.com/vertex-ai",
|
|
|
|
"credential": {
|
|
|
|
"private_key_id": "test-key-id",
|
|
|
|
"private_key": "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n",
|
|
|
|
"client_email": "test-email@test-project.iam.gserviceaccount.com",
|
|
|
|
"client_id": "test-client-id",
|
|
|
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
|
|
|
"token_uri": "https://oauth2.googleapis.com/token",
|
|
|
|
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
|
|
|
|
"type": "service_account",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
config = VertexAIConfig(**config_data)
|
|
|
|
|
|
|
|
assert config.project_id == "test-project"
|
|
|
|
assert config.region == "us-central1"
|
|
|
|
assert config.bucket_uri == "gs://test-bucket"
|
|
|
|
assert config.vertexai_url == "https://console.cloud.google.com/vertex-ai"
|
|
|
|
assert config.credential is not None
|
|
|
|
assert config.credential.private_key_id == "test-key-id"
|
|
|
|
assert (
|
|
|
|
config.credential.private_key
|
|
|
|
== "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
config.credential.client_email
|
|
|
|
== "test-email@test-project.iam.gserviceaccount.com"
|
|
|
|
)
|
|
|
|
assert config.credential.client_id == "test-client-id"
|
|
|
|
assert config.credential.auth_uri == "https://accounts.google.com/o/oauth2/auth"
|
|
|
|
assert config.credential.token_uri == "https://oauth2.googleapis.com/token"
|
|
|
|
assert (
|
|
|
|
config.credential.auth_provider_x509_cert_url
|
|
|
|
== "https://www.googleapis.com/oauth2/v1/certs"
|
|
|
|
)
|
|
|
|
|
2025-03-21 14:06:17 -07:00
|
|
|
parsed_conf = config.get_credentials()
|
|
|
|
assert parsed_conf is not None
|
|
|
|
assert parsed_conf.get("project_id") == config_data["project_id"]
|
|
|
|
assert "credential" in config_data
|
|
|
|
assert parsed_conf.get("private_key_id", "") == "test-key-id"
|
|
|
|
assert (
|
|
|
|
parsed_conf.get("private_key", "")
|
|
|
|
== "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
parsed_conf.get("client_email")
|
|
|
|
== "test-email@test-project.iam.gserviceaccount.com"
|
|
|
|
)
|
|
|
|
assert parsed_conf.get("client_id") == "test-client-id"
|
|
|
|
assert parsed_conf.get("auth_uri") == "https://accounts.google.com/o/oauth2/auth"
|
|
|
|
assert parsed_conf.get("token_uri") == "https://oauth2.googleapis.com/token"
|
|
|
|
assert (
|
|
|
|
parsed_conf.get("auth_provider_x509_cert_url")
|
|
|
|
== "https://www.googleapis.com/oauth2/v1/certs"
|
|
|
|
)
|
|
|
|
assert parsed_conf.get("type") == "service_account"
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_get_input_dataset_mcps(source: VertexAISource) -> None:
|
|
|
|
mock_dataset = gen_mock_dataset()
|
|
|
|
mock_job = gen_mock_training_custom_job()
|
|
|
|
job_meta = TrainingJobMetadata(mock_job, input_dataset=mock_dataset)
|
|
|
|
|
|
|
|
actual_mcps: List[MetadataChangeProposalWrapper] = list(
|
|
|
|
source._get_input_dataset_mcps(job_meta)
|
|
|
|
)
|
|
|
|
|
|
|
|
assert job_meta.input_dataset is not None
|
2025-03-21 14:06:17 -07:00
|
|
|
expected_urn = builder.make_dataset_urn(
|
|
|
|
platform=source.platform,
|
|
|
|
name=source._make_vertexai_dataset_name(entity_id=job_meta.input_dataset.name),
|
|
|
|
env=source.config.env,
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_ds = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DatasetPropertiesClass(
|
|
|
|
name=mock_dataset.display_name,
|
|
|
|
description=mock_dataset.display_name,
|
|
|
|
qualifiedName=mock_dataset.resource_name,
|
|
|
|
customProperties={"resourceName": mock_dataset.resource_name},
|
|
|
|
created=TimeStampClass(
|
|
|
|
time=datetime_to_ts_millis(mock_dataset.create_time)
|
2025-03-13 11:02:15 -07:00
|
|
|
),
|
2025-03-21 14:06:17 -07:00
|
|
|
),
|
|
|
|
)
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_DATASET]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 4
|
|
|
|
assert any(mcp_ds == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
|
|
|
|
|
|
|
|
@patch("google.cloud.aiplatform.Experiment.list")
|
|
|
|
def test_get_experiment_mcps(
|
|
|
|
mock_list: List[Experiment], source: VertexAISource
|
|
|
|
) -> None:
|
|
|
|
mock_experiment = gen_mock_experiment()
|
|
|
|
assert hasattr(mock_list, "return_value") # this check needed to go ground lint
|
|
|
|
mock_list.return_value = [mock_experiment]
|
|
|
|
actual_wus: List[MetadataWorkUnit] = list(source._get_experiments_workunits())
|
|
|
|
actual_mcps = [
|
|
|
|
mcp.metadata
|
|
|
|
for mcp in actual_wus
|
|
|
|
if isinstance(mcp.metadata, MetadataChangeProposalWrapper)
|
|
|
|
]
|
|
|
|
|
2025-04-01 00:00:09 -07:00
|
|
|
expected_urn = ExperimentKey(
|
2025-03-21 14:06:17 -07:00
|
|
|
platform=source.platform,
|
2025-04-10 22:54:17 -07:00
|
|
|
id=source._make_vertexai_experiment_id(mock_experiment.name),
|
2025-03-21 14:06:17 -07:00
|
|
|
).as_urn()
|
|
|
|
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_EXPERIMENT]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_container_props = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerPropertiesClass(
|
|
|
|
name=mock_experiment.name,
|
|
|
|
customProperties={
|
|
|
|
"platform": source.platform,
|
2025-04-10 22:54:17 -07:00
|
|
|
"id": source._make_vertexai_experiment_id(mock_experiment.name),
|
2025-03-21 14:06:17 -07:00
|
|
|
"name": mock_experiment.name,
|
|
|
|
"resourceName": mock_experiment.resource_name,
|
|
|
|
"dashboardURL": mock_experiment.dashboard_url
|
|
|
|
if mock_experiment.dashboard_url
|
|
|
|
else "",
|
|
|
|
},
|
|
|
|
externalUrl=source._make_experiment_external_url(mock_experiment),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_status = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn, aspect=StatusClass(removed=False)
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_wus) == 5
|
|
|
|
assert len(actual_mcps) == 5
|
|
|
|
|
2025-04-01 00:00:09 -07:00
|
|
|
assert any(mcp_container_props == mcp for mcp in actual_mcps)
|
2025-03-21 14:06:17 -07:00
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_status == mcp for mcp in actual_mcps)
|
|
|
|
|
|
|
|
|
|
|
|
@patch("google.cloud.aiplatform.ExperimentRun.list")
|
|
|
|
def test_gen_experiment_run_mcps(
|
|
|
|
mock_list: List[ExperimentRun], source: VertexAISource
|
|
|
|
) -> None:
|
|
|
|
mock_exp = gen_mock_experiment()
|
|
|
|
source.experiments = [mock_exp]
|
|
|
|
mock_exp_run = gen_mock_experiment_run()
|
|
|
|
assert hasattr(mock_list, "return_value") # this check needed to go ground lint
|
|
|
|
mock_list.return_value = [mock_exp_run]
|
|
|
|
|
2025-04-01 00:00:09 -07:00
|
|
|
expected_exp_urn = ExperimentKey(
|
2025-03-21 14:06:17 -07:00
|
|
|
platform=source.platform,
|
2025-04-10 22:54:17 -07:00
|
|
|
id=source._make_vertexai_experiment_id(mock_exp.name),
|
2025-03-21 14:06:17 -07:00
|
|
|
).as_urn()
|
|
|
|
|
|
|
|
expected_urn = source._make_experiment_run_urn(mock_exp, mock_exp_run)
|
|
|
|
actual_mcps: List[MetadataChangeProposalWrapper] = list(
|
|
|
|
source._get_experiment_runs_mcps()
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dpi = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataProcessInstancePropertiesClass(
|
|
|
|
name=mock_exp_run.name,
|
|
|
|
externalUrl=source._make_experiment_run_external_url(
|
|
|
|
mock_exp, mock_exp_run
|
|
|
|
),
|
|
|
|
customProperties={
|
|
|
|
"externalUrl": source._make_experiment_run_external_url(
|
|
|
|
mock_exp, mock_exp_run
|
|
|
|
)
|
|
|
|
},
|
|
|
|
created=AuditStamp(time=0, actor="urn:li:corpuser:datahub"),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_ml_props = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=MLTrainingRunProperties(
|
|
|
|
id=f"{mock_exp.name}-{mock_exp_run.name}",
|
|
|
|
externalUrl=source._make_experiment_run_external_url(
|
|
|
|
mock_exp, mock_exp_run
|
|
|
|
),
|
|
|
|
hyperParams=[],
|
|
|
|
trainingMetrics=[],
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=ContainerClass(container=expected_exp_urn),
|
|
|
|
)
|
|
|
|
mcp_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_EXPERIMENT_RUN]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 5
|
|
|
|
assert any(mcp_dpi == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
|
2025-03-13 11:02:15 -07:00
|
|
|
|
|
|
|
|
2025-04-10 22:54:17 -07:00
|
|
|
def test_get_pipeline_mcps(
|
|
|
|
source: VertexAISource,
|
|
|
|
) -> None:
|
|
|
|
mock_pipeline = get_mock_pipeline_job()
|
|
|
|
with contextlib.ExitStack() as exit_stack:
|
|
|
|
mock = exit_stack.enter_context(
|
|
|
|
patch("google.cloud.aiplatform.PipelineJob.list")
|
|
|
|
)
|
|
|
|
mock.return_value = [mock_pipeline]
|
|
|
|
|
|
|
|
actual_mcps = [mcp for mcp in source._get_pipelines_mcps()]
|
|
|
|
|
|
|
|
# Assert Entity Urns
|
|
|
|
expected_pipeline_urn = "urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD)"
|
|
|
|
|
|
|
|
expected_task_urn = "urn:li:dataJob:(urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD),acryl-poc.pipeline_task.reverse)"
|
|
|
|
|
|
|
|
duration = timedelta(
|
|
|
|
milliseconds=datetime_to_ts_millis(mock_pipeline.update_time)
|
|
|
|
- datetime_to_ts_millis(mock_pipeline.create_time)
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_pipe_df_info = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_pipeline_urn,
|
|
|
|
aspect=DataFlowInfoClass(
|
|
|
|
env=source.config.env,
|
|
|
|
name=mock_pipeline.name,
|
|
|
|
customProperties={
|
|
|
|
"resource_name": mock_pipeline.resource_name,
|
|
|
|
"create_time": mock_pipeline.create_time.isoformat(),
|
|
|
|
"update_time": mock_pipeline.update_time.isoformat(),
|
|
|
|
"duration": source._format_pipeline_duration(duration),
|
|
|
|
"location": mock_pipeline.location,
|
|
|
|
"labels": ",".join(
|
|
|
|
[f"{k}:{v}" for k, v in mock_pipeline.labels.items()]
|
|
|
|
),
|
|
|
|
},
|
|
|
|
externalUrl=source._make_pipeline_external_url(mock_pipeline.name),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
mcp_pipe_df_status = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_pipeline_urn,
|
|
|
|
aspect=StatusClass(removed=False),
|
|
|
|
)
|
|
|
|
mcp_pipe_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_pipeline_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_pipe_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_pipeline_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_pipeline_tag = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_pipeline_urn,
|
|
|
|
aspect=GlobalTagsClass(tags=[]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_input = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_task_urn,
|
|
|
|
aspect=DataJobInputOutputClass(
|
|
|
|
inputDatasets=[],
|
|
|
|
outputDatasets=[],
|
|
|
|
inputDatajobs=[
|
|
|
|
"urn:li:dataJob:(urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD),acryl-poc.pipeline_task.concat)"
|
|
|
|
],
|
|
|
|
fineGrainedLineages=[],
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_info = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_task_urn,
|
|
|
|
aspect=DataJobInfoClass(
|
|
|
|
name="reverse",
|
|
|
|
customProperties={},
|
|
|
|
type="COMMAND",
|
|
|
|
externalUrl="https://console.cloud.google.com/vertex-ai/pipelines/locations/us-west2/runs/mock_pipeline_job?project=acryl-poc",
|
|
|
|
env=source.config.env,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_task_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_task_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_task_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE_TASK]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_status = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_pipeline_urn,
|
|
|
|
aspect=StatusClass(removed=False),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_tag = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=expected_task_urn,
|
|
|
|
aspect=GlobalTagsClass(tags=[]),
|
|
|
|
)
|
|
|
|
|
|
|
|
dpi_urn = "urn:li:dataProcessInstance:acryl-poc.pipeline_task_run.reverse"
|
|
|
|
|
|
|
|
mcp_task_run_dpi = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=dpi_urn,
|
|
|
|
aspect=DataProcessInstancePropertiesClass(
|
|
|
|
name="reverse",
|
|
|
|
externalUrl="https://console.cloud.google.com/vertex-ai/pipelines/locations/us-west2/runs/mock_pipeline_job?project=acryl-poc",
|
|
|
|
customProperties={},
|
|
|
|
created=AuditStamp(
|
|
|
|
time=0,
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_run_container = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=dpi_urn,
|
|
|
|
aspect=ContainerClass(container=source._get_project_container().as_urn()),
|
|
|
|
)
|
|
|
|
mcp_task_run_subtype = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=dpi_urn,
|
|
|
|
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE_TASK_RUN]),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_run_dataplatform = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=dpi_urn,
|
|
|
|
aspect=DataPlatformInstanceClass(
|
|
|
|
platform=str(DataPlatformUrn(source.platform))
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp_task_run_relationship = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=dpi_urn,
|
|
|
|
aspect=DataProcessInstanceRelationships(
|
|
|
|
upstreamInstances=[], parentTemplate=expected_task_urn
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert len(actual_mcps) == 19
|
|
|
|
assert any(mcp_pipe_df_info == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_pipe_df_status == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_pipe_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_pipe_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_pipeline_tag == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_input == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_info == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_status == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_tag == mcp for mcp in actual_mcps)
|
|
|
|
|
|
|
|
assert any(mcp_task_run_dpi == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_run_container == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_run_subtype == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_run_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_run_dataplatform == mcp for mcp in actual_mcps)
|
|
|
|
assert any(mcp_task_run_relationship == mcp for mcp in actual_mcps)
|
|
|
|
|
|
|
|
|
2025-03-13 11:02:15 -07:00
|
|
|
def test_make_model_external_url(source: VertexAISource) -> None:
|
|
|
|
mock_model = gen_mock_model()
|
|
|
|
assert (
|
|
|
|
source._make_model_external_url(mock_model)
|
|
|
|
== f"{source.config.vertexai_url}/models/locations/{source.config.region}/models/{mock_model.name}"
|
|
|
|
f"?project={source.config.project_id}"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_make_job_urn(source: VertexAISource) -> None:
|
|
|
|
mock_training_job = gen_mock_training_automl_job()
|
|
|
|
assert (
|
|
|
|
source._make_training_job_urn(mock_training_job)
|
|
|
|
== f"{builder.make_data_process_instance_urn(source._make_vertexai_job_name(mock_training_job.name))}"
|
|
|
|
)
|