autogen/flaml/automl/task/generic_task.py
Li Jiang 50334f2c52
Support spark dataframe as input dataset and spark models as estimators (#934)
* add basic support to Spark dataframe

add support to SynapseML LightGBM model

update to pyspark>=3.2.0 to leverage pandas_on_Spark API

* clean code, add TODOs

* add sample_train_data for pyspark.pandas dataframe, fix bugs

* improve some functions, fix bugs

* fix dict change size during iteration

* update model predict

* update LightGBM model, update test

* update SynapseML LightGBM params

* update synapseML and tests

* update TODOs

* Added support to roc_auc for spark models

* Added support to score of spark estimator

* Added test for automl score of spark estimator

* Added cv support to pyspark.pandas dataframe

* Update test, fix bugs

* Added tests

* Updated docs, tests, added a notebook

* Fix bugs in non-spark env

* Fix bugs and improve tests

* Fix uninstall pyspark

* Fix tests error

* Fix java.lang.OutOfMemoryError: Java heap space

* Fix test_performance

* Update test_sparkml to test_0sparkml to use the expected spark conf

* Remove unnecessary widgets in notebook

* Fix iloc java.lang.StackOverflowError

* fix pre-commit

* Added params check for spark dataframes

* Refactor code for train_test_split to a function

* Update train_test_split_pyspark

* Refactor if-else, remove unnecessary code

* Remove y from predict, remove mem control from n_iter compute

* Update workflow

* Improve _split_pyspark

* Fix test failure of too short training time

* Fix typos, improve docstrings

* Fix index errors of pandas_on_spark, add spark loss metric

* Fix typo of ndcgAtK

* Update NDCG metrics and tests

* Remove unuseful logger

* Use cache and count to ensure consistent indexes

* refactor for merge maain

* fix errors of refactor

* Updated SparkLightGBMEstimator and cache

* Updated config2params

* Remove unused import

* Fix unknown parameters

* Update default_estimator_list

* Add unit tests for spark metrics
2023-03-25 19:59:46 +00:00

1095 lines
46 KiB
Python

import os
import logging
import time
from typing import List, Optional
import pandas as pd
import numpy as np
from scipy.sparse import issparse
from sklearn.utils import shuffle
from sklearn.model_selection import (
train_test_split,
RepeatedStratifiedKFold,
RepeatedKFold,
GroupKFold,
TimeSeriesSplit,
GroupShuffleSplit,
StratifiedGroupKFold,
)
from flaml.automl.data import TS_TIMESTAMP_COL, concat
from flaml.automl.ml import EstimatorSubclass, default_cv_score_agg_func, get_val_loss
from flaml.automl.model import (
XGBoostSklearnEstimator,
XGBoostLimitDepthEstimator,
RandomForestEstimator,
LGBMEstimator,
LRL1Classifier,
LRL2Classifier,
CatBoostEstimator,
ExtraTreesEstimator,
KNeighborsEstimator,
TransformersEstimator,
TransformersEstimatorModelSelection,
SparkLGBMEstimator,
)
from flaml.automl.task.task import (
Task,
get_classification_objective,
TS_FORECAST,
TS_FORECASTPANEL,
)
from flaml.config import RANDOM_SEED
try:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
from pyspark.sql.functions import col
import pyspark.pandas as ps
from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries
from pyspark.pandas.config import set_option, reset_option
from flaml.automl.spark.utils import (
to_pandas_on_spark,
iloc_pandas_on_spark,
spark_kFold,
train_test_split_pyspark,
unique_pandas_on_spark,
unique_value_first_index,
len_labels,
)
from flaml.automl.spark.metrics import spark_metric_loss_score
except ImportError:
train_test_split_pyspark = None
unique_pandas_on_spark = None
iloc_pandas_on_spark = None
from flaml.automl.utils import (
len_labels,
unique_value_first_index,
)
ps = None
class psDataFrame:
pass
class psSeries:
pass
logger = logging.getLogger(__name__)
class GenericTask(Task):
estimators = {
"xgboost": XGBoostSklearnEstimator,
"xgb_limitdepth": XGBoostLimitDepthEstimator,
"rf": RandomForestEstimator,
"lgbm": LGBMEstimator,
"lrl1": LRL1Classifier,
"lrl2": LRL2Classifier,
"catboost": CatBoostEstimator,
"extra_tree": ExtraTreesEstimator,
"kneighbor": KNeighborsEstimator,
"transformer": TransformersEstimator,
"transformer_ms": TransformersEstimatorModelSelection,
"lgbm_spark": SparkLGBMEstimator,
}
def validate_data(
self,
automl,
state,
X_train_all,
y_train_all,
dataframe,
label,
X_val=None,
y_val=None,
groups_val=None,
groups=None,
):
if X_train_all is not None and y_train_all is not None:
assert isinstance(
X_train_all, (np.ndarray, pd.DataFrame, psDataFrame)
) or issparse(X_train_all), (
"X_train_all must be a numpy array, a pandas dataframe, "
"a Scipy sparse matrix or a pyspark.pandas dataframe."
)
assert isinstance(
y_train_all, (np.ndarray, pd.Series, psSeries)
), "y_train_all must be a numpy array, a pandas series or a pyspark.pandas series."
assert (
X_train_all.size != 0 and y_train_all.size != 0
), "Input data must not be empty."
if isinstance(X_train_all, np.ndarray) and len(X_train_all.shape) == 1:
X_train_all = np.reshape(X_train_all, (X_train_all.size, 1))
if isinstance(y_train_all, np.ndarray):
y_train_all = y_train_all.flatten()
assert (
X_train_all.shape[0] == y_train_all.shape[0]
), "# rows in X_train must match length of y_train."
if isinstance(X_train_all, psDataFrame):
X_train_all = (
X_train_all.spark.cache()
) # cache data to improve compute speed
y_train_all = y_train_all.to_frame().spark.cache()[y_train_all.name]
logger.debug(
f"X_train_all and y_train_all cached, shape of X_train_all: {X_train_all.shape}"
)
automl._df = isinstance(X_train_all, (pd.DataFrame, psDataFrame))
automl._nrow, automl._ndim = X_train_all.shape
if self.is_ts_forecast():
X_train_all = (
pd.DataFrame(X_train_all)
if isinstance(X_train_all, np.ndarray)
else X_train_all
)
X_train_all, y_train_all = self._validate_ts_data(
X_train_all, y_train_all
)
X, y = X_train_all, y_train_all
elif dataframe is not None and label is not None:
assert isinstance(
dataframe, (pd.DataFrame, psDataFrame)
), "dataframe must be a pandas DataFrame or a pyspark.pandas DataFrame."
assert (
label in dataframe.columns
), f"The provided label column name `{label}` doesn't exist in the provided dataframe."
if isinstance(dataframe, psDataFrame):
dataframe = (
dataframe.spark.cache()
) # cache data to improve compute speed
logger.debug(f"dataframe cached, shape of dataframe: {dataframe.shape}")
automl._df = True
if self.is_ts_forecast():
dataframe = self._validate_ts_data(dataframe)
# TODO: to support pyspark.sql.DataFrame and pure dataframe mode
X = dataframe.drop(columns=label)
automl._nrow, automl._ndim = X.shape
y = dataframe[label]
else:
raise ValueError("either X_train+y_train or dataframe+label are required")
# check the validity of input dimensions for NLP tasks, so need to check _is_nlp_task not estimator
if self.is_nlp():
from flaml.automl.nlp.utils import is_a_list_of_str
is_all_str = True
is_all_list = True
for column in X.columns:
assert X[column].dtype.name in (
"object",
"string",
), "If the task is an NLP task, X can only contain text columns"
for _, each_cell in X[column].items():
if each_cell is not None:
is_str = isinstance(each_cell, str)
is_list_of_int = isinstance(each_cell, list) and all(
isinstance(x, int) for x in each_cell
)
is_list_of_str = is_a_list_of_str(each_cell)
if self.is_token_classification():
assert is_list_of_str, (
"For the token-classification task, the input column needs to be a list of string,"
"instead of string, e.g., ['EU', 'rejects','German', 'call','to','boycott','British','lamb','.',].",
"For more examples, please refer to test/nlp/test_autohf_tokenclassification.py",
)
else:
assert is_str or is_list_of_int, (
"Each column of the input must either be str (untokenized) "
"or a list of integers (tokenized)"
)
is_all_str &= is_str
is_all_list &= is_list_of_int or is_list_of_str
assert is_all_str or is_all_list, (
"Currently FLAML only supports two modes for NLP: either all columns of X are string (non-tokenized), "
"or all columns of X are integer ids (tokenized)"
)
if isinstance(X, psDataFrame):
# TODO: support pyspark.pandas dataframe in DataTransformer
automl._skip_transform = True
if automl._skip_transform or issparse(X_train_all):
automl._transformer = automl._label_transformer = False
automl._X_train_all, automl._y_train_all = X, y
else:
from flaml.automl.data import DataTransformer
automl._transformer = DataTransformer()
(
automl._X_train_all,
automl._y_train_all,
) = automl._transformer.fit_transform(X, y, self)
automl._label_transformer = automl._transformer.label_transformer
if self.is_token_classification():
if hasattr(automl._label_transformer, "label_list"):
state.fit_kwargs.update(
{"label_list": automl._label_transformer.label_list}
)
elif "label_list" not in state.fit_kwargs:
for each_fit_kwargs in state.fit_kwargs_by_estimator.values():
assert (
"label_list" in each_fit_kwargs
), "For the token-classification task, you must either (1) pass token labels; or (2) pass id labels and the label list. "
"Please refer to the documentation for more details: https://microsoft.github.io/FLAML/docs/Examples/AutoML-NLP#a-simple-token-classification-example"
automl._feature_names_in_ = (
automl._X_train_all.columns.to_list()
if hasattr(automl._X_train_all, "columns")
else None
)
automl._sample_weight_full = state.fit_kwargs.get(
"sample_weight"
) # NOTE: _validate_data is before kwargs is updated to fit_kwargs_by_estimator
if X_val is not None and y_val is not None:
assert isinstance(
X_val, (np.ndarray, pd.DataFrame, psDataFrame)
) or issparse(X_train_all), (
"X_val must be None, a numpy array, a pandas dataframe, "
"a Scipy sparse matrix or a pyspark.pandas dataframe."
)
assert isinstance(y_val, (np.ndarray, pd.Series, psSeries)), (
"y_val must be None, a numpy array, a pandas series "
"or a pyspark.pandas series."
)
assert X_val.size != 0 and y_val.size != 0, (
"Validation data are expected to be nonempty. "
"Use None for X_val and y_val if no validation data."
)
if isinstance(y_val, np.ndarray):
y_val = y_val.flatten()
assert (
X_val.shape[0] == y_val.shape[0]
), "# rows in X_val must match length of y_val."
if automl._transformer:
state.X_val = automl._transformer.transform(X_val)
else:
state.X_val = X_val
# If it's NLG_TASKS, y_val is a pandas series containing the output sequence tokens,
# so we cannot use label_transformer.transform to process it
if automl._label_transformer:
state.y_val = automl._label_transformer.transform(y_val)
else:
state.y_val = y_val
else:
state.X_val = state.y_val = None
if groups is not None and len(groups) != automl._nrow:
# groups is given as group counts
state.groups = np.concatenate([[i] * c for i, c in enumerate(groups)])
assert (
len(state.groups) == automl._nrow
), "the sum of group counts must match the number of examples"
state.groups_val = (
np.concatenate([[i] * c for i, c in enumerate(groups_val)])
if groups_val is not None
else None
)
else:
state.groups_val = groups_val
state.groups = groups
@staticmethod
def _validate_ts_data(
dataframe,
y_train_all=None,
):
assert (
dataframe[dataframe.columns[0]].dtype.name == "datetime64[ns]"
), f"For '{TS_FORECAST}' task, the first column must contain timestamp values."
if y_train_all is not None:
if isinstance(y_train_all, pd.Series):
y_df = pd.DataFrame(y_train_all)
elif isinstance(y_train_all, np.ndarray):
y_df = pd.DataFrame(y_train_all, columns=["labels"])
elif isinstance(y_train_all, (psDataFrame, psSeries)):
# TODO: optimize this
set_option("compute.ops_on_diff_frames", True)
y_df = y_train_all
dataframe = dataframe.join(y_df)
duplicates = dataframe.duplicated()
if isinstance(dataframe, psDataFrame):
if duplicates.any():
logger.warning("Duplicate timestamp values found in timestamp column.")
dataframe = dataframe.drop_duplicates()
logger.warning("Removed duplicate rows based on all columns")
assert (
dataframe[[dataframe.columns[0]]].duplicated().any() is False
), "Duplicate timestamp values with different values for other columns."
ts_series = ps.to_datetime(dataframe[dataframe.columns[0]])
inferred_freq = None # TODO: `pd.infer_freq()` is not implemented yet.
else:
if any(duplicates):
logger.warning(
"Duplicate timestamp values found in timestamp column. "
f"\n{dataframe.loc[duplicates, dataframe][dataframe.columns[0]]}"
)
dataframe = dataframe.drop_duplicates()
logger.warning("Removed duplicate rows based on all columns")
assert (
dataframe[[dataframe.columns[0]]].duplicated() is None
), "Duplicate timestamp values with different values for other columns."
ts_series = pd.to_datetime(dataframe[dataframe.columns[0]])
inferred_freq = pd.infer_freq(ts_series)
if inferred_freq is None:
logger.warning(
"Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. "
)
if y_train_all is not None:
return dataframe.iloc[:, :-1], dataframe.iloc[:, -1]
return dataframe
@staticmethod
def _split_pyspark(state, X_train_all, y_train_all, split_ratio, stratify=None):
# TODO: optimize this
set_option("compute.ops_on_diff_frames", True)
if not isinstance(y_train_all, (psDataFrame, psSeries)):
raise ValueError("y_train_all must be a pyspark.pandas dataframe or series")
df_all_in_one = X_train_all.join(y_train_all)
stratify_column = (
y_train_all.name
if isinstance(y_train_all, psSeries)
else y_train_all.columns[0]
)
ret_sample_weight = False
if (
"sample_weight" in state.fit_kwargs
): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
# fit_kwargs["sample_weight"] is an numpy array
ps_sample_weight = ps.DataFrame(
state.fit_kwargs["sample_weight"],
columns=["sample_weight"],
)
df_all_in_one = df_all_in_one.join(ps_sample_weight)
ret_sample_weight = True
df_all_train, df_all_val = train_test_split_pyspark(
df_all_in_one,
None if stratify is None else stratify_column,
test_fraction=split_ratio,
seed=RANDOM_SEED,
)
columns_to_drop = [
c for c in df_all_train.columns if c in [stratify_column, "sample_weight"]
]
X_train = df_all_train.drop(columns_to_drop)
X_val = df_all_val.drop(columns_to_drop)
y_train = df_all_train[stratify_column]
y_val = df_all_val[stratify_column]
if ret_sample_weight:
return (
X_train,
X_val,
y_train,
y_val,
df_all_train["sample_weight"],
df_all_val["sample_weight"],
)
return X_train, X_val, y_train, y_val
@staticmethod
def _train_test_split(
state, X, y, first=None, rest=None, split_ratio=0.2, stratify=None
):
condition_type = isinstance(X, (psDataFrame, psSeries))
# NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
condition_param = "sample_weight" in state.fit_kwargs
if not condition_type and condition_param:
sample_weight = (
state.fit_kwargs["sample_weight"]
if rest is None
else state.fit_kwargs["sample_weight"][rest]
)
(
X_train,
X_val,
y_train,
y_val,
weight_train,
weight_val,
) = train_test_split(
X,
y,
sample_weight,
test_size=split_ratio,
stratify=stratify,
random_state=RANDOM_SEED,
)
if first is not None:
weight1 = state.fit_kwargs["sample_weight"][first]
state.weight_val = concat(weight1, weight_val)
state.fit_kwargs["sample_weight"] = concat(weight1, weight_train)
else:
state.weight_val = weight_val
state.fit_kwargs["sample_weight"] = weight_train
elif not condition_type and not condition_param:
X_train, X_val, y_train, y_val = train_test_split(
X,
y,
test_size=split_ratio,
stratify=stratify,
random_state=RANDOM_SEED,
)
elif condition_type and condition_param:
(
X_train,
X_val,
y_train,
y_val,
weight_train,
weight_val,
) = GenericTask._split_pyspark(state, X, y, split_ratio, stratify)
if first is not None:
weight1 = state.fit_kwargs["sample_weight"][first]
state.weight_val = concat(weight1, weight_val)
state.fit_kwargs["sample_weight"] = concat(weight1, weight_train)
else:
state.weight_val = weight_val
state.fit_kwargs["sample_weight"] = weight_train
else:
X_train, X_val, y_train, y_val = GenericTask._split_pyspark(
state, X, y, split_ratio, stratify
)
return X_train, X_val, y_train, y_val
def prepare_data(
self,
state,
X_train_all,
y_train_all,
auto_augment,
eval_method,
split_type,
split_ratio,
n_splits,
data_is_df,
sample_weight_full,
) -> int:
X_val, y_val = state.X_val, state.y_val
if issparse(X_val):
X_val = X_val.tocsr()
if issparse(X_train_all):
X_train_all = X_train_all.tocsr()
is_spark_dataframe = isinstance(X_train_all, (psDataFrame, psSeries))
self.is_spark_dataframe = is_spark_dataframe
if (
self.is_classification()
and auto_augment
and state.fit_kwargs.get("sample_weight")
is None # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
and split_type in ["stratified", "uniform"]
and not self.is_token_classification()
):
# logger.info(f"label {pd.unique(y_train_all)}")
if is_spark_dataframe:
label_set, counts = unique_pandas_on_spark(y_train_all)
# TODO: optimize this
set_option("compute.ops_on_diff_frames", True)
else:
label_set, counts = np.unique(y_train_all, return_counts=True)
# augment rare classes
rare_threshld = 20
rare = counts < rare_threshld
rare_label, rare_counts = label_set[rare], counts[rare]
for i, label in enumerate(rare_label.tolist()):
count = rare_count = rare_counts[i]
rare_index = y_train_all == label
n = len(y_train_all)
while count < rare_threshld:
if data_is_df:
X_train_all = concat(
X_train_all, X_train_all.iloc[:n].loc[rare_index]
)
else:
X_train_all = concat(
X_train_all, X_train_all[:n][rare_index, :]
)
if isinstance(y_train_all, (pd.Series, psSeries)):
y_train_all = concat(
y_train_all, y_train_all.iloc[:n].loc[rare_index]
)
else:
y_train_all = np.concatenate(
[y_train_all, y_train_all[:n][rare_index]]
)
count += rare_count
logger.info(f"class {label} augmented from {rare_count} to {count}")
SHUFFLE_SPLIT_TYPES = ["uniform", "stratified"]
if is_spark_dataframe:
# no need to shuffle pyspark dataframe
pass
elif split_type in SHUFFLE_SPLIT_TYPES:
if sample_weight_full is not None:
X_train_all, y_train_all, state.sample_weight_all = shuffle(
X_train_all,
y_train_all,
sample_weight_full,
random_state=RANDOM_SEED,
)
state.fit_kwargs[
"sample_weight"
] = (
state.sample_weight_all
) # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
if isinstance(state.sample_weight_all, pd.Series):
state.sample_weight_all.reset_index(drop=True, inplace=True)
else:
X_train_all, y_train_all = shuffle(
X_train_all, y_train_all, random_state=RANDOM_SEED
)
if data_is_df:
X_train_all.reset_index(drop=True, inplace=True)
if isinstance(y_train_all, pd.Series):
y_train_all.reset_index(drop=True, inplace=True)
X_train, y_train = X_train_all, y_train_all
state.groups_all = state.groups
if X_val is None and eval_method == "holdout":
# if eval_method = holdout, make holdout data
if split_type == "time":
if self.is_ts_forecast():
period = state.fit_kwargs[
"period"
] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
if self.is_ts_forecastpanel():
X_train_all["time_idx"] -= X_train_all["time_idx"].min()
X_train_all["time_idx"] = X_train_all["time_idx"].astype("int")
ids = state.fit_kwargs["group_ids"].copy()
ids.append(TS_TIMESTAMP_COL)
ids.append("time_idx")
y_train_all = (
pd.DataFrame(y_train_all)
if not is_spark_dataframe
else ps.DataFrame(y_train_all)
if isinstance(y_train_all, psSeries)
else y_train_all
)
y_train_all[ids] = X_train_all[ids]
X_train_all = X_train_all.sort_values(ids)
y_train_all = y_train_all.sort_values(ids)
training_cutoff = X_train_all["time_idx"].max() - period
X_train = X_train_all[
X_train_all["time_idx"] <= training_cutoff
]
y_train = y_train_all[
y_train_all["time_idx"] <= training_cutoff
].drop(columns=ids)
X_val = X_train_all[X_train_all["time_idx"] > training_cutoff]
y_val = y_train_all[
y_train_all["time_idx"] > training_cutoff
].drop(columns=ids)
else:
num_samples = X_train_all.shape[0]
assert (
period < num_samples
), f"period={period}>#examples={num_samples}"
split_idx = num_samples - period
X_train = X_train_all[:split_idx]
y_train = y_train_all[:split_idx]
X_val = X_train_all[split_idx:]
y_val = y_train_all[split_idx:]
else:
is_sample_weight = "sample_weight" in state.fit_kwargs
if not is_spark_dataframe and is_sample_weight:
(
X_train,
X_val,
y_train,
y_val,
state.fit_kwargs[
"sample_weight"
], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
state.weight_val,
) = train_test_split(
X_train_all,
y_train_all,
state.fit_kwargs[
"sample_weight"
], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
test_size=split_ratio,
shuffle=False,
)
elif not is_spark_dataframe and not is_sample_weight:
X_train, X_val, y_train, y_val = train_test_split(
X_train_all,
y_train_all,
test_size=split_ratio,
shuffle=False,
)
elif is_spark_dataframe and is_sample_weight:
(
X_train,
X_val,
y_train,
y_val,
state.fit_kwargs[
"sample_weight"
], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
state.weight_val,
) = self._split_pyspark(
state, X_train_all, y_train_all, split_ratio
)
else:
X_train, X_val, y_train, y_val = self._split_pyspark(
state, X_train_all, y_train_all, split_ratio
)
elif split_type == "group":
gss = GroupShuffleSplit(
n_splits=1, test_size=split_ratio, random_state=RANDOM_SEED
)
for train_idx, val_idx in gss.split(
X_train_all, y_train_all, state.groups_all
):
if data_is_df:
X_train = X_train_all.iloc[train_idx]
X_val = X_train_all.iloc[val_idx]
else:
X_train, X_val = X_train_all[train_idx], X_train_all[val_idx]
y_train, y_val = y_train_all[train_idx], y_train_all[val_idx]
state.groups = state.groups_all[train_idx]
state.groups_val = state.groups_all[val_idx]
elif self.is_classification():
# for classification, make sure the labels are complete in both
# training and validation data
label_set, first = unique_value_first_index(y_train_all)
rest = []
last = 0
first.sort()
for i in range(len(first)):
rest.extend(range(last, first[i]))
last = first[i] + 1
rest.extend(range(last, len(y_train_all)))
X_first = X_train_all.iloc[first] if data_is_df else X_train_all[first]
X_rest = X_train_all.iloc[rest] if data_is_df else X_train_all[rest]
y_rest = (
y_train_all[rest]
if isinstance(y_train_all, np.ndarray)
else iloc_pandas_on_spark(y_train_all, rest)
if is_spark_dataframe
else y_train_all.iloc[rest]
)
stratify = y_rest if split_type == "stratified" else None
X_train, X_val, y_train, y_val = self._train_test_split(
state, X_rest, y_rest, first, rest, split_ratio, stratify
)
X_train = concat(X_first, X_train)
y_train = (
concat(label_set, y_train)
if data_is_df
else np.concatenate([label_set, y_train])
)
X_val = concat(X_first, X_val)
y_val = (
concat(label_set, y_val)
if data_is_df
else np.concatenate([label_set, y_val])
)
elif self.is_regression():
X_train, X_val, y_train, y_val = self._train_test_split(
state, X_train_all, y_train_all, split_ratio=split_ratio
)
state.data_size = X_train.shape
state.X_train, state.y_train = X_train, y_train
state.X_val, state.y_val = X_val, y_val
state.X_train_all = X_train_all
state.y_train_all = y_train_all
y_train_all_size = y_train_all.size
if eval_method == "holdout":
state.kf = None
return
if split_type == "group":
# logger.info("Using GroupKFold")
assert (
len(state.groups_all) == y_train_all_size
), "the length of groups must match the number of examples"
assert (
len_labels(state.groups_all) >= n_splits
), "the number of groups must be equal or larger than n_splits"
state.kf = GroupKFold(n_splits)
elif split_type == "stratified":
# logger.info("Using StratifiedKFold")
assert y_train_all_size >= n_splits, (
f"{n_splits}-fold cross validation"
f" requires input data with at least {n_splits} examples."
)
assert y_train_all_size >= 2 * n_splits, (
f"{n_splits}-fold cross validation with metric=r2 "
f"requires input data with at least {n_splits*2} examples."
)
state.kf = RepeatedStratifiedKFold(
n_splits=n_splits, n_repeats=1, random_state=RANDOM_SEED
)
elif split_type == "time":
# logger.info("Using TimeSeriesSplit")
if self.is_ts_forecast() and not self.is_ts_forecastpanel():
period = state.fit_kwargs[
"period"
] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator
if period * (n_splits + 1) > y_train_all_size:
n_splits = int(y_train_all_size / period - 1)
assert n_splits >= 2, (
f"cross validation for forecasting period={period}"
f" requires input data with at least {3 * period} examples."
)
logger.info(f"Using nsplits={n_splits} due to data size limit.")
state.kf = TimeSeriesSplit(n_splits=n_splits, test_size=period)
elif self.is_ts_forecastpanel():
n_groups = len(
X_train.groupby(state.fit_kwargs.get("group_ids")).size()
)
period = state.fit_kwargs.get("period")
state.kf = TimeSeriesSplit(
n_splits=n_splits, test_size=period * n_groups
)
else:
state.kf = TimeSeriesSplit(n_splits=n_splits)
elif isinstance(split_type, str):
# logger.info("Using RepeatedKFold")
state.kf = RepeatedKFold(
n_splits=n_splits, n_repeats=1, random_state=RANDOM_SEED
)
else:
# logger.info("Using splitter object")
state.kf = split_type
if isinstance(state.kf, (GroupKFold, StratifiedGroupKFold)):
# self._split_type is either "group", a GroupKFold object, or a StratifiedGroupKFold object
state.kf.groups = state.groups_all
def decide_split_type(
self,
split_type,
y_train_all,
fit_kwargs,
groups=None,
) -> str:
if self.name == "classification":
self.name = get_classification_objective(len_labels(y_train_all))
if not isinstance(split_type, str):
assert hasattr(split_type, "split") and hasattr(
split_type, "get_n_splits"
), "split_type must be a string or a splitter object with split and get_n_splits methods."
assert (
not isinstance(split_type, GroupKFold) or groups is not None
), "GroupKFold requires groups to be provided."
return split_type
elif self.is_ts_forecast():
assert split_type in ["auto", "time"]
assert isinstance(
fit_kwargs.get("period"),
int, # NOTE: _decide_split_type is before kwargs is updated to fit_kwargs_by_estimator
), f"missing a required integer 'period' for '{TS_FORECAST}' task."
if fit_kwargs.get("group_ids"):
# TODO (MARK) This will likely not play well with the task class
self.name = TS_FORECASTPANEL
assert isinstance(
fit_kwargs.get("group_ids"), list
), f"missing a required List[str] 'group_ids' for '{TS_FORECASTPANEL}' task."
return "time"
elif self.is_classification():
assert split_type in ["auto", "stratified", "uniform", "time", "group"]
return (
split_type
if split_type != "auto"
else groups is None and "stratified" or "group"
)
elif self.is_regression():
assert split_type in ["auto", "uniform", "time", "group"]
return split_type if split_type != "auto" else "uniform"
elif self.is_rank():
assert groups is not None, "groups must be specified for ranking task."
assert split_type in ["auto", "group"]
return "group"
elif self.is_nlg():
assert split_type in ["auto", "uniform", "time", "group"]
return split_type if split_type != "auto" else "uniform"
def preprocess(self, X, transformer=None):
if isinstance(X, List):
try:
if isinstance(X[0], List):
X = [x for x in zip(*X)]
X = pd.DataFrame(
dict(
[
(transformer._str_columns[idx], X[idx])
if isinstance(X[0], List)
else (transformer._str_columns[idx], [X[idx]])
for idx in range(len(X))
]
)
)
except IndexError:
raise IndexError(
"Test data contains more columns than training data, exiting"
)
elif isinstance(X, int):
return X
elif isinstance(X, psDataFrame):
return X
elif issparse(X):
X = X.tocsr()
if self.is_ts_forecast():
X = pd.DataFrame(X)
if transformer:
X = transformer.transform(X)
return X
def evaluate_model_CV(
self,
config: dict,
estimator: EstimatorSubclass,
X_train_all,
y_train_all,
budget,
kf,
eval_metric,
best_val_loss,
cv_score_agg_func=None,
log_training_metric=False,
fit_kwargs: Optional[dict] = None,
free_mem_ratio=0,
):
if fit_kwargs is None:
fit_kwargs = {}
if cv_score_agg_func is None:
cv_score_agg_func = default_cv_score_agg_func
start_time = time.time()
val_loss_folds = []
log_metric_folds = []
metric = None
train_time = pred_time = 0
total_fold_num = 0
n = kf.get_n_splits()
rng = np.random.RandomState(2020)
budget_per_train = budget and budget / n
groups = None
if self.is_classification():
labels = _, labels = len_labels(y_train_all, return_labels=True)
else:
labels = fit_kwargs.get(
"label_list"
) # pass the label list on to compute the evaluation metric
if "sample_weight" in fit_kwargs:
weight = fit_kwargs["sample_weight"]
weight_val = None
else:
weight = weight_val = None
is_spark_dataframe = isinstance(X_train_all, (psDataFrame, psSeries))
if is_spark_dataframe:
dataframe = X_train_all.join(y_train_all)
if weight is not None:
dataframe = dataframe.join(weight)
if isinstance(kf, (GroupKFold, StratifiedGroupKFold)):
groups = kf.groups
dataframe = dataframe.join(groups)
kf = spark_kFold(
dataframe, nFolds=n, foldCol=groups.name if groups is not None else ""
)
shuffle = False
else:
X_train_split, y_train_split = X_train_all, y_train_all
shuffle = getattr(kf, "shuffle", not self.is_ts_forecast())
if isinstance(kf, RepeatedStratifiedKFold):
kf = kf.split(X_train_split, y_train_split)
elif isinstance(kf, (GroupKFold, StratifiedGroupKFold)):
groups = kf.groups
kf = kf.split(X_train_split, y_train_split, groups)
shuffle = False
elif isinstance(kf, TimeSeriesSplit):
kf = kf.split(X_train_split, y_train_split)
else:
kf = kf.split(X_train_split)
for train_index, val_index in kf:
if shuffle:
train_index = rng.permutation(train_index)
if is_spark_dataframe:
# cache data to increase compute speed
X_train = train_index.spark.cache()
X_val = val_index.spark.cache()
y_train = X_train.pop(y_train_all.name)
y_val = X_val.pop(y_train_all.name)
if weight is not None:
weight_val = X_val.pop(weight.name)
fit_kwargs["sample_weight"] = X_train.pop(weight.name)
groups_val = None
elif isinstance(X_train_all, pd.DataFrame):
X_train = X_train_split.iloc[train_index]
X_val = X_train_split.iloc[val_index]
else:
X_train, X_val = X_train_split[train_index], X_train_split[val_index]
if not is_spark_dataframe:
y_train, y_val = y_train_split[train_index], y_train_split[val_index]
if weight is not None:
fit_kwargs["sample_weight"], weight_val = (
weight[train_index],
weight[val_index],
)
if groups is not None:
fit_kwargs["groups"] = (
groups[train_index]
if isinstance(groups, np.ndarray)
else groups.iloc[train_index]
)
groups_val = (
groups[val_index]
if isinstance(groups, np.ndarray)
else groups.iloc[val_index]
)
else:
groups_val = None
estimator.cleanup()
val_loss_i, metric_i, train_time_i, pred_time_i = get_val_loss(
config,
estimator,
X_train,
y_train,
X_val,
y_val,
weight_val,
groups_val,
eval_metric,
self,
labels,
budget_per_train,
log_training_metric=log_training_metric,
fit_kwargs=fit_kwargs,
free_mem_ratio=free_mem_ratio,
)
if isinstance(metric_i, dict) and "intermediate_results" in metric_i.keys():
del metric_i["intermediate_results"]
if weight is not None:
fit_kwargs["sample_weight"] = weight
total_fold_num += 1
val_loss_folds.append(val_loss_i)
log_metric_folds.append(metric_i)
train_time += train_time_i
pred_time += pred_time_i
if is_spark_dataframe:
X_train.spark.unpersist() # uncache data to free memory
X_val.spark.unpersist() # uncache data to free memory
if budget and time.time() - start_time >= budget:
break
val_loss, metric = cv_score_agg_func(val_loss_folds, log_metric_folds)
n = total_fold_num
pred_time /= n
return val_loss, metric, train_time, pred_time
def default_estimator_list(
self, estimator_list: List[str], is_spark_dataframe: bool = False
) -> List[str]:
if "auto" != estimator_list:
n_estimators = len(estimator_list)
if is_spark_dataframe:
# For spark dataframe, only estimators ending with '_spark' are supported
estimator_list = [
est for est in estimator_list if est.endswith("_spark")
]
if len(estimator_list) == 0:
raise ValueError(
"Spark dataframes only support estimator names ending with `_spark`. Non-supported "
"estimators are removed. No estimator is left."
)
elif n_estimators != len(estimator_list):
logger.warning(
"Spark dataframes only support estimator names ending with `_spark`. Non-supported "
"estimators are removed."
)
else:
# For non-spark dataframe, only estimators not ending with '_spark' are supported
estimator_list = [
est for est in estimator_list if not est.endswith("_spark")
]
if len(estimator_list) == 0:
raise ValueError(
"Non-spark dataframes only support estimator names not ending with `_spark`. Non-supported "
"estimators are removed. No estimator is left."
)
elif n_estimators != len(estimator_list):
logger.warning(
"Non-spark dataframes only support estimator names not ending with `_spark`. Non-supported "
"estimators are removed."
)
return estimator_list
if self.is_rank():
estimator_list = ["lgbm", "xgboost", "xgb_limitdepth", "lgbm_spark"]
elif self.is_nlp():
estimator_list = ["transformer"]
elif self.is_ts_forecastpanel():
estimator_list = ["tft"]
else:
try:
import catboost
estimator_list = [
"lgbm",
"rf",
"catboost",
"xgboost",
"extra_tree",
"xgb_limitdepth",
"lgbm_spark",
]
except ImportError:
estimator_list = [
"lgbm",
"rf",
"xgboost",
"extra_tree",
"xgb_limitdepth",
"lgbm_spark",
]
if self.is_ts_forecast():
# catboost is removed because it has a `name` parameter, making it incompatible with hcrystalball
if "catboost" in estimator_list:
estimator_list.remove("catboost")
if self.is_ts_forecastregression():
try:
import prophet
estimator_list += ["prophet", "arima", "sarimax"]
except ImportError:
estimator_list += ["arima", "sarimax"]
elif not self.is_regression():
estimator_list += ["lrl1"]
estimator_list = [
est
for est in estimator_list
if (
est.endswith("_spark")
if is_spark_dataframe
else not est.endswith("_spark")
)
]
return estimator_list
def default_metric(self, metric: str) -> str:
if "auto" != metric:
return metric
if self.is_nlp():
from flaml.automl.nlp.utils import (
load_default_huggingface_metric_for_task,
)
return load_default_huggingface_metric_for_task(self.name)
elif self.is_binary():
return "roc_auc"
elif self.is_multiclass():
return "log_loss"
elif self.is_ts_forecast():
return "mape"
elif self.is_rank():
return "ndcg"
else:
return "r2"