datahub/metadata-ingestion/tests/unit/test_vertexai_source.py

823 lines
31 KiB
Python
Raw Permalink Normal View History

import contextlib
from datetime import timedelta
from typing import List
from unittest.mock import patch
import pytest
from google.cloud.aiplatform import Experiment, ExperimentRun
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ExperimentKey
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import MLAssetSubTypes
from datahub.ingestion.source.vertexai.vertexai import (
ModelMetadata,
TrainingJobMetadata,
VertexAIConfig,
VertexAISource,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
DataProcessInstanceRelationships,
)
from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import (
MLModelGroupProperties,
MLModelProperties,
MLTrainingRunProperties,
)
from datahub.metadata.schema_classes import (
ContainerClass,
ContainerPropertiesClass,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
DataProcessInstanceInputClass,
DataProcessInstancePropertiesClass,
DatasetPropertiesClass,
EdgeClass,
GlobalTagsClass,
MLModelDeploymentPropertiesClass,
StatusClass,
SubTypesClass,
TimeStampClass,
VersionPropertiesClass,
VersionTagClass,
)
from datahub.metadata.urns import DataPlatformUrn
from datahub.utilities.time import datetime_to_ts_millis
from tests.integration.vertexai.mock_vertexai import (
gen_mock_dataset,
gen_mock_endpoint,
gen_mock_experiment,
gen_mock_experiment_run,
gen_mock_model,
gen_mock_model_version,
gen_mock_training_automl_job,
gen_mock_training_custom_job,
get_mock_pipeline_job,
)
PROJECT_ID = "acryl-poc"
REGION = "us-west2"
@pytest.fixture
def source() -> VertexAISource:
return VertexAISource(
ctx=PipelineContext(run_id="vertexai-source-test"),
config=VertexAIConfig(project_id=PROJECT_ID, region=REGION),
)
def test_get_ml_model_mcps(source: VertexAISource) -> None:
mock_model = gen_mock_model()
with contextlib.ExitStack() as exit_stack:
mock = exit_stack.enter_context(patch("google.cloud.aiplatform.Model.list"))
mock.return_value = [mock_model]
# Running _get_ml_models_mcps
actual_mcps = [mcp for mcp in source._get_ml_models_mcps()]
expected_urn = builder.make_ml_model_group_urn(
platform=source.platform,
group_name=source._make_vertexai_model_group_name(mock_model.name),
env=source.config.env,
)
mcp_mlgroup = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLModelGroupProperties(
name=mock_model.display_name,
description=mock_model.description,
created=TimeStampClass(
time=datetime_to_ts_millis(mock_model.create_time),
actor="urn:li:corpuser:datahub",
),
lastModified=TimeStampClass(
time=datetime_to_ts_millis(mock_model.update_time),
actor="urn:li:corpuser:datahub",
),
externalUrl=source._make_model_external_url(mock_model),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_MODEL_GROUP]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 4
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_mlgroup == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
def test_get_ml_model_properties_mcps(source: VertexAISource) -> None:
mock_model = gen_mock_model()
model_version = gen_mock_model_version(mock_model)
model_meta = ModelMetadata(mock_model, model_version)
# Run _gen_ml_model_mcps
actual_mcps = list(source._gen_ml_model_mcps(model_meta))
expected_urn = source._make_ml_model_urn(
model_version, source._make_vertexai_model_name(mock_model.name)
)
mcp_mlmodel = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLModelProperties(
name=f"{mock_model.display_name}_{model_version.version_id}",
description=model_version.version_description,
created=TimeStampClass(
time=datetime_to_ts_millis(mock_model.create_time),
actor="urn:li:corpuser:datahub",
),
lastModified=TimeStampClass(
time=datetime_to_ts_millis(mock_model.update_time),
actor="urn:li:corpuser:datahub",
),
customProperties={
"versionId": model_version.version_id,
"resourceName": mock_model.resource_name,
},
externalUrl=source._make_model_version_external_url(mock_model),
version=VersionTagClass(
versionTag=model_version.version_id, metadataAttribution=None
),
groups=[source._make_ml_model_group_urn(mock_model)],
type=MLAssetSubTypes.VERTEX_MODEL,
deployments=[],
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_MODEL]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 5
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_mlmodel == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
# Version Tag has GUID generated by the system, so we need to check it separately
version_tag_mcps = [
mcp for mcp in actual_mcps if isinstance(mcp.aspect, VersionPropertiesClass)
]
assert len(version_tag_mcps) == 1
version_tag_mcp = version_tag_mcps[0]
assert isinstance(version_tag_mcp.aspect, VersionPropertiesClass)
def test_get_endpoint_mcps(
source: VertexAISource,
) -> None:
mock_model = gen_mock_model()
model_version = gen_mock_model_version(mock_model)
mock_endpoint = gen_mock_endpoint()
model_meta = ModelMetadata(
model=mock_model, model_version=model_version, endpoints=[mock_endpoint]
)
# Run _gen_endpoint_mcps
actual_mcps = list(source._gen_endpoints_mcps(model_meta))
expected_urn = builder.make_ml_model_deployment_urn(
platform=source.platform,
deployment_name=source._make_vertexai_endpoint_name(
entity_id=mock_endpoint.name
),
env=source.config.env,
)
mcp_endpoint = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLModelDeploymentPropertiesClass(
customProperties={"displayName": mock_endpoint.display_name},
description=mock_model.description,
createdAt=int(datetime_to_ts_millis(mock_endpoint.create_time)),
version=VersionTagClass(
versionTag=model_version.version_id, metadataAttribution=None
),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
assert len(actual_mcps) == 2
assert any(mcp_endpoint == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
def test_get_training_jobs_mcps(
source: VertexAISource,
) -> None:
mock_training_job = gen_mock_training_custom_job()
with contextlib.ExitStack() as exit_stack:
for func_to_mock in [
"google.cloud.aiplatform.init",
"google.cloud.aiplatform.CustomJob.list",
"google.cloud.aiplatform.CustomTrainingJob.list",
"google.cloud.aiplatform.CustomContainerTrainingJob.list",
"google.cloud.aiplatform.CustomPythonPackageTrainingJob.list",
"google.cloud.aiplatform.AutoMLTabularTrainingJob.list",
"google.cloud.aiplatform.AutoMLImageTrainingJob.list",
"google.cloud.aiplatform.AutoMLTextTrainingJob.list",
"google.cloud.aiplatform.AutoMLVideoTrainingJob.list",
"google.cloud.aiplatform.AutoMLForecastingTrainingJob.list",
]:
mock = exit_stack.enter_context(patch(func_to_mock))
if func_to_mock == "google.cloud.aiplatform.CustomJob.list":
mock.return_value = [mock_training_job]
else:
mock.return_value = []
"""
Test the retrieval of training jobs work units from Vertex AI.
This function mocks customJob and AutoMLTabularTrainingJob,
and verifies the properties of the work units
"""
# Run _get_training_jobs_mcps
actual_mcps = [mcp for mcp in source._get_training_jobs_mcps()]
# Assert Entity Urns
expected_urn = builder.make_data_process_instance_urn(
source._make_vertexai_job_name(mock_training_job.name)
)
mcp_dpi = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstancePropertiesClass(
name=mock_training_job.display_name,
externalUrl=source._make_job_external_url(mock_training_job),
customProperties={"jobType": "CustomJob"},
created=AuditStamp(
time=datetime_to_ts_millis(mock_training_job.create_time),
actor="urn:li:corpuser:datahub",
),
),
)
mcp_ml_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLTrainingRunProperties(
id=mock_training_job.name,
externalUrl=source._make_job_external_url(mock_training_job),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 5
assert any(mcp_dpi == mcp for mcp in actual_mcps)
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
def test_gen_training_job_mcps(source: VertexAISource) -> None:
mock_training_job = gen_mock_training_custom_job()
mock_dataset = gen_mock_dataset()
job_meta = TrainingJobMetadata(mock_training_job, input_dataset=mock_dataset)
actual_mcps = [mcp for mcp in source._gen_training_job_mcps(job_meta)]
dataset_name = source._make_vertexai_dataset_name(entity_id=mock_dataset.name)
dataset_urn = builder.make_dataset_urn(
platform=source.platform,
name=dataset_name,
env=source.config.env,
)
# Assert Entity Urns
expected_urn = builder.make_data_process_instance_urn(
source._make_vertexai_job_name(mock_training_job.name)
)
mcp_dpi = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstancePropertiesClass(
name=mock_training_job.display_name,
externalUrl=source._make_job_external_url(mock_training_job),
customProperties={"jobType": "CustomJob"},
created=AuditStamp(
time=datetime_to_ts_millis(mock_training_job.create_time),
actor="urn:li:corpuser:datahub",
),
),
)
mcp_ml_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLTrainingRunProperties(
id=mock_training_job.name,
externalUrl=source._make_job_external_url(mock_training_job),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
mcp_dpi_input = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstanceInputClass(
inputs=[], inputEdges=[EdgeClass(destinationUrn=dataset_urn)]
),
)
assert len(actual_mcps) == 6
assert any(mcp_dpi == mcp for mcp in actual_mcps)
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_dpi_input == mcp for mcp in actual_mcps)
def test_vertexai_config_init():
config_data = {
"project_id": "test-project",
"region": "us-central1",
"bucket_uri": "gs://test-bucket",
"vertexai_url": "https://console.cloud.google.com/vertex-ai",
"credential": {
"private_key_id": "test-key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n",
"client_email": "test-email@test-project.iam.gserviceaccount.com",
"client_id": "test-client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"type": "service_account",
},
}
config = VertexAIConfig(**config_data)
assert config.project_id == "test-project"
assert config.region == "us-central1"
assert config.bucket_uri == "gs://test-bucket"
assert config.vertexai_url == "https://console.cloud.google.com/vertex-ai"
assert config.credential is not None
assert config.credential.private_key_id == "test-key-id"
assert (
config.credential.private_key
== "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n"
)
assert (
config.credential.client_email
== "test-email@test-project.iam.gserviceaccount.com"
)
assert config.credential.client_id == "test-client-id"
assert config.credential.auth_uri == "https://accounts.google.com/o/oauth2/auth"
assert config.credential.token_uri == "https://oauth2.googleapis.com/token"
assert (
config.credential.auth_provider_x509_cert_url
== "https://www.googleapis.com/oauth2/v1/certs"
)
parsed_conf = config.get_credentials()
assert parsed_conf is not None
assert parsed_conf.get("project_id") == config_data["project_id"]
assert "credential" in config_data
assert parsed_conf.get("private_key_id", "") == "test-key-id"
assert (
parsed_conf.get("private_key", "")
== "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n"
)
assert (
parsed_conf.get("client_email")
== "test-email@test-project.iam.gserviceaccount.com"
)
assert parsed_conf.get("client_id") == "test-client-id"
assert parsed_conf.get("auth_uri") == "https://accounts.google.com/o/oauth2/auth"
assert parsed_conf.get("token_uri") == "https://oauth2.googleapis.com/token"
assert (
parsed_conf.get("auth_provider_x509_cert_url")
== "https://www.googleapis.com/oauth2/v1/certs"
)
assert parsed_conf.get("type") == "service_account"
def test_get_input_dataset_mcps(source: VertexAISource) -> None:
mock_dataset = gen_mock_dataset()
mock_job = gen_mock_training_custom_job()
job_meta = TrainingJobMetadata(mock_job, input_dataset=mock_dataset)
actual_mcps: List[MetadataChangeProposalWrapper] = list(
source._get_input_dataset_mcps(job_meta)
)
assert job_meta.input_dataset is not None
expected_urn = builder.make_dataset_urn(
platform=source.platform,
name=source._make_vertexai_dataset_name(entity_id=job_meta.input_dataset.name),
env=source.config.env,
)
mcp_ds = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DatasetPropertiesClass(
name=mock_dataset.display_name,
description=mock_dataset.display_name,
qualifiedName=mock_dataset.resource_name,
customProperties={"resourceName": mock_dataset.resource_name},
created=TimeStampClass(
time=datetime_to_ts_millis(mock_dataset.create_time)
),
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_DATASET]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 4
assert any(mcp_ds == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
@patch("google.cloud.aiplatform.Experiment.list")
def test_get_experiment_mcps(
mock_list: List[Experiment], source: VertexAISource
) -> None:
mock_experiment = gen_mock_experiment()
assert hasattr(mock_list, "return_value") # this check needed to go ground lint
mock_list.return_value = [mock_experiment]
actual_wus: List[MetadataWorkUnit] = list(source._get_experiments_workunits())
actual_mcps = [
mcp.metadata
for mcp in actual_wus
if isinstance(mcp.metadata, MetadataChangeProposalWrapper)
]
expected_urn = ExperimentKey(
platform=source.platform,
id=source._make_vertexai_experiment_id(mock_experiment.name),
).as_urn()
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_EXPERIMENT]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
mcp_container_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerPropertiesClass(
name=mock_experiment.name,
customProperties={
"platform": source.platform,
"id": source._make_vertexai_experiment_id(mock_experiment.name),
"name": mock_experiment.name,
"resourceName": mock_experiment.resource_name,
"dashboardURL": mock_experiment.dashboard_url
if mock_experiment.dashboard_url
else "",
},
externalUrl=source._make_experiment_external_url(mock_experiment),
),
)
mcp_status = MetadataChangeProposalWrapper(
entityUrn=expected_urn, aspect=StatusClass(removed=False)
)
assert len(actual_wus) == 5
assert len(actual_mcps) == 5
assert any(mcp_container_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_status == mcp for mcp in actual_mcps)
@patch("google.cloud.aiplatform.ExperimentRun.list")
def test_gen_experiment_run_mcps(
mock_list: List[ExperimentRun], source: VertexAISource
) -> None:
mock_exp = gen_mock_experiment()
source.experiments = [mock_exp]
mock_exp_run = gen_mock_experiment_run()
assert hasattr(mock_list, "return_value") # this check needed to go ground lint
mock_list.return_value = [mock_exp_run]
expected_exp_urn = ExperimentKey(
platform=source.platform,
id=source._make_vertexai_experiment_id(mock_exp.name),
).as_urn()
expected_urn = source._make_experiment_run_urn(mock_exp, mock_exp_run)
actual_mcps: List[MetadataChangeProposalWrapper] = list(
source._get_experiment_runs_mcps()
)
mcp_dpi = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataProcessInstancePropertiesClass(
name=mock_exp_run.name,
externalUrl=source._make_experiment_run_external_url(
mock_exp, mock_exp_run
),
customProperties={
"externalUrl": source._make_experiment_run_external_url(
mock_exp, mock_exp_run
)
},
created=AuditStamp(time=0, actor="urn:li:corpuser:datahub"),
),
)
mcp_ml_props = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=MLTrainingRunProperties(
id=f"{mock_exp.name}-{mock_exp_run.name}",
externalUrl=source._make_experiment_run_external_url(
mock_exp, mock_exp_run
),
hyperParams=[],
trainingMetrics=[],
),
)
mcp_container = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=ContainerClass(container=expected_exp_urn),
)
mcp_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_EXPERIMENT_RUN]),
)
mcp_dataplatform = MetadataChangeProposalWrapper(
entityUrn=expected_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
assert len(actual_mcps) == 5
assert any(mcp_dpi == mcp for mcp in actual_mcps)
assert any(mcp_ml_props == mcp for mcp in actual_mcps)
assert any(mcp_subtype == mcp for mcp in actual_mcps)
assert any(mcp_container == mcp for mcp in actual_mcps)
assert any(mcp_dataplatform == mcp for mcp in actual_mcps)
def test_get_pipeline_mcps(
source: VertexAISource,
) -> None:
mock_pipeline = get_mock_pipeline_job()
with contextlib.ExitStack() as exit_stack:
mock = exit_stack.enter_context(
patch("google.cloud.aiplatform.PipelineJob.list")
)
mock.return_value = [mock_pipeline]
actual_mcps = [mcp for mcp in source._get_pipelines_mcps()]
# Assert Entity Urns
expected_pipeline_urn = "urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD)"
expected_task_urn = "urn:li:dataJob:(urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD),acryl-poc.pipeline_task.reverse)"
duration = timedelta(
milliseconds=datetime_to_ts_millis(mock_pipeline.update_time)
- datetime_to_ts_millis(mock_pipeline.create_time)
)
mcp_pipe_df_info = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=DataFlowInfoClass(
env=source.config.env,
name=mock_pipeline.name,
customProperties={
"resource_name": mock_pipeline.resource_name,
"create_time": mock_pipeline.create_time.isoformat(),
"update_time": mock_pipeline.update_time.isoformat(),
"duration": source._format_pipeline_duration(duration),
"location": mock_pipeline.location,
"labels": ",".join(
[f"{k}:{v}" for k, v in mock_pipeline.labels.items()]
),
},
externalUrl=source._make_pipeline_external_url(mock_pipeline.name),
),
)
mcp_pipe_df_status = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=StatusClass(removed=False),
)
mcp_pipe_container = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_pipe_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE]),
)
mcp_pipeline_tag = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=GlobalTagsClass(tags=[]),
)
mcp_task_input = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=DataJobInputOutputClass(
inputDatasets=[],
outputDatasets=[],
inputDatajobs=[
"urn:li:dataJob:(urn:li:dataFlow:(vertexai,vertexai.acryl-poc.pipeline.mock_pipeline_job,PROD),acryl-poc.pipeline_task.concat)"
],
fineGrainedLineages=[],
),
)
mcp_task_info = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=DataJobInfoClass(
name="reverse",
customProperties={},
type="COMMAND",
externalUrl="https://console.cloud.google.com/vertex-ai/pipelines/locations/us-west2/runs/mock_pipeline_job?project=acryl-poc",
env=source.config.env,
),
)
mcp_task_container = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_task_subtype = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE_TASK]),
)
mcp_task_status = MetadataChangeProposalWrapper(
entityUrn=expected_pipeline_urn,
aspect=StatusClass(removed=False),
)
mcp_task_tag = MetadataChangeProposalWrapper(
entityUrn=expected_task_urn,
aspect=GlobalTagsClass(tags=[]),
)
dpi_urn = "urn:li:dataProcessInstance:acryl-poc.pipeline_task_run.reverse"
mcp_task_run_dpi = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=DataProcessInstancePropertiesClass(
name="reverse",
externalUrl="https://console.cloud.google.com/vertex-ai/pipelines/locations/us-west2/runs/mock_pipeline_job?project=acryl-poc",
customProperties={},
created=AuditStamp(
time=0,
actor="urn:li:corpuser:datahub",
),
),
)
mcp_task_run_container = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=ContainerClass(container=source._get_project_container().as_urn()),
)
mcp_task_run_subtype = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_PIPELINE_TASK_RUN]),
)
mcp_task_run_dataplatform = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=DataPlatformInstanceClass(
platform=str(DataPlatformUrn(source.platform))
),
)
mcp_task_run_relationship = MetadataChangeProposalWrapper(
entityUrn=dpi_urn,
aspect=DataProcessInstanceRelationships(
upstreamInstances=[], parentTemplate=expected_task_urn
),
)
assert len(actual_mcps) == 19
assert any(mcp_pipe_df_info == mcp for mcp in actual_mcps)
assert any(mcp_pipe_df_status == mcp for mcp in actual_mcps)
assert any(mcp_pipe_subtype == mcp for mcp in actual_mcps)
assert any(mcp_pipe_container == mcp for mcp in actual_mcps)
assert any(mcp_pipeline_tag == mcp for mcp in actual_mcps)
assert any(mcp_task_input == mcp for mcp in actual_mcps)
assert any(mcp_task_info == mcp for mcp in actual_mcps)
assert any(mcp_task_container == mcp for mcp in actual_mcps)
assert any(mcp_task_subtype == mcp for mcp in actual_mcps)
assert any(mcp_task_status == mcp for mcp in actual_mcps)
assert any(mcp_task_tag == mcp for mcp in actual_mcps)
assert any(mcp_task_run_dpi == mcp for mcp in actual_mcps)
assert any(mcp_task_run_container == mcp for mcp in actual_mcps)
assert any(mcp_task_run_subtype == mcp for mcp in actual_mcps)
assert any(mcp_task_run_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_task_run_dataplatform == mcp for mcp in actual_mcps)
assert any(mcp_task_run_relationship == mcp for mcp in actual_mcps)
def test_make_model_external_url(source: VertexAISource) -> None:
mock_model = gen_mock_model()
assert (
source._make_model_external_url(mock_model)
== f"{source.config.vertexai_url}/models/locations/{source.config.region}/models/{mock_model.name}"
f"?project={source.config.project_id}"
)
def test_make_job_urn(source: VertexAISource) -> None:
mock_training_job = gen_mock_training_automl_job()
assert (
source._make_training_job_urn(mock_training_job)
== f"{builder.make_data_process_instance_urn(source._make_vertexai_job_name(mock_training_job.name))}"
)