[Python] 粒子群優化 Particle Swarm Optimization

程式語言:Python
Package:pyswarm
pyswarm 官網

功能:最佳化參數演算法

演算法

  1. 隨機初始化粒子的位置與速率
  2. 計算初始粒子的 fitness
  3. While (直到滿足 threshold)
    1. 更新粒子速度與位置
      $$ \begin{align*} v_i^t &= w\cdot v_i^{t-1}+w_p\cdot rand()\cdot (p_i-x_i^{t-1})+w_g\cdot rand()\cdot (g-x_i^{t-1})\\ x_i^t &= x_i^{t-1} + v_i^t \end{align*} $$
      • \(v_i\):粒子的速度
      • \(x_i\):粒子的位置
      • \(w\):慣性速度權重
      • \(w_p\):本身最佳解速度權重
      • \(p_i\):本身最佳解位置
      • \(w_g\):全體最佳解速度權重
      • \(g\) :全體最佳解位置
    2. 更新粒子的 fitness
注意:fitness function 會影響整體的表現,需考慮清楚
概念來自於鳥群覓食
一群鳥在一個廣大的區域中,想找到最多食物的地方
於是大家都帶著手機出門去,一個留守在基地
一開始,大家先隨機分散出去找,然後用手機回報基地,更新自己所找到最多食物的地方
基地也會記錄目前找到最多食物的地方
所以每隻鳥都知道,自己曾經找到最多食物的地方,跟全體找到最多食物的地方
利用這兩個資訊,更新目前本身的飛行速度
慢慢地,大家會集合到同樣的位置上去,而此位置便可能是最多食物的地方

演算法程式碼

基本 PSO 與 pyswarm
import numpy as np
from pyswarm import pso

class PSO:
    pBest = []
    pBestFitness = []
    gBest = None
    gBestFitness = np.inf
    
    # 初始化
    def __init__(self, fitness, bounds, swarmSize=100, w=0.5, wp=0.5, wg=0.5):
        '''
        v = w*v + wp*(pBest-x) + wg*(gBest-x)
        '''
        self.swarmSize = swarmSize
        self.fitness = fitness
        self.pNum = len(bounds)
        self.bounds = bounds
        self.w = w
        self.wp = wp
        self.wg = wg
        
        # 初始化粒子和速度
        self.particles = np.zeros((self.swarmSize, self.pNum))
        
        self.v = np.zeros((self.swarmSize, self.pNum))
        for i, b in enumerate(self.bounds):
            self.particles[:,i] = np.random.uniform(b[0], b[1], self.swarmSize)
            self.v[:,i] = np.random.uniform(-b[1]+b[0], b[1]-b[0], self.swarmSize)

        # 初始化 fitness
        self.pBest = np.zeros((self.swarmSize, self.pNum))
        self.pBestFitness = np.ones(self.swarmSize) * np.inf
        self.updateFitness()
        
    
    # 更新 fitness
    def updateFitness(self):
        for i, p in enumerate(self.particles):
            fit = self.fitness(p)
            if fit < self.pBestFitness[i]: 
                self.pBest[i] = p
                self.pBestFitness[i] = fit

                if fit < self.gBestFitness:
                    self.gBest = p
                    self.gBestFitness = fit
        
        
    def run(self, threshold=0.01, updateThreshold=1e-4, maxiter=20):
        n = 0
        while self.gBestFitness > threshold and n < maxiter:
            # 更新粒子速度
            rp = np.random.rand()
            rg = np.random.rand()
            self.v = self.w*self.v + self.wp*rp*(self.pBest-self.particles) + self.wg*rg*(self.gBest-self.particles)
            
            # 更新粒子
            self.particles = self.particles + self.v
            for i, b in enumerate(self.bounds):
                self.particles[:,i] = np.clip(self.particles[:,i], b[0], b[1])
            
            # 計算 fitness
            old = self.gBestFitness
            self.updateFitness()
            
            # 若更新小於 0.001,持續 maxiter 次,則中止
            if old - self.gBestFitness < 0.001:
                n += 1
            else:
                n = 0
            if n > maxiter:
                break
            
        return self.gBest, self.gBestFitness
        
if __name__ == "__main__":
    def fitness(x):
        return (x[0]-5)**2 + (x[1]-10)**2 + (x[2]+10)**2

    xopt, fopt = PSO(fitness, [(-1e10,1e10), (-1e10,1e10), (-1e10,1e10)]).run(threshold=1e-6)
    print("自製:{} {}".format(fopt, xopt))
    
    xopt, fopt = pso(func=fitness, lb=(-1e10,-1e10,-1e10), ub=(1e10,1e10,1e10))
    print("套件:{} {}".format(fopt, xopt))
PSO + 定期隨機
import numpy as np

