mirror of
				https://github.com/microsoft/autogen.git
				synced 2025-10-31 01:40:58 +00:00 
			
		
		
		
	 7de4eb347d
			
		
	
	
		7de4eb347d
		
			
		
	
	
	
	
		
			
			* Improve test by removing unnecessary environment variable * Fix PULL_REQUEST_TEMPLATE * Hide pre-commit check * remove the checkbox for pre-commit Co-authored-by: Chi Wang <wang.chi@microsoft.com> --------- Co-authored-by: Chi Wang <wang.chi@microsoft.com>
		
			
				
	
	
		
			419 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			419 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import numpy as np
 | |
| import pandas as pd
 | |
| from functools import partial
 | |
| from timeit import timeit
 | |
| import pytest
 | |
| import os
 | |
| 
 | |
| try:
 | |
|     os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
 | |
|     from pyspark.sql import SparkSession
 | |
|     import pyspark
 | |
|     import pyspark.pandas as ps
 | |
|     from flaml.tune.spark.utils import (
 | |
|         with_parameters,
 | |
|         check_spark,
 | |
|         get_n_cpus,
 | |
|         get_broadcast_data,
 | |
|     )
 | |
|     from flaml.automl.spark.utils import (
 | |
|         to_pandas_on_spark,
 | |
|         train_test_split_pyspark,
 | |
|         unique_pandas_on_spark,
 | |
|         len_labels,
 | |
|         unique_value_first_index,
 | |
|         iloc_pandas_on_spark,
 | |
|     )
 | |
|     from flaml.automl.spark.metrics import spark_metric_loss_score
 | |
|     from flaml.automl.ml import sklearn_metric_loss_score
 | |
|     from pyspark.ml.linalg import Vectors
 | |
| 
 | |
|     spark_available, _ = check_spark()
 | |
|     skip_spark = not spark_available
 | |
| except ImportError:
 | |
|     print("Spark is not installed. Skip all spark tests.")
 | |
|     skip_spark = True
 | |
| 
 | |
| pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.")
 | |
| 
 | |
| 
 | |
| def test_with_parameters_spark():
 | |
|     def train(config, data=None):
 | |
|         if isinstance(data, pyspark.broadcast.Broadcast):
 | |
|             data = data.value
 | |
|         print(config, len(data))
 | |
| 
 | |
|     data = ["a"] * 10**6
 | |
| 
 | |
|     with_parameters_train = with_parameters(train, data=data)
 | |
|     partial_train = partial(train, data=data)
 | |
| 
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     rdd = spark.sparkContext.parallelize(list(range(2)))
 | |
| 
 | |
|     t_partial = timeit(lambda: rdd.map(lambda x: partial_train(config=x)).collect(), number=5)
 | |
|     print("python_partial_train: " + str(t_partial))
 | |
| 
 | |
|     t_spark = timeit(
 | |
|         lambda: rdd.map(lambda x: with_parameters_train(config=x)).collect(),
 | |
|         number=5,
 | |
|     )
 | |
|     print("spark_with_parameters_train: " + str(t_spark))
 | |
| 
 | |
|     # assert t_spark < t_partial
 | |
| 
 | |
| 
 | |
| def test_get_n_cpus_spark():
 | |
|     n_cpus = get_n_cpus()
 | |
|     assert isinstance(n_cpus, int)
 | |
| 
 | |
| 
 | |
| def test_broadcast_code():
 | |
|     from flaml.tune.spark.utils import broadcast_code
 | |
|     from flaml.automl.model import LGBMEstimator
 | |
| 
 | |
|     custom_code = """
 | |
|     from flaml.automl.model import LGBMEstimator
 | |
|     from flaml import tune
 | |
| 
 | |
|     class MyLargeLGBM(LGBMEstimator):
 | |
|         @classmethod
 | |
|         def search_space(cls, **params):
 | |
|             return {
 | |
|                 "n_estimators": {
 | |
|                     "domain": tune.lograndint(lower=4, upper=32768),
 | |
|                     "init_value": 32768,
 | |
|                     "low_cost_init_value": 4,
 | |
|                 },
 | |
|                 "num_leaves": {
 | |
|                     "domain": tune.lograndint(lower=4, upper=32768),
 | |
|                     "init_value": 32768,
 | |
|                     "low_cost_init_value": 4,
 | |
|                 },
 | |
|             }
 | |
|     """
 | |
