"""! * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See LICENSE file in the * project root for license information. """ from flaml.tune.sample import Domain from typing import Dict, Optional, Tuple import numpy as np try: from ray import __version__ as ray_version assert ray_version >= "1.0.0" from ray.tune.suggest import Searcher from ray.tune.suggest.variant_generator import generate_variants from ray.tune import sample from ray.tune.utils.util import flatten_dict, unflatten_dict except (ImportError, AssertionError): from .suggestion import Searcher from .variant_generator import generate_variants from ..tune import sample from ..tune.trial import flatten_dict, unflatten_dict from ..tune.space import complete_config, denormalize, normalize import logging logger = logging.getLogger(__name__) class FLOW2(Searcher): """Local search algorithm FLOW2, with adaptive step size""" STEPSIZE = 0.1 STEP_LOWER_BOUND = 0.0001 def __init__( self, init_config: dict, metric: Optional[str] = None, mode: Optional[str] = None, space: Optional[dict] = None, prune_attr: Optional[str] = None, min_resource: Optional[float] = None, max_resource: Optional[float] = None, resource_multiple_factor: Optional[float] = 4, cost_attr: Optional[str] = "time_total_s", seed: Optional[int] = 20, ): """Constructor Args: init_config: a dictionary of a partial or full initial config, e.g. from a subset of controlled dimensions to the initial low-cost values. e.g. {'epochs': 1} metric: A string of the metric name to optimize for. mode: A string in ['min', 'max'] to specify the objective as minimization or maximization. cat_hp_cost: A dictionary from a subset of categorical dimensions to the relative cost of each choice. e.g., .. code-block:: python {'tree_method': [1, 1, 2]} i.e., the relative cost of the three choices of 'tree_method' is 1, 1 and 2 respectively. space: A dictionary to specify the search space. prune_attr: A string of the attribute used for pruning. Not necessarily in space. When prune_attr is in space, it is a hyperparameter, e.g., 'n_iters', and the best value is unknown. When prune_attr is not in space, it is a resource dimension, e.g., 'sample_size', and the peak performance is assumed to be at the max_resource. min_resource: A float of the minimal resource to use for the prune_attr; only valid if prune_attr is not in space. max_resource: A float of the maximal resource to use for the prune_attr; only valid if prune_attr is not in space. resource_multiple_factor: A float of the multiplicative factor used for increasing resource. cost_attr: A string of the attribute used for cost. seed: An integer of the random seed. """ if mode: assert mode in ["min", "max"], "`mode` must be 'min' or 'max'." else: mode = "min" super(FLOW2, self).__init__(metric=metric, mode=mode) # internally minimizes, so "max" => -1 if mode == "max": self.metric_op = -1.0 elif mode == "min": self.metric_op = 1.0 self.space = space or {} self._space = flatten_dict(self.space, prevent_delimiter=True) self._random = np.random.RandomState(seed) self.seed = seed self.init_config = init_config self.best_config = flatten_dict(init_config) self.prune_attr = prune_attr self.min_resource = min_resource self.resource_multiple_factor = resource_multiple_factor or 4 self.cost_attr = cost_attr self.max_resource = max_resource self._resource = None self._step_lb = np.Inf if space is not None: self._init_search() def _init_search(self): self._tunable_keys = [] self._bounded_keys = [] self._unordered_cat_hp = {} hier = False for key, domain in self._space.items(): assert not ( isinstance(domain, dict) and "grid_search" in domain ), f"{key}'s domain is grid search, not supported in FLOW^2." if callable(getattr(domain, "get_sampler", None)): self._tunable_keys.append(key) sampler = domain.get_sampler() # the step size lower bound for uniform variables doesn't depend # on the current config if isinstance(sampler, sample.Quantized): q = sampler.q sampler = sampler.get_sampler() if str(sampler) == "Uniform": self._step_lb = min( self._step_lb, q / (domain.upper - domain.lower) ) elif isinstance(domain, sample.Integer) and str(sampler) == "Uniform": self._step_lb = min( self._step_lb, 1.0 / (domain.upper - 1 - domain.lower) ) if isinstance(domain, sample.Categorical): if not domain.ordered: self._unordered_cat_hp[key] = len(domain.categories) if not hier: for cat in domain.categories: if isinstance(cat, dict): hier = True break if str(sampler) != "Normal": self._bounded_keys.append(key) if not hier: self._space_keys = sorted(self._tunable_keys) self.hierarchical = hier if self.prune_attr and self.prune_attr not in self._space and self.max_resource: self.min_resource = self.min_resource or self._min_resource() self._resource = self._round(self.min_resource) if not hier: self._space_keys.append(self.prune_attr) else: self._resource = None self.incumbent = {} self.incumbent = self.normalize(self.best_config) # flattened self.best_obj = self.cost_incumbent = None self.dim = len(self._tunable_keys) # total # tunable dimensions self._direction_tried = None self._num_complete4incumbent = self._cost_complete4incumbent = 0 self._num_allowed4incumbent = 2 * self.dim self._proposed_by = {} # trial_id: int -> incumbent: Dict self.step_ub = np.sqrt(self.dim) self.step = self.STEPSIZE * self.step_ub lb = self.step_lower_bound if lb > self.step: self.step = lb * 2 # upper bound if self.step > self.step_ub: self.step = self.step_ub # maximal # consecutive no improvements self.dir = 2 ** (min(9, self.dim)) self._configs = {} # dict from trial_id to (config, stepsize) self._K = 0 self._iter_best_config = 1 self.trial_count_proposed = self.trial_count_complete = 1 self._num_proposedby_incumbent = 0 self._reset_times = 0 # record intermediate trial cost self._trial_cost = {} self._same = False # whether the proposed config is the same as best_config self._init_phase = True # initial phase to increase initial stepsize self._trunc = 0 # no truncation by default. when > 0, it means how many # non-zero dimensions to keep in the random unit vector @property def step_lower_bound(self) -> float: step_lb = self._step_lb for key in self._tunable_keys: if key not in self.best_config: continue domain = self._space[key] sampler = domain.get_sampler() # the stepsize lower bound for log uniform variables depends on the # current config if isinstance(sampler, sample.Quantized): q = sampler.q sampler_inner = sampler.get_sampler() if str(sampler_inner) == "LogUniform": step_lb = min( step_lb, np.log(1.0 + q / self.best_config[key]) / np.log(domain.upper / domain.lower), ) elif isinstance(domain, sample.Integer) and str(sampler) == "LogUniform": step_lb = min( step_lb, np.log(1.0 + 1.0 / self.best_config[key]) / np.log((domain.upper - 1) / domain.lower), ) if np.isinf(step_lb): step_lb = self.STEP_LOWER_BOUND else: step_lb *= self.step_ub return step_lb @property def resource(self) -> float: return self._resource def _min_resource(self) -> float: """automatically decide minimal resource""" return self.max_resource / np.pow(self.resource_multiple_factor, 5) def _round(self, resource) -> float: """round the resource to self.max_resource if close to it""" if resource * self.resource_multiple_factor > self.max_resource: return self.max_resource return resource def rand_vector_gaussian(self, dim, std=1.0): vec = self._random.normal(0, std, dim) return vec def complete_config( self, partial_config: Dict, lower: Optional[Dict] = None, upper: Optional[Dict] = None, ) -> Tuple[Dict, Dict]: """generate a complete config from the partial config input add minimal resource to config if available """ disturb = self._reset_times and partial_config == self.init_config # if not the first time to complete init_config, use random gaussian config, space = complete_config( partial_config, self.space, self, disturb, lower, upper ) if partial_config == self.init_config: self._reset_times += 1 if self._resource: config[self.prune_attr] = self.min_resource return config, space def create( self, init_config: Dict, obj: float, cost: float, space: Dict ) -> Searcher: # space is the subspace where the init_config is located flow2 = self.__class__( init_config, self.metric, self.mode, space, self.prune_attr, self.min_resource, self.max_resource, self.resource_multiple_factor, self.cost_attr, self.seed + 1, ) flow2.best_obj = obj * self.metric_op # minimize internally flow2.cost_incumbent = cost self.seed += 1 return flow2 def normalize(self, config, recursive=False) -> Dict: """normalize each dimension in config to [0,1]""" return normalize( config, self._space, self.best_config, self.incumbent, recursive ) def denormalize(self, config): """denormalize each dimension in config from [0,1]""" return denormalize( config, self._space, self.best_config, self.incumbent, self._random ) def set_search_properties( self, metric: Optional[str] = None, mode: Optional[str] = None, config: Optional[Dict] = None, ) -> bool: if metric: self._metric = metric if mode: assert mode in ["min", "max"], "`mode` must be 'min' or 'max'." self._mode = mode if mode == "max": self.metric_op = -1.0 elif mode == "min": self.metric_op = 1.0 if config: self.space = config self._space = flatten_dict(self.space) self._init_search() return True def on_trial_complete( self, trial_id: str, result: Optional[Dict] = None, error: bool = False ): # compare with incumbent # if better, move, reset num_complete and num_proposed # if not better and num_complete >= 2*dim, num_allowed += 2 self.trial_count_complete += 1 if not error and result: obj = result.get(self._metric) if obj: obj *= self.metric_op if self.best_obj is None or obj < self.best_obj: self.best_obj = obj self.best_config, self.step = self._configs[trial_id] self.incumbent = self.normalize(self.best_config) self.cost_incumbent = result.get(self.cost_attr) if self._resource: self._resource = self.best_config[self.prune_attr] self._num_complete4incumbent = 0 self._cost_complete4incumbent = 0 self._num_proposedby_incumbent = 0 self._num_allowed4incumbent = 2 * self.dim self._proposed_by.clear() if self._K > 0: # self._oldK must have been set when self._K>0 self.step *= np.sqrt(self._K / self._oldK) if self.step > self.step_ub: self.step = self.step_ub self._iter_best_config = self.trial_count_complete if self._trunc: self._trunc = min(self._trunc + 1, self.dim) return elif self._trunc: self._trunc = max(self._trunc >> 1, 1) proposed_by = self._proposed_by.get(trial_id) if proposed_by == self.incumbent: # proposed by current incumbent and no better self._num_complete4incumbent += 1 cost = ( result.get(self.cost_attr) if result else self._trial_cost.get(trial_id) ) if cost: self._cost_complete4incumbent += cost if ( self._num_complete4incumbent >= 2 * self.dim and self._num_allowed4incumbent == 0 ): self._num_allowed4incumbent = 2 if self._num_complete4incumbent == self.dir and ( not self._resource or self._resource == self.max_resource ): # check stuck condition if using max resource self._num_complete4incumbent -= 2 if self._num_allowed4incumbent < 2: self._num_allowed4incumbent = 2 # elif proposed_by: del self._proposed_by[trial_id] def on_trial_result(self, trial_id: str, result: Dict): """early update of incumbent""" if result: obj = result.get(self._metric) if obj: obj *= self.metric_op if self.best_obj is None or obj < self.best_obj: self.best_obj = obj config = self._configs[trial_id][0] if self.best_config != config: self.best_config = config if self._resource: self._resource = config[self.prune_attr] self.incumbent = self.normalize(self.best_config) self.cost_incumbent = result.get(self.cost_attr) self._cost_complete4incumbent = 0 self._num_complete4incumbent = 0 self._num_proposedby_incumbent = 0 self._num_allowed4incumbent = 2 * self.dim self._proposed_by.clear() self._iter_best_config = self.trial_count_complete cost = result.get(self.cost_attr) # record the cost in case it is pruned and cost info is lost self._trial_cost[trial_id] = cost def rand_vector_unit_sphere(self, dim, trunc=0) -> np.ndarray: vec = self._random.normal(0, 1, dim) if 0 < trunc < dim: vec[np.abs(vec).argsort()[: dim - trunc]] = 0 mag = np.linalg.norm(vec) return vec / mag def suggest(self, trial_id: str) -> Optional[Dict]: """suggest a new config, one of the following cases: 1. same incumbent, increase resource 2. same resource, move from the incumbent to a random direction 3. same resource, move from the incumbent to the opposite direction #TODO: better decouple FLOW2 config suggestion and stepsize update """ self.trial_count_proposed += 1 if ( self._num_complete4incumbent > 0 and self.cost_incumbent and self._resource and self._resource < self.max_resource and ( self._cost_complete4incumbent >= self.cost_incumbent * self.resource_multiple_factor ) ): # consider increasing resource using sum eval cost of complete # configs old_resource = self._resource self._resource = self._round(self._resource * self.resource_multiple_factor) self.cost_incumbent *= self._resource / old_resource config = self.best_config.copy() config[self.prune_attr] = self._resource self._direction_tried = None self._configs[trial_id] = (config, self.step) return unflatten_dict(config) self._num_allowed4incumbent -= 1 move = self.incumbent.copy() if self._direction_tried is not None: # return negative direction for i, key in enumerate(self._tunable_keys): move[key] -= self._direction_tried[i] self._direction_tried = None else: # propose a new direction self._direction_tried = ( self.rand_vector_unit_sphere(self.dim, self._trunc) * self.step ) for i, key in enumerate(self._tunable_keys): move[key] += self._direction_tried[i] self._project(move) config = self.denormalize(move) self._proposed_by[trial_id] = self.incumbent self._configs[trial_id] = (config, self.step) self._num_proposedby_incumbent += 1 best_config = self.best_config if self._init_phase: if self._direction_tried is None: if self._same: # check if the new config is different from best_config same = True for key, value in config.items(): if key not in best_config or value != best_config[key]: same = False break if same: # increase step size self.step += self.STEPSIZE if self.step > self.step_ub: self.step = self.step_ub else: # check if the new config is different from best_config same = True for key, value in config.items(): if key not in best_config or value != best_config[key]: same = False break self._same = same if self._num_proposedby_incumbent == self.dir and ( not self._resource or self._resource == self.max_resource ): # check stuck condition if using max resource self._num_proposedby_incumbent -= 2 self._init_phase = False if self.step >= self.step_lower_bound: # decrease step size self._oldK = self._K if self._K else self._iter_best_config self._K = self.trial_count_proposed + 1 self.step *= np.sqrt(self._oldK / self._K) else: return None if self._init_phase: return unflatten_dict(config) if self._trunc == 1 and self._direction_tried is not None: # random for i, key in enumerate(self._tunable_keys): if self._direction_tried[i] != 0: for _, generated in generate_variants( {"config": {key: self._space[key]}} ): if generated["config"][key] != best_config[key]: config[key] = generated["config"][key] return unflatten_dict(config) break else: # check if config == best_config if len(config) == len(best_config): for key, value in best_config.items(): if value != config[key]: return unflatten_dict(config) # print('move to', move) self.incumbent = move return unflatten_dict(config) def _project(self, config): """project normalized config in the feasible region and set prune_attr""" for key in self._bounded_keys: value = config[key] config[key] = max(0, min(1, value)) if self._resource: config[self.prune_attr] = self._resource @property def can_suggest(self) -> bool: """can't suggest if 2*dim configs have been proposed for the incumbent while fewer are completed """ return self._num_allowed4incumbent > 0 def config_signature(self, config, space: Dict = None) -> tuple: """return the signature tuple of a config""" config = flatten_dict(config) if space: space = flatten_dict(space) else: space = self._space value_list = [] # self._space_keys doesn't contain keys with const values, # e.g., "eval_metric": ["logloss", "error"]. keys = sorted(config.keys()) if self.hierarchical else self._space_keys for key in keys: value = config[key] if key == self.prune_attr: value_list.append(value) else: # key must be in space domain = space[key] if self.hierarchical: # can't remove constant for hierarchical search space, # e.g., learner if not ( domain is None or type(domain) in (str, int, float) or isinstance(domain, sample.Domain) ): # not domain or hashable # get rid of list type for hierarchical search space. continue if isinstance(domain, sample.Integer): value_list.append(int(round(value))) else: value_list.append(value) return tuple(value_list) @property def converged(self) -> bool: """return whether the local search has converged""" if self._num_complete4incumbent < self.dir - 2: return False # check stepsize after enough configs are completed return self.step < self.step_lower_bound def reach(self, other: Searcher) -> bool: """whether the incumbent can reach the incumbent of other""" config1, config2 = self.best_config, other.best_config incumbent1, incumbent2 = self.incumbent, other.incumbent if self._resource and config1[self.prune_attr] > config2[self.prune_attr]: # resource will not decrease return False for key in self._unordered_cat_hp: # unordered cat choice is hard to reach by chance if config1[key] != config2.get(key): return False delta = np.array( [ incumbent1[key] - incumbent2.get(key, np.inf) for key in self._tunable_keys ] ) return np.linalg.norm(delta) <= self.step