class PSO:
    pBest = []
    pBestFitness = []
    gBest = None
    gBestFitness = np.inf
    
    
    # 初始化
    def __init__(self, fitness, bounds, swarmSize=100, w=0.5, wp=0.5, wg=0.5):
        '''
        v = w*v + wp*(pBest-x) + wg*(gBest-x)
        '''
        self.swarmSize = swarmSize
        self.fitness = fitness
        self.pNum = len(bounds)
        self.bounds = bounds
        self.w = w
        self.wp = wp
        self.wg = wg
        
        # 初始化粒子和速度
        self.init_particles()

        # 初始化 fitness
        self.pBest = np.zeros((self.swarmSize, self.pNum))
        self.pBestFitness = np.ones(self.swarmSize) * np.inf
        self.updateFitness()
    
    
    # 初始化粒子
    def init_particles(self):
        self.particles = np.zeros((self.swarmSize, self.pNum))        
        self.v = np.zeros((self.swarmSize, self.pNum))
        
        for i, b in enumerate(self.bounds):
            self.particles[:,i] = np.random.uniform(b[0], b[1], self.swarmSize)
            self.v[:,i] = np.random.uniform(-b[1]+b[0], b[1]-b[0], self.swarmSize)
            
            
    # 增加粒子
    def produce_particles(self, addSize):
        self.swarmSize += addSize
        ps = np.zeros((addSize, self.pNum))        
        vs = np.zeros((addSize, self.pNum))
        pbests = np.zeros((addSize, self.pNum))
        pBestFitness = np.ones(addSize) * np.inf
        
        for i, b in enumerate(self.bounds):
            ps[:,i] = np.random.uniform(b[0], b[1], addSize)
            vs[:,i] = np.random.uniform(-b[1]+b[0], b[1]-b[0], addSize)
            
        self.particles = np.concatenate((ps, self.particles))
        self.v = np.concatenate((vs, self.v))
        self.pBest = np.concatenate((pbests, self.pBest))
        self.pBestFitness = np.concatenate((pBestFitness, self.pBestFitness))
        
        
    # 取代粒子
    def replace_particles(self, replaceSize):
        # 重新排序
        index = np.argsort(self.pBestFitness)[::-1]
        self.particles = self.particles[index]
        self.v = self.v[index]
        self.pBest = self.pBest[index]
        self.pBestFitness = self.pBestFitness[index]
        
        ps = np.zeros((replaceSize, self.pNum))        
        vs = np.zeros((replaceSize, self.pNum))
        pbests = np.zeros((replaceSize, self.pNum))
        pBestFitness = np.ones(replaceSize) * np.inf
        
        for i, b in enumerate(self.bounds):
            ps[:,i] = np.random.uniform(b[0], b[1], replaceSize)
            vs[:,i] = np.random.uniform(-b[1]+b[0], b[1]-b[0], replaceSize)

        self.particles[:replaceSize] = ps
        self.v[:replaceSize] = vs
        self.pBest[:replaceSize] = pbests
        self.pBestFitness[:replaceSize] = pBestFitness
    
    
    # 更新 fitness
    def updateFitness(self):
        for i, p in enumerate(self.particles):
            fit = self.fitness(p)
            if fit < self.pBestFitness[i]: 
                self.pBest[i] = p
                self.pBestFitness[i] = fit

                if fit < self.gBestFitness:
                    self.gBest = p
                    self.gBestFitness = fit
        
        
    def run(self, threshold=0.01, maxiter=100, randIter=20):
        n = 0
        try:
            while self.gBestFitness > threshold and n < maxiter:
                # 更新粒子速度
                rp = np.random.rand()
                rg = np.random.rand()
                self.v = self.w*self.v + self.wp*rp*(self.pBest-self.particles) + self.wg*rg*(self.gBest-self.particles)
                
                # 更新粒子
                self.particles = self.particles + self.v
                for i, b in enumerate(self.bounds):
                    self.particles[:,i] = np.clip(self.particles[:,i], b[0], b[1])
                
                # 計算 fitness
                old = self.gBestFitness
                self.updateFitness()
                # print(self.gBestFitness)
                
                if old - self.gBestFitness < threshold/10:
                    n += 1
                    if n % randIter == 0:
                        self.replace_particles(self.swarmSize//3*2)
                        # print('replace, now Size: {}'.format(self.swarmSize))
                        self.produce_particles(self.swarmSize//10)
                        # print('produce, now Size: {}'.format(self.swarmSize))                        
                else:
                    n = 0
                if n > maxiter:
                    break                
                print(n)
                
        except KeyboardInterrupt:
            print("Interrupt by user")
            
        return self.gBest, self.gBestFitness
        
        
if __name__ == "__main__":
    def fitness(x):
        return (x[0]-5)**2 + (x[1]-10)**2 + (x[2]+10)**2

    pso = PSO(fitness, [(-1e10,1e10), (-1e10,1e10), (-1e10,1e10)])
    print(pso.run(threshold=1e-6))

參考

[pso] 初步 - 粒子移動演算法精髓
Particle swarm optimization
PSO 粒子群演算法

留言