| 
 | |
|     _ = broadcast_code(custom_code=custom_code)
 | |
|     from flaml.tune.spark.mylearner import MyLargeLGBM
 | |
| 
 | |
|     assert isinstance(MyLargeLGBM(), LGBMEstimator)
 | |
| 
 | |
| 
 | |
| def test_get_broadcast_data():
 | |
|     data = ["a"] * 10
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     bc_data = spark.sparkContext.broadcast(data)
 | |
|     assert get_broadcast_data(bc_data) == data
 | |
| 
 | |
| 
 | |
| def test_to_pandas_on_spark(capsys):
 | |
|     pdf = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
 | |
|     psdf = to_pandas_on_spark(pdf)
 | |
|     print(psdf)
 | |
|     captured = capsys.readouterr()
 | |
|     assert captured.out == "   a  b\n0  1  4\n1  2  5\n2  3  6\n"
 | |
|     assert isinstance(psdf, ps.DataFrame)
 | |
| 
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     sdf = spark.createDataFrame(pdf)
 | |
|     psdf = to_pandas_on_spark(sdf)
 | |
|     print(psdf)
 | |
|     captured = capsys.readouterr()
 | |
|     assert captured.out == "   a  b\n0  1  4\n1  2  5\n2  3  6\n"
 | |
|     assert isinstance(psdf, ps.DataFrame)
 | |
| 
 | |
|     pds = pd.Series([1, 2, 3])
 | |
|     pss = to_pandas_on_spark(pds)
 | |
|     print(pss)
 | |
|     captured = capsys.readouterr()
 | |
|     assert captured.out == "0    1\n1    2\n2    3\ndtype: int64\n"
 | |
|     assert isinstance(pss, ps.Series)
 | |
| 
 | |
| 
 | |
| def test_train_test_split_pyspark():
 | |
|     pdf = pd.DataFrame({"x": [1, 2, 3, 4], "y": [0, 1, 1, 0]})
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     sdf = spark.createDataFrame(pdf).repartition(1)
 | |
|     psdf = to_pandas_on_spark(sdf).spark.repartition(1)
 | |
|     train_sdf, test_sdf = train_test_split_pyspark(sdf, test_fraction=0.5, to_pandas_spark=False, seed=1)
 | |
|     train_psdf, test_psdf = train_test_split_pyspark(psdf, test_fraction=0.5, stratify_column="y", seed=1)
 | |
|     assert isinstance(train_sdf, pyspark.sql.dataframe.DataFrame)
 | |
|     assert isinstance(test_sdf, pyspark.sql.dataframe.DataFrame)
 | |
|     assert isinstance(train_psdf, ps.DataFrame)
 | |
|     assert isinstance(test_psdf, ps.DataFrame)
 | |
|     assert train_sdf.count() == 2
 | |
|     assert train_psdf.shape[0] == 2
 | |
|     print(train_sdf.toPandas())
 | |
|     print(test_sdf.toPandas())
 | |
|     print(train_psdf.to_pandas())
 | |
|     print(test_psdf.to_pandas())
 | |
| 
 | |
| 
 | |
| def test_unique_pandas_on_spark():
 | |
|     pdf = pd.DataFrame({"x": [1, 2, 2, 3], "y": [0, 1, 1, 0]})
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     sdf = spark.createDataFrame(pdf)
 | |
|     psdf = to_pandas_on_spark(sdf)
 | |
|     label_set, counts = unique_pandas_on_spark(psdf)
 | |
|     assert np.array_equal(label_set, np.array([2, 1, 3]))
 | |
|     assert np.array_equal(counts, np.array([2, 1, 1]))
 | |
| 
 | |
