"""Self-adaptive cooperative enhanced scatter search (SACESS)."""
from __future__ import annotations
import importlib.metadata
import itertools
import logging
import logging.handlers
import multiprocessing
import os
import time
from collections.abc import Callable
from contextlib import suppress
from dataclasses import dataclass
from math import ceil, sqrt
from multiprocessing import get_context
from multiprocessing.managers import SyncManager
from pathlib import Path
from typing import Any
from warnings import warn
import numpy as np
import pypesto
import pypesto.optimize
from pypesto.history import (
Hdf5History,
HistoryBase,
HistoryOptions,
MemoryHistory,
)
from pypesto.problem import Problem
from pypesto.result import OptimizerResult
from pypesto.store.save_to_hdf5 import (
OptimizationResultHDF5Writer,
ProblemHDF5Writer,
)
from .ess import ESSExitFlag, ESSOptimizer, _check_valid_bounds
from .function_evaluator import create_function_evaluator
from .refset import RefSet
from .utils import merge_monotonic_traces
__all__ = [
"SacessOptimizer",
"get_default_ess_options",
"SacessFidesFactory",
"SacessCmaFactory",
"SacessIpoptFactory",
"SacessOptions",
]
logger = logging.getLogger(__name__)
[docs]
class SacessOptimizer:
"""SACESS optimizer.
A shared-memory-based implementation of the
`Self-Adaptive Cooperative Enhanced Scatter Search` (SaCeSS) algorithm
presented in :footcite:t:`PenasGon2017`. This is a meta-heuristic for
global optimization. Multiple processes (`workers`) run
:class:`enhanced scatter searches (eSSs) <ESSOptimizer>` in parallel.
After each ESS iteration, depending on the outcome, there is a chance
of exchanging good parameters, and changing eSS hyperparameters to those of
the most promising worker. See :footcite:t:`PenasGon2017` for details.
:class:`SacessOptimizer` can be used with or without a local optimizer, but
it is highly recommended to use one.
:ivar histories:
List of the histories of the best values/parameters
found by each worker. (Monotonically decreasing objective values.)
See :func:`pyscat.plot.plot_sacess_history` for visualization.
A basic example using :class:`SacessOptimizer` to minimize the Rosenbrock
function:
>>> from pyscat import SacessOptimizer
>>> from pypesto import Problem, Objective
>>> import scipy as sp
>>> import numpy as np
>>> import logging
>>> # Define some test Problem
>>> objective = Objective(
... fun=sp.optimize.rosen,
... grad=sp.optimize.rosen_der,
... hess=sp.optimize.rosen_hess,
... )
>>> dim = 6
>>> problem = Problem(
... objective=objective,
... lb=-5 * np.ones((dim, 1)),
... ub=5 * np.ones((dim, 1)),
... )
>>> # Create and run the optimizer
>>> sacess = SacessOptimizer(
... problem=problem,
... num_workers=2,
... max_walltime_s=5,
... sacess_loglevel=logging.WARNING
... )
>>> result = sacess.minimize()
.. seealso::
:class:`ESSOptimizer`
References
----------
.. footbibliography::
"""
[docs]
def __init__(
self,
problem: Problem,
num_workers: int | None = None,
ess_init_args: list[dict[str, Any]] | None = None,
max_walltime_s: float = np.inf,
sacess_loglevel: int = logging.INFO,
ess_loglevel: int = logging.WARNING,
mp_start_method: str = "spawn",
options: SacessOptions = None,
autosave_dir: Path | str = None,
):
"""Construct.
:param problem: The minimization problem.
:meth:`Problem.startpoint_method` will be used to sample random
points. `SacessOptimizer` will deal with non-evaluable points.
Therefore, using :class:`pypesto.startpoint.CheckedStartpoints`
with ``check_fval=True`` or ``check_grad=True`` is not recommended
since it would create significant overhead.
:param num_workers:
Number of workers to be used. A minimum of 2 workers is required.
If this argument is given,
(different) default eSS settings will be used for each worker.
Mutually exclusive with ``ess_init_args``.
See :func:`get_default_ess_options` for details on the default
settings.
Currently, if ``problem.objective`` supports gradients,
the default local optimizer is :class:`FidesOptimizer`.
Otherwise, no local optimizer is used by default.
This may be changed in future releases.
:param ess_init_args:
List of argument dictionaries passed to
:func:`ESSOptimizer.__init__`. Each entry corresponds to one worker
process. I.e., the length of this list is the number of eSSs.
Mutually exclusive with ``num_workers``.
If not provided, default settings will be used,
see :func:`get_default_ess_options`.
Ideally, this list contains some more conservative and some more
aggressive configurations.
Resource limits such as ``max_eval`` apply to a single eSS
iteration, not to the full search.
Mutually exclusive with ``num_workers``.
:param max_walltime_s:
Maximum walltime in seconds. It will only be checked between local
optimizations and other simulations, and thus, may be exceeded by
the duration of a local search. Defaults to no limit.
Note that in order to impose the wall time limit also on the local
optimizer, the user has to provide a wrapper function similar to
:meth:`SacessFidesFactory.__call__`.
:param ess_loglevel:
Loglevel for ESS runs.
:param sacess_loglevel:
Loglevel for SACESS runs.
:param autosave_dir:
Directory for storing intermediate results.
If not ``None``, HDF5 files with intermediate results will be
saved in this directory during the optimization.
This can be used to monitor the optimization while it is running,
or to recover results if the optimization was interrupted.
When setting this option, make sure any optimizers running in
parallel have a unique `tmpdir`.
The directory is expected to be empty.
:param mp_start_method:
The start method for the multiprocessing context.
See :mod:`multiprocessing` for details. Running `SacessOptimizer`
under Jupyter may require ``mp_start_method="fork"``.
:param options:
Further optimizer hyperparameters, see :class:`SacessOptions`.
"""
if problem.x_guesses.shape[0]:
# We'll use problem.startpoint_method to sample random points
# later on. Depending on the startpoint method, this will return
# the provided guesses, meaning that we'll always get the same
# points. This means, we won't explore the parameter space, or
# potentially even get stuck if the provided guesses are not
# evaluable.
raise ValueError(
"Providing startpoints in `problem.x_guesses` "
f"is not supported by {self.__class__.__name__}. "
"Unset `problem.x_guesses`."
)
_check_valid_bounds(problem)
self.problem = problem
if (num_workers is None and ess_init_args is None) or (
num_workers is not None and ess_init_args is not None
):
raise ValueError(
"Exactly one of `num_workers` or `ess_init_args` "
"has to be provided."
)
self.num_workers = num_workers or len(ess_init_args)
if self.num_workers < 2:
raise ValueError(
f"{self.__class__.__name__} needs at least 2 workers."
)
self.ess_init_args = ess_init_args or get_default_ess_options(
num_workers=self.num_workers,
dim=problem.dim,
# Dunno what a good default for a derivative-free local optimizer
# would be. So for now, we only use a local optimizer if we can
# use a gradient-based one.
local_optimizer=problem.objective.has_grad,
)
self.max_walltime_s = max_walltime_s
self.exit_flag = ESSExitFlag.DID_NOT_RUN
self.ess_loglevel = ess_loglevel
self.sacess_loglevel = sacess_loglevel
self.worker_results: list[SacessWorkerResult] = []
logger.setLevel(self.sacess_loglevel)
self._autosave_dir: Path | None = None
if autosave_dir is not None:
self._autosave_dir = Path(autosave_dir).absolute()
self._autosave_dir.mkdir(parents=True, exist_ok=True)
self.histories: list[MemoryHistory] | None = None
self._mp_ctx = get_context(mp_start_method)
self.options = options or SacessOptions()
[docs]
def set_local_optimizer(
self,
local_optimizer: None
| pypesto.optimize.Optimizer
| Callable[..., pypesto.optimize.Optimizer],
):
"""Set a local solver for all workers.
:param local_optimizer: The local optimizer to use
(see same argument in :class:`ESSOptimizer`):
a :class:`Optimizer` instance,
a :obj:`Callable` returning an optimizer instance,
or ``None`` to disable local searches.
The callable can be used to propagate walltime limits to the local
optimizers. See :meth:`SacessFidesFactory.__call__` for an example.
"""
for ess_kwargs in self.ess_init_args:
ess_kwargs["local_optimizer"] = local_optimizer
[docs]
def minimize(
self,
) -> pypesto.Result:
"""Minimize the given problem.
Note that if this function is called from a multithreaded program
(multiple threads running at the time of calling this function) and
the :mod:`multiprocessing` `start method` is set to ``fork``, there is
a good chance for deadlocks. Postpone spawning threads until after
`minimize` or change the *start method* to ``spawn``.
:return:
Result object with optimized parameters in
:attr:`pypesto.Result.optimize_result`.
Only the best solution found is returned.
To get the per-worker histories of the best values/parameters
found, use the :attr:`histories` attribute.
Note that the number of gradient and Hessian evaluations is not
tracked and thus set to 0 in the result, even if a local optimizer
is used that performs gradient/Hessian evaluations.
"""
start_time = time.time()
logger.debug(
f"Running {self.__class__.__name__} with {self.num_workers} "
f"workers: {self.ess_init_args} and {self.options}."
)
logging_handler = logging.StreamHandler()
logging_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(name)s %(levelname)-8s %(message)s"
)
)
log_queue_listener = logging.handlers.QueueListener(
self._mp_ctx.Queue(-1), logging_handler
)
# shared memory manager to handle shared state
# (simulates the sacess manager process)
with self._mp_ctx.Manager() as shmem_manager:
sacess_manager = SacessManager(
shmem_manager=shmem_manager,
ess_options=self.ess_init_args,
dim=self.problem.dim,
options=self.options,
)
# create workers
workers = [
SacessWorker(
manager=sacess_manager,
ess_kwargs=ess_kwargs,
worker_idx=worker_idx,
max_walltime_s=self.max_walltime_s,
loglevel=self.sacess_loglevel,
ess_loglevel=self.ess_loglevel,
autosave_file=self.get_autosave_path(
self._autosave_dir, worker_idx
),
options=self.options,
start_time=start_time,
)
for worker_idx, ess_kwargs in enumerate(self.ess_init_args)
]
# launch worker processes
worker_processes = [
self._mp_ctx.Process(
name=f"{self.__class__.__name__}-worker-{i:02d}",
target=_run_worker,
args=(
worker,
self.problem,
log_queue_listener.queue,
),
)
for i, worker in enumerate(workers)
]
for p in worker_processes:
p.start()
# start logging thread only AFTER starting the worker processes
# to prevent deadlocks
log_queue_listener.start()
# wait for finish
# collect results
self.worker_results = sacess_manager.collect_worker_results()
for p in worker_processes:
p.join()
log_queue_listener.stop()
log_queue_listener.queue.close()
log_queue_listener.queue.join_thread()
self.histories = [
worker_result.history for worker_result in self.worker_results
]
self.exit_flag = min(
worker_result.exit_flag for worker_result in self.worker_results
)
walltime = time.time() - start_time
result = self._create_result(walltime)
n_eval_total = self.n_eval_total
if len(result.optimize_result):
logger.info(
f"{self.__class__.__name__} stopped after {walltime:3g}s "
f"and {n_eval_total} objective evaluations "
f"with global best {result.optimize_result[0].fval}."
)
else:
logger.error(
f"{self.__class__.__name__} stopped after {walltime:3g}s "
f"and {n_eval_total} objective evaluations without producing "
"a result."
)
return result
@property
def n_eval_total(self) -> int:
"""Get the total number of objective evaluations.
Only available after :meth:`minimize` has been called and finished.
"""
return sum(
worker_result.n_eval for worker_result in self.worker_results
)
def _create_result(self, walltime: float) -> pypesto.Result:
"""Create result object."""
result = pypesto.Result()
result.problem = self.problem
C = pypesto.C
monotonic_traces = merge_monotonic_traces(
time_traces=[
np.asarray(wr.history.get_time_trace())
for wr in self.worker_results
],
fx_traces=[
np.asarray(wr.history.get_fval_trace())
for wr in self.worker_results
],
x=[
np.asarray(wr.history.get_x_trace())
for wr in self.worker_results
],
# TODO: pypesto Hdf5History.from_history currently requires
# the presence of all keys. This should be changed.
**{
k: [
np.asarray(getattr(wr.history, f"get_{k}_trace")())
for wr in self.worker_results
]
for k in HistoryBase.RESULT_KEYS
if k != C.FVAL
},
)
history = MemoryHistory()
history._trace[C.FVAL] = monotonic_traces["fx"]
history._trace[C.TIME] = monotonic_traces["time"]
history._trace[C.X] = monotonic_traces["x"]
for k in HistoryBase.RESULT_KEYS:
if k not in (C.FVAL, C.TIME, C.X):
history._trace[k] = monotonic_traces[k]
history._n_fval = self.n_eval_total
# TODO: n_grad, n_hess
best_worker_result = min(
self.worker_results,
key=lambda wr: wr.fx,
)
package_name = __name__.split(".")[0]
try:
version = importlib.metadata.version(package_name)
except importlib.metadata.PackageNotFoundError:
version = "unknown"
cls = self.__class__
optimizer = f"{cls.__module__}.{cls.__qualname__} {version}"
global_best = OptimizerResult(
id="0",
x=best_worker_result.x,
fval=best_worker_result.fx,
# TODO: n_grad, n_hess
n_fval=self.n_eval_total,
exitflag=self.exit_flag,
history=history,
time=walltime,
message=f"Global best from {cls.__qualname__} ({self.exit_flag}).",
optimizer=optimizer,
)
result.optimize_result.append(global_best)
return result
[docs]
@staticmethod
def get_autosave_path(base_dir: Path, worker_idx: int) -> Path | None:
"""Get the path of the autosave file for the given worker."""
if base_dir is None:
return None
return (
base_dir / f"SacessOptimizer_worker_{worker_idx:02d}_autosave.h5"
)
class SacessManager:
"""The Sacess manager.
Manages shared memory of a saCeSS run. Loosely corresponds to the manager
process in [PenasGon2017]_.
:ivar _dim: Dimension of the optimization problem
:ivar _num_workers: Number of workers
:ivar _ess_options: eSS options for each worker
:ivar _best_known_fx: Best objective value encountered so far
:ivar _best_known_x: Parameters corresponding to ``_best_known_fx``
:ivar _worker_scores: Performance score of the different workers
(the higher, the more promising the respective worker is considered)
:ivar _worker_comms: Number of communications received from the individual
workers
:ivar _rejections: Number of rejected solutions received from workers since
the last adaptation of ``_rejection_threshold``.
:ivar _rejection_threshold: Threshold for relative objective improvements
that incoming solutions have to pass to be accepted
:ivar _lock: Lock for accessing shared state.
:ivar _terminate: Flag to signal termination of the saCeSS run to workers
:ivar _logger: A logger instance
:ivar _options: Further optimizer hyperparameters.
"""
def __init__(
self,
shmem_manager: SyncManager,
ess_options: list[dict[str, Any]],
dim: int,
options: SacessOptions = None,
):
self._dim = dim
self._options = options or SacessOptions()
self._num_workers = len(ess_options)
self._ess_options = [shmem_manager.dict(o) for o in ess_options]
self._best_known_fx = shmem_manager.Value("d", np.inf)
self._best_known_x = shmem_manager.Array("d", [np.nan] * dim)
self._rejections = shmem_manager.Value("i", 0)
# The initial value for the acceptance/rejection threshold in
# [PenasGon2017]_ p.9 is 0.1.
# However, their implementation uses 0.1 *percent*. I assume this is a
# mistake in the paper.
self._rejection_threshold = shmem_manager.Value(
"d", self._options.manager_initial_rejection_threshold
)
# scores of the workers, ordered by worker-index
# initial score is the worker index
self._worker_scores = shmem_manager.Array(
"d", range(self._num_workers)
)
self._terminate = shmem_manager.Value("b", False)
self._worker_comms = shmem_manager.Array("i", [0] * self._num_workers)
self._lock = shmem_manager.RLock()
self._logger = logging.getLogger()
self._result_queue = shmem_manager.Queue()
def get_best_solution(self) -> tuple[np.ndarray, float]:
"""Get the best objective value and corresponding parameters."""
with self._lock:
return np.array(self._best_known_x), self._best_known_fx.value
def reconfigure_worker(self, worker_idx: int) -> dict:
"""Reconfigure the given worker.
Updates the ESS options for the given worker to those of the worker at
the top of the scoreboard and returns those settings.
"""
with self._lock:
leader_options = self._ess_options[
np.argmax(self._worker_scores)
].copy()
for setting in ["local_n2", "balance", "dim_refset"]:
if setting in leader_options:
self._ess_options[worker_idx][setting] = leader_options[
setting
]
return self._ess_options[worker_idx].copy()
def submit_solution(
self,
x: np.ndarray,
fx: float,
sender_idx: int,
elapsed_time_s: float,
):
"""Submit a solution.
To be called by a worker.
:param x: Model parameters
:param fx: Objective value corresponding to ``x``
:param sender_idx: Index of the worker submitting the results.
:param elapsed_time_s:
Elapsed time since the beginning of the sacess run.
"""
abs_change = fx - self._best_known_fx.value
with self._lock:
# cooperation step
# solution improves best value by at least a factor of ...
if (
# initially _best_known_fx is NaN
(
np.isfinite(fx)
and not np.isfinite(self._best_known_fx.value)
)
# avoid division by 0. just accept any improvement if the best
# known value is 0.
or (self._best_known_fx.value == 0 and fx < 0)
or (
fx < self._best_known_fx.value
and abs(abs_change / self._best_known_fx.value)
> self._rejection_threshold.value
)
):
# accept solution
self._logger.debug(
f"Accepted solution from worker {sender_idx}: {fx}."
)
# accept
if len(x) != len(self._best_known_x):
raise AssertionError(
f"Received solution with {len(x)} parameters, "
f"but expected {len(self._best_known_x)}."
)
for i, xi in enumerate(x):
self._best_known_x[i] = xi
self._best_known_fx.value = fx
self._worker_comms[sender_idx] += 1
self._worker_scores[sender_idx] = (
self._worker_comms[sender_idx] * elapsed_time_s
)
else:
# reject solution
self._rejections.value += 1
rel_change = (
abs(abs_change / self._best_known_fx.value)
if self._best_known_fx.value != 0
else np.nan
)
self._logger.debug(
f"Rejected solution from worker {sender_idx} "
f"abs change: {abs_change} "
f"rel change: {rel_change:.4g} "
f"(threshold: {self._rejection_threshold.value}) "
f"(total rejections: {self._rejections.value})."
)
# adapt the acceptance threshold if too many solutions have
# been rejected
if self._rejections.value >= self._num_workers:
self._rejection_threshold.value = min(
self._rejection_threshold.value / 2,
self._options.manager_minimum_rejection_threshold,
)
self._logger.debug(
"Lowered acceptance threshold to "
f"{self._rejection_threshold.value}."
)
self._rejections.value = 0
def send_result(self, result: SacessWorkerResult):
"""Send the final result of a worker.
Must be called exactly once by each worker after finishing.
:param result: The result to be sent.
"""
self._result_queue.put(result)
def abort(self):
"""Abort the SACESS run."""
with self._lock:
self._terminate.value = True
def aborted(self) -> bool:
"""Whether this run was aborted."""
with self._lock:
return self._terminate.value
def collect_worker_results(
self,
) -> list[SacessWorkerResult]:
"""Collect results from all workers.
To be called only once.
:return: List of worker results.
"""
res = [self._result_queue.get() for _ in range(self._num_workers)]
return sorted(res, key=lambda r: r.worker_idx)
class SacessWorker:
"""A saCeSS worker.
Runs ESSs and exchanges information with a SacessManager.
Corresponds to a worker process in [PenasGon2017]_.
:ivar _manager: The :class: `SacessManager` this worker is working for.
:ivar _worker_idx: Index of this worker.
:ivar _best_known_fx: Best objective value known to this worker
(obtained on its own or received from the manager).
:ivar _n_received_solutions: Number of solutions received by this worker
since the last one was sent to the manager.
:ivar _n_eval_last_sent:
Number of objective evaluations at the time the most recent solution
was sent to the manager, or 0 if no solution has been sent yet.
:ivar _ess_kwargs: ESSOptimizer options for this worker
(may get updated during the self-adaptive step).
:ivar _n_sent_solutions: Number of solutions sent to the Manager.
:ivar _max_walltime_s: Walltime limit.
:ivar logger: A Logger instance.
:ivar _loglevel: Logging level for sacess
:ivar _ess_loglevel: Logging level for ESS runs
:ivar _autosave_file: Path of the autosave file to be created.
"""
def __init__(
self,
manager: SacessManager,
ess_kwargs: dict[str, Any],
worker_idx: int,
max_walltime_s: float = np.inf,
loglevel: int = logging.INFO,
ess_loglevel: int = logging.WARNING,
autosave_file: Path = None,
options: SacessOptions = None,
start_time: float | None = None,
):
self._manager = manager
self._worker_idx = worker_idx
self._best_known_fx = np.inf
self._n_received_solutions = 0
self._n_eval_last_sent = 0
self._ess_kwargs = ess_kwargs
self._n_sent_solutions = 0
self._max_walltime_s = max_walltime_s
self._start_time = (
start_time if start_time is not None else time.time()
)
self._loglevel = loglevel
self._ess_loglevel = ess_loglevel
self.logger = None
self._autosave_file = autosave_file
self._refset: RefSet | None = None
self._options = options or SacessOptions()
def run(
self,
problem: Problem,
):
# index of the local solution in ESSOptimizer.local_solutions
# that was most recently saved by _autosave
last_saved_local_solution = -1
self.logger.setLevel(self._loglevel)
# Set the manager logger to one created within the current process
self._manager._logger = self.logger
self.logger.debug(
f"#{self._worker_idx} starting "
f"({self._ess_kwargs}, {self._options})."
)
evaluator = create_function_evaluator(
problem,
n_procs=self._ess_kwargs.get("n_procs"),
n_threads=self._ess_kwargs.get("n_threads"),
)
# create initial refset
self._refset = RefSet.from_random(
dim=self._ess_kwargs["dim_refset"],
n_diverse=max(
self._ess_kwargs.get("n_diverse", 10 * problem.dim),
self._ess_kwargs["dim_refset"],
),
evaluator=evaluator,
)
ess = self._setup_ess()
# run ESS until exit criteria are met, but start at least one iteration
while self._keep_going(ess) or ess.n_iter == 0:
# perform one ESS iteration
ess._do_iteration()
# check if the best solution of the last local ESS is sufficiently
# better than the sacess-wide best solution
self.maybe_update_best(
ess.x_best, ess.fx_best, ess.evaluator.n_eval
)
self._best_known_fx = min(ess.fx_best, self._best_known_fx)
self._autosave(ess, last_saved_local_solution)
last_saved_local_solution = len(ess.local_solutions) - 1
self._cooperate(ess)
self._maybe_adapt(problem, ess.evaluator.n_eval)
t_left = self._max_walltime_s - (time.time() - self._start_time)
self.logger.info(
f"sacess worker {self._worker_idx} iteration {ess.n_iter} "
f"(best: {self._best_known_fx}, "
f"n_eval: {ess.evaluator.n_eval}, "
f"remaining wall time: {t_left}s)."
)
self._finalize(ess)
def _finalize(self, ess: ESSOptimizer = None):
"""Finalize the worker."""
# Whatever happens here, we need to put something to the queue before
# returning to avoid deadlocks.
worker_result = None
if ess is not None:
try:
ess.history.finalize(exitflag=ess.exit_flag.name)
ess._report_final()
if not isinstance(ess.history, MemoryHistory):
try:
history = _memory_history_from_history(ess.history)
except Exception as e:
self.logger.exception(
f"Worker {self._worker_idx} failed to convert "
f"history: {e}"
)
history = MemoryHistory()
else:
history = ess.history
worker_result = SacessWorkerResult(
worker_idx=self._worker_idx,
x=ess.x_best,
fx=ess.fx_best,
history=history,
n_eval=ess.evaluator.n_eval,
n_iter=ess.n_iter,
n_local=len(ess.local_solutions),
exit_flag=ess.exit_flag,
)
except Exception as e:
self.logger.exception(
f"Worker {self._worker_idx} failed to finalize: {e}"
)
if worker_result is None:
# Create some dummy result
worker_result = SacessWorkerResult(
worker_idx=self._worker_idx,
x=np.full(self._manager._dim, np.nan),
fx=np.nan,
history=MemoryHistory(),
n_eval=0,
n_iter=0,
n_local=0,
exit_flag=ESSExitFlag.ERROR,
)
self._manager.send_result(worker_result)
self.logger.debug(f"Final configuration: {self._ess_kwargs}")
def _setup_ess(self) -> ESSOptimizer:
"""Run ESS."""
ess_kwargs = self._ess_kwargs.copy()
# account for sacess walltime limit
ess_kwargs["max_walltime_s"] = min(
ess_kwargs.get("max_walltime_s", np.inf),
self._max_walltime_s - (time.time() - self._start_time),
)
ess = ESSOptimizer(**ess_kwargs)
ess.logger = self.logger.getChild(f"sacess-{self._worker_idx:02d}-ess")
ess.logger.setLevel(self._ess_loglevel)
if self._autosave_file:
history = Hdf5History(
id="0",
file=self._autosave_file,
options=HistoryOptions(trace_record=True, trace_save_iter=1),
)
else:
history = None
ess._initialize_minimize(
refset=self._refset, start_time=self._start_time, history=history
)
return ess
def _cooperate(self, ess: ESSOptimizer) -> None:
"""Cooperation step."""
# try to obtain a new solution from manager
recv_x, recv_fx = self._manager.get_best_solution()
self.logger.log(
logging.DEBUG - 1,
f"Worker {self._worker_idx} received solution {recv_fx} "
f"(known best: {self._best_known_fx}).",
)
if recv_fx < self._best_known_fx or (
not np.isfinite(self._best_known_fx) and np.isfinite(recv_fx)
):
if not np.isfinite(recv_x).all():
raise AssertionError(
f"Received non-finite parameters {recv_x}."
)
self.logger.debug(
f"Worker {self._worker_idx} received better solution "
f" {recv_fx}."
)
self._best_known_fx = recv_fx
self._n_received_solutions += 1
self.replace_solution(self._refset, x=recv_x, fx=recv_fx)
ess._maybe_update_global_best(recv_x, recv_fx)
def _maybe_adapt(self, problem: Problem, n_eval: int):
"""Perform the adaptation step if needed.
Update ESS settings if conditions are met.
"""
# Update ESS settings if we received way more solutions than we sent
# Note: [PenasGon2017]_ Algorithm 5 uses AND in the following
# condition, but the accompanying implementation uses OR.
n_rcvd_thresh = (
self._options.adaptation_sent_coeff * self._n_sent_solutions
+ self._options.adaptation_sent_offset
)
n_eval_thresh = problem.dim * self._options.adaptation_min_evals
if (
self._n_received_solutions > n_rcvd_thresh
or n_eval - self._n_eval_last_sent > n_eval_thresh
):
self._ess_kwargs = self._manager.reconfigure_worker(
self._worker_idx
)
self._refset.sort()
self._refset.resize(self._ess_kwargs["dim_refset"])
self.logger.debug(
f"Updated settings on worker {self._worker_idx} "
f"to {self._ess_kwargs}"
)
else:
self.logger.debug(
f"Worker {self._worker_idx} not adapting. "
f"Received: {self._n_received_solutions} <= {n_rcvd_thresh}, "
f"Sent: {self._n_sent_solutions}, "
f"neval: {n_eval - self._n_eval_last_sent} <= {n_eval_thresh}."
)
def maybe_update_best(self, x: np.ndarray, fx: float, n_eval: int):
"""Maybe update the best known solution and send it to the manager."""
rel_change = (
abs((fx - self._best_known_fx) / fx) if fx != 0 else np.nan
)
self.logger.debug(
f"Worker {self._worker_idx} maybe sending solution {fx}. "
f"best known: {self._best_known_fx}, "
f"rel change: {rel_change:.4g}, "
f"threshold: {self._options.worker_acceptance_threshold}"
)
# solution improves the best value by at least a factor of ...
if (
(np.isfinite(fx) and not np.isfinite(self._best_known_fx))
or (self._best_known_fx == 0 and fx < 0)
or (
fx < self._best_known_fx
and abs((self._best_known_fx - fx) / self._best_known_fx)
> self._options.worker_acceptance_threshold
)
):
self.logger.debug(
f"Worker {self._worker_idx} sending solution {fx}."
)
self._n_sent_solutions += 1
self._best_known_fx = fx
# send best_known_fx to manager
self._n_eval_last_sent = n_eval
self._n_received_solutions = 0
elapsed_time = time.time() - self._start_time
self._manager.submit_solution(
x=x,
fx=fx,
sender_idx=self._worker_idx,
elapsed_time_s=elapsed_time,
)
@staticmethod
def replace_solution(refset: RefSet, x: np.ndarray, fx: float):
"""Replace the global refset member by the given solution."""
# [PenasGon2017]_ page 8, top
if "cooperative_solution" not in refset.attributes:
label = np.zeros(shape=refset.dim, dtype=int)
# on first call, mark the worst solution as "cooperative solution"
cooperative_solution_idx = np.argmax(refset.fx)
label[cooperative_solution_idx] = 1
refset.add_attribute("cooperative_solution", label)
elif (
cooperative_solution_idx := np.argwhere(
refset.attributes["cooperative_solution"]
)
).size == 0:
# the attribute exists, but no member is marked as the cooperative
# solution. this may happen if we shrink the refset.
cooperative_solution_idx = np.argmax(refset.fx)
# replace the cooperative solution
refset.update(
i=cooperative_solution_idx,
x=x,
fx=fx,
)
def _keep_going(self, ess: ESSOptimizer) -> bool:
"""Check exit criteria.
:return: ``True`` if none of the exit criteria is met,
``False`` otherwise.
"""
# elapsed time
if time.time() - self._start_time >= self._max_walltime_s:
ess.exit_flag = ESSExitFlag.MAX_TIME
self.logger.debug(
f"Max walltime ({self._max_walltime_s}s) exceeded."
)
return False
# other reasons for termination (some worker failed, ...)
if self._manager.aborted():
ess.exit_flag = ESSExitFlag.ERROR
self.logger.debug("Manager requested termination.")
return False
return True
def abort(self):
"""Send signal to abort."""
self.logger.error(f"Worker {self._worker_idx} aborting.")
# signal to manager
self._manager.abort()
self._finalize(None)
def _autosave(self, ess: ESSOptimizer, last_saved_local_solution: int):
"""Save intermediate results.
If a temporary result file is set, save (parts of) the current state
of the ESS to that file.
We save the current best solution and the local optimizer results.
"""
if not self._autosave_file:
return
t_start = time.time()
# save problem in first iteration
if ess.n_iter == 1:
pypesto_problem_writer = ProblemHDF5Writer(self._autosave_file)
pypesto_problem_writer.write(
ess.evaluator.problem, overwrite=False
)
# save new local solutions
opt_res_writer = OptimizationResultHDF5Writer(self._autosave_file)
for i in range(
last_saved_local_solution + 1, len(ess.local_solutions)
):
optimizer_result = ess.local_solutions[i]
optimizer_result.id = str(i + ess.n_iter)
opt_res_writer.write_optimizer_result(
optimizer_result, overwrite=False
)
# save the current best solution
optimizer_result = pypesto.OptimizerResult(
id="0",
x=ess.x_best,
fval=ess.fx_best,
message=f"Global best (iteration {ess.n_iter})",
time=time.time() - ess._start_time,
n_fval=ess.evaluator.n_eval,
optimizer=str(ess),
)
optimizer_result.update_to_full(ess.evaluator.problem)
opt_res_writer.write_optimizer_result(optimizer_result, overwrite=True)
t_save = time.time() - t_start
self.logger.debug(
f"Worker {self._worker_idx} autosave to {self._autosave_file} "
f"took {t_save:.2f}s."
)
def _run_worker(
worker: SacessWorker,
problem: Problem,
log_process_queue: multiprocessing.Queue,
):
"""Run the given SACESS worker.
Helper function as entrypoint for processes running `SacessWorker`s.
"""
try:
# different random seeds per process
np.random.seed((os.getpid() * int(time.time() * 1000)) % 2**32)
# Forward log messages to the logging process
h = logging.handlers.QueueHandler(log_process_queue)
worker.logger = logging.getLogger(
multiprocessing.current_process().name
)
worker.logger.addHandler(h)
return worker.run(problem=problem)
except Exception as e:
with suppress(Exception):
worker.logger.exception(f"Worker {worker._worker_idx} failed: {e}")
worker.abort()
[docs]
def get_default_ess_options(
num_workers: int,
dim: int,
local_optimizer: bool
| pypesto.optimize.Optimizer
| Callable[..., pypesto.optimize.Optimizer] = True,
) -> list[dict]:
"""Get default ESS settings for (SA)CESS.
Returns settings for ``num_workers`` parallel scatter searches, combining
more aggressive and more conservative configurations. Mainly intended for
use with :class:`SacessOptimizer`. For details on the different options,
see keyword arguments of :meth:`ESSOptimizer.__init__`.
Setting appropriate values for ``n_threads`` and ``local_optimizer`` is
left to the user. Defaults to single-threaded and no local optimizer.
Based on https://bitbucket.org/DavidPenas/sacess-library/src/508e7ac15579104731cf1f8c3969960c6e72b872/src/method_module_fortran/eSS/parallelscattersearchfunctions.f90#lines-929
:param num_workers: Number of configurations to return.
:param dim: Problem dimension (number of optimized parameters).
:param local_optimizer: The local optimizer to use
(see same argument in :class:`ESSOptimizer`), a boolean indicating
whether to set the default local optimizer
(currently :class:`FidesOptimizer`), a :class:`Optimizer` instance,
or a :obj:`Callable` returning an optimizer instance.
The latter can be used to propagate walltime limits to the local
optimizers. See :meth:`SacessFidesFactory.__call__` for an example.
The current default optimizer assumes that the optimized objective
function can provide its gradient. If this is not the case, the user
should provide a different local optimizer or consider using
:class:`pypesto.objective.finite_difference.FD` to approximate the
gradient using finite differences.
Examples
--------
To get default ESS settings for running :class:`SacessOptimizer` without
a local optimizer, use:
>>> from pypesto.optimize.ess import get_default_ess_options
>>> ess_init_args = get_default_ess_options(
... num_workers=12,
... dim=10, # usually problem.dim
... local_optimizer=False,
... )
>>> ess_init_args # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
[{'dim_refset': 5, 'balance': 0.0, 'local_n1': 1, 'local_n2': 1},
...
{'dim_refset': 7, 'balance': 1.0, 'local_n1': 4, 'local_n2': 4}]
"""
min_dimrefset = 5
def dim_refset(x):
return max(min_dimrefset, ceil((1 + sqrt(4 * dim * x)) / 2))
settings: list[dict[str, Any]] = [
# 1
{
"dim_refset": dim_refset(1),
"balance": 0.0,
"local_n1": 1,
"local_n2": 1,
},
# 2
{
"dim_refset": dim_refset(3),
"balance": 0.0,
"local_n1": 1000,
"local_n2": 1000,
},
# 3
{
"dim_refset": dim_refset(5),
"balance": 0.25,
"local_n1": 10,
"local_n2": 10,
},
# 4
{
"dim_refset": dim_refset(10),
"balance": 0.5,
"local_n1": 20,
"local_n2": 20,
},
# 5
{
"dim_refset": dim_refset(15),
"balance": 0.25,
"local_n1": 100,
"local_n2": 100,
},
# 6
{
"dim_refset": dim_refset(12),
"balance": 0.25,
"local_n1": 1000,
"local_n2": 1000,
},
# 7
{
"dim_refset": dim_refset(7.5),
"balance": 0.25,
"local_n1": 15,
"local_n2": 15,
},
# 8
{
"dim_refset": dim_refset(5),
"balance": 0.25,
"local_n1": 7,
"local_n2": 7,
},
# 9
{
"dim_refset": dim_refset(2),
"balance": 0.0,
"local_n1": 1000,
"local_n2": 1000,
},
# 10
{
"dim_refset": dim_refset(0.5),
"balance": 0.0,
"local_n1": 1,
"local_n2": 1,
},
# 11
{
"dim_refset": dim_refset(1.5),
"balance": 1.0,
"local_n1": 1,
"local_n2": 1,
},
# 12
{
"dim_refset": dim_refset(3.5),
"balance": 1.0,
"local_n1": 4,
"local_n2": 4,
},
# 13
{
"dim_refset": dim_refset(5.5),
"balance": 0.1,
"local_n1": 10,
"local_n2": 10,
},
# 14
{
"dim_refset": dim_refset(10.5),
"balance": 0.3,
"local_n1": 20,
"local_n2": 20,
},
# 15
{
"dim_refset": dim_refset(15.5),
"balance": 0.2,
"local_n1": 1000,
"local_n2": 1000,
},
# 16
{
"dim_refset": dim_refset(12.5),
"balance": 0.2,
"local_n1": 10,
"local_n2": 10,
},
# 17
{
"dim_refset": dim_refset(8),
"balance": 0.75,
"local_n1": 15,
"local_n2": 15,
},
# 18
{
"dim_refset": dim_refset(5.5),
"balance": 0.75,
"local_n1": 1000,
"local_n2": 1000,
},
# 19
{
"dim_refset": dim_refset(2.2),
"balance": 1.0,
"local_n1": 2,
"local_n2": 2,
},
# 20
{
"dim_refset": dim_refset(1),
"balance": 1.0,
"local_n1": 1,
"local_n2": 1,
},
]
# Set local optimizer
for cur_settings in settings:
if local_optimizer is True:
cur_settings["local_optimizer"] = SacessFidesFactory(
fides_kwargs={"verbose": logging.WARNING}
)
elif local_optimizer is not False:
cur_settings["local_optimizer"] = local_optimizer
return list(itertools.islice(itertools.cycle(settings), num_workers))
[docs]
class SacessFidesFactory:
"""
Factory for :class:`FidesOptimizer` instances for use with
:class:`SacessOptimizer`.
:meth:`__call__` will forward the walltime limit and function evaluation
limit imposed on :class:`SacessOptimizer` to :class:`FidesOptimizer`.
Besides that, default options are used.
:param fides_options:
Options for the :class:`FidesOptimizer`.
See :class:`fides.constants.Options`.
:param fides_kwargs:
Keyword arguments for the :class:`FidesOptimizer`. See
:meth:`FidesOptimizer.__init__`. Must not include ``options``.
"""
[docs]
def __init__(
self,
fides_options: dict[str, Any] | None = None,
fides_kwargs: dict[str, Any] | None = None,
):
if fides_options is None:
fides_options = {}
if fides_kwargs is None:
fides_kwargs = {}
self._fides_options = fides_options
self._fides_kwargs = fides_kwargs
# Check if fides is installed
try:
import fides # noqa F401
except ImportError:
from pypesto.optimize.optimizer import OptimizerImportError
raise OptimizerImportError("fides") from None
def __call__(
self, max_walltime_s: int, max_eval: int
) -> pypesto.optimize.FidesOptimizer:
"""Create a :class:`FidesOptimizer` instance."""
from fides.constants import Options as FidesOptions
options = self._fides_options.copy()
options[FidesOptions.MAXTIME] = max_walltime_s
# only accepts int
if np.isfinite(max_eval):
options[FidesOptions.MAXITER] = int(max_eval)
return pypesto.optimize.FidesOptimizer(
**self._fides_kwargs, options=options
)
def __repr__(self):
cls = self.__class__.__name__
return (
f"{cls}(fides_options={self._fides_options}, "
f"fides_kwargs={self._fides_kwargs})"
)
[docs]
class SacessCmaFactory:
"""
Factory for :class:`CmaOptimizer` instances for use with
:class:`SacessOptimizer`.
:meth:`__call__` will forward the walltime limit and function evaluation
limit imposed on :class:`SacessOptimizer` to :class:`CmaOptimizer`.
Besides that, default options are used.
:param options:
Options as passed to :meth:`CmaOptimizer.__init__`.
See ``cma.CMAOptions()`` for available options.
"""
[docs]
def __init__(
self,
options: dict[str, Any] | None = None,
):
if options is None:
options = {"verbose": -10}
self._options = options
# Check if cma is installed
try:
import cma # noqa F401
except ImportError:
from pypesto.optimize.optimizer import OptimizerImportError
raise OptimizerImportError("cma") from None
def __call__(
self, max_walltime_s: int, max_eval: int
) -> pypesto.optimize.CmaOptimizer:
"""Create a :class:`CmaOptimizer` instance."""
options = self._options.copy()
options["timeout"] = max_walltime_s
options["maxfevals"] = max_eval
return pypesto.optimize.CmaOptimizer(options=options)
def __repr__(self):
return f"{self.__class__.__name__}(options={self._options})"
[docs]
class SacessIpoptFactory:
"""
Factory for :class:`IpoptOptimizer` instances for use with
:class:`SacessOptimizer`.
:meth:`__call__` will forward the walltime limit and function evaluation
limit imposed on :class:`SacessOptimizer` to :class:`IpoptOptimizer`.
Besides that, default options are used.
:param ipopt_options:
Options for the :class:`IpoptOptimizer`.
See https://coin-or.github.io/Ipopt/OPTIONS.html.
"""
[docs]
def __init__(
self,
ipopt_options: dict[str, Any] | None = None,
):
if ipopt_options is None:
ipopt_options = {}
self._ipopt_options = ipopt_options
import cyipopt
if cyipopt.IPOPT_VERSION < (3, 14, 0):
ver = ".".join(map(str, cyipopt.IPOPT_VERSION))
warn(
f"The currently installed Ipopt version {ver} "
"does not support the `max_wall_time` option. "
"At least Ipopt 3.14 is required. "
"The walltime limit will be ignored.",
stacklevel=2,
)
def __call__(
self, max_walltime_s: float, max_eval: float
) -> pypesto.optimize.IpoptOptimizer:
"""Create a :class:`IpoptOptimizer` instance."""
import cyipopt
options = self._ipopt_options.copy()
if np.isfinite(max_walltime_s) and cyipopt.IPOPT_VERSION >= (3, 14, 0):
options["max_wall_time"] = max_walltime_s
if np.isfinite(max_eval):
raise NotImplementedError(
"Ipopt does not support function evaluation limits."
)
return pypesto.optimize.IpoptOptimizer(options=options)
def __repr__(self):
return (
f"{self.__class__.__name__}(ipopt_options={self._ipopt_options})"
)
@dataclass
class SacessWorkerResult:
"""Container for :class:`SacessWorker` results.
Contains various information about the optimization process of a single
:class:`SacessWorker` instance that is to be sent to
:class:`SacessOptimizer`.
:ivar worker_idx:
Index of the worker this result corresponds to.
:ivar x:
Best parameters found.
:ivar fx:
Objective value corresponding to ``x``.
:ivar n_eval:
Number of objective evaluations performed.
:ivar n_iter:
Number of scatter search iterations performed.
:ivar n_local:
Number of local searches performed.
:ivar history:
History object containing information about the optimization process.
:ivar exit_flag:
Exit flag of the optimization process.
"""
worker_idx: int
x: np.ndarray
fx: float
n_eval: int
n_iter: int
n_local: int
history: pypesto.history.memory.MemoryHistory
exit_flag: ESSExitFlag
[docs]
@dataclass
class SacessOptions:
"""Container for :class:`SacessOptimizer` hyperparameters.
:param manager_initial_rejection_threshold:
:param manager_minimum_rejection_threshold:
Initial and minimum threshold for relative objective improvements that
incoming solutions have to pass to be accepted. If the number of
rejected solutions exceeds the number of workers, the threshold is
halved until it reaches ``manager_minimum_rejection_threshold``.
:param worker_acceptance_threshold:
Minimum relative improvement of the objective compared to the best
known value to be eligible for submission to the Manager.
:param adaptation_min_evals:
:param adaptation_sent_offset:
:param adaptation_sent_coeff:
Hyperparameters that control when the workers will adapt their settings
based on the performance of the other workers.
The adaptation step is performed if all the following conditions are
met:
* The number of function evaluations since the last solution was sent
to the manager times the number of optimization parameters is greater
than ``adaptation_min_evals``.
* The number of solutions received by the worker since the last
solution it sent to the manager is greater than
``adaptation_sent_coeff * n_sent_solutions + adaptation_sent_offset``,
where ``n_sent_solutions`` is the number of solutions sent to the
manager by the given worker.
""" # noqa: E501
manager_initial_rejection_threshold: float = 0.001
manager_minimum_rejection_threshold: float = 0.001
# Default value from original SaCeSS implementation
worker_acceptance_threshold: float = 0.0001
# Magic numbers for adaptation, taken from [PenasGon2017]_ algorithm 5
adaptation_min_evals: int = 5000
adaptation_sent_offset: int = 20
adaptation_sent_coeff: int = 10
def __post_init__(self):
if self.adaptation_min_evals < 0:
raise ValueError("adaptation_min_evals must be non-negative.")
if self.adaptation_sent_offset < 0:
raise ValueError("adaptation_sent_offset must be non-negative.")
if self.adaptation_sent_coeff < 0:
raise ValueError("adaptation_sent_coeff must be non-negative.")
if self.manager_initial_rejection_threshold < 0:
raise ValueError(
"manager_initial_rejection_threshold must be non-negative."
)
if self.manager_minimum_rejection_threshold < 0:
raise ValueError(
"manager_minimum_rejection_threshold must be non-negative."
)
if self.worker_acceptance_threshold < 0:
raise ValueError(
"worker_acceptance_threshold must be non-negative."
)
def _memory_history_from_history(history: HistoryBase) -> MemoryHistory:
"""Convert a general HistoryBase to MemoryHistory.
:param history: The history to convert.
:return: The converted history.
"""
memory_history = MemoryHistory()
memory_history._n_fval = history.n_fval
memory_history._n_grad = history.n_grad
memory_history._start_time = history.start_time
for key in memory_history.ALL_KEYS:
meth = f"get_{key}_trace"
if hasattr(history, meth):
trace = getattr(history, meth)()
memory_history._trace[key] = trace
return memory_history