datahub/metadata-ingestion/tests/unit/test_vertexai_source.py

823 lines
31 KiB
Python

import contextlib
from datetime import timedelta
from typing import List
from unittest.mock import patch
import pytest
from google.cloud.aiplatform import Experiment, ExperimentRun
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ExperimentKey
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import MLAssetSubTypes
from datahub.ingestion.source.vertexai.vertexai import (
ModelMetadata,
TrainingJobMetadata,
VertexAIConfig,
VertexAISource,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
DataProcessInstanceRelationships,
)
from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import (
MLModelGroupProperties,
MLModelProperties,
MLTrainingRunProperties,
)
from datahub.metadata.schema_classes import (
ContainerClass,
ContainerPropertiesClass,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
DataProcessInstanceInputClass,
DataProcessInstancePropertiesClass,
DatasetPropertiesClass,
EdgeClass,
GlobalTagsClass,
MLModelDeploymentPropertiesClass,
StatusClass,
SubTypesClass,
TimeStampClass,
VersionPropertiesClass,
VersionTagClass,
)
from datahub.metadata.urns import DataPlatformUrn
from datahub.utilities.time import datetime_to_ts_millis
from tests.integration.vertexai.mock_vertexai import (
gen_mock_dataset,
gen_mock_endpoint,
gen_mock_experiment,
gen_mock_experiment_run,
gen_mock_model,
gen_mock_model_version,
gen_mock_training_automl_job,
gen_mock_training_custom_job,
get_mock_pipeline_job,
)
PROJECT_ID = "acryl-poc"
REGION = "us-west2"
@pytest.fixture
def source() -> VertexAISource:
return VertexAISource(
ctx=PipelineContext(run_id="vertexai-source-test"),
config=VertexAIConfig(project_id=PROJECT_ID, region=REGION),
)
def test_get_ml_model_mcps(source: VertexAISource) -> None:
mock_model = gen_mock_model()
with contextlib.ExitStack() as exit_stack:
mock = exit_stack.enter_context(patch("google.cloud.aiplatform.Model.list"))
mock.return_value = [mock_model]
# Running _get_ml_models_mcps
actual_mcps = [mcp for mcp in source._get_ml_models_mcps()]
expected_urn = builder.make_ml_model_group_urn(
platform=source.platform,
group_name=source._make_vertexai_model_group_name(mock_model.name),
env=source.config.env,
)
mcp_mlgroup = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLModelGroupProperties(
name=mock_model.display_name,
description=mock_model.description,
created=TimeStampClass(
time=datetime_to_ts_millis(mock_model.create_time),
actor="urn:li:corpuser:datahub",
),
lastModified=TimeStampClass(
time=datetime_to_ts_millis(mock_model.update_time),
actor="urn:li:corpuser:datahub",
),
externalUrl=source._make_model_external_url(mock_model),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_MODEL_GROUP]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 4
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_mlgroup == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
def test_get_ml_model_properties_mcps(source: VertexAISource) -> None:
mock_model = gen_mock_model()
model_version = gen_mock_model_version(mock_model)
model_meta = ModelMetadata(mock_model, model_version)
# Run _gen_ml_model_mcps
actual_mcps = list(source._gen_ml_model_mcps(model_meta))
expected_urn = source._make_ml_model_urn(
model_version, source._make_vertexai_model_name(mock_model.name)
)
mcp_mlmodel = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLModelProperties(
name=f"{mock_model.display_name}_{model_version.version_id}",
description=model_version.version_description,
created=TimeStampClass(
time=datetime_to_ts_millis(mock_model.create_time),
actor="urn:li:corpuser:datahub",
),
lastModified=TimeStampClass(
time=datetime_to_ts_millis(mock_model.update_time),
actor="urn:li:corpuser:datahub",
),
customProperties={
"versionId": model_version.version_id,
"resourceName": mock_model.resource_name,
},
externalUrl=source._make_model_version_external_url(mock_model),
version=VersionTagClass(
versionTag=model_version.version_id, metadataAttribution=None
),
groups=[source._make_ml_model_group_urn(mock_model)],
type=MLAssetSubTypes.VERTEX_MODEL,
deployments=[],
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_MODEL]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 5
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_mlmodel == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
# Version Tag has GUID generated by the system, so we need to check it separately
version_tag_mcps = [
mcp for mcp in actual_mcps if isinstance(mcp.aspect, VersionPropertiesClass)
]
assert len(version_tag_mcps) == 1
version_tag_mcp = version_tag_mcps[0]
assert isinstance(version_tag_mcp.aspect, VersionPropertiesClass)
def test_get_endpoint_mcps(
source: VertexAISource,
) -> None:
mock_model = gen_mock_model()
model_version = gen_mock_model_version(mock_model)
mock_endpoint = gen_mock_endpoint()
model_meta = ModelMetadata(
model=mock_model, model_version=model_version, endpoints=[mock_endpoint]
)
# Run _gen_endpoint_mcps
actual_mcps = list(source._gen_endpoints_mcps(model_meta))
expected_urn = builder.make_ml_model_deployment_urn(
platform=source.platform,
deployment_name=source._make_vertexai_endpoint_name(
entity_id=mock_endpoint.name
),
env=source.config.env,
)
mcp_endpoint = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLModelDeploymentPropertiesClass(
customProperties={"displayName": mock_endpoint.display_name},
description=mock_model.description,
createdAt=int(datetime_to_ts_millis(mock_endpoint.create_time)),
version=VersionTagClass(
versionTag=model_version.version_id, metadataAttribution=None
),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
assert len(actual_mcps) == 2
assert any(mcp_endpoint == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
def test_get_training_jobs_mcps(
source: VertexAISource,
) -> None:
mock_training_job = gen_mock_training_custom_job()
with contextlib.ExitStack() as exit_stack:
for func_to_mock in [
"google.cloud.aiplatform.init",
"google.cloud.aiplatform.CustomJob.list",
"google.cloud.aiplatform.CustomTrainingJob.list",
"google.cloud.aiplatform.CustomContainerTrainingJob.list",
"google.cloud.aiplatform.CustomPythonPackageTrainingJob.list",
"google.cloud.aiplatform.AutoMLTabularTrainingJob.list",
"google.cloud.aiplatform.AutoMLImageTrainingJob.list",
"google.cloud.aiplatform.AutoMLTextTrainingJob.list",
"google.cloud.aiplatform.AutoMLVideoTrainingJob.list",
"google.cloud.aiplatform.AutoMLForecastingTrainingJob.list",
]:
mock = exit_stack.enter_context(patch(func_to_mock))
if func_to_mock == "google.cloud.aiplatform.CustomJob.list":
mock.return_value = [mock_training_job]
else:
mock.return_value = []
"""
Test the retrieval of training jobs work units from Vertex AI.
This function mocks customJob and AutoMLTabularTrainingJob,
and verifies the properties of the work units
"""
# Run _get_training_jobs_mcps
actual_mcps = [mcp for mcp in source._get_training_jobs_mcps()]
# Assert Entity Urns
expected_urn = builder.make_data_process_instance_urn(
source._make_vertexai_job_name(mock_training_job.name)
)
mcp_dpi = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstancePropertiesClass(
name=mock_training_job.display_name,
externalUrl=source._make_job_external_url(mock_training_job),
customProperties={"jobType": "CustomJob"},
created=AuditStamp(
time=datetime_to_ts_millis(mock_training_job.create_time),
actor="urn:li:corpuser:datahub",
),
),
)
mcp_ml_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLTrainingRunProperties(
id=mock_training_job.name,
externalUrl=source._make_job_external_url(mock_training_job),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 5
assert any(mcp_dpi == mcp for mcp in actual_mcps)
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
def test_gen_training_job_mcps(source: VertexAISource) -> None:
mock_training_job = gen_mock_training_custom_job()
mock_dataset = gen_mock_dataset()
job_meta = TrainingJobMetadata(mock_training_job, input_dataset=mock_dataset)
actual_mcps = [mcp for mcp in source._gen_training_job_mcps(job_meta)]
dataset_name = source._make_vertexai_dataset_name(entity_id=mock_dataset.name)
dataset_urn = builder.make_dataset_urn(
platform=source.platform,
name=dataset_name,
env=source.config.env,
)
# Assert Entity Urns
expected_urn = builder.make_data_process_instance_urn(
source._make_vertexai_job_name(mock_training_job.name)
)
mcp_dpi = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstancePropertiesClass(
name=mock_training_job.display_name,
externalUrl=source._make_job_external_url(mock_training_job),
customProperties={"jobType": "CustomJob"},
created=AuditStamp(
time=datetime_to_ts_millis(mock_training_job.create_time),
actor="urn:li:corpuser:datahub",
),
),
)
mcp_ml_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLTrainingRunProperties(
id=mock_training_job.name,
externalUrl=source._make_job_external_url(mock_training_job),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
mcp_dpi_input = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstanceInputClass(
inputs=[], inputEdges=[EdgeClass(destinationUrn=dataset_urn)]
),
)
assert len(actual_mcps) == 6
assert any(mcp_dpi == mcp for mcp in actual_mcps)
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_dpi_input == mcp for mcp in actual_mcps)
def test_vertexai_config_init():
config_data = {
"project_id": "test-project",
"region": "us-central1",
"bucket_uri": "gs://test-bucket",
"vertexai_url": "https://console.cloud.google.com/vertex-ai",
"credential": {
"private_key_id": "test-key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n",
"client_email": "test-email@test-project.iam.gserviceaccount.com",
"client_id": "test-client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"type": "service_account",
},
}
config = VertexAIConfig(**config_data)
assert config.project_id == "test-project"
assert config.region == "us-central1"
assert config.bucket_uri == "gs://test-bucket"
assert config.vertexai_url == "https://console.cloud.google.com/vertex-ai"
assert config.credential is not None
assert config.credential.private_key_id == "test-key-id"
assert (
config.credential.private_key
== "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n"
)
assert (
config.credential.client_email
== "test-email@test-project.iam.gserviceaccount.com"
)
assert config.credential.client_id == "test-client-id"
assert config.credential.auth_uri == "https://accounts.google.com/o/oauth2/auth"
assert config.credential.token_uri == "https://oauth2.googleapis.com/token"
assert (
config.credential.auth_provider_x509_cert_url
== "https://www.googleapis.com/oauth2/v1/certs"
)
parsed_conf = config.get_credentials()
assert parsed_conf is not None
assert parsed_conf.get("project_id") == config_data["project_id"]
assert "credential" in config_data
assert parsed_conf.get("private_key_id", "") == "test-key-id"
assert (
parsed_conf.get("private_key", "")
== "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n"
)
assert (
parsed_conf.get("client_email")
== "test-email@test-project.iam.gserviceaccount.com"
)
assert parsed_conf.get("client_id") == "test-client-id"
assert parsed_conf.get("auth_uri") == "https://accounts.google.com/o/oauth2/auth"
assert parsed_conf.get("token_uri") == "https://oauth2.googleapis.com/token"
assert (
parsed_conf.get("auth_provider_x509_cert_url")
== "https://www.googleapis.com/oauth2/v1/certs"
)
assert parsed_conf.get("type") == "service_account"
def test_get_input_dataset_mcps(source: VertexAISource) -> None:
mock_dataset = gen_mock_dataset()
mock_job = gen_mock_training_custom_job()
job_meta = TrainingJobMetadata(mock_job, input_dataset=mock_dataset)
actual_mcps: List[MetadataChangeProposalWrapper] = list(
source._get_input_dataset_mcps(job_meta)
)
assert job_meta.input_dataset is not None
expected_urn = builder.make_dataset_urn(
platform=source.platform,
name=source._make_vertexai_dataset_name(entity_id=job_meta.input_dataset.name),
env=source.config.env,
)
mcp_ds = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DatasetPropertiesClass(
name=mock_dataset.display_name,
description=mock_dataset.display_name,
qualifiedName=mock_dataset.resource_name,
customProperties={"resourceName": mock_dataset.resource_name},
created=TimeStampClass(
time=datetime_to_ts_millis(mock_dataset.create_time)
),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_DATASET]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 4
assert any(mcp_ds == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
@patch("google.cloud.aiplatform.Experiment.list")
def test_get_experiment_mcps(
mock_list: List[Experiment], source: VertexAISource
) -> None:
mock_experiment = gen_mock_experiment()
assert hasattr(mock_list, "return_value") # this check needed to go ground lint
mock_list.return_value = [mock_experiment]
actual_wus: List[MetadataWorkUnit] = list(source._get_experiments_workunits())
actual_mcps = [
mcp.metadata
for mcp in actual_wus
if isinstance(mcp.metadata, MetadataChangeProposalWrapper)
]
expected_urn = ExperimentKey(
platform=source.platform,
id=source._make_vertexai_experiment_id(mock_experiment.name),
).as_urn()
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_EXPERIMENT]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
mcp_container_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerPropertiesClass(
name=mock_experiment.name,
customProperties={
"platform": source.platform,
"id": source._make_vertexai_experiment_id(mock_experiment.name),
"name": mock_experiment.name,
"resourceName": mock_experiment.resource_name,
"dashboardURL": mock_experiment.dashboard_url
if mock_experiment.dashboard_url
else "",
},
externalUrl=source._make_experiment_external_url(mock_experiment),
),
)
mcp_status = MetadataChangeProposalWrapper(
entityUrn=expected_urn, aspect=StatusClass(removed=False)
)
assert len(actual_wus) == 5
assert len(actual_mcps) == 5
assert any(mcp_container_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_status == mcp for mcp in actual_mcps)
@patch("google.cloud.aiplatform.ExperimentRun.list")
def test_gen_experiment_run_mcps(
mock_list: List[ExperimentRun], source: VertexAISource
) -> None:
mock_exp = gen_mock_experiment()
source.experiments = [mock_exp]
mock_exp_run = gen_mock_experiment_run()
assert hasattr(mock_list, "return_value") # this check needed to go ground lint
mock_list.return_value = [mock_exp_run]
expected_exp_urn = ExperimentKey(
platform=source.platform,
id=source._make_vertexai_experiment_id(mock_exp.name),
).as_urn()
expected_urn = source._make_experiment_run_urn(mock_exp, mock_exp_run)
actual_mcps: List[MetadataChangeProposalWrapper] = list(
source._get_experiment_runs_mcps()
)
mcp_dpi = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstancePropertiesClass(
name=mock_exp_run.name,
externalUrl=source._make_experiment_run_external_url(
mock_exp, mock_exp_run
),
customProperties={
"externalUrl": source._make_experiment_run_external_url(
mock_exp, mock_exp_run
)
},
created=AuditStamp(time=0, actor="urn:li:corpuser:datahub"),
),
)
mcp_ml_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLTrainingRunProperties(
id=f"{mock_exp.name}-{mock_exp_run.name}",
externalUrl=source._make_experiment_run_external_url(
mock_exp, mock_exp_run
),
hyperParams=[],
trainingMetrics=[],
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=expected_exp_urn),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_EXPERIMENT_RUN]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 5
assert any(mcp_dpi == mcp for mcp in actual_mcps)
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
def test_get_pipeline_mcps(
source: VertexAISource,
) -> None:
mock_pipeline = get_mock_pipeline_job()
with contextlib.ExitStack() as exit_stack:
mock = exit_stack.enter_context(
patch("google.cloud.aiplatform.PipelineJob.list")
)
mock.return_value = [mock_pipeline]
actual_mcps = [mcp for mcp in source._get_pipelines_mcps()]
# Assert Entity Urns
expected_pipeline_urn = "urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD)"
expected_task_urn = "urn:li:dataJob:(urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD),acryl-poc.pipeline_task.reverse)"
duration = timedelta(
milliseconds=datetime_to_ts_millis(mock_pipeline.update_time)
- datetime_to_ts_millis(mock_pipeline.create_time)
)
mcp_pipe_df_info = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=DataFlowInfoClass(
env=source.config.env,
name=mock_pipeline.name,
customProperties={
"resource_name": mock_pipeline.resource_name,
"create_time": mock_pipeline.create_time.isoformat(),
"update_time": mock_pipeline.update_time.isoformat(),
"duration": source._format_pipeline_duration(duration),
"location": mock_pipeline.location,
"labels": ",".join(
[f"{k}:{v}" for k, v in mock_pipeline.labels.items()]
),
},
externalUrl=source._make_pipeline_external_url(mock_pipeline.name),
),
)
mcp_pipe_df_status = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=StatusClass(removed=False),
)
mcp_pipe_container = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_pipe_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE]),
)
mcp_pipeline_tag = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=GlobalTagsClass(tags=[]),
)
mcp_task_input = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=DataJobInputOutputClass(
inputDatasets=[],
outputDatasets=[],
inputDatajobs=[
"urn:li:dataJob:(urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD),acryl-poc.pipeline_task.concat)"
],
fineGrainedLineages=[],
),
)
mcp_task_info = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=DataJobInfoClass(
name="reverse",
customProperties={},
type="COMMAND",
externalUrl="https://console.cloud.google.com/vertex-ai/pipelines/locations/us-west2/runs/mock_pipeline_job?project=acryl-poc",
env=source.config.env,
),
)
mcp_task_container = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_task_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE_TASK]),
)
mcp_task_status = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=StatusClass(removed=False),
)
mcp_task_tag = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=GlobalTagsClass(tags=[]),
)
dpi_urn = "urn:li:dataProcessInstance:acryl-poc.pipeline_task_run.reverse"
mcp_task_run_dpi = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=DataProcessInstancePropertiesClass(
name="reverse",
externalUrl="https://console.cloud.google.com/vertex-ai/pipelines/locations/us-west2/runs/mock_pipeline_job?project=acryl-poc",
customProperties={},
created=AuditStamp(
time=0,
actor="urn:li:corpuser:datahub",
),
),
)
mcp_task_run_container = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_task_run_subtype = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE_TASK_RUN]),
)
mcp_task_run_dataplatform = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
mcp_task_run_relationship = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=DataProcessInstanceRelationships(
upstreamInstances=[], parentTemplate=expected_task_urn
),
)
assert len(actual_mcps) == 19
assert any(mcp_pipe_df_info == mcp for mcp in actual_mcps)
assert any(mcp_pipe_df_status == mcp for mcp in actual_mcps)
assert any(mcp_pipe_subtype == mcp for mcp in actual_mcps)
assert any(mcp_pipe_container == mcp for mcp in actual_mcps)
assert any(mcp_pipeline_tag == mcp for mcp in actual_mcps)
assert any(mcp_task_input == mcp for mcp in actual_mcps)
assert any(mcp_task_info == mcp for mcp in actual_mcps)
assert any(mcp_task_container == mcp for mcp in actual_mcps)
assert any(mcp_task_subtype == mcp for mcp in actual_mcps)
assert any(mcp_task_status == mcp for mcp in actual_mcps)
assert any(mcp_task_tag == mcp for mcp in actual_mcps)
assert any(mcp_task_run_dpi == mcp for mcp in actual_mcps)
assert any(mcp_task_run_container == mcp for mcp in actual_mcps)
assert any(mcp_task_run_subtype == mcp for mcp in actual_mcps)
assert any(mcp_task_run_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_task_run_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_task_run_relationship == mcp for mcp in actual_mcps)
def test_make_model_external_url(source: VertexAISource) -> None:
mock_model = gen_mock_model()
assert (
source._make_model_external_url(mock_model)
== f"{source.config.vertexai_url}/models/locations/{source.config.region}/models/{mock_model.name}"
f"?project={source.config.project_id}"
)
def test_make_job_urn(source: VertexAISource) -> None:
mock_training_job = gen_mock_training_automl_job()
assert (
source._make_training_job_urn(mock_training_job)
== f"{builder.make_data_process_instance_urn(source._make_vertexai_job_name(mock_training_job.name))}"
)