| 
 | |
| def test_len_labels():
 | |
|     y1 = np.array([1, 2, 5, 4, 5])
 | |
|     y2 = ps.Series([1, 2, 5, 4, 5])
 | |
|     assert len_labels(y1) == 4
 | |
|     ll, la = len_labels(y2, return_labels=True)
 | |
|     assert ll == 4
 | |
|     assert set(la.to_numpy()) == set([1, 2, 5, 4])
 | |
| 
 | |
| 
 | |
| def test_unique_value_first_index():
 | |
|     y1 = np.array([1, 2, 5, 4, 5])
 | |
|     y2 = ps.Series([1, 2, 5, 4, 5])
 | |
|     l1, f1 = unique_value_first_index(y1)
 | |
|     l2, f2 = unique_value_first_index(y2)
 | |
|     assert np.array_equal(l1, np.array([1, 2, 4, 5]))
 | |
|     assert np.array_equal(f1, np.array([0, 1, 3, 2]))
 | |
|     assert np.array_equal(l2, np.array([1, 2, 5, 4]))
 | |
|     assert np.array_equal(f2, np.array([0, 1, 2, 3]))
 | |
| 
 | |
| 
 | |
| def test_n_current_trials():
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     sc = spark._jsc.sc()
 | |
|     num_executors = len([executor.host() for executor in sc.statusTracker().getExecutorInfos()]) - 1
 | |
| 
 | |
|     def get_n_current_trials(n_concurrent_trials=0, num_executors=num_executors):
 | |
|         try:
 | |
|             FLAML_MAX_CONCURRENT = int(os.getenv("FLAML_MAX_CONCURRENT", 0))
 | |
|         except ValueError:
 | |
|             FLAML_MAX_CONCURRENT = 0
 | |
|         num_executors = max(num_executors, FLAML_MAX_CONCURRENT, 1)
 | |
|         max_spark_parallelism = max(spark.sparkContext.defaultParallelism, FLAML_MAX_CONCURRENT)
 | |
|         max_concurrent = max(1, max_spark_parallelism)
 | |
|         n_concurrent_trials = min(
 | |
|             n_concurrent_trials if n_concurrent_trials > 0 else num_executors,
 | |
|             max_concurrent,
 | |
|         )
 | |
|         print("n_concurrent_trials:", n_concurrent_trials)
 | |
|         return n_concurrent_trials
 | |
| 
 | |
|     os.environ["FLAML_MAX_CONCURRENT"] = "invlaid"
 | |
|     assert get_n_current_trials() == max(num_executors, 1)
 | |
|     tmp_max = spark.sparkContext.defaultParallelism
 | |
|     assert get_n_current_trials(1) == 1
 | |
|     assert get_n_current_trials(2) == min(2, tmp_max)
 | |
|     assert get_n_current_trials(50) == min(50, tmp_max)
 | |
|     assert get_n_current_trials(200) == min(200, tmp_max)
 | |
|     os.environ["FLAML_MAX_CONCURRENT"] = "0"
 | |
|     assert get_n_current_trials() == max(num_executors, 1)
 | |
|     os.environ["FLAML_MAX_CONCURRENT"] = "4"
 | |
|     tmp_max = max(4, spark.sparkContext.defaultParallelism)
 | |
|     assert get_n_current_trials() == min(4, tmp_max)
 | |
|     os.environ["FLAML_MAX_CONCURRENT"] = "9999999"
 | |
|     assert get_n_current_trials() == 9999999
 | |
|     os.environ["FLAML_MAX_CONCURRENT"] = "100"
 | |
|     tmp_max = max(100, spark.sparkContext.defaultParallelism)
 | |
|     assert get_n_current_trials(1) == 1
 | |
|     assert get_n_current_trials(2) == min(2, tmp_max)
 | |
|     assert get_n_current_trials(50) == min(50, tmp_max)
 | |
|     assert get_n_current_trials(200) == min(200, tmp_max)
 | |
|     del os.environ["FLAML_MAX_CONCURRENT"]
 | |
