autogen/flaml/onlineml/trial_runner.py
Qingyun Wu 0d3a0bfab6
Add ChaCha (#92)
* pickle the AutoML object

* get best model per estimator

* test deberta

* stateless API

* pickle the AutoML object

* get best model per estimator

* test deberta

* stateless API

* prevent divide by zero

* test roberta

* BlendSearchTuner

* sync

* version number

* update gitignore

* delta time

* reindex columns when dropping int-indexed columns

* add seed

* add seed in Args

* merge

* init upload of ChaCha

* remove redundancy

* add back catboost

* improve AutoVW API

* set min_resource_lease in VWOnlineTrial

* docstr

* rename

* docstr

* add docstr

* improve API and documentation

* fix name

* docstr

* naming

* remove max_resource in scheduler

* add TODO in flow2

* remove redundancy in rearcher

* add input type

* adapt code from ray.tune

* move files

* naming

* documentation

* fix import error

* fix format issues

* remove cb in worse than test

* improve _generate_all_comb

* remove ray tune

* naming

* VowpalWabbitTrial

* import error

* import error

* merge test code

* scheduler import

* fix import

* remove

* import, minor bug and version

* Float or Categorical

* fix default

* add test_autovw.py

* add vowpalwabbit and openml

* lint

* reorg

* lint

* indent

* add autovw notebook

* update notebook

* update log msg and autovw notebook

* update autovw notebook

* update autovw notebook

* add available strings for model_select_policy

* string for metric

* Update vw format in flaml/onlineml/trial.py

Co-authored-by: olgavrou <olgavrou@gmail.com>

* make init_config optional

* add _setup_trial_runner and update notebook

* space

Co-authored-by: Chi Wang (MSR) <chiw@microsoft.com>
Co-authored-by: Chi Wang <wang.chi@microsoft.com>
Co-authored-by: Qingyun Wu <qiw@microsoft.com>
Co-authored-by: olgavrou <olgavrou@gmail.com>
2021-06-02 22:08:24 -04:00

496 lines
24 KiB
Python

import time
import numpy as np
import math
from flaml.tune import Trial
from flaml.scheduler import TrialScheduler
import logging
logger = logging.getLogger(__name__)
class OnlineTrialRunner:
"""The OnlineTrialRunner class
Methods:
step(max_live_model_num, data_sample, prediction_trial_tuple)
Outputs a _max_live_model_num number of trials to run each time it is called
get_top_running_trials()
Get a list of trial ids, whose performance is among the top running trials
add_trial(trial)
Add trial to this TrialRunner.
stop_trial(trial)
Set the status of a trial to be Trial.TERMINATED and perform other subsequent operations
pause_trial(trial)
Set the status of a trial to be Trial.PAUSED and perform other subsequent operations
run_trial(trial)
Set the status of a trial to be Trial.RUNNING and perform other subsequent operations
get_trials()
Get all the trials added (whatever that status) in the the OnlineTrialRunner
NOTE about the status of a trial:
Trial.PENDING: All trials are set to be pending when frist added into the OnlineTrialRunner until
it is selected to run. By this definition, a trial with status Trial.PENDING is a challenger
trial added to the OnlineTrialRunner but never been selected to run.
It denotes the starting of trial's lifespan in the OnlineTrialRunner.
Trial.RUNNING: It indicates that this trial is one of the concurrently running trials.
The max number of Trial.RUNNING trials is running_budget.
The status of a trial will be set to Trial.RUNNING the next time it selected to run.
A trial's status may have the following change:
Trial.PENDING -> Trial.RUNNING
Trial.PAUSED - > Trial.RUNNING
Trial.PAUSED: The status of a trial is set to Trial.PAUSED once it is removed from the running trials.
Trial.RUNNING - > Trial.PAUSED
Trial.TERMINATED: set the status of a trial to Trial.TERMINATED when you never want to select it.
It denotes the real end of a trial's lifespan.
Status change routine of a trial
Trial.PENDING -> (Trial.RUNNING -> Trial.PAUSED -> Trial.RUNNING -> ...) -> Trial.TERMINATED(optional)
"""
RANDOM_SEED = 123456
WARMSTART_NUM = 100
def __init__(self,
max_live_model_num: int,
searcher=None,
scheduler=None,
champion_test_policy='loss_ucb',
**kwargs
):
"""Constructor
Args:
max_live_model_num: The maximum number of 'live'/running models allowed.
searcher: A class for generating Trial objects progressively. The ConfigOracle
is implemented in the searcher.
Required methods of the searcher:
- next_trial()
Generate the next trial to add.
- set_search_properties(metric: Optional[str], mode: Optional[str], config: dict)
Generate new challengers based on the current champion and update the challenger list
- on_trial_result(trial_id: str, result: Dict)
Reprot results to the scheduler.
scheduler: A class for managing the 'live' trials and allocating the resources for the trials.
Required methods of the scheduler:
- on_trial_add(trial_runner, trial: Trial)
It adds candidate trials to the scheduler. It is called inside of the add_trial
function in the TrialRunner.
- on_trial_remove(trial_runner, trial: Trial)
Remove terminated trials from the scheduler.
- on_trial_result(trial_runner, trial: Trial, result: Dict)
Reprot results to the scheduler.
- choose_trial_to_run(trial_runner) -> Optional[Trial]
Among them, on_trial_result and choose_trial_to_run are the most important methods
champion_test_policy: A string to specify what test policy to test for champion.
Currently can choose from ['loss_ucb', 'loss_avg', 'loss_lcb', None].
"""
# OnlineTrialRunner setting
self._searcher = searcher
self._scheduler = scheduler
self._champion_test_policy = champion_test_policy
self._max_live_model_num = max_live_model_num
self._remove_worse = kwargs.get('remove_worse', True)
self._bound_trial_num = kwargs.get('bound_trial_num', False)
self._no_model_persistence = True
# stores all the trials added to the OnlineTrialRunner
# i.e., include the champion and all the challengers
self._trials = []
self._champion_trial = None
self._best_challenger_trial = None
self._first_challenger_pool_size = None
self._random_state = np.random.RandomState(self.RANDOM_SEED)
self._running_trials = set()
# initially schedule up to max_live_model_num of live models and
# set the first trial as the champion (which is done inside self.step())
self._total_steps = 0
logger.info('init step %s', self._max_live_model_num)
# TODO: add more comments
self.step()
assert self._champion_trial is not None
@property
def champion_trial(self) -> Trial:
"""The champion trial
"""
return self._champion_trial
@property
def running_trials(self):
"""The running/'live' trials
"""
return self._running_trials
def step(self, data_sample=None, prediction_trial_tuple=None):
"""Schedule up to max_live_model_num trials to run
Args:
data_sample
prediction_trial_tuple
NOTE:
It consists of the following several parts:
Update model:
0. Update running trials using observations received.
Tests for Champion
1. Test for champion (BetterThan test, and WorseThan test)
1.1 BetterThan test
1.2 WorseThan test: a trial may be removed if WroseThan test is triggered
Online Scheduling:
2. Report results to the searcher and scheduler (the scheduler will return a decision about
the status of the running trials).
3. Pause or stop a trial according to the scheduler's decision.
Add trial into the OnlineTrialRunner if there are opening slots.
TODO:
add documentation about the Args
"""
# ***********Update running trials with observation***************************
if data_sample is not None:
self._total_steps += 1
prediction_made, prediction_trial = prediction_trial_tuple[0], prediction_trial_tuple[1]
# assert prediction_trial.status == Trial.RUNNING
trials_to_pause = []
for trial in list(self._running_trials):
if trial != prediction_trial:
y_predicted = trial.predict(data_sample)
else:
y_predicted = prediction_made
trial.train_eval_model_online(data_sample, y_predicted)
logger.debug('running trial at iter %s %s %s %s %s %s', self._total_steps,
trial.trial_id, trial.result.loss_avg, trial.result.loss_cb,
trial.result.resource_used, trial.resource_lease)
# report result to the searcher
self._searcher.on_trial_result(trial.trial_id, trial.result)
# report result to the scheduler and the scheduler makes a decision about
# the running status of the trial
decision = self._scheduler.on_trial_result(self, trial, trial.result)
# set the status of the trial according to the decision made by the scheduler
logger.debug('trial decision %s %s at step %s', decision, trial.trial_id, self._total_steps)
if decision == TrialScheduler.STOP:
self.stop_trial(trial)
elif decision == TrialScheduler.PAUSE:
trials_to_pause.append(trial)
else:
self.run_trial(trial)
# ***********Statistical test of champion*************************************
self._champion_test()
# Pause the trial after the tests because the tests involves the reset of the trial's result
for trial in trials_to_pause:
self.pause_trial(trial)
# ***********Add and schedule new trials to run if there are opening slots****
# Add trial if needed: add challengers into consideration through _add_trial_from_searcher()
# if there are available slots
for _ in range(self._max_live_model_num - len(self._running_trials)):
self._add_trial_from_searcher()
# Scheduling: schedule up to max_live_model_num number of trials to run
# (set the status as Trial.RUNNING)
while self._max_live_model_num > len(self._running_trials):
trial_to_run = self._scheduler.choose_trial_to_run(self)
if trial_to_run is not None:
self.run_trial(trial_to_run)
else:
break
def get_top_running_trials(self, top_ratio=None, top_metric='ucb') -> list:
"""Get a list of trial ids, whose performance is among the top running trials
"""
running_valid_trials = [trial for trial in self._running_trials if
trial.result is not None]
if not running_valid_trials:
return
if top_ratio is None:
top_number = 0
elif isinstance(top_ratio, float):
top_number = math.ceil(len(running_valid_trials) * top_ratio)
elif isinstance(top_ratio, str) and 'best' in top_ratio:
top_number = 1
else:
raise NotImplementedError
if 'ucb' in top_metric:
test_attribute = 'loss_ucb'
elif 'avg' in top_metric:
test_attribute = 'loss_avg'
elif 'lcb' in top_metric:
test_attribute = 'loss_lcb'
else:
raise NotImplementedError
top_running_valid_trials = []
logger.info('Running trial ids %s', [trial.trial_id for trial in running_valid_trials])
self._random_state.shuffle(running_valid_trials)
results = [trial.result.get_score(test_attribute) for trial in running_valid_trials]
sorted_index = np.argsort(np.array(results)) # sorted result (small to large) index
for i in range(min(top_number, len(running_valid_trials))):
top_running_valid_trials.append(running_valid_trials[sorted_index[i]])
logger.info('Top running ids %s', [trial.trial_id for trial in top_running_valid_trials])
return top_running_valid_trials
def _add_trial_from_searcher(self):
"""Add a new trial to this TrialRunner.
NOTE:
The new trial is acquired from the input search algorithm, i.e. self._searcher
A 'new' trial means the trial is not in self._trial
"""
# (optionally) upper bound the number of trials in the OnlineTrialRunner
if self._bound_trial_num and self._first_challenger_pool_size is not None:
active_trial_size = len([t for t in self._trials if t.status != Trial.TERMINATED])
trial_num_upper_bound = int(round((np.log10(self._total_steps) + 1) * self._first_challenger_pool_size)
) if self._first_challenger_pool_size else np.inf
if active_trial_size > trial_num_upper_bound:
logger.info('Not adding new trials: %s exceeds trial limit %s.',
active_trial_size, trial_num_upper_bound)
return None
# output one trial from the trial pool (new challenger pool) maintained in the searcher
# Assumption on the searcher: when all frontiers (i.e., all the challengers generated
# based on the current champion) of the current champion are added, calling next_trial()
# will return None
trial = self._searcher.next_trial()
if trial is not None:
self.add_trial(trial) # dup checked in add_trial
# the champion_trial is initially None, so we need to set it up the first time
# a valid trial is added.
# Assumption on self._searcher: the first trial generated is the champion trial
if self._champion_trial is None:
logger.info('Initial set up of the champion trial %s', trial.config)
self._set_champion(trial)
else:
self._all_new_challengers_added = True
if self._first_challenger_pool_size is None:
self._first_challenger_pool_size = len(self._trials)
def _champion_test(self):
"""Perform tests again the latest champion, including bette_than tests and worse_than tests
"""
# for BetterThan test, we only need to compare the best challenger with the champion
self._get_best_challenger()
if self._best_challenger_trial is not None:
assert self._best_challenger_trial.trial_id != self._champion_trial.trial_id
# test whether a new champion is found and set the trial properties accordingly
is_new_champion_found = self._better_than_champion_test(self._best_challenger_trial)
if is_new_champion_found:
self._set_champion(new_champion_trial=self._best_challenger_trial)
# performs _worse_than_champion_test, which is an optional component in ChaCha
if self._remove_worse:
to_stop = []
for trial_to_test in self._trials:
if trial_to_test.status != Trial.TERMINATED:
worse_than_champion = self._worse_than_champion_test(
self._champion_trial, trial_to_test, self.WARMSTART_NUM)
if worse_than_champion:
to_stop.append(trial_to_test)
# we want to ensure there are at least #max_live_model_num of challengers remaining
max_to_stop_num = len([t for t in self._trials if t.status != Trial.TERMINATED]
) - self._max_live_model_num
for i in range(min(max_to_stop_num, len(to_stop))):
self.stop_trial(to_stop[i])
def _get_best_challenger(self):
"""Get the 'best' (in terms of the champion_test_policy) challenger under consideration.
"""
if self._champion_test_policy is None:
return
if 'ucb' in self._champion_test_policy:
test_attribute = 'loss_ucb'
elif 'avg' in self._champion_test_policy:
test_attribute = 'loss_avg'
else:
raise NotImplementedError
active_trials = [trial for trial in self._trials if
(trial.status != Trial.TERMINATED
and trial.trial_id != self._champion_trial.trial_id
and trial.result is not None)]
if active_trials:
self._random_state.shuffle(active_trials)
results = [trial.result.get_score(test_attribute) for trial in active_trials]
best_index = np.argmin(results)
self._best_challenger_trial = active_trials[best_index]
def _set_champion(self, new_champion_trial):
"""Set the status of the existing trials once a new champion is found.
"""
assert new_champion_trial is not None
is_init_update = False
if self._champion_trial is None:
is_init_update = True
self.run_trial(new_champion_trial)
# set the checked_under_current_champion status of the trials
for trial in self._trials:
if trial.trial_id == new_champion_trial.trial_id:
trial.set_checked_under_current_champion(True)
else:
trial.set_checked_under_current_champion(False)
self._champion_trial = new_champion_trial
self._all_new_challengers_added = False
logger.info('Set the champion as %s', self._champion_trial.trial_id)
if not is_init_update:
self._champion_update_times += 1
# calling set_search_properties of searcher will trigger
# new challenger generation. we do not do this for init champion
# as this step is already done when first constructing the searcher
self._searcher.set_search_properties(None, None,
{self._searcher.CHAMPION_TRIAL_NAME: self._champion_trial}
)
else:
self._champion_update_times = 0
def get_trials(self) -> list:
"""Return the list of trials managed by this TrialRunner.
"""
return self._trials
def add_trial(self, new_trial):
"""Add a new trial to this TrialRunner.
Trials may be added at any time.
Args:
trial (Trial): Trial to queue.
NOTE:
Only add the new trial when it does not exist (according to the trial_id, which is
the signature of the trail) in self._trials.
"""
for trial in self._trials:
if trial.trial_id == new_trial.trial_id:
trial.set_checked_under_current_champion(True)
return
logger.info('adding trial at iter %s, %s %s', self._total_steps, new_trial.trial_id,
len(self._trials))
self._trials.append(new_trial)
self._scheduler.on_trial_add(self, new_trial)
def stop_trial(self, trial):
"""Stop a trial: set the status of a trial to be Trial.TERMINATED and perform
other subsequent operations
"""
if trial.status in [Trial.ERROR, Trial.TERMINATED]:
return
else:
logger.info('Terminating trial %s, with trial result %s',
trial.trial_id, trial.result)
trial.set_status(Trial.TERMINATED)
# clean up model and result
trial.clean_up_model()
self._scheduler.on_trial_remove(self, trial)
self._searcher.on_trial_complete(trial.trial_id)
self._running_trials.remove(trial)
def pause_trial(self, trial):
"""Pause a trial: set the status of a trial to be Trial.PAUSED and perform other
subsequent operations
"""
if trial.status in [Trial.ERROR, Trial.TERMINATED]:
return
else:
logger.info('Pausing trial %s, with trial loss_avg: %s, loss_cb: %s, loss_ucb: %s,\
resource_lease: %s', trial.trial_id, trial.result.loss_avg,
trial.result.loss_cb, trial.result.loss_avg + trial.result.loss_cb,
trial.resource_lease)
trial.set_status(Trial.PAUSED)
# clean up model and result if no model persistence
if self._no_model_persistence:
trial.clean_up_model()
self._running_trials.remove(trial)
def run_trial(self, trial):
"""Run a trial: set the status of a trial to be Trial.RUNNING and perform other
subsequent operations
"""
if trial.status in [Trial.ERROR, Trial.TERMINATED]:
return
else:
trial.set_status(Trial.RUNNING)
self._running_trials.add(trial)
def _better_than_champion_test(self, trial_to_test):
"""Test whether there is a config in the existing trials that is better than
the current champion config
Returns:
A bool indicating whether a new champion is found
"""
if trial_to_test.result is not None and self._champion_trial.result is not None:
if 'ucb' in self._champion_test_policy:
return self._test_lcb_ucb(self._champion_trial, trial_to_test, self.WARMSTART_NUM)
elif 'avg' in self._champion_test_policy:
return self._test_avg_loss(self._champion_trial, trial_to_test, self.WARMSTART_NUM)
elif 'martingale' in self._champion_test_policy:
return self._test_martingale(self._champion_trial, trial_to_test)
else:
raise NotImplementedError
else:
return False
@staticmethod
def _worse_than_champion_test(champion_trial, trial, warmstart_num=1) -> bool:
"""Test whether the input trial is worse than the champion_trial
"""
if trial.result is not None and trial.result.resource_used >= warmstart_num:
if trial.result.loss_lcb > champion_trial.result.loss_ucb:
logger.info('=========trial %s is worse than champion %s=====',
trial.trial_id, champion_trial.trial_id)
logger.info('trial %s %s %s', trial.config, trial.result, trial.resource_lease)
logger.info('trial loss_avg:%s, trial loss_cb %s', trial.result.loss_avg,
trial.result.loss_cb)
logger.info('champion loss_avg:%s, champion loss_cb %s', champion_trial.result.loss_avg,
champion_trial.result.loss_cb)
logger.info('champion %s', champion_trial.config)
logger.info('trial loss_avg_recent:%s, trial loss_cb %s', trial.result.loss_avg_recent,
trial.result.loss_cb)
logger.info('champion loss_avg_recent:%s, champion loss_cb %s',
champion_trial.result.loss_avg_recent, champion_trial.result.loss_cb)
return True
return False
@staticmethod
def _test_lcb_ucb(champion_trial, trial, warmstart_num=1) -> bool:
"""Comare the challenger(i.e., trial)'s loss upper bound with
champion_trial's loss lower bound - cb
"""
assert trial.trial_id != champion_trial.trial_id
if trial.result.resource_used >= warmstart_num:
if trial.result.loss_ucb < champion_trial.result.loss_lcb - champion_trial.result.loss_cb:
logger.info('======new champion condition satisfied: using lcb vs ucb=====')
logger.info('new champion trial %s %s %s',
trial.trial_id, trial.result.resource_used, trial.resource_lease)
logger.info('new champion trial loss_avg:%s, trial loss_cb %s',
trial.result.loss_avg, trial.result.loss_cb)
logger.info('old champion trial %s %s %s',
champion_trial.trial_id, champion_trial.result.resource_used,
champion_trial.resource_lease,)
logger.info('old champion loss avg %s, loss cb %s',
champion_trial.result.loss_avg,
champion_trial.result.loss_cb)
return True
return False
@staticmethod
def _test_avg_loss(champion_trial, trial, warmstart_num=1) -> bool:
"""Comare the challenger(i.e., trial)'s average loss with the
champion_trial's average loss
"""
assert trial.trial_id != champion_trial.trial_id
if trial.result.resource_used >= warmstart_num:
if trial.result.loss_avg < champion_trial.result.loss_avg:
logger.info('=====new champion condition satisfied using avg loss=====')
logger.info('trial %s', trial.config)
logger.info('trial loss_avg:%s, trial loss_cb %s',
trial.result.loss_avg, trial.result.loss_cb)
logger.info('champion loss_avg:%s, champion loss_cb %s',
champion_trial.result.loss_avg, champion_trial.result.loss_cb)
logger.info('champion %s', champion_trial.config)
return True
return False
@staticmethod
def _test_martingale(champion_trial, trial):
"""Comare the challenger and champion using confidence sequence based
test martingale
Not implementated yet
"""
NotImplementedError