128 lines
4.3 KiB
Python

import contextlib
from pathlib import Path
from typing import Any, Dict, TypeVar
from unittest.mock import patch
from pytest import Config
from datahub.ingestion.run.pipeline import Pipeline
from tests.integration.vertexai.mock_vertexai import (
gen_mock_dataset,
gen_mock_experiment,
gen_mock_experiment_run,
gen_mock_model,
gen_mock_models,
gen_mock_training_automl_job,
gen_mock_training_custom_job,
get_mock_pipeline_job,
)
from tests.test_helpers import mce_helpers
T = TypeVar("T")
PROJECT_ID = "test-project-id"
REGION = "us-west2"
def get_pipeline_config(sink_file_path: str) -> Dict[str, Any]:
source_type = "vertexai"
return {
"run_id": "vertexai-source-test",
"source": {
"type": source_type,
"config": {
"project_id": PROJECT_ID,
"region": REGION,
},
},
"sink": {
"type": "file",
"config": {
"filename": sink_file_path,
},
},
}
def test_vertexai_source_ingestion(pytestconfig: Config, tmp_path: Path) -> None:
with contextlib.ExitStack() as exit_stack:
# Mock the Vertex API with empty list
for func_to_mock in [
"google.cloud.aiplatform.init",
"google.cloud.aiplatform.datasets.TextDataset.list",
"google.cloud.aiplatform.datasets.ImageDataset.list",
"google.cloud.aiplatform.datasets.TimeSeriesDataset.list",
"google.cloud.aiplatform.datasets.VideoDataset.list",
"google.cloud.aiplatform.CustomTrainingJob.list",
"google.cloud.aiplatform.CustomContainerTrainingJob.list",
"google.cloud.aiplatform.CustomPythonPackageTrainingJob.list",
"google.cloud.aiplatform.AutoMLTextTrainingJob.list",
"google.cloud.aiplatform.AutoMLImageTrainingJob.list",
"google.cloud.aiplatform.AutoMLVideoTrainingJob.list",
"google.cloud.aiplatform.AutoMLForecastingTrainingJob.list",
"google.cloud.aiplatform.TabularDataset.list",
]:
mock = exit_stack.enter_context(patch(func_to_mock))
mock.return_value = []
# Mock the Vertex AI with Mock data list
mock_models = exit_stack.enter_context(
patch("google.cloud.aiplatform.Model.list")
)
mock_models.return_value = gen_mock_models()
mock_custom_job = exit_stack.enter_context(
patch("google.cloud.aiplatform.CustomJob.list")
)
mock_custom_job.return_value = [gen_mock_training_custom_job()]
mock_automl_job = exit_stack.enter_context(
patch("google.cloud.aiplatform.AutoMLTabularTrainingJob.list")
)
mock_automl_job.return_value = [gen_mock_training_automl_job()]
mock_ds = exit_stack.enter_context(
patch("google.cloud.aiplatform.datasets.TabularDataset.list")
)
mock_ds.return_value = [gen_mock_dataset()]
mock_model = exit_stack.enter_context(
patch("google.cloud.aiplatform.models.Model")
)
mock_model.return_value = gen_mock_model()
mock_exp = exit_stack.enter_context(
patch("google.cloud.aiplatform.Experiment.list")
)
mock_exp.return_value = [gen_mock_experiment()]
mock_exp_run = exit_stack.enter_context(
patch("google.cloud.aiplatform.ExperimentRun.list")
)
mock_exp_run.return_value = [gen_mock_experiment_run()]
mock = exit_stack.enter_context(
patch("google.cloud.aiplatform.PipelineJob.list")
)
mock.return_value = [get_mock_pipeline_job()]
golden_file_path = (
pytestconfig.rootpath
/ "tests/integration/vertexai/vertexai_mcps_golden.json"
)
sink_file_path = str(tmp_path / "vertexai_source_mcps.json")
print(f"Output mcps file path: {str(sink_file_path)}")
print(f"Golden file path: {str(golden_file_path)}")
pipeline = Pipeline.create(get_pipeline_config(sink_file_path))
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=sink_file_path,
golden_path=golden_file_path,
)