| 
 | |
| 
 | |
| def test_iloc_pandas_on_spark():
 | |
|     psdf = ps.DataFrame({"x": [1, 2, 2, 3], "y": [0, 1, 1, 0]}, index=[0, 1, 2, 3])
 | |
|     psds = ps.Series([1, 2, 2, 3], index=[0, 1, 2, 3])
 | |
|     assert iloc_pandas_on_spark(psdf, 0).tolist() == [1, 0]
 | |
|     d1 = iloc_pandas_on_spark(psdf, slice(1, 3)).to_pandas()
 | |
|     d2 = pd.DataFrame({"x": [2, 2], "y": [1, 1]}, index=[1, 2])
 | |
|     assert d1.equals(d2)
 | |
|     d1 = iloc_pandas_on_spark(psdf, [1, 3]).to_pandas()
 | |
|     d2 = pd.DataFrame({"x": [2, 3], "y": [1, 0]}, index=[0, 1])
 | |
|     assert d1.equals(d2)
 | |
|     assert iloc_pandas_on_spark(psds, 0) == 1
 | |
|     assert iloc_pandas_on_spark(psds, slice(1, 3)).tolist() == [2, 2]
 | |
|     assert iloc_pandas_on_spark(psds, [0, 3]).tolist() == [1, 3]
 | |
| 
 | |
| 
 | |
| def test_spark_metric_loss_score():
 | |
|     spark = SparkSession.builder.getOrCreate()
 | |
|     scoreAndLabels = map(
 | |
|         lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]),
 | |
|         [
 | |
|             (0.1, 0.0),
 | |
|             (0.1, 1.0),
 | |
|             (0.4, 0.0),
 | |
|             (0.6, 0.0),
 | |
|             (0.6, 1.0),
 | |
|             (0.6, 1.0),
 | |
|             (0.8, 1.0),
 | |
|         ],
 | |
|     )
 | |
|     dataset = spark.createDataFrame(scoreAndLabels, ["raw", "label"])
 | |
|     dataset = to_pandas_on_spark(dataset)
 | |
|     # test pr_auc
 | |
