1. 程式人生 > >集體智慧程式設計——優化搜尋演算法:爬山法,模擬退火演算法,遺傳演算法-Python實現

集體智慧程式設計——優化搜尋演算法:爬山法,模擬退火演算法,遺傳演算法-Python實現

在優化問題中,有兩個關鍵點

  • 代價函式:確定問題的形式和規模之後,根據不同的問題,選擇要優化的目標。如本文涉及的兩個問題中,一個優化目標是使得航班選擇最優,共計12個航班,要使得總的票價最少且每個人的等待時間之和最小。第二個問題是學生選擇宿舍的問題,每個學生可以實現填報志願,如果安排的宿舍與志願完全一致,則代價為0,與第二志願一致,代價為1,如果沒有和志願一致,代價為3。 故,抽象問題的能力很重要,如何將自己要優化的目標量化的表達出來,是解決優化函式的關鍵。如在普通的數值優化問題中,可選擇當前值與目標值距離的絕對之差,或者使用平方損失函式,均可。
  • 值域:值域是確定搜尋的範圍。一個可行解的範圍是多少,比如學生選宿舍的問題,一共學校有100間宿舍,編號為1-100,那麼這個數值優化問題的每個可行解的範圍均在100之內。不同的問題對應不同的可行解,也要具體問題具體分析。

    1. 隨機搜尋演算法
      隨機搜尋演算法是最簡單的優化搜尋演算法,它只適用於代價函式在值域範圍內沒有任何變化規律的情況。即找不到任何使得代價下降的梯度和極小值點。我們只需要在值域範圍內生成足夠多的可行解,然後分別計算每個可行解的代價,根據代價選擇一個最小的可行解作為隨機搜尋的最優解即可。

    # 搜尋方法1: 隨機搜尋演算法
    # 函式會作1000次隨機猜測,記錄總代價最低的方法. domain為航班號的範圍(0-9),共有5個人,因此共有10項
    def randomoptimize(self, domain):
        best_sol = []
        bestcost = 99999
