autogen/test/spark/test_overtime.py
Li Jiang 50334f2c52
Support spark dataframe as input dataset and spark models as estimators (#934)
* add basic support to Spark dataframe

add support to SynapseML LightGBM model

update to pyspark>=3.2.0 to leverage pandas_on_Spark API

* clean code, add TODOs

* add sample_train_data for pyspark.pandas dataframe, fix bugs

* improve some functions, fix bugs

* fix dict change size during iteration

* update model predict

* update LightGBM model, update test

* update SynapseML LightGBM params

* update synapseML and tests

* update TODOs

* Added support to roc_auc for spark models

* Added support to score of spark estimator

* Added test for automl score of spark estimator

* Added cv support to pyspark.pandas dataframe

* Update test, fix bugs

* Added tests

* Updated docs, tests, added a notebook

* Fix bugs in non-spark env

* Fix bugs and improve tests

* Fix uninstall pyspark

* Fix tests error

* Fix java.lang.OutOfMemoryError: Java heap space

* Fix test_performance

* Update test_sparkml to test_0sparkml to use the expected spark conf

* Remove unnecessary widgets in notebook

* Fix iloc java.lang.StackOverflowError

* fix pre-commit

* Added params check for spark dataframes

* Refactor code for train_test_split to a function

* Update train_test_split_pyspark

* Refactor if-else, remove unnecessary code

* Remove y from predict, remove mem control from n_iter compute

* Update workflow

* Improve _split_pyspark

* Fix test failure of too short training time

* Fix typos, improve docstrings

* Fix index errors of pandas_on_spark, add spark loss metric

* Fix typo of ndcgAtK

* Update NDCG metrics and tests

* Remove unuseful logger

* Use cache and count to ensure consistent indexes

* refactor for merge maain

* fix errors of refactor

* Updated SparkLightGBMEstimator and cache

* Updated config2params

* Remove unused import

* Fix unknown parameters

* Update default_estimator_list

* Add unit tests for spark metrics
2023-03-25 19:59:46 +00:00

73 lines
1.9 KiB
Python

import os
import time
import numpy as np
import pytest
from sklearn.datasets import load_iris
from flaml import AutoML
try:
from test.spark.custom_mylearner import *
except ImportError:
from custom_mylearner import *
try:
import pyspark
from flaml.tune.spark.utils import check_spark
from flaml.tune.spark.mylearner import lazy_metric
os.environ["FLAML_MAX_CONCURRENT"] = "10"
spark = pyspark.sql.SparkSession.builder.appName("App4OvertimeTest").getOrCreate()
spark_available, _ = check_spark()
skip_spark = not spark_available
except ImportError:
skip_spark = True
pytestmark = pytest.mark.skipif(
skip_spark, reason="Spark is not installed. Skip all spark tests."
)
def test_overtime():
time_budget = 15
df, y = load_iris(return_X_y=True, as_frame=True)
df["label"] = y
automl_experiment = AutoML()
automl_settings = {
"dataframe": df,
"label": "label",
"time_budget": time_budget,
"eval_method": "cv",
"metric": lazy_metric,
"task": "classification",
"log_file_name": "test/iris_custom.log",
"log_training_metric": True,
"log_type": "all",
"n_jobs": 1,
"model_history": True,
"sample_weight": np.ones(len(y)),
"pred_time_limit": 1e-5,
"estimator_list": ["lgbm"],
"n_concurrent_trials": 2,
"use_spark": True,
"force_cancel": True,
}
start_time = time.time()
automl_experiment.fit(**automl_settings)
elapsed_time = time.time() - start_time
print(
"time budget: {:.2f}s, actual elapsed time: {:.2f}s".format(
time_budget, elapsed_time
)
)
assert abs(elapsed_time - time_budget) < 2
print(automl_experiment.predict(df))
print(automl_experiment.model)
print(automl_experiment.best_iteration)
print(automl_experiment.best_estimator)
if __name__ == "__main__":
test_overtime()