552 lines
24 KiB
Python
Raw Normal View History

'''!
* Copyright (c) 2020-2021 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the
* project root for license information.
'''
from flaml.tune.sample import Domain
from typing import Dict, Optional, Tuple
import numpy as np
try:
from ray import __version__ as ray_version
assert ray_version >= '1.0.0'
from ray.tune.suggest import Searcher
from ray.tune.suggest.variant_generator import generate_variants
from ray.tune import sample
from ray.tune.utils.util import flatten_dict, unflatten_dict
except (ImportError, AssertionError):
from .suggestion import Searcher
from .variant_generator import generate_variants, flatten_dict, unflatten_dict
from ..tune import sample
from ..tune.space import complete_config, denormalize, normalize
import logging
logger = logging.getLogger(__name__)
class FLOW2(Searcher):
'''Local search algorithm FLOW2, with adaptive step size
'''
STEPSIZE = 0.1
STEP_LOWER_BOUND = 0.0001
def __init__(self,
init_config: dict,
metric: Optional[str] = None,
mode: Optional[str] = None,
space: Optional[dict] = None,
prune_attr: Optional[str] = None,
min_resource: Optional[float] = None,
max_resource: Optional[float] = None,
resource_multiple_factor: Optional[float] = 4,
cost_attr: Optional[str] = 'time_total_s',
seed: Optional[int] = 20):
'''Constructor
Args:
init_config: a dictionary of a partial or full initial config,
e.g. from a subset of controlled dimensions
to the initial low-cost values.
e.g. {'epochs': 1}
metric: A string of the metric name to optimize for.
mode: A string in ['min', 'max'] to specify the objective as
Add ChaCha (#92) * pickle the AutoML object * get best model per estimator * test deberta * stateless API * pickle the AutoML object * get best model per estimator * test deberta * stateless API * prevent divide by zero * test roberta * BlendSearchTuner * sync * version number * update gitignore * delta time * reindex columns when dropping int-indexed columns * add seed * add seed in Args * merge * init upload of ChaCha * remove redundancy * add back catboost * improve AutoVW API * set min_resource_lease in VWOnlineTrial * docstr * rename * docstr * add docstr * improve API and documentation * fix name * docstr * naming * remove max_resource in scheduler * add TODO in flow2 * remove redundancy in rearcher * add input type * adapt code from ray.tune * move files * naming * documentation * fix import error * fix format issues * remove cb in worse than test * improve _generate_all_comb * remove ray tune * naming * VowpalWabbitTrial * import error * import error * merge test code * scheduler import * fix import * remove * import, minor bug and version * Float or Categorical * fix default * add test_autovw.py * add vowpalwabbit and openml * lint * reorg * lint * indent * add autovw notebook * update notebook * update log msg and autovw notebook * update autovw notebook * update autovw notebook * add available strings for model_select_policy * string for metric * Update vw format in flaml/onlineml/trial.py Co-authored-by: olgavrou <olgavrou@gmail.com> * make init_config optional * add _setup_trial_runner and update notebook * space Co-authored-by: Chi Wang (MSR) <chiw@microsoft.com> Co-authored-by: Chi Wang <wang.chi@microsoft.com> Co-authored-by: Qingyun Wu <qiw@microsoft.com> Co-authored-by: olgavrou <olgavrou@gmail.com>
2021-06-02 22:08:24 -04:00
minimization or maximization.
cat_hp_cost: A dictionary from a subset of categorical dimensions
to the relative cost of each choice.
e.g.,
.. code-block:: python
{'tree_method': [1, 1, 2]}
i.e., the relative cost of the
three choices of 'tree_method' is 1, 1 and 2 respectively.
space: A dictionary to specify the search space.
prune_attr: A string of the attribute used for pruning.
Not necessarily in space.
When prune_attr is in space, it is a hyperparameter, e.g.,
'n_iters', and the best value is unknown.
When prune_attr is not in space, it is a resource dimension,
e.g., 'sample_size', and the peak performance is assumed
to be at the max_resource.
min_resource: A float of the minimal resource to use for the
prune_attr; only valid if prune_attr is not in space.
max_resource: A float of the maximal resource to use for the
prune_attr; only valid if prune_attr is not in space.
resource_multiple_factor: A float of the multiplicative factor
used for increasing resource.
cost_attr: A string of the attribute used for cost.
seed: An integer of the random seed.
'''
if mode:
assert mode in ["min", "max"], "`mode` must be 'min' or 'max'."
else:
mode = "min"
super(FLOW2, self).__init__(
metric=metric,
mode=mode)
# internally minimizes, so "max" => -1
if mode == "max":
self.metric_op = -1.
elif mode == "min":
self.metric_op = 1.
self.space = space or {}
self._space = flatten_dict(self.space, prevent_delimiter=True)
self._random = np.random.RandomState(seed)
self._seed = seed
self.init_config = init_config
self.best_config = flatten_dict(init_config)
self.prune_attr = prune_attr
self.min_resource = min_resource
self.resource_multiple_factor = resource_multiple_factor or 4
self.cost_attr = cost_attr
self.max_resource = max_resource
self._resource = None
self._step_lb = np.Inf
if space:
self._init_search()
def _init_search(self):
self._tunable_keys = []
self._bounded_keys = []
self._unordered_cat_hp = {}
hier = False
for key, domain in self._space.items():
assert not (isinstance(domain, dict) and 'grid_search' in domain), \
f"{key}'s domain is grid search, not supported in FLOW^2."
if callable(getattr(domain, 'get_sampler', None)):
self._tunable_keys.append(key)
sampler = domain.get_sampler()
# the step size lower bound for uniform variables doesn't depend
# on the current config
if isinstance(sampler, sample.Quantized):
q = sampler.q
sampler = sampler.get_sampler()
if str(sampler) == 'Uniform':
self._step_lb = min(
self._step_lb, q / (domain.upper - domain.lower))
elif isinstance(domain, sample.Integer) and str(sampler) == 'Uniform':
self._step_lb = min(
self._step_lb, 1.0 / (domain.upper - 1 - domain.lower))
if isinstance(domain, sample.Categorical):
if not domain.ordered:
self._unordered_cat_hp[key] = len(domain.categories)
if not hier:
for cat in domain.categories:
if isinstance(cat, dict):
hier = True
break
if str(sampler) != 'Normal':
self._bounded_keys.append(key)
if not hier:
self._space_keys = sorted(self._tunable_keys)
self._hierarchical = hier
if (self.prune_attr and self.prune_attr not in self._space
and self.max_resource):
self.min_resource = self.min_resource or self._min_resource()
self._resource = self._round(self.min_resource)
if not hier:
self._space_keys.append(self.prune_attr)
else:
self._resource = None
self.incumbent = {}
self.incumbent = self.normalize(self.best_config) # flattened
self.best_obj = self.cost_incumbent = None
self.dim = len(self._tunable_keys) # total # tunable dimensions
self._direction_tried = None
self._num_complete4incumbent = self._cost_complete4incumbent = 0
self._num_allowed4incumbent = 2 * self.dim
self._proposed_by = {} # trial_id: int -> incumbent: Dict
self.step_ub = np.sqrt(self.dim)
self.step = self.STEPSIZE * self.step_ub
lb = self.step_lower_bound
if lb > self.step:
self.step = lb * 2
# upper bound
if self.step > self.step_ub:
self.step = self.step_ub
# maximal # consecutive no improvements
self.dir = 2**(min(9, self.dim))
self._configs = {} # dict from trial_id to (config, stepsize)
self._K = 0
self._iter_best_config = self.trial_count_proposed = self.trial_count_complete = 1
self._num_proposedby_incumbent = 0
self._reset_times = 0
# record intermediate trial cost
self._trial_cost = {}
self._same = False # whether the proposed config is the same as best_config
self._init_phase = True # initial phase to increase initial stepsize
self._trunc = 0
# no truncation by default. when > 0, it means how many
# non-zero dimensions to keep in the random unit vector
@property
def step_lower_bound(self) -> float:
step_lb = self._step_lb
for key in self._tunable_keys:
if key not in self.best_config:
continue
domain = self._space[key]
sampler = domain.get_sampler()
# the stepsize lower bound for log uniform variables depends on the
# current config
if isinstance(sampler, sample.Quantized):
q = sampler.q
sampler_inner = sampler.get_sampler()
if str(sampler_inner) == 'LogUniform':
step_lb = min(
step_lb, np.log(1.0 + q / self.best_config[key])
/ np.log(domain.upper / domain.lower))
elif isinstance(domain, sample.Integer) and str(sampler) == 'LogUniform':
step_lb = min(
step_lb, np.log(1.0 + 1.0 / self.best_config[key])
/ np.log((domain.upper - 1) / domain.lower))
if np.isinf(step_lb):
step_lb = self.STEP_LOWER_BOUND
else:
step_lb *= self.step_ub
return step_lb
@property
def resource(self) -> float:
return self._resource
def _min_resource(self) -> float:
''' automatically decide minimal resource
'''
return self.max_resource / np.pow(self.resource_multiple_factor, 5)
def _round(self, resource) -> float:
''' round the resource to self.max_resource if close to it
'''
if resource * self.resource_multiple_factor > self.max_resource:
return self.max_resource
return resource
def rand_vector_gaussian(self, dim, std=1.0):
vec = self._random.normal(0, std, dim)
return vec
def complete_config(
self, partial_config: Dict,
lower: Optional[Dict] = None, upper: Optional[Dict] = None
) -> Tuple[Dict, Dict]:
''' generate a complete config from the partial config input
add minimal resource to config if available
'''
disturb = self._reset_times and partial_config == self.init_config
# if not the first time to complete init_config, use random gaussian
config, space = complete_config(
partial_config, self.space, self, disturb, lower, upper)
if partial_config == self.init_config:
self._reset_times += 1
if self._resource:
config[self.prune_attr] = self.min_resource
return config, space
def create(self, init_config: Dict, obj: float, cost: float, space: Dict
) -> Searcher:
# space is the subspace where the init_config is located
flow2 = self.__class__(
init_config, self.metric, self.mode,
space, self.prune_attr,
self.min_resource, self.max_resource,
self.resource_multiple_factor, self.cost_attr, self._seed + 1)
flow2.best_obj = obj * self.metric_op # minimize internally
flow2.cost_incumbent = cost
self._seed += 1
return flow2
def normalize(self, config, recursive=False) -> Dict:
''' normalize each dimension in config to [0,1]
'''
return normalize(
config, self._space, self.best_config, self.incumbent, recursive)
def denormalize(self, config):
''' denormalize each dimension in config from [0,1]
'''
return denormalize(
config, self._space, self.best_config, self.incumbent, self._random)
def set_search_properties(self,
metric: Optional[str] = None,
mode: Optional[str] = None,
config: Optional[Dict] = None) -> bool:
if metric:
self._metric = metric
if mode:
assert mode in ["min", "max"], "`mode` must be 'min' or 'max'."
self._mode = mode
if mode == "max":
self.metric_op = -1.
elif mode == "min":
self.metric_op = 1.
if config:
self.space = config
self._space = flatten_dict(self.space)
self._init_search()
return True
def on_trial_complete(self, trial_id: str, result: Optional[Dict] = None,
error: bool = False):
''' compare with incumbent
'''
# if better, move, reset num_complete and num_proposed
# if not better and num_complete >= 2*dim, num_allowed += 2
self.trial_count_complete += 1
if not error and result:
obj = result.get(self._metric)
if obj:
obj *= self.metric_op
if self.best_obj is None or obj < self.best_obj:
self.best_obj = obj
self.best_config, self.step = self._configs[trial_id]
self.incumbent = self.normalize(self.best_config)
self.cost_incumbent = result.get(self.cost_attr)
if self._resource:
self._resource = self.best_config[self.prune_attr]
self._num_complete4incumbent = 0
self._cost_complete4incumbent = 0
self._num_proposedby_incumbent = 0
self._num_allowed4incumbent = 2 * self.dim
self._proposed_by.clear()
if self._K > 0:
# self._oldK must have been set when self._K>0
self.step *= np.sqrt(self._K / self._oldK)
if self.step > self.step_ub:
self.step = self.step_ub
self._iter_best_config = self.trial_count_complete
if self._trunc:
self._trunc = min(self._trunc + 1, self.dim)
return
elif self._trunc:
self._trunc = max(self._trunc >> 1, 1)
proposed_by = self._proposed_by.get(trial_id)
if proposed_by == self.incumbent:
# proposed by current incumbent and no better
self._num_complete4incumbent += 1
cost = result.get(
self.cost_attr) if result else self._trial_cost.get(trial_id)
if cost:
self._cost_complete4incumbent += cost
if self._num_complete4incumbent >= 2 * self.dim and \
self._num_allowed4incumbent == 0:
self._num_allowed4incumbent = 2
if self._num_complete4incumbent == self.dir and (
not self._resource or self._resource == self.max_resource):
# check stuck condition if using max resource
self._num_complete4incumbent -= 2
if self._num_allowed4incumbent < 2:
self._num_allowed4incumbent = 2
# elif proposed_by: del self._proposed_by[trial_id]
def on_trial_result(self, trial_id: str, result: Dict):
''' early update of incumbent
'''
if result:
obj = result.get(self._metric)
if obj:
obj *= self.metric_op
if self.best_obj is None or obj < self.best_obj:
self.best_obj = obj
config = self._configs[trial_id][0]
if self.best_config != config:
self.best_config = config
if self._resource:
self._resource = config[self.prune_attr]
self.incumbent = self.normalize(self.best_config)
self.cost_incumbent = result.get(self.cost_attr)
self._cost_complete4incumbent = 0
self._num_complete4incumbent = 0
self._num_proposedby_incumbent = 0
self._num_allowed4incumbent = 2 * self.dim
self._proposed_by.clear()
self._iter_best_config = self.trial_count_complete
cost = result.get(self.cost_attr)
# record the cost in case it is pruned and cost info is lost
self._trial_cost[trial_id] = cost
def rand_vector_unit_sphere(self, dim, trunc=0) -> np.ndarray:
vec = self._random.normal(0, 1, dim)
if 0 < trunc < dim:
vec[np.abs(vec).argsort()[:dim - trunc]] = 0
mag = np.linalg.norm(vec)
return vec / mag
def suggest(self, trial_id: str) -> Optional[Dict]:
''' suggest a new config, one of the following cases:
1. same incumbent, increase resource
2. same resource, move from the incumbent to a random direction
3. same resource, move from the incumbent to the opposite direction
Add ChaCha (#92) * pickle the AutoML object * get best model per estimator * test deberta * stateless API * pickle the AutoML object * get best model per estimator * test deberta * stateless API * prevent divide by zero * test roberta * BlendSearchTuner * sync * version number * update gitignore * delta time * reindex columns when dropping int-indexed columns * add seed * add seed in Args * merge * init upload of ChaCha * remove redundancy * add back catboost * improve AutoVW API * set min_resource_lease in VWOnlineTrial * docstr * rename * docstr * add docstr * improve API and documentation * fix name * docstr * naming * remove max_resource in scheduler * add TODO in flow2 * remove redundancy in rearcher * add input type * adapt code from ray.tune * move files * naming * documentation * fix import error * fix format issues * remove cb in worse than test * improve _generate_all_comb * remove ray tune * naming * VowpalWabbitTrial * import error * import error * merge test code * scheduler import * fix import * remove * import, minor bug and version * Float or Categorical * fix default * add test_autovw.py * add vowpalwabbit and openml * lint * reorg * lint * indent * add autovw notebook * update notebook * update log msg and autovw notebook * update autovw notebook * update autovw notebook * add available strings for model_select_policy * string for metric * Update vw format in flaml/onlineml/trial.py Co-authored-by: olgavrou <olgavrou@gmail.com> * make init_config optional * add _setup_trial_runner and update notebook * space Co-authored-by: Chi Wang (MSR) <chiw@microsoft.com> Co-authored-by: Chi Wang <wang.chi@microsoft.com> Co-authored-by: Qingyun Wu <qiw@microsoft.com> Co-authored-by: olgavrou <olgavrou@gmail.com>
2021-06-02 22:08:24 -04:00
#TODO: better decouple FLOW2 config suggestion and stepsize update
'''
self.trial_count_proposed += 1
if self._num_complete4incumbent > 0 and self.cost_incumbent and \
self._resource and self._resource < self.max_resource and (
self._cost_complete4incumbent
>= self.cost_incumbent * self.resource_multiple_factor):
# consider increasing resource using sum eval cost of complete
# configs
old_resource = self._resource
self._resource = self._round(
self._resource * self.resource_multiple_factor)
self.cost_incumbent *= self._resource / old_resource
config = self.best_config.copy()
config[self.prune_attr] = self._resource
self._direction_tried = None
self._configs[trial_id] = (config, self.step)
return unflatten_dict(config)
self._num_allowed4incumbent -= 1
move = self.incumbent.copy()
if self._direction_tried is not None:
# return negative direction
for i, key in enumerate(self._tunable_keys):
move[key] -= self._direction_tried[i]
self._direction_tried = None
2021-05-01 00:19:41 +00:00
else:
# propose a new direction
self._direction_tried = self.rand_vector_unit_sphere(
self.dim, self._trunc) * self.step
2021-05-01 00:19:41 +00:00
for i, key in enumerate(self._tunable_keys):
move[key] += self._direction_tried[i]
self._project(move)
config = self.denormalize(move)
self._proposed_by[trial_id] = self.incumbent
self._configs[trial_id] = (config, self.step)
self._num_proposedby_incumbent += 1
best_config = self.best_config
if self._init_phase:
if self._direction_tried is None:
if self._same:
# check if the new config is different from best_config
same = True
for key, value in config.items():
if key not in best_config or value != best_config[key]:
same = False
break
if same:
# increase step size
self.step += self.STEPSIZE
if self.step > self.step_ub:
self.step = self.step_ub
else:
# check if the new config is different from best_config
same = True
for key, value in config.items():
if key not in best_config or value != best_config[key]:
same = False
break
self._same = same
if self._num_proposedby_incumbent == self.dir and (
not self._resource or self._resource == self.max_resource):
# check stuck condition if using max resource
self._num_proposedby_incumbent -= 2
self._init_phase = False
if self.step >= self.step_lower_bound:
# decrease step size
self._oldK = self._K if self._K else self._iter_best_config
self._K = self.trial_count_proposed + 1
self.step *= np.sqrt(self._oldK / self._K)
else:
return None
if self._init_phase:
return unflatten_dict(config)
if self._trunc == 1 and self._direction_tried is not None:
# random
for i, key in enumerate(self._tunable_keys):
if self._direction_tried[i] != 0:
for _, generated in generate_variants({'config': {
key: self._space[key]
}}):
if generated['config'][key] != best_config[key]:
config[key] = generated['config'][key]
return unflatten_dict(config)
break
else:
# check if config == best_config
if len(config) == len(best_config):
for key, value in best_config.items():
if value != config[key]:
return unflatten_dict(config)
# print('move to', move)
self.incumbent = move
return unflatten_dict(config)
def _project(self, config):
''' project normalized config in the feasible region and set prune_attr
'''
for key in self._bounded_keys:
value = config[key]
config[key] = max(0, min(1, value))
if self._resource:
config[self.prune_attr] = self._resource
@property
def can_suggest(self) -> bool:
''' can't suggest if 2*dim configs have been proposed for the incumbent
while fewer are completed
'''
return self._num_allowed4incumbent > 0
def config_signature(self, config, space: Dict = None) -> tuple:
''' return the signature tuple of a config
'''
config = flatten_dict(config)
if space:
space = flatten_dict(space)
else:
space = self._space
value_list = []
# self._space_keys doesn't contain keys with const values,
# e.g., "eval_metric": ["logloss", "error"].
keys = sorted(config.keys()) if self._hierarchical else self._space_keys
for key in keys:
value = config[key]
if key == self.prune_attr:
value_list.append(value)
else:
# key must be in space
domain = space[key]
if self._hierarchical:
# can't remove constant for hierarchical search space,
# e.g., learner
if not (domain is None or type(domain) in (str, int, float)
or isinstance(domain, sample.Domain)):
# not domain or hashable
# get rid of list type for hierarchical search space.
continue
if isinstance(domain, sample.Integer):
value_list.append(int(round(value)))
else:
value_list.append(value)
return tuple(value_list)
@property
def converged(self) -> bool:
''' return whether the local search has converged
'''
if self._num_complete4incumbent < self.dir - 2:
return False
# check stepsize after enough configs are completed
return self.step < self.step_lower_bound
def reach(self, other: Searcher) -> bool:
''' whether the incumbent can reach the incumbent of other
'''
config1, config2 = self.best_config, other.best_config
incumbent1, incumbent2 = self.incumbent, other.incumbent
if self._resource and config1[self.prune_attr] > config2[self.prune_attr]:
# resource will not decrease
return False
for key in self._unordered_cat_hp:
# unordered cat choice is hard to reach by chance
if config1[key] != config2.get(key):
return False
delta = np.array(
[incumbent1[key] - incumbent2.get(key, np.inf)
for key in self._tunable_keys])
return np.linalg.norm(delta) <= self.step