for i in range(1000): sol = [0] * len(domain) for j in range(len(domain) / 2): # 人數 sol[2 * j] = random.randint(domain[2 * j][0], domain[2 * j][1]) sol[2 * j + 1] = random.randint(domain[2 * j + 1][0], domain[2 * j + 1][1]) print
sol[:] newcost = self.schedulecost(sol) if newcost < bestcost: bestcost = newcost best_sol = sol else: continue self.printschedule(best_sol) print "隨機搜尋演算法的結果的最小代價是:", bestcost return best_sol
  1. 爬山法
    爬山法假設當前解和周圍的解是有變化規律的,如,當前解得下方有一個代價較小的解,則我們就認為,沿著這個方向走,解會越來越小。步驟為:首先選擇一個解作為種子解,每次尋找與這個解相近的解,如果相近的解中有代價更小的解,則把這個解作為種子解。而如果周圍的解都比該解的代價大,則表示已經到達了局部極小值點,搜尋停止。

    # 搜尋演算法2:爬山法
    # 首先隨機選擇一個解作為種子解,每次尋找這個種子相近的解,如果相近的解有代價更小的解,則把這個新的解作為種子
    # 依次迴圈進行,當迴圈到某種子附近的解都比該種子的代價大時,說明到達了局部極小值點,搜尋結束
    def hillclimb(self, domain):
        # 隨機產生一個航班序列作為初始種子
        seed = [random.randint(domain[i][0], domain[i][1]) for i in range(len(domain))]
        while 1:
            neighbor = []
            # 迴圈改變解的每一個值產生一個臨近解的列表
            for i in range(len(domain)):
                # 下列判斷是為了將某一位加減1後不超出domain的範圍
                # print seed
                if seed[i] > domain[i][0]:
                    newneighbor = seed[0:i] + [seed[i] - 1] + seed[i + 1:]
                    # print newneighbor[:]
                    neighbor.append(newneighbor)
                if seed[i] < domain[i][1]:
                    newneighbor = seed[0:i] + [seed[i] + 1] + seed[i + 1:]
                    # print newneighbor[:]
                    neighbor.append(newneighbor)

            # 對所有的臨近解計算代價,排序,得到代價最小的解
            neighbor_cost = sorted(
                [(s, self.schedulecost(s)) for s in neighbor], key=lambda x: x[1])

            # 如果新的最小代價 > 原種子代價,則跳出迴圈
            if neighbor_cost[0][1] > self.schedulecost(seed):
                break

            # 新的代價更小的臨近解作為新的種子
            seed = neighbor_cost[0][0]
            print "newseed = ", seed[:], " 代價:", self.schedulecost(seed)
        # 輸出
        self.printschedule(seed)
        print "爬山法得到的解的最小代價是", self.schedulecost(seed)
        return seed
  1. 模擬退火演算法
    從一個問題的原始解開始,用一個變數代表溫度,這一溫度開始時非常高,而後逐步減低。在每一次迭代期間,演算法會隨機選中題解中的某個數字,使其發生細微變化,而後計算該解的代價。關鍵的地方在於計算出該解的代價後,如果決定是否接受該解。
    如果新的成本更低,則新的題解就會變成當前題解,這與爬山法類似;如果新的成本更高,則新的題解與概率 P 被接受。這一概率會隨著溫度T的降低而降低。即演算法開始時,可以接受表現較差的解,隨著退火過程中溫度的不斷下降,演算法越來越不可以接受較差的解,知道最後,它只會接受更優的解。
    其中P = exp[-(newcost - oldcost)/ T ]
    其中newcost是新解的成本,oldcost是當前成本,T為當前溫度。演算法以概率P接受新的解。

    # 搜尋演算法4:模擬退火演算法
    # 引數:T代表原始溫度,cool代表冷卻率,step代表每次選擇臨近解的變化範圍
    # 原理:退火演算法以一個問題的隨機解開始,用一個變量表示溫度,這一溫度開始時非常高,而後逐步降低
    #      在每一次迭代期間,算啊會隨機選中題解中的某個數字,然後朝某個方向變化。如果新的成本值更
    #      低,則新的題解將會變成當前題解,這與爬山法類似。不過,如果成本值更高的話,則新的題解仍
    #      有可能成為當前題解,這是避免區域性極小值問題的一種嘗試。
    # 注意:演算法總會接受一個更優的解,而且在退火的開始階段會接受較差的解,隨著退火的不斷進行,演算法
    #      原來越不能接受較差的解,直到最後,它只能接受更優的解。
    # 演算法接受較差解的概率 P = exp[-(highcost-lowcost)/temperature]
    def annealingoptimize(self, domain, T=10000.0, cool=0.98, step=1):
        # 隨機初始化值
        vec = [random.randint(domain[i][0], domain[i][1]) for i in range(len(domain))]

        # 迴圈
        while T > 0.1:
            # 選擇一個索引值
            i = random.randint(0, len(domain) - 1)
            # 選擇一個改變索引值的方向
            c = random.randint(-step, step)  # -1 or 0 or 1
            # 構造新的解
            vecb = vec[:]
            vecb[i] += c
            if vecb[i] < domain[i][0]:  # 判斷越界情況
                vecb[i] = domain[i][0]
            if vecb[i] > domain[i][1]:
                vecb[i] = domain[i][1]

            # 計算當前成本和新的成本
            cost1 = self.schedulecost(vec)
            cost2 = self.schedulecost(vecb)

            # 判斷新的解是否優於原始解 或者 演算法將以一定概率接受較差的解
            if cost2 < cost1 or random.random() < math.exp(-(cost2 - cost1) / T):
                vec = vecb

            T = T * cool  # 溫度冷卻
            print vecb[:], "代價:", self.schedulecost(vecb)

        self.printschedule(vec)
        print "模擬退火演算法得到的最小代價是:", self.schedulecost(vec)
        return vec
  1. 遺傳演算法
    隨機生成一組解,成為一個種群
    (1)直接遺傳
    將當前種群中代價最小的一部分解,如 40% 進行直接遺傳,傳入下一代種群。
    (2)變異
    從題解中隨機選取一個數字,對其進行微小的,簡單的改變。
    (3)交叉
    選取最優解中的兩個解,將他們按照某種方式進行結合

