mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-03 02:55:59 +00:00
FIXES 15626: Fix issue with not url model store (#15974)
* Changed the MLModelStore storage type to string * fix checkstyle * remove unused files * Update requirements * fix checkstyle * Skipping MLFlow intergration on python 3.8 * Hack to allow pytest to parse the mlflow integrations test on python 3.8 * Fix checkstyle
This commit is contained in:
parent
c04ec3e922
commit
08c114c340
@ -309,7 +309,8 @@ test = {
|
||||
VERSIONS["avro"], # Sample Data
|
||||
VERSIONS["grpc-tools"],
|
||||
"testcontainers==3.7.1;python_version<'3.9'",
|
||||
"testcontainers==4.3.3;python_version>='3.9'",
|
||||
"testcontainers==4.4.0;python_version>='3.9'",
|
||||
"minio==7.2.5",
|
||||
}
|
||||
|
||||
e2e_test = {
|
||||
|
||||
0
ingestion/tests/integration/mlflow/__init__.py
Normal file
0
ingestion/tests/integration/mlflow/__init__.py
Normal file
176
ingestion/tests/integration/mlflow/conftest.py
Normal file
176
ingestion/tests/integration/mlflow/conftest.py
Normal file
@ -0,0 +1,176 @@
|
||||
"""
|
||||
Environment fixtures to be able to test the MLFlow Ingestion Pipeline.
|
||||
|
||||
The following steps are taken:
|
||||
1. Build the MLFlow image with needed dependencies
|
||||
2. Get a testcontainer Container for
|
||||
- MlFlow Image
|
||||
- MySQL
|
||||
- MinIO
|
||||
* For each container, an open port is found to be able to reach it from the host.
|
||||
3. A Docker Network is created so that they can reach each other easily without the need to use the host.
|
||||
4. The containers are started
|
||||
5. Any specific configuration is done
|
||||
6. Needed configurations are yielded back to the test.
|
||||
"""
|
||||
import io
|
||||
import sys
|
||||
from dataclasses import asdict, dataclass
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from testcontainers.core.container import DockerContainer
|
||||
from testcontainers.core.docker_client import DockerClient
|
||||
from testcontainers.mysql import MySqlContainer
|
||||
|
||||
# HACK: This test is only possible for Python3.9 or higher.
|
||||
# This allows pytest to parse the file even on lower verions.
|
||||
if sys.version_info >= (3, 9):
|
||||
from testcontainers.core.network import Network
|
||||
from testcontainers.minio import MinioContainer
|
||||
else:
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
Network = MagicMock()
|
||||
MinioContainer = MagicMock()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Container Configurations
|
||||
# ------------------------------------------------------------
|
||||
@dataclass
|
||||
class MySqlContainerConfigs:
|
||||
"""MySQL Configurations"""
|
||||
|
||||
image: str = "mysql:8"
|
||||
username: str = "mlflow"
|
||||
password: str = "password"
|
||||
dbname: str = "experiments"
|
||||
port: int = 3306
|
||||
container_name: str = "mlflow-db"
|
||||
exposed_port: Optional[int] = None
|
||||
|
||||
def with_exposed_port(self, container):
|
||||
self.exposed_port = container.get_exposed_port(self.port)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MinioContainerConfigs:
|
||||
"""MinIO Configurations"""
|
||||
|
||||
access_key: str = "minio"
|
||||
secret_key: str = "password"
|
||||
port: int = 9000
|
||||
container_name: str = "mlflow-artifact"
|
||||
exposed_port: Optional[int] = None
|
||||
|
||||
def with_exposed_port(self, container):
|
||||
self.exposed_port = container.get_exposed_port(self.port)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MlflowContainerConfigs:
|
||||
"""MLFlow Configurations"""
|
||||
|
||||
backend_uri: str = "mysql+pymysql://mlflow:password@mlflow-db:3306/experiments"
|
||||
artifact_bucket: str = "mlops.local.com"
|
||||
port: int = 6000
|
||||
exposed_port: Optional[int] = None
|
||||
|
||||
def with_exposed_port(self, container):
|
||||
self.exposed_port = container.get_exposed_port(self.port)
|
||||
|
||||
|
||||
class MlflowTestConfiguration:
|
||||
"""Responsible to hold all the configurations used by the test"""
|
||||
|
||||
def __init__(self):
|
||||
self.mysql_configs = MySqlContainerConfigs()
|
||||
self.minio_configs = MinioContainerConfigs()
|
||||
self.mlflow_configs = MlflowContainerConfigs()
|
||||
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Fixture to setup the environment
|
||||
# ------------------------------------------------------------
|
||||
@pytest.fixture(scope="session")
|
||||
def mlflow_environment():
|
||||
config = MlflowTestConfiguration()
|
||||
|
||||
docker_network = get_docker_network()
|
||||
|
||||
minio_container = get_minio_container(config.minio_configs)
|
||||
mysql_container = get_mysql_container(config.mysql_configs)
|
||||
mlflow_container = build_and_get_mlflow_container(config.mlflow_configs)
|
||||
|
||||
with docker_network:
|
||||
minio_container.with_network(docker_network)
|
||||
mysql_container.with_network(docker_network)
|
||||
mlflow_container.with_network(docker_network)
|
||||
with mysql_container, minio_container, mlflow_container:
|
||||
# minio setup
|
||||
minio_client = minio_container.get_client()
|
||||
minio_client.make_bucket(config.mlflow_configs.artifact_bucket)
|
||||
|
||||
config.mysql_configs.with_exposed_port(mysql_container)
|
||||
config.minio_configs.with_exposed_port(minio_container)
|
||||
config.mlflow_configs.with_exposed_port(mlflow_container)
|
||||
|
||||
yield config
|
||||
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Utility functions
|
||||
# ------------------------------------------------------------
|
||||
def get_docker_network(name: str = "docker_mlflow_test_nw"):
|
||||
network = Network()
|
||||
network.name = name
|
||||
return network
|
||||
|
||||
|
||||
def build_and_get_mlflow_container(mlflow_config: MlflowContainerConfigs):
|
||||
docker_client = DockerClient()
|
||||
|
||||
dockerfile = io.BytesIO(
|
||||
b"""
|
||||
FROM python:3.10-slim-buster
|
||||
RUN python -m pip install --upgrade pip
|
||||
RUN pip install cryptography mlflow boto3 pymysql
|
||||
"""
|
||||
)
|
||||
|
||||
docker_client.client.images.build(fileobj=dockerfile, tag="mlflow_image:latest")
|
||||
|
||||
container = DockerContainer("mlflow_image:latest")
|
||||
container.with_bind_ports(mlflow_config.port, 8778)
|
||||
container.with_command(
|
||||
f"mlflow server --backend-store-uri {mlflow_config.backend_uri} --default-artifact-root s3://{mlflow_config.artifact_bucket} --host 0.0.0.0 --port {mlflow_config.port}"
|
||||
)
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def get_mysql_container(mysql_config: MySqlContainerConfigs):
|
||||
|
||||
container = MySqlContainer(
|
||||
**{
|
||||
k: v
|
||||
for k, v in asdict(mysql_config).items()
|
||||
if k not in ["exposed_port", "container_name"]
|
||||
}
|
||||
)
|
||||
container.with_name(mysql_config.container_name)
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def get_minio_container(minio_config: MinioContainerConfigs):
|
||||
container = MinioContainer(
|
||||
**{
|
||||
k: v
|
||||
for k, v in asdict(minio_config).items()
|
||||
if k not in ["exposed_port", "container_name"]
|
||||
}
|
||||
)
|
||||
container.with_name(minio_config.container_name)
|
||||
|
||||
return container
|
||||
193
ingestion/tests/integration/mlflow/test_mlflow.py
Normal file
193
ingestion/tests/integration/mlflow/test_mlflow.py
Normal file
@ -0,0 +1,193 @@
|
||||
import os
|
||||
import sys
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import mlflow
|
||||
import mlflow.sklearn
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pytest
|
||||
from mlflow.models import infer_signature
|
||||
from sklearn.linear_model import ElasticNet
|
||||
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
from metadata.generated.schema.api.services.createMlModelService import (
|
||||
CreateMlModelServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.mlmodel import MlModel
|
||||
from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import (
|
||||
MlflowConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.mlmodelService import (
|
||||
MlModelConnection,
|
||||
MlModelService,
|
||||
MlModelServiceType,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.mlmodelServiceMetadataPipeline import (
|
||||
MlModelServiceMetadataPipeline,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
Sink,
|
||||
Source,
|
||||
SourceConfig,
|
||||
WorkflowConfig,
|
||||
)
|
||||
from metadata.workflow.metadata import MetadataWorkflow
|
||||
|
||||
from ..integration_base import int_admin_ometa
|
||||
|
||||
MODEL_HYPERPARAMS = {
|
||||
"alpha": {"name": "alpha", "value": "0.5", "description": None},
|
||||
"l1_ratio": {"name": "l1_ratio", "value": "1.0", "description": None},
|
||||
}
|
||||
|
||||
MODEL_NAME = "ElasticnetWineModel"
|
||||
|
||||
SERVICE_NAME = "docker_test_mlflow"
|
||||
|
||||
|
||||
def eval_metrics(actual, pred):
|
||||
rmse = np.sqrt(mean_squared_error(actual, pred))
|
||||
mae = mean_absolute_error(actual, pred)
|
||||
r2 = r2_score(actual, pred)
|
||||
return rmse, mae, r2
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def create_data(mlflow_environment):
|
||||
mlflow_uri = f"http://localhost:{mlflow_environment.mlflow_configs.exposed_port}"
|
||||
mlflow.set_tracking_uri(mlflow_uri)
|
||||
|
||||
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
|
||||
os.environ["AWS_SECRET_ACCESS_KEY"] = "password"
|
||||
os.environ[
|
||||
"MLFLOW_S3_ENDPOINT_URL"
|
||||
] = f"http://localhost:{mlflow_environment.minio_configs.exposed_port}"
|
||||
|
||||
np.random.seed(40)
|
||||
|
||||
# Read the wine-quality csv file from the URL
|
||||
csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
|
||||
data = pd.read_csv(csv_url, sep=";")
|
||||
|
||||
# Split the data into training and test sets. (0.75, 0.25) split.
|
||||
train, test = train_test_split(data)
|
||||
|
||||
# The predicted column is "quality" which is a scalar from [3, 9]
|
||||
train_x = train.drop(["quality"], axis=1)
|
||||
test_x = test.drop(["quality"], axis=1)
|
||||
train_y = train[["quality"]]
|
||||
test_y = test[["quality"]]
|
||||
|
||||
alpha = float(MODEL_HYPERPARAMS["alpha"]["value"])
|
||||
l1_ratio = float(MODEL_HYPERPARAMS["l1_ratio"]["value"])
|
||||
|
||||
with mlflow.start_run():
|
||||
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
|
||||
lr.fit(train_x, train_y)
|
||||
|
||||
signature = infer_signature(train_x, lr.predict(train_x))
|
||||
|
||||
predicted_qualities = lr.predict(test_x)
|
||||
|
||||
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
|
||||
|
||||
mlflow.log_param("alpha", alpha)
|
||||
mlflow.log_param("l1_ratio", l1_ratio)
|
||||
mlflow.log_metric("rmse", rmse)
|
||||
mlflow.log_metric("r2", r2)
|
||||
mlflow.log_metric("mae", mae)
|
||||
|
||||
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
|
||||
|
||||
# Model registry does not work with file store
|
||||
if tracking_url_type_store != "file":
|
||||
|
||||
# Register the model
|
||||
# There are other ways to use the Model Registry, which depends on the use case,
|
||||
# please refer to the doc for more information:
|
||||
# https://mlflow.org/docs/latest/model-registry.html#api-workflow
|
||||
mlflow.sklearn.log_model(
|
||||
lr,
|
||||
"model",
|
||||
registered_model_name=MODEL_NAME,
|
||||
signature=signature,
|
||||
)
|
||||
else:
|
||||
mlflow.sklearn.log_model(lr, "model")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def metadata():
|
||||
return int_admin_ometa()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def service(metadata, mlflow_environment):
|
||||
service = CreateMlModelServiceRequest(
|
||||
name=SERVICE_NAME,
|
||||
serviceType=MlModelServiceType.Mlflow,
|
||||
connection=MlModelConnection(
|
||||
config=MlflowConnection(
|
||||
type="Mlflow",
|
||||
trackingUri=f"http://localhost:{mlflow_environment.mlflow_configs.exposed_port}",
|
||||
registryUri=f"mysql+pymysql://mlflow:password@localhost:{mlflow_environment.mysql_configs.exposed_port}/experiments",
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
service_entity = metadata.create_or_update(data=service)
|
||||
yield service_entity
|
||||
metadata.delete(MlModelService, service_entity.id, recursive=True, hard_delete=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def ingest_mlflow(metadata, service, create_data):
|
||||
workflow_config = OpenMetadataWorkflowConfig(
|
||||
source=Source(
|
||||
type=service.connection.config.type.value.lower(),
|
||||
serviceName=service.fullyQualifiedName.__root__,
|
||||
serviceConnection=service.connection,
|
||||
sourceConfig=SourceConfig(config=MlModelServiceMetadataPipeline()),
|
||||
),
|
||||
sink=Sink(type="metadata-rest", config={}),
|
||||
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
|
||||
)
|
||||
|
||||
metadata_ingestion = MetadataWorkflow.create(workflow_config)
|
||||
metadata_ingestion.execute()
|
||||
return
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.version_info < (3, 9),
|
||||
reason="testcontainers Network feature requires python3.9 or higher",
|
||||
)
|
||||
def test_mlflow(ingest_mlflow, metadata):
|
||||
ml_models = metadata.list_all_entities(entity=MlModel)
|
||||
|
||||
# Check we only get the same amount of models we should have ingested
|
||||
filtered_ml_models = [
|
||||
ml_model for ml_model in ml_models if ml_model.service.name == SERVICE_NAME
|
||||
]
|
||||
|
||||
assert len(filtered_ml_models) == 1
|
||||
|
||||
# Assert inner information about the model
|
||||
model = filtered_ml_models[0]
|
||||
|
||||
# Assert name is as expected
|
||||
assert model.name.__root__ == MODEL_NAME
|
||||
|
||||
# Assert HyperParameters are as expected
|
||||
assert len(model.mlHyperParameters) == 2
|
||||
|
||||
for i, hp in enumerate(MODEL_HYPERPARAMS.values()):
|
||||
assert model.mlHyperParameters[i].name == hp["name"]
|
||||
assert model.mlHyperParameters[i].value == hp["value"]
|
||||
assert model.mlHyperParameters[i].description == hp["description"]
|
||||
|
||||
# Assert MLStore is as expected
|
||||
assert "mlops.local.com" in model.mlStore.storage
|
||||
@ -84,7 +84,7 @@ public class MlModelResourceTest extends EntityResourceTest<MlModel, CreateMlMod
|
||||
public static final URI SERVER = URI.create("http://localhost.com/mlModel");
|
||||
public static final MlStore ML_STORE =
|
||||
new MlStore()
|
||||
.withStorage(URI.create("s3://my-bucket.com/mlModel"))
|
||||
.withStorage(URI.create("s3://my-bucket.com/mlModel").toString())
|
||||
.withImageRepository(URI.create("https://12345.dkr.ecr.region.amazonaws.com").toString());
|
||||
|
||||
public static final List<MlFeature> ML_FEATURES =
|
||||
|
||||
@ -141,7 +141,7 @@
|
||||
"properties": {
|
||||
"storage": {
|
||||
"description": "Storage Layer containing the ML Model data.",
|
||||
"$ref": "../../type/basic.json#/definitions/href"
|
||||
"type": "string"
|
||||
},
|
||||
"imageRepository": {
|
||||
"description": "Container Repository with the ML Model image.",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user