From 08c114c34031d08a44112e8f5a3c773273e39d28 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Mon, 22 Apr 2024 15:50:44 +0200 Subject: [PATCH] FIXES 15626: Fix issue with not url model store (#15974) * Changed the MLModelStore storage type to string * fix checkstyle * remove unused files * Update requirements * fix checkstyle * Skipping MLFlow intergration on python 3.8 * Hack to allow pytest to parse the mlflow integrations test on python 3.8 * Fix checkstyle --- ingestion/setup.py | 3 +- .../tests/integration/mlflow/__init__.py | 0 .../tests/integration/mlflow/conftest.py | 176 ++++++++++++++++ .../tests/integration/mlflow/test_mlflow.py | 193 ++++++++++++++++++ .../mlmodels/MlModelResourceTest.java | 2 +- .../json/schema/entity/data/mlmodel.json | 2 +- 6 files changed, 373 insertions(+), 3 deletions(-) create mode 100644 ingestion/tests/integration/mlflow/__init__.py create mode 100644 ingestion/tests/integration/mlflow/conftest.py create mode 100644 ingestion/tests/integration/mlflow/test_mlflow.py diff --git a/ingestion/setup.py b/ingestion/setup.py index c633ea85a5c..22ae132f4c0 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -309,7 +309,8 @@ test = { VERSIONS["avro"], # Sample Data VERSIONS["grpc-tools"], "testcontainers==3.7.1;python_version<'3.9'", - "testcontainers==4.3.3;python_version>='3.9'", + "testcontainers==4.4.0;python_version>='3.9'", + "minio==7.2.5", } e2e_test = { diff --git a/ingestion/tests/integration/mlflow/__init__.py b/ingestion/tests/integration/mlflow/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/integration/mlflow/conftest.py b/ingestion/tests/integration/mlflow/conftest.py new file mode 100644 index 00000000000..5ab70af47a9 --- /dev/null +++ b/ingestion/tests/integration/mlflow/conftest.py @@ -0,0 +1,176 @@ +""" +Environment fixtures to be able to test the MLFlow Ingestion Pipeline. + +The following steps are taken: +1. Build the MLFlow image with needed dependencies +2. Get a testcontainer Container for + - MlFlow Image + - MySQL + - MinIO + * For each container, an open port is found to be able to reach it from the host. +3. A Docker Network is created so that they can reach each other easily without the need to use the host. +4. The containers are started +5. Any specific configuration is done +6. Needed configurations are yielded back to the test. +""" +import io +import sys +from dataclasses import asdict, dataclass +from typing import Optional + +import pytest +from testcontainers.core.container import DockerContainer +from testcontainers.core.docker_client import DockerClient +from testcontainers.mysql import MySqlContainer + +# HACK: This test is only possible for Python3.9 or higher. +# This allows pytest to parse the file even on lower verions. +if sys.version_info >= (3, 9): + from testcontainers.core.network import Network + from testcontainers.minio import MinioContainer +else: + from unittest.mock import MagicMock + + Network = MagicMock() + MinioContainer = MagicMock() + +# ------------------------------------------------------------ +# Container Configurations +# ------------------------------------------------------------ +@dataclass +class MySqlContainerConfigs: + """MySQL Configurations""" + + image: str = "mysql:8" + username: str = "mlflow" + password: str = "password" + dbname: str = "experiments" + port: int = 3306 + container_name: str = "mlflow-db" + exposed_port: Optional[int] = None + + def with_exposed_port(self, container): + self.exposed_port = container.get_exposed_port(self.port) + + +@dataclass +class MinioContainerConfigs: + """MinIO Configurations""" + + access_key: str = "minio" + secret_key: str = "password" + port: int = 9000 + container_name: str = "mlflow-artifact" + exposed_port: Optional[int] = None + + def with_exposed_port(self, container): + self.exposed_port = container.get_exposed_port(self.port) + + +@dataclass +class MlflowContainerConfigs: + """MLFlow Configurations""" + + backend_uri: str = "mysql+pymysql://mlflow:password@mlflow-db:3306/experiments" + artifact_bucket: str = "mlops.local.com" + port: int = 6000 + exposed_port: Optional[int] = None + + def with_exposed_port(self, container): + self.exposed_port = container.get_exposed_port(self.port) + + +class MlflowTestConfiguration: + """Responsible to hold all the configurations used by the test""" + + def __init__(self): + self.mysql_configs = MySqlContainerConfigs() + self.minio_configs = MinioContainerConfigs() + self.mlflow_configs = MlflowContainerConfigs() + + +# ------------------------------------------------------------ +# Fixture to setup the environment +# ------------------------------------------------------------ +@pytest.fixture(scope="session") +def mlflow_environment(): + config = MlflowTestConfiguration() + + docker_network = get_docker_network() + + minio_container = get_minio_container(config.minio_configs) + mysql_container = get_mysql_container(config.mysql_configs) + mlflow_container = build_and_get_mlflow_container(config.mlflow_configs) + + with docker_network: + minio_container.with_network(docker_network) + mysql_container.with_network(docker_network) + mlflow_container.with_network(docker_network) + with mysql_container, minio_container, mlflow_container: + # minio setup + minio_client = minio_container.get_client() + minio_client.make_bucket(config.mlflow_configs.artifact_bucket) + + config.mysql_configs.with_exposed_port(mysql_container) + config.minio_configs.with_exposed_port(minio_container) + config.mlflow_configs.with_exposed_port(mlflow_container) + + yield config + + +# ------------------------------------------------------------ +# Utility functions +# ------------------------------------------------------------ +def get_docker_network(name: str = "docker_mlflow_test_nw"): + network = Network() + network.name = name + return network + + +def build_and_get_mlflow_container(mlflow_config: MlflowContainerConfigs): + docker_client = DockerClient() + + dockerfile = io.BytesIO( + b""" + FROM python:3.10-slim-buster + RUN python -m pip install --upgrade pip + RUN pip install cryptography mlflow boto3 pymysql + """ + ) + + docker_client.client.images.build(fileobj=dockerfile, tag="mlflow_image:latest") + + container = DockerContainer("mlflow_image:latest") + container.with_bind_ports(mlflow_config.port, 8778) + container.with_command( + f"mlflow server --backend-store-uri {mlflow_config.backend_uri} --default-artifact-root s3://{mlflow_config.artifact_bucket} --host 0.0.0.0 --port {mlflow_config.port}" + ) + + return container + + +def get_mysql_container(mysql_config: MySqlContainerConfigs): + + container = MySqlContainer( + **{ + k: v + for k, v in asdict(mysql_config).items() + if k not in ["exposed_port", "container_name"] + } + ) + container.with_name(mysql_config.container_name) + + return container + + +def get_minio_container(minio_config: MinioContainerConfigs): + container = MinioContainer( + **{ + k: v + for k, v in asdict(minio_config).items() + if k not in ["exposed_port", "container_name"] + } + ) + container.with_name(minio_config.container_name) + + return container diff --git a/ingestion/tests/integration/mlflow/test_mlflow.py b/ingestion/tests/integration/mlflow/test_mlflow.py new file mode 100644 index 00000000000..e28d3b222af --- /dev/null +++ b/ingestion/tests/integration/mlflow/test_mlflow.py @@ -0,0 +1,193 @@ +import os +import sys +from urllib.parse import urlparse + +import mlflow +import mlflow.sklearn +import numpy as np +import pandas as pd +import pytest +from mlflow.models import infer_signature +from sklearn.linear_model import ElasticNet +from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score +from sklearn.model_selection import train_test_split + +from metadata.generated.schema.api.services.createMlModelService import ( + CreateMlModelServiceRequest, +) +from metadata.generated.schema.entity.data.mlmodel import MlModel +from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import ( + MlflowConnection, +) +from metadata.generated.schema.entity.services.mlmodelService import ( + MlModelConnection, + MlModelService, + MlModelServiceType, +) +from metadata.generated.schema.metadataIngestion.mlmodelServiceMetadataPipeline import ( + MlModelServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, + Sink, + Source, + SourceConfig, + WorkflowConfig, +) +from metadata.workflow.metadata import MetadataWorkflow + +from ..integration_base import int_admin_ometa + +MODEL_HYPERPARAMS = { + "alpha": {"name": "alpha", "value": "0.5", "description": None}, + "l1_ratio": {"name": "l1_ratio", "value": "1.0", "description": None}, +} + +MODEL_NAME = "ElasticnetWineModel" + +SERVICE_NAME = "docker_test_mlflow" + + +def eval_metrics(actual, pred): + rmse = np.sqrt(mean_squared_error(actual, pred)) + mae = mean_absolute_error(actual, pred) + r2 = r2_score(actual, pred) + return rmse, mae, r2 + + +@pytest.fixture(scope="module") +def create_data(mlflow_environment): + mlflow_uri = f"http://localhost:{mlflow_environment.mlflow_configs.exposed_port}" + mlflow.set_tracking_uri(mlflow_uri) + + os.environ["AWS_ACCESS_KEY_ID"] = "minio" + os.environ["AWS_SECRET_ACCESS_KEY"] = "password" + os.environ[ + "MLFLOW_S3_ENDPOINT_URL" + ] = f"http://localhost:{mlflow_environment.minio_configs.exposed_port}" + + np.random.seed(40) + + # Read the wine-quality csv file from the URL + csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv" + data = pd.read_csv(csv_url, sep=";") + + # Split the data into training and test sets. (0.75, 0.25) split. + train, test = train_test_split(data) + + # The predicted column is "quality" which is a scalar from [3, 9] + train_x = train.drop(["quality"], axis=1) + test_x = test.drop(["quality"], axis=1) + train_y = train[["quality"]] + test_y = test[["quality"]] + + alpha = float(MODEL_HYPERPARAMS["alpha"]["value"]) + l1_ratio = float(MODEL_HYPERPARAMS["l1_ratio"]["value"]) + + with mlflow.start_run(): + lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42) + lr.fit(train_x, train_y) + + signature = infer_signature(train_x, lr.predict(train_x)) + + predicted_qualities = lr.predict(test_x) + + (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities) + + mlflow.log_param("alpha", alpha) + mlflow.log_param("l1_ratio", l1_ratio) + mlflow.log_metric("rmse", rmse) + mlflow.log_metric("r2", r2) + mlflow.log_metric("mae", mae) + + tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme + + # Model registry does not work with file store + if tracking_url_type_store != "file": + + # Register the model + # There are other ways to use the Model Registry, which depends on the use case, + # please refer to the doc for more information: + # https://mlflow.org/docs/latest/model-registry.html#api-workflow + mlflow.sklearn.log_model( + lr, + "model", + registered_model_name=MODEL_NAME, + signature=signature, + ) + else: + mlflow.sklearn.log_model(lr, "model") + + +@pytest.fixture(scope="module") +def metadata(): + return int_admin_ometa() + + +@pytest.fixture(scope="module") +def service(metadata, mlflow_environment): + service = CreateMlModelServiceRequest( + name=SERVICE_NAME, + serviceType=MlModelServiceType.Mlflow, + connection=MlModelConnection( + config=MlflowConnection( + type="Mlflow", + trackingUri=f"http://localhost:{mlflow_environment.mlflow_configs.exposed_port}", + registryUri=f"mysql+pymysql://mlflow:password@localhost:{mlflow_environment.mysql_configs.exposed_port}/experiments", + ) + ), + ) + + service_entity = metadata.create_or_update(data=service) + yield service_entity + metadata.delete(MlModelService, service_entity.id, recursive=True, hard_delete=True) + + +@pytest.fixture(scope="module") +def ingest_mlflow(metadata, service, create_data): + workflow_config = OpenMetadataWorkflowConfig( + source=Source( + type=service.connection.config.type.value.lower(), + serviceName=service.fullyQualifiedName.__root__, + serviceConnection=service.connection, + sourceConfig=SourceConfig(config=MlModelServiceMetadataPipeline()), + ), + sink=Sink(type="metadata-rest", config={}), + workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), + ) + + metadata_ingestion = MetadataWorkflow.create(workflow_config) + metadata_ingestion.execute() + return + + +@pytest.mark.skipif( + sys.version_info < (3, 9), + reason="testcontainers Network feature requires python3.9 or higher", +) +def test_mlflow(ingest_mlflow, metadata): + ml_models = metadata.list_all_entities(entity=MlModel) + + # Check we only get the same amount of models we should have ingested + filtered_ml_models = [ + ml_model for ml_model in ml_models if ml_model.service.name == SERVICE_NAME + ] + + assert len(filtered_ml_models) == 1 + + # Assert inner information about the model + model = filtered_ml_models[0] + + # Assert name is as expected + assert model.name.__root__ == MODEL_NAME + + # Assert HyperParameters are as expected + assert len(model.mlHyperParameters) == 2 + + for i, hp in enumerate(MODEL_HYPERPARAMS.values()): + assert model.mlHyperParameters[i].name == hp["name"] + assert model.mlHyperParameters[i].value == hp["value"] + assert model.mlHyperParameters[i].description == hp["description"] + + # Assert MLStore is as expected + assert "mlops.local.com" in model.mlStore.storage diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/mlmodels/MlModelResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/mlmodels/MlModelResourceTest.java index 2eec895e20c..31666bbf64a 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/mlmodels/MlModelResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/mlmodels/MlModelResourceTest.java @@ -84,7 +84,7 @@ public class MlModelResourceTest extends EntityResourceTest ML_FEATURES = diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json index 789eb6d7897..7f3dcfd0d04 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json @@ -141,7 +141,7 @@ "properties": { "storage": { "description": "Storage Layer containing the ML Model data.", - "$ref": "../../type/basic.json#/definitions/href" + "type": "string" }, "imageRepository": { "description": "Container Repository with the ML Model image.",