通過上述三種方法,從上一代種群中構建出了下一代種群。而後,這一過程重複進行,知道達到了指定的迭代次數,或者連續數代都沒有改善種群,則整個過程就結束了。


    # 搜尋演算法5: 遺傳演算法
    # 原理: 首先隨機生成一組解,我們稱之為種群,在優化過程的每一步,演算法會計算整個種群的成本函式,
    #       從而得到一個有關題解的有序列表。隨後根據種群構造進化的下一代種群,方法如下:
    #       遺傳:從當前種群中選出代價最優的一部分加入下一代種群,稱為“精英選拔”
    #       變異:對一個既有解進行微小的、簡單的、隨機的修改
    #       交叉:選取最優解中的兩個解,按照某種方式進行交叉。方法有單點交叉,多點交叉和均勻交叉
    # 一個種群是通過對最優解進行隨機的變異和配對處理構造出來的,它的大小通常與舊的種群相同,爾後,這一過程會
    #       一直重複進行————新的種群經過排序,又一個種群被構造出來,如果達到指定的迭代次數之後題解都沒有得
    #       改善,整個過程就結束了
    # 引數:
    # popsize-種群數量 step-變異改變的大小 mutprob-交叉和變異的比例 elite-直接遺傳的比例 maxiter-最大迭代次數
    def geneticoptimize(self, domain, popsize=50, step=1, mutprob=0.2, elite=0.2, maxiter=100):
        # 變異操作的函式
        def mutate(vec):
            i = random.randint(0, len(domain) - 1)
            if random.random() < 0.5 and vec[i] > domain[i][0]:
                return vec[0:i] + [vec[i] - step] + vec[i + 1:]
            elif vec[i] < domain[i][1]:
                return vec[0:i] + [vec[i] + step] + vec[i + 1:]

        # 交叉操作的函式(單點交叉)
        def crossover(r1, r2):
            i = random.randint(0, len(domain) - 1)
            return r1[0:i] + r2[i:]

        # 構造初始種群
        pop = []
        for i in range(popsize):
            vec = [random.randint(domain[i][0], domain[i][1]) for i in range(len(domain))]
            pop.append(vec)

        # 每一代中有多少勝出者
        topelite = int(elite * popsize)

        # 主迴圈
        for i in range(maxiter):
            scores = [(self.schedulecost(v), v) for v in pop]
            scores.sort()
            ranked = [v for (s, v) in scores]  # 解按照代價由小到大的排序

            # 優質解遺傳到下一代
            pop = ranked[0: topelite]

            # 如果當前種群數量小於既定數量,則新增變異和交叉遺傳
            while len(pop) < popsize:
                # 隨機數小於 mutprob 則變異,否則交叉
                if random.random() < mutprob:  # mutprob控制交叉和變異的比例
                    # 選擇一個個體
                    c = random.randint(0, topelite)
                    # 變異
                    pop.append(mutate(ranked[c]))
                else:
                    # 隨機選擇兩個個體進行交叉
                    c1 = random.randint(0, topelite)
                    c2 = random.randint(0, topelite)
                    pop.append(crossover(ranked[c1], ranked[c2]))
            # 輸出當前種群中代價最小的解
            print scores[0][1], "代價:", scores[0][0]
        self.printschedule(scores[0][1])
        print "遺傳演算法求得的最小代價:", scores[0][0]
        return scores[0][1]

書中給出了關於學生宿舍選擇的一個具體例子,以下是程式碼:

# -*-  coding: utf-8 -*-
__author__ = 'Bai Chenjia'

