"""
Particle Swarm Optimizer (PSO)
======================================================================
Particles
----------------------------------------------------------------------
In PSO, a particle represents a set of parameters to be optimised.
Each parameter is therefore a degree of freedom of the particle.
Each particle is represented by its coordinate value.
Additionally it needs several attributes:
* fitness -- the quality of the set of parameters
* speed -- how much the position of the particle changes from one generation to the next
* smin/smax -- speed limits (observed only initially, in the current implementation)
* best -- particles own best position (i.e. with best fitness)
Particle normalisaton/re-normalisation
----------------------------------------------------------------------
Additionally to the above generic PSO-related attributes, we need to
introduce position normalisation as follows. The parameters giving the
particle coordinates may be with different physical meaning, magnitudes
and units. However, to keep the PSO generic, it is best to impose identical
scale in each dimension, so that particle range is the same in each direction.
This is achieved by normalising the parameters so that the particle position
in each dimension is between -1 and +1. However, when evaluating the fitness
of the particle, we need the renormalized values (i.e. the true values of
the parameters).
Hence we introduce three additional attributes:
* norm -- a list with the scaling factors for each dimension (:math:`\eta`),
* shift -- offset of the parameter range from 0 (:math:`\sigma`),
* renormalized -- the true value of the parameters (:math:`\lambda`) represented by the particle.
The user must supply only the true range of the particle in the form of a tuple,
per dimension, e.g. :math:`(\lambda_{min}, \lambda_{max})`.
Then :math:`(\lambda_{max}-\lambda_{min})\eta = 2.0`, or,
:math:`\eta = 2.0/(\lambda_2-\lambda_1)`, and
:math:`\sigma = 0.5*(\lambda_{max}+\lambda_{min})`.
So, the true particle position (for evaluations) is
:math:`\lambda = P/\eta + \sigma`,
where :math:`P` is the normalised position of the particle.
## Using particles
Below, we have the declaration of the particle class and a couple of methods for
creation and evolution of the particle based on the PSO algorithm.
Note that the evaluation of the fitness of the particle is problem specific and
the user must supply its own evaluation function and register it under the name
``evaluate`` with the toolbox.
Particle Swarm
----------------------------------------------------------------------
The swarm is a a list of particles, with a couple of additional attributes:
* ``gbest`` -- globally the best particle (position) ever (i.e. accross any generation so far)
* ``gbestfit`` -- globally the best fitness (i.e. the quality value of gbest)
The swarm is declared, created and let to evolve with the help of the ``PSO`` class.
"""
import random
import operator
import sys
import numpy as np
from deap import base
from deap import creator
from deap import tools
from skpar.core.utils import get_logger
module_logger = get_logger("skpar.pso")
[docs]def declareTypes(weights):
"""
Declare Particle class with fitting objectives (-1: minimization; +1 maximization).
Each particle consists of a set of normalised coordinates, returned by the
particle's instance itself. The true (physical) coordinates of the particle
are stored in the field 'renormalized'. The field 'best' contains the best
fitness-wise position that the particle has ever had (in normalized coords).
Declare also Swarm class; inherit from list + PSO specific attributes
gbest and gbestfit.
Arguments:
weights -- tupple, e.g. (-1,) for single objective minimization, or
(+1,+0.5) for two objective maximization, etc.
"""
creator.create("pFitness", base.Fitness, weights=weights)
creator.create(
"Particle",
list,
fitness=creator.pFitness,
speed=list,
past=list,
smin=None,
smax=None,
best=None,
norm=list,
shift=list,
renormalized=list,
strict_bounds=True,
prange=list,
)
creator.create(
"Swarm", list, gbest=None, gbestfit=creator.pFitness, gbest_iteration=None
)
[docs]def createParticle(prange, strict_bounds=True):
"""
Create particle of dimensionality len(prange), assigning initial particle
coordinate in the i-th dimension within the prange[i] tuple.
Note that the range is normalised and shifted, so that the result is a coordinate
within -1 to 1 for each dimension, and initial speed between -.5 and +.5.
To get the true (i.e. physical) coordinates of the particle,
one must use part.renormalized field.
Arguments:
prange -- list of tuples. each tuple is a range of _initial_ coord.
Return:
particle -- an instance of the Particle class, with initialized coordinates both
normalized (the instance itself) and true, physical coords (self.renormalized).
"""
# size is the dimensionality (degrees of freedom) of the particle
# prange is a list of tuples containing the _initial_ range of particle coordinates
# prange is effectively normalized so particle position is in [-1:+1],
# and the speedlimit is set to half the range
size = len(prange)
pmin, pmax, smin, smax = -1.0, 1.0, -1.0, 1.0
# pmin, pmax, smin, smax = -1.0, 1.0, -0.5, 0.5
part = creator.Particle(random.uniform(pmin, pmax) for _ in range(size))
part.past = [random.uniform(pmin, pmax) for _ in range(size)]
part.speed = [random.uniform(smin, smax) for _ in range(size)]
part.smin = smin
part.smax = smax
if prange is not None:
part.prange = prange
else:
part.prange = [(pmin, pmax)] * size
part.norm = [(pmax - pmin) / (r[1] - r[0]) for r in part.prange]
part.shift = [0.5 * (r[1] + r[0]) for r in part.prange]
part.renormalized = list(
map(operator.add, list(map(operator.truediv, part, part.norm)), part.shift)
)
part.strict_bounds = strict_bounds
return part
[docs]def evolveParticle_0(part, best, phi1=2, phi2=2):
"""
This is the implementation shown in the examples of the DEAP library.
The inertial factor is 1.0 (implicit) and seems to be too big.
Phi1/2 are also somewhat bigger than the Psi/Ki resulting from the optimal
Psi = 2.9922
A method to update the position and speed of a particle (part), according to the
original PSO algorithm of Ricardo Poli, James Kennedy and Tim Blackwell,
'Particle swarm optimization: an overview'. Swarm Intelligence. 2007; 1: 33-57.
Arguments:
part -- instance of the particle class, the particle to be updated
best -- the best known particle ever (within the life of the swarm)
phi1,phi2 -- connectivity coefficients
"""
u1 = (random.uniform(0, phi1) for _ in range(len(part)))
u2 = (random.uniform(0, phi2) for _ in range(len(part)))
v_u1 = list(map(operator.mul, u1, list(map(operator.sub, part.best, part))))
v_u2 = list(map(operator.mul, u2, list(map(operator.sub, best, part))))
part.speed = list(
map(operator.add, part.speed, list(map(operator.add, v_u1, v_u2)))
)
for i, speed in enumerate(part.speed):
if speed < part.smin:
part.speed[i] = part.smin
elif speed > part.smax:
part.speed[i] = part.smax
part[:] = list(map(operator.add, part, part.speed))
part.renormalized = list(
map(operator.add, list(map(operator.truediv, part, part.norm)), part.shift)
)
[docs]def evolveParticle(part, best, inertia=0.7298, acceleration=2.9922, degree=2):
"""
A method to update the position and speed of a particle (part), according to the
generalized formula of Eq(3) in J.Kennedy, "Particle Swarm Optimization" in
"Encyclopedia of Machine Learning" (2010), which is equivalent to Eqs(3-4) of
'Particle swarm optimization: an overview'. Swarm Intelligence. 2007; 1: 33-57.
Arguments:
* part -- instance of the particle class, the particle to be updated
* best -- the best known particle ever (within the life of the swarm)
* inertia -- factor scaling the persistence of the particle
* acceleration -- factor scaling the influence of particle connection
* degree -- unused right now; should serve for a fully informed particle swarm (FIPS),
but this requires best to become a list of neighbours best;
also u1,u2 and v_u1, v_u2 should be transformed into a Sum over neighbours
Returns the updated particle
"""
if degree != 2:
sys.exit(
"ERROR: degree!=2 is not supported (no support for FIPS yet). Cannot continue."
)
# calculate persistence and influence terms
u1 = (random.uniform(0, acceleration / degree) for _ in range(len(part)))
u2 = (random.uniform(0, acceleration / degree) for _ in range(len(part)))
v_u1 = list(map(operator.mul, u1, list(map(operator.sub, part.best, part))))
v_u2 = list(map(operator.mul, u2, list(map(operator.sub, best, part))))
persistence = list(
map(
operator.mul,
[inertia] * len(part),
list(map(operator.sub, part, part.past)),
)
)
influence = list(map(operator.add, v_u1, v_u2))
part.speed = list(map(operator.add, persistence, influence))
# assign current position to the old one
for i, p in enumerate(part):
part.past[i] = p
# apply speed limit per dimension!
for i, s in enumerate(part.speed):
if s < part.smin:
part.speed[i] = part.smin
elif s > part.smax:
part.speed[i] = part.smax
# update current position in both normalized and physical coordinates
# part[:] = list(map(operator.add, part, part.speed))
for i, p in enumerate(part):
new_pos = p + part.speed[i]
if part.strict_bounds:
# If strict bounds are imposed, then tackle the escape goat per dimension.
# Below, we realise a bounce, where the excess travel is reversed in
# direction. This reverses the persistence term, and reduces the
# chance for a second escape. Gradually though, if gbest happens to
# be in the vicinity of the boundary, the particle will find its way
# there. However both its persistence and influence terms will be
# smaller, thus reducing its tendency to escape.
if new_pos > 1:
module_logger.warning(
"Escape goat along {} to {:.3f}, speed {:.3f}".format(
i, new_pos, part.speed[i]
)
)
new_pos = 2 - new_pos
module_logger.warning("Bounced back to {:.3f}\n".format(new_pos))
if new_pos < -1:
module_logger.warning(
"Escape goat along {} to {:.3f}, speed {:.3f}".format(
i, new_pos, part.speed[i]
)
)
new_pos = -2 - new_pos
module_logger.warning("Bounced back to {:.3f}\n".format(new_pos))
part[i] = new_pos
part.renormalized = list(
map(operator.add, list(map(operator.truediv, part, part.norm)), part.shift)
)
# try recursion if we're out out
# if part.strict_bounds and not all([-1.0 < pp < 1.0 for pp in part]):
# # recreate particle, but keeps its history (best and past)
# module_logger.warning('Particle attempted to leave domain: \n'+pformat(part))
# newpart = createParticle(part.prange, part.strict_bounds)
# for ii, pp in enumerate(part):
# part[ii] = newpart[ii]
# part.speed = newpart.speed
# part.renormalized = newpart.renormalized
# module_logger.info ('Particle repositioned inside domain: \n'+pformat(part))
# init arguments:
pso_init_args = ["npart", "objectives", "parrange", "evaluate"]
pso_optinit_args = ["ngen", "ErrTol", "strict_bounds"]
# call arguments
pso_call_args = []
pso_optcall_args = [
"ngen",
"ErrTol",
]
pso_dflts = {
"npart": 10,
"ngen": 200,
"ErrTol": 0.001,
"objective_weights": (-1,),
"strict_bounds": True,
}
[docs]def pso_args(**kwargs):
""" """
pso_obligatory_args = pso_init_args + pso_call_args
args = {}
optargs = {}
for arg in pso_obligatory_args:
try:
args[arg] = kwargs[arg]
except KeyError:
errormsg = (
"PSO missing obligatory argument {0}\n".format(arg)
+ "Please define at least:\n{0}\n".format(pso_obligatory_args)
+ "PSO supports also:\n{0}\n".format(pso_optional_args)
)
module_logger.critical(errormsg)
sys.exit(1)
for arg in list(set(pso_optinit_args + pso_optcall_args)):
try:
optargs[arg] = kwargs[arg]
except KeyError:
pass
init_args = [args[key] for key in pso_init_args]
call_args = [args[key] for key in pso_call_args]
init_optional_args = dict(
[(key, optargs[key]) for key in optargs if key in pso_optinit_args]
)
call_optional_args = dict(
[(key, optargs[key]) for key in optargs if key in pso_optcall_args]
)
return init_args, call_args, init_optional_args, call_optional_args
[docs]def report_stats(stats):
""" """
logger = module_logger
statsHeader = "".join(
[
"{0:>5s}".format("Gen."),
"{0:>10s}".format("Min."),
"{0:>10s}".format("Max."),
"{0:>10s}".format("Avg."),
"{0:>10s}".format("Std."),
]
)
logger.info("")
logger.info("Fitness statistics follow:")
logger.info(statsHeader)
logger.info("============================================================")
ngen = len(stats)
for gen in range(ngen):
s = stats[gen]
logger.info(
"".join(
[
"{0:>5d}".format(gen),
"{0:>10.4f}".format(s["Fitness"]["Min"]),
"{0:>10.4f}".format(s["Fitness"]["Max"]),
"{0:>10.4f}".format(s["Fitness"]["Avg"]),
"{0:>10.4f}".format(s["Fitness"]["Std"]),
# '{0:>12.2f}'.format(s['WRE']['Min']*100),
]
)
)
logger.info("============================================================")
[docs]class PSO(object):
"""
Class defining Particle-Swarm Optimizer.
"""
toolbox = base.Toolbox()
nBestKept = 10
halloffame = tools.HallOfFame(nBestKept)
# see J. Kennedy "Particle Swarm Optimization" in "Encyclopedia of machine learning" (2010).
pInertia = 0.7298
pAcceleration = 2.9922
def __init__(
self,
parameters,
evaluate,
npart=10,
ngen=100,
objective_weights=(-1,),
ErrTol=0.001,
*args,
**kwargs
):
"""
Create a particle swarm
"""
self.logger = module_logger
self.logger.debug("Working with the following parameters")
# try to establish the names of the parameters for logging
try:
self.parnames = [p.name for p in parameters]
except AttributeError:
self.parnames = None
# try to establish the allowed parameter range
try:
parrange = [(p.minv, p.maxv) for p in parameters]
except AttributeError:
assert isinstance(parameters, list) and len(parameters[0]) == 2, (
"NOTABENE: `parameters` argument PSO.__init__() should be a list,\n"
"every element of which is either a tuple (min_value, max_value)\n"
"or a class with attributes minv and maxv!"
)
parrange = parameters
# see if the pso is allowed to cross over defined range for particles
strict_bounds = kwargs.get("strict_bounds", True)
# define the particle and the methods associated with its creation, evolution and fitness evaluation
declareTypes(objective_weights)
self.toolbox.register(
"create", createParticle, prange=parrange, strict_bounds=strict_bounds
)
self.toolbox.register(
"evolve",
evolveParticle,
inertia=self.pInertia,
acceleration=self.pAcceleration,
)
self.toolbox.register("evaluate", evaluate)
# create a swarm from particles with the above defined properties
self.toolbox.register(
"swarm", tools.initRepeat, creator.Swarm, self.toolbox.create
)
self.swarm = self.toolbox.swarm(npart)
self.ngen = ngen
self.ErrTol = ErrTol
# Provide with statistics collector
# - fitness statistics
fit_stats = tools.Statistics(key=lambda ind: ind.fitness.values)
fit_stats.register("Avg", np.mean)
fit_stats.register("Std", np.std)
fit_stats.register("Min", np.min)
fit_stats.register("Max", np.max)
# All statistics will be compiled for each generation and added to the record
self.mstats = tools.MultiStatistics(Fitness=fit_stats)
self.stats_record = []
[docs] def optimise(self, ngen=None, ErrTol=None):
"""
Let the swarm evolve for ngen (or self.ngen) generations.
"""
# ngen and ErrTol would typically be set during initialization
if ngen is None:
ngen = self.ngen
if ErrTol is None:
ErrTol = self.ErrTol
#
self.stats_record = []
for g in range(ngen):
for i, part in enumerate(self.swarm):
iteration = (g, i)
part.fitness.values = self.toolbox.evaluate(
part.renormalized, iteration
)
if not part.best or part.best.fitness < part.fitness:
part.best = creator.Particle(part)
part.best.fitness.values = part.fitness.values
if not self.swarm.gbest or self.swarm.gbest.fitness < part.fitness:
self.swarm.gbest_iteration = iteration
self.swarm.gbest = creator.Particle(part)
self.swarm.gbest.fitness.values = part.fitness.values
self.swarm.gbest.renormalized = part.renormalized
self.halloffame.update(self.swarm)
# Update particles only after full evaluation of the swarm,
# so that gbest possibly arise from the last generation.
for part in self.swarm:
self.toolbox.evolve(part, self.swarm.gbest)
# Gather all the fitnesses and update the stats
self.stats_record.append(self.mstats.compile(self.swarm))
return self.swarm, self.stats_record
[docs] def report(self):
report_stats(self.stats_record)
self.logger.info("GBest iteration : {}".format(self.swarm.gbest_iteration))
self.logger.info(
"GBest fitness : {}".format(self.swarm.gbest.fitness.values)
)
gbestpars = self.swarm.gbest.renormalized
if self.parnames:
self.logger.info(
"GBest parameters:\n"
+ "\n".join(
[
"{:>20s} {}".format(name, val)
for (name, val) in zip(self.parnames, gbestpars)
]
)
)
else:
self.logger.info("GBest parameters : {}".format(gbestpars))
def __call__(self, *args, **kwargs):
return self.optimise(*args, **kwargs)