#!/usr/bin/env python
# -*- coding: utf-8 -*-
#Copyright (C) 2013 Petr Ĺ ebek
#Permission is hereby granted, free of charge, to any person obtaining a copy
#of this software and associated documentation files (the "Software"), to deal
#in the Software without restriction, including without limitation the rights
#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#copies of the Software, and to permit persons to whom the Software is
#furnished to do so, subject to the following conditions:
#The above copyright notice and this permission notice shall be included in
#all copies or substantial portions of the Software.
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHERWISE
#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#SOFTWARE.
from __future__ import division
import collections
import logging
import numpy as np
import scipy.optimize
[docs]class CMAES(object):
"""
Class CMAES represent algorithm Covariance Matrix Adaptation - Evolution
Strategy. It provides function minimization.
"""
def __init__(self, func, N, sigma=0.3, xmean=None):
"""
:param func: function to be minimized
:type func: function
:param N: number of parameter of function
:type N: int
:param sigma: step size of method
:type sigma: float
:param xmean: initial point, if None some is generated
:type xmean: np.array
"""
self.func = func
self.N = N
self.store_parameters = {'func': func,
'N': N,
'sigma': sigma,
'xmean': xmean,
}
self.stopeval = 1e4 * self.N / 2
self.stopfitness = 1e-10
self.eigenval = 0
# generation loop
self.counteval = 0
self.generation = 0
#stop criteria
self.stop_criteria = ("Fitness",
"MaxEval",
"NoEffectAxis",
"NoEffectCoord",
#"Stagnation",
"ConditionCov"
"TolXUp",
"TolFun",
"TolX")
self.tolfun = 1e-12
self.tolxup = 1e4
self.condition_cov_max = 1e14
self.lamda = int(4 + 3 * np.log(self.N))
self.initVariables(sigma, xmean)
[docs] def initVariables(self, sigma, xmean, lamda_factor=1):
"""
Init variables that can change after restart of method, basically that
are dependent on lamda.
:param sigma: step size
:type sigma: float
:param xmean: initial point
:type xmean: np.array
:param lamda_factor: factor for multyplying old lambda, serves for restarting method
:type lamda_factor: float
"""
self.sigma = sigma
if xmean is None:
self.xmean = np.random.rand(self.N)
else:
self.xmean = xmean
self.status = -1
# strategy parameter setting: selection
self.lamda *= lamda_factor
self.mu = self.lamda / 2
self.weights = np.array([np.log(self.mu + 0.5) - np.log(i)
for i in range(1, int(self.mu) + 1)])
self.mu = int(self.mu)
self.weights = self.weights / np.sum(self.weights)
self.mueff = 1 / np.sum(self.weights ** 2)
# strategy parameter setting: adaptation
self.cc = (4 + self.mueff / self.N) / (self.N + 4 + 2 * self.mueff / self.N)
self.cs = (self.mueff + 2) / (self.N + self.mueff + 5)
self.c1 = 2 / ((self.N + 1.3) ** 2 + self.mueff)
self.cmu = min(1 - self.c1, 2 * (self.mueff - 2 + 1 / self.mueff) /
((self.N + 2) ** 2 + self.mueff))
self.damps = 1 + 2 * max(0, np.sqrt((self.mueff - 1) /
(self.N + 1)) - 1) + self.cs
# initialize dynamic (internal) strategy parameters and constants
self.pc = np.zeros((1, self.N))
self.ps = np.zeros_like(self.pc)
self.B = np.eye(self.N)
self.D = np.eye(self.N)
self.C = np.identity(self.N)
self.chiN = self.N ** 0.5 * (1 - 1 / (4 * self.N) + 1 /
(21 * self.N ** 2))
# termination
self.tolx = 1e-12 * self.sigma
self.short_history_len = 10 + np.ceil(30 * self.N / self.lamda)
self.long_history_len_down = 120 + 30 * self.N / self.lamda
self.long_history_len_up = 20000
self.history = {}
self.history['short_best'] = collections.deque()
self.history['long_best'] = collections.deque()
self.history['long_median'] = collections.deque()
[docs] def newGeneration(self):
"""
Generate new generation of individuals.
:rtype: np.array
:return: new generation
"""
self.generation += 1
self.arz = np.random.randn(self.lamda, self.N)
self.arx = self.xmean + self.sigma * np.dot(np.dot(self.B, self.D), self.arz.T).T
return self.arx
[docs] def update(self, arfitness):
"""
Update values of method from new evaluated generation
:param arfitness: list of function values to individuals
:type arfitness: list
"""
self.counteval += self.lamda
self.arfitness = arfitness
# sort by fitness and compute weighted mean into xmean
self.arindex = np.argsort(self.arfitness)
self.arfitness = self.arfitness[self.arindex]
self.xmean = np.dot(self.arx[self.arindex[:self.mu]].T, self.weights)
self.zmean = np.dot(self.arz[self.arindex[:self.mu]].T, self.weights)
self.ps = np.dot((1 - self.cs), self.ps) + np.dot((np.sqrt(self.cs * (2 - self.cs) * self.mueff)),
np.dot(self.B, self.zmean))
self.hsig = np.linalg.norm(self.ps) / np.sqrt(
1 - (1 - self.cs) ** (2 * self.counteval / self.lamda)) / self.chiN < 1.4 + 2 / (self.N + 1)
self.pc = np.dot((1 - self.cc), self.pc) + np.dot(
np.dot(self.hsig, np.sqrt(self.cc * (2 - self.cc) * self.mueff)),
np.dot(np.dot(self.B, self.D), self.zmean))
# adapt covariance matrix C
self.C = np.dot((1 - self.c1 - self.cmu), self.C) \
+ np.dot(self.c1, ((self.pc * self.pc.T)
+ np.dot((1 - self.hsig) * self.cc * (2 - self.cc), self.C))) \
+ np.dot(self.cmu,
np.dot(np.dot(np.dot(np.dot(self.B, self.D), self.arz[self.arindex[:self.mu]].T),
np.diag(self.weights)),
(np.dot(np.dot(self.B, self.D), self.arz[self.arindex[:self.mu]].T)).T))
# adapt step size sigma
self.sigma = self.sigma * np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / self.chiN - 1))
# diagonalization
if self.counteval - self.eigenval > self.lamda / (self.c1 + self.cmu) / self.N / 10:
self.eigenval = self.counteval
self.C = np.triu(self.C) + np.triu(self.C, 1).T
self.D, self.B = np.linalg.eig(self.C)
self.D = np.diag(np.sqrt(self.D))
#history
self.history['short_best'].append(arfitness[0])
if len(self.history['short_best']) >= self.short_history_len:
self.history['short_best'].popleft()
if self.generation % 5 == 0: # last 20 %
self.history['long_best'].append(arfitness[0])
self.history['long_median'].append(np.median(arfitness))
if len(self.history['long_best']) >= self.long_history_len_up:
self.history['long_best'].popleft()
self.history['long_median'].popleft()
self.checkStop()
if self.generation % 20 == 0:
self.logState()
[docs] def fmin(self):
"""
Method for actual function minimization. Iterates while not end.
If unsuccess termination criteria is met then the method is restarted
with doubled population. If the number of maximum evaluations is
reached or the function is acceptable minimized, iterations ends and
result is returned.
:rtype: scipy.optimize.Result
"""
while self.status != 0 and self.status != 1:
if self.status > 2:
logging.warning("Restart due to %s", self.stop_criteria[self.status])
self.restart(2)
pop = self.newGeneration()
values = np.empty(pop.shape[0])
for i in xrange(pop.shape[0]):
values[i] = self.func(pop[i])
self.update(values)
return self.result
[docs] def restart(self, lamda_factor):
"""
Restart whole method to initial state, but with population multiplied
by lamda_factor.
:param lamda_factor: multiply factor
:type lamda_factor: int
"""
self.initVariables(self.store_parameters['sigma'], np.random.rand(self.N), lamda_factor=lamda_factor)
[docs] def checkStop(self):
"""
Termination criteria of method. They are checked every iteration. If any of them is true, computation should end.
:return: True if some termination criteria was met, False otherwise
:rtype: bool
"""
i = self.generation % self.N
self.stop_conditions = (self.arfitness[0] <= self.stopfitness,
self.counteval > self.stopeval,
sum(self.xmean == self.xmean + 0.1 * self.sigma * self.D[i] * self.B[:, i]) == self.N,
np.any(self.xmean == self.xmean + 0.2 * self.sigma * np.sqrt(np.diag(self.C))),
#len(self.history['long_median']) > self.long_history_len_down and \
#np.median(list(itertools.islice(self.history['long_median'], int(0.7*len(self.history['long_median'])), None))) <= \
#np.median(list(itertools.islice(self.history['long_median'],int(0.3*len(self.history['long_median']))))),
np.linalg.cond(self.C) > self.condition_cov_max,
self.sigma * np.max(self.D) >= self.tolxup,
max(self.history['short_best']) - min(self.history['short_best']) <= self.tolfun and
self.arfitness[-1] - self.arfitness[0] <= self.tolfun,
np.all(self.sigma * self.pc < self.tolx) and np.all(
self.sigma * np.sqrt(np.diag(self.C)) < self.tolx)
)
if np.any(self.stop_conditions):
self.status = self.stop_conditions.index(True)
return True
[docs] def logState(self):
"""
Function for logging the progress of method.
"""
logging.debug(
"generation: {generation:<5}, v: {v_function:<6.2e}, sigma: {sigma:.2e}, best: {best}, xmean: {xmean}".format(
generation=self.generation, best=map(lambda x: round(x, 8), self.arx[self.arindex[0]]),
v_function=self.arfitness[0], sigma=self.sigma, xmean=self.xmean))
@property
[docs] def result(self):
"""
Result of computation. Not returned while minimization is in progress.
:return: result of computation
:rtype: scipy.optimize.Result
"""
if self.status < 0:
raise AttributeError("Result is not ready yet, cmaes is not finished")
else:
self._result = scipy.optimize.Result()
self._result['x'] = self.arx[self.arindex[0]]
self._result['fun'] = self.arfitness[0]
self._result['nfev'] = self.counteval
if self.status == 0:
self._result['success'] = True
self._result['status'] = self.status
self._result['message'] = "Optimization terminated successfully."
else:
self._result['success'] = False
self._result['status'] = self.status
self._result['message'] = self.stop_criteria[self.status]
return self._result
[docs]def fmin(func, N):
"""
Function for easy call function minimization from other modules.
:param func: function to be minimized
:type func: function
:param N: number of parameters of given function
:type N: int
:return: resulting statistics of computation with result
:rtype: scipy.optimize.Result
"""
c = CMAES(func, N)
return c.fmin()