|     metric = spark_metric_loss_score(
 | |
|         "pr_auc",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("pr_auc: ", metric)
 | |
|     assert str(metric)[:5] == "0.166"
 | |
|     # test roc_auc
 | |
|     metric = spark_metric_loss_score(
 | |
|         "roc_auc",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("roc_auc: ", metric)
 | |
|     assert str(metric)[:5] == "0.291"
 | |
| 
 | |
|     scoreAndLabels = [
 | |
|         (-28.98343821, -27.0),
 | |
|         (20.21491975, 21.5),
 | |
|         (-25.98418959, -22.0),
 | |
|         (30.69731842, 33.0),
 | |
|         (74.69283752, 71.0),
 | |
|     ]
 | |
|     dataset = spark.createDataFrame(scoreAndLabels, ["raw", "label"])
 | |
|     dataset = to_pandas_on_spark(dataset)
 | |
|     # test rmse
 | |
|     metric = spark_metric_loss_score(
 | |
|         "rmse",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("rmse: ", metric)
 | |
|     assert str(metric)[:5] == "2.842"
 | |
|     # test mae
 | |
|     metric = spark_metric_loss_score(
 | |
|         "mae",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("mae: ", metric)
 | |
|     assert str(metric)[:5] == "2.649"
 | |
|     # test r2
 | |
|     metric = spark_metric_loss_score(
 | |
|         "r2",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("r2: ", metric)
 | |
|     assert str(metric)[:5] == "0.006"
 | |
|     # test mse
 | |
|     metric = spark_metric_loss_score(
 | |
|         "mse",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("mse: ", metric)
 | |
|     assert str(metric)[:5] == "8.079"
 | |
|     # test var
 | |
|     metric = spark_metric_loss_score(
 | |
|         "var",
 | |
|         dataset["raw"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("var: ", metric)
 | |
|     assert str(metric)[:5] == "-1489"
 | |
| 
 | |
|     predictionAndLabelsWithProbabilities = [
 | |
|         (1.0, 1.0, 1.0, [0.1, 0.8, 0.1]),
 | |
|         (0.0, 2.0, 1.0, [0.9, 0.05, 0.05]),
 | |
|         (0.0, 0.0, 1.0, [0.8, 0.2, 0.0]),
 | |
|         (1.0, 1.0, 1.0, [0.3, 0.65, 0.05]),
 | |
|     ]
 | |
|     dataset = spark.createDataFrame(
 | |
|         predictionAndLabelsWithProbabilities,
 | |
|         ["prediction", "label", "weight", "probability"],
 | |
|     )
 | |
|     dataset = to_pandas_on_spark(dataset)
 | |
|     # test logloss
 | |
|     metric = spark_metric_loss_score(
 | |
|         "log_loss",
 | |
|         dataset["probability"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("log_loss: ", metric)
 | |
|     assert str(metric)[:5] == "0.968"
 | |
|     # test accuracy
 | |
|     metric = spark_metric_loss_score(
 | |
|         "accuracy",
 | |
|         dataset["prediction"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("accuracy: ", metric)
 | |
|     assert str(metric)[:5] == "0.25"
 | |
|     # test f1
 | |
|     metric = spark_metric_loss_score(
 | |
|         "f1",
 | |
|         dataset["prediction"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("f1: ", metric)
 | |
|     assert str(metric)[:5] == "0.333"
 | |
| 
 | |
|     scoreAndLabels = [
 | |
|         ([0.0, 1.0], [0.0, 2.0]),
 | |
|         ([0.0, 2.0], [0.0, 1.0]),
 | |
|         ([], [0.0]),
 | |
|         ([2.0], [2.0]),
 | |
|         ([2.0, 0.0], [2.0, 0.0]),
 | |
|         ([0.0, 1.0, 2.0], [0.0, 1.0]),
 | |
|         ([1.0], [1.0, 2.0]),
 | |
|     ]
 | |
|     dataset = spark.createDataFrame(scoreAndLabels, ["prediction", "label"])
 | |
|     dataset = to_pandas_on_spark(dataset)
 | |
|     # test micro_f1
 | |
|     metric = spark_metric_loss_score(
 | |
|         "micro_f1",
 | |
|         dataset["prediction"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("micro_f1: ", metric)
 | |
|     assert str(metric)[:5] == "0.304"
 | |
|     # test macro_f1
 | |
|     metric = spark_metric_loss_score(
 | |
|         "macro_f1",
 | |
|         dataset["prediction"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("macro_f1: ", metric)
 | |
|     assert str(metric)[:5] == "0.111"
 | |
| 
 | |
|     scoreAndLabels = [
 | |
|         (
 | |
|             [1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0],
 | |
|             [1.0, 2.0, 3.0, 4.0, 5.0],
 | |
|         ),
 | |
|         ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 3.0]),
 | |
|         ([1.0, 2.0, 3.0, 4.0, 5.0], []),
 | |
|     ]
 | |
|     dataset = spark.createDataFrame(scoreAndLabels, ["prediction", "label"])
 | |
|     dataset = to_pandas_on_spark(dataset)
 | |
|     # test ap
 | |
|     metric = spark_metric_loss_score(
 | |
|         "ap",
 | |
|         dataset["prediction"],
 | |
|         dataset["label"],
 | |
|     )
 | |
|     print("ap: ", metric)
 | |
|     assert str(metric)[:5] == "0.644"
 | |
|     # test ndcg
 | |
|     # ndcg is tested in synapseML rank tests, so we don't need to test it here
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     # test_with_parameters_spark()
 | |
|     # test_get_n_cpus_spark()
 | |
|     # test_broadcast_code()
 | |
|     # test_get_broadcast_data()
 | |
|     # test_train_test_split_pyspark()
 | |
|     test_n_current_trials()
 | |
|     # test_len_labels()
 | |
|     # test_iloc_pandas_on_spark()
 | |
|     test_spark_metric_loss_score()
 |