import random
import math
"""
宿舍分配問題,屬於搜尋優化問題。優化方法使用optimization.py中使用的
隨機搜尋、爬山法、模擬退火法、遺傳演算法等. 但題解的描述比之前的問題複雜
"""


class dorm:
    def __init__(self):
        # 代表宿舍,每個宿舍有兩個隔間可用
        self.dorms = ['Zeus', 'Athena', 'Hercules', 'Bacchus', 'Pluto']

        # 代表選擇,第一列代表人名,第二列和第三列代表該學生的首選和次選
        self.prefs = [('Toby', ('Bacchus', 'Hercules')),
                      ('Steve', ('Zeus', 'Pluto')),
                      ('Karen', ('Athena', 'Zeus')),
                      ('Sarah', ('Zeus', 'Pluto')),
                      ('Dave', ('Athena', 'Bacchus')),
                      ('Jeff', ('Hercules', 'Pluto')),
                      ('Fred', ('Pluto', 'Athena')),
                      ('Suzie', ('Bacchus', 'Hercules')),
                      ('Laura', ('Bacchus', 'Hercules')),
                      ('James', ('Hercules', 'Athena'))]

        # 題解的表示法:
        # 設想每個宿舍有兩個槽,本例中共有5個宿舍,則共有10個槽,我們將每名學生依序安置於各空槽
        # 內————則第一名學生有10種選擇,解的取值範圍為0-9;第二名學生有9種選擇,解的取值範圍為
        # 0-8,第三名學生解的取值範圍是0-7,以此類推,最後一名學生只有一個可選。
        # 按照上述題解的表示法初始化題解範圍:
        self.domain = [(0, 2*len(self.dorms)-1-i) for i in range(len(self.prefs))]

    # 根據題解序列vec打印出最終宿舍分配方案
    # 注意,輸出一個槽後表明該槽已經用過,需將該槽刪除
    def printsolution(self, vec):
        slots = []
        # slots = [0,0,1,1,2,2,3,3,4,4]
        for i in range(len(self.dorms)):
            slots.append(i)
            slots.append(i)
        # 迴圈題解
        print "選擇方案是:"
        for i in range(len(vec)):
            index = slots[vec[i]]
            print self.prefs[i][0], self.dorms[index]
            del slots[vec[i]]

    # 代價函式: 如果學生當前安置的宿舍使其首選則代價為0,是其次選則代價為1,否則代價為3
    # 注意,輸出一個槽後表明該槽已經用過,需將該槽刪除
    def dormcost(self, vec):
        cost = 0
        # 建立槽
        slots = []
        for i in range(len(self.dorms)):
            slots.append(i)
            slots.append(i)
        # 迴圈題解
        for i in range(len(vec)):
            index = slots[vec[i]]
            if self.dorms[index] == self.prefs[i][1][0]:
                cost += 0
            elif self.dorms[index] == self.prefs[i][1][1]:
                cost += 1
            else:
                cost += 3
            del slots[vec[i]]
        return cost

    """
    下列函式與 optimization 中函式相同,只不過代價函式和輸出函式用本問題的輸出函式
    """
    # 搜尋方法1: 隨機搜尋演算法
    # 函式會作1000次隨機猜測,記錄總代價最低的方法. domain為航班號的範圍(0-9),共有5個人,因此共有10項
    def randomoptimize(self, domain):
        best_sol = []
        bestcost = 99999
        for i in range(1000):
            sol = [0] * len(domain)
            for j in range(len(domain) / 2):  # 人數
                sol[2 * j] = random.randint(domain[2 * j][0], domain[2 * j][1])
                sol[2 * j + 1] = random.randint(domain[2 * j + 1][0], domain[2 * j + 1][1])

            print sol[:]
            newcost = self.dormcost(sol)
            if newcost < bestcost:
                bestcost = newcost
                best_sol = sol
            else:
                continue
        self.printsolution(best_sol)
        print "隨機搜尋演算法的結果的最小代價是:", bestcost
        return best_sol

    # 搜尋演算法2:爬山法
    # 首先隨機選擇一個解作為種子解,每次尋找這個種子相近的解,如果相近的解有代價更小的解,則把這個新的解作為種子
    # 依次迴圈進行,當迴圈到某種子附近的解都比該種子的代價大時,說明到達了局部極小值點,搜尋結束
    def hillclimb(self, domain):
        # 隨機產生一個航班序列作為初始種子
        seed = [random.randint(domain[i][0], domain[i][1]) for i in range(len(domain))]
        while 1:
            neighbor = []
            # 迴圈改變解的每一個值產生一個臨近解的列表
            for i in range(len(domain)):
                # 下列判斷是為了將某一位加減1後不超出domain的範圍
                # print seed
                if seed[i] > domain[i][0]:
                    newneighbor = seed[0:i] + [seed[i] - 1] + seed[i + 1:]
                    # print newneighbor[:]
                    neighbor.append(newneighbor)
                if seed[i] < domain[i][1]:
                    newneighbor = seed[0:i] + [seed[i] + 1] + seed[i + 1:]
                    # print newneighbor[:]
                    neighbor.append(newneighbor)

            # 對所有的臨近解計算代價,排序,得到代價最小的解
            neighbor_cost = sorted(
                [(s, self.dormcost(s)) for s in neighbor], key=lambda x: x[1])

            # 如果新的最小代價 > 原種子代價,則跳出迴圈
            if neighbor_cost[0][1] > self.dormcost(seed):
                break

            # 新的代價更小的臨近解作為新的種子
            seed = neighbor_cost[0][0]
            print "newseed = ", seed[:], " 代價:", self.dormcost(seed)
        # 輸出
        self.printsolution(seed)
        print "爬山法得到的解的最小代價是", self.dormcost(seed)
        return seed

    # 搜尋演算法4:模擬退火演算法
    # 引數:T代表原始溫度,cool代表冷卻率,step代表每次選擇臨近解的變化範圍
    # 原理:退火演算法以一個問題的隨機解開始,用一個變量表示溫度,這一溫度開始時非常高,而後逐步降低
    #      在每一次迭代期間,算啊會隨機選中題解中的某個數字,然後朝某個方向變化。如果新的成本值更
    #      低,則新的題解將會變成當前題解,這與爬山法類似。不過,如果成本值更高的話,則新的題解仍
    #      有可能成為當前題解,這是避免區域性極小值問題的一種嘗試。
    # 注意:演算法總會接受一個更優的解,而且在退火的開始階段會接受較差的解,隨著退火的不斷進行,演算法
    #      原來越不能接受較差的解,直到最後,它只能接受更優的解。
    # 演算法接受較差解的概率 P = exp[-(highcost-lowcost)/temperature]
    def annealingoptimize(self, domain, T=10000.0, cool=0.98, step=1):
        # 隨機初始化值
        vec = [random.randint(domain[i][0], domain[i][1]) for i in range(len(domain))]

        # 迴圈
        while T > 0.1:
            # 選擇一個索引值
            i = random.randint(0, len(domain) - 1)
            # 選擇一個改變索引值的方向
            c = random.randint(-step, step)  # -1 or 0 or 1
            # 構造新的解
            vecb = vec[:]
            vecb[i] += c
            if vecb[i] < domain[i][0]:  # 判斷越界情況
                vecb[i] = domain[i][0]
            if vecb[i] > domain[i][1]:
                vecb[i] = domain[i][1]

            # 計算當前成本和新的成本
            cost1 = self.dormcost(vec)
            cost2 = self.dormcost(vecb)

            # 判斷新的解是否優於原始解 或者 演算法將以一定概率接受較差的解
            if cost2 < cost1 or random.random() < math.exp(-(cost2 - cost1) / T):
                vec = vecb

            T = T * cool  # 溫度冷卻
            print vecb[:], "代價:", self.dormcost(vecb)

        self.printsolution(vec)
        print "模擬退火演算法得到的最小代價是:", self.dormcost(vec)
        return vec

    # 搜尋演算法5: 遺傳演算法
    # 原理: 首先隨機生成一組解,我們稱之為種群,在優化過程的每一步,演算法會計算整個種群的成本函式,
    #       從而得到一個有關題解的有序列表。隨後根據種群構造進化的下一代種群,方法如下:
    #       遺傳:從當前種群中選出代價最優的一部分加入下一代種群,稱為“精英選拔”
    #       變異:對一個既有解進行微小的、簡單的、隨機的修改
    #       交叉:選取最優解中的兩個解,按照某種方式進行交叉。方法有單點交叉,多點交叉和均勻交叉
    # 一個種群是通過對最優解進行隨機的變異和配對處理構造出來的,它的大小通常與舊的種群相同,爾後,這一過程會
    #       一直重複進行————新的種群經過排序,又一個種群被構造出來,如果達到指定的迭代次數之後題解都沒有得
    #       改善,整個過程就結束了
    # 引數:
    # popsize-種群數量 step-變異改變的大小 mutprob-交叉和變異的比例 elite-直接遺傳的比例 maxiter-最大迭代次數
    def geneticoptimize(self, domain, popsize=50, step=1, mutprob=0.2, elite=0.2, maxiter=100):
        # 變異操作的函式
        def mutate(vec):
            i = random.randint(0, len(domain) - 1)
            res = []
            if random.random() < 0.5 and vec[i] > domain[i][0]:
                res = vec[0:i] + [vec[i] - step] + vec[i + 1:]
            elif vec[i] < domain[i][1]:
                res = vec[0:i] + [vec[i] + step] + vec[i + 1:]
            else:
                res = vec
            return res

        # 交叉操作的函式(單點交叉)
        def crossover(r1, r2):
            i = random.randint(0, len(domain) - 1)
            return r1[0:i] + r2[i:]

        # 構造初始種群
        pop = []
        for i in range(popsize):
            vec = [random.randint(domain[i][0], domain[i][1]) for i in range(len(domain))]
            pop.append(vec)

        # 每一代中有多少勝出者
        topelite = int(elite * popsize)

        # 主迴圈
        for i in range(maxiter):
            if [] in pop:
                print "***"
            try:
                scores = [(self.dormcost(v), v) for v in pop]
            except:
                print "pop!!", pop[:]
            scores.sort()
            ranked = [v for (s, v) in scores]  # 解按照代價由小到大的排序

            # 優質解遺傳到下一代
            pop = ranked[0: topelite]
            # 如果當前種群數量小於既定數量,則新增變異和交叉遺傳
            while len(pop) < popsize:
                # 隨機數小於 mutprob 則變異,否則交叉
                if random.random() < mutprob:  # mutprob控制交叉和變異的比例
                    # 選擇一個個體
                    c = random.randint(0, topelite)
                    # 變異
                    if len(ranked[c]) == len(self.prefs):
                        temp = mutate(ranked[c])
                        if temp == []:
                            print "******", ranked[c]
                        else:
                            pop.append(temp)

                else:
                    # 隨機選擇兩個個體進行交叉
                    c1 = random.randint(0, topelite)
                    c2 = random.randint(0, topelite)
                    pop.append(crossover(ranked[c1], ranked[c2]))
            # 輸出當前種群中代價最小的解
            print scores[0][1], "代價:", scores[0][0]
        self.printsolution(scores[0][1])
        print "遺傳演算法求得的最小代價:", scores[0][0]
        return scores[0][1]

if __name__ == '__main__':
    dormsol = dorm()
    #sol = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    #dormsol.printsolution(sol)
    #dormsol.dormcost(sol)

    domain = dormsol.domain

    # 方法1:隨機猜測
    #dormsol.randomoptimize(domain)

    # 方法2:爬山法
    #dormsol.hillclimb(domain)

    # 方法3:模擬退火法
    #dormsol.annealingoptimize(domain)

    # 方法4:遺傳演算法
    dormsol.geneticoptimize(domain)