長大後,鄉愁是一棟小小的房子

借宿2018-01-22 03:21:36

前言:本文來自PaddlePaddle開發者説成員「子空」的投稿,寫的很棒!!有原理有代碼,推薦大家閲讀並實戰!

            


關鍵詞:強化學習 Paddle PARL



百度AIStudio源碼:  https://aistudio.baidu.com/aistudio/projectdetail/63441

強化學習PARL倉庫:https://github.com/PaddlePaddle/PARL

本文提及的源碼: https://github.com/kosoraYintai/PARL-Sample/tree/master/eight_puzzle


引言

八數碼(EightPuzzle)是一道經典的ACM題,同時也是首屆百度之星(A*Star)程序設計大賽的決賽題。這道題除了使用傳統的數據結構與算法解決之外,還可以藉助於深度強化學習框架PARL進行求解。本文將使用Rainbow模型訓練一套深度神經網絡,自動生成代碼並刷到OJ排行榜的Top1!
最終效果如下圖所示:

我們從以下步驟依次着手,帶領大家AC並且打敗所有的Submissions:

  • 可行性分析——為什麼可以使用強化學習

  • 問題建模——將原問題轉化為對應的MDP環境

  • DQN的三大改進:DoubleDQN、DuelingDQN、PrioritizedReplayBuffer

  • Rainbow模型以及代碼實現

  • 對比普通DQN以及經典解法


可行性分析

翻開POJ-1077,可以看到一道題,題目大意是這樣的。給定初始狀態:

以及目標狀態:

每次只能移動方格0,有上(U)下(D)左(L)右(R)四個方向可以選擇,求由初始狀態到達目標狀態的最小步數,並打印最優路徑。比如對於測試數據,最小步數為3,最優路徑為RDR;具體移動步驟如下:

經過初步分析,題目給定了初始狀態以及目標狀態,相當於有了reset以及terminal;其次,題幹中出現了最小、最優等字眼,也就是説,我們需要給出每一步的具體移動方案,這個方案能使得整體的移動步數最少、路徑最優,這顯然是一個序列決策問題。所以斷定:八數碼問題可以使用強化學習算法解決。

不妨畫出八數碼問題的解空間樹,節點表示狀態State,藍色表示初始狀態,紅色表示目標狀態,分支上的字母表示動作Action:

它顯然是一個遞歸的結構,如果在狀態State上附加值函數V,在動作Action上附加行為值函數Q,那麼就可以轉化為V-Q樹,採取任意一種強化學習算法,都可以使得每個節點的V、每個分支的Q收斂至最優值,再根據貝爾曼最優方程,就可以推導出整個解空間樹的最優解。

另外,由於只能走上下左右四個方向,也就是説每個狀態最多可以轉化為其他四個狀態,而狀態s與狀態s'之間的相互轉化其實是完全對稱的(比如s執行L動作可以轉化為s',那麼s'執行R動作必然可以轉化為s),所以八數碼問題其實建立了一張隱式的無向圖;這張無向圖是一個稀疏圖,每個頂點的度都小於等於4,並且各個頂點都是連通的,我們的目標是求初始狀態init_state到最終狀態terminal_state的一條最短路徑。


問題建模

今天我們所採用的環境比原始的八數碼要稍微簡潔一些,這道題源自LeetCode的一場周賽weekly-contest-69,輸入輸出和POJ差不多,只是由八數碼變為了六數碼,維度變小了而已,題目大意如下。

給定初始狀態:

以及目標狀態:

求由初始狀態變換到目標狀態的最小步數,若無解,返回-1;對於測試數據來説,最小步數為5,具體移動步驟如下: 

這樣做的原因是:

  • POJ暫時不支持python,而LeetCode支持Python

  • POJ不提供WA和TLE的數據,而LeetCode可以查看哪些用例沒有通過,便於我們調試程序

  • POJ不提供單個題目的運行時間排名,而LeetCode提供所有AC代碼的運行時間分佈圖,便於我們評測算法的速度

  • 在2X3這個benchmark上更容易解釋,同理可以推廣到3X3的8數碼、4X3的12數碼等更復雜的問題。

在構建環境之前,我們注意到,無論是POJ還是LeetCode,都提到了可能無解的情況,即:無論怎樣移動,都無法到達最終狀態。那麼什麼樣的情況有解,什麼樣的情況無解呢?

我們可以使用逆序對數進行判定,即:首先去除0,再將原始矩陣展開為一維數組,然後判斷初始狀態和最終狀態逆序對數的奇偶性是否一致,如果一致就有解,否則無解。注意到,終止狀態的逆序對數為0,恆為偶數,所以只需判斷初始狀態的是否為偶排列即可。

比如[[1,2,3],[5,4,0]],展開成一維數組[1,2,3,5,4,0],去掉0變成[1,2,3,5,4],其逆序對數為1,是奇排列,所以無解,返回-1

比如[[4,1,2],[5,0,3]],展開成一維數組[4,1,2,5,0,3],去掉0變成[4,1,2,5,3],其逆序對數為4,是偶排列,所以有解,可以進行計算。

關於逆序對數為什麼可以判斷解的存在性,有興趣的同學可以解決下LintCode 532,本文提供參考答案,詳見ReversPairs.py。

 

另外,有個小Trick,以上定理僅僅適用於列數為奇數的情況,對於列數為偶數的情況,比如3X2、4X4,每移動一步,排列的奇偶性都會發生改變,這種情況並不是本文的重點。

下面分析一下Environment的核心代碼。

EightPuzzleEnv.py:八數碼問題的環境,繼承自gym.Env,實現了reset、step、reward、render等標準接口。

reset函數:調用initBoard函數初始化一個隨機棋盤,並調用findZeroPos找到0所在的位置。

def reset(self):   self.state=initBoard(self.m,self.n,self)  self.pos=findZeroPos(self.state,self.m,self.n)   return self.state


initBoard函數:隨機產生一個偶排列,並且這個排列不能和終止狀態相同;為了增加初始狀態的隨機性,這裏同時藉助於sklearn和numpy兩個庫。

def initBoard(m,n,env):    '''    初始化一個隨機棋盤    '''    MAXITER=65535    for _ in range(MAXITER):        perm=np.arange(m*n)        p=np.random.random()        #隨機選擇sklearn或者numpy生成一個排列        if p0.5:            perm,_= train_test_split(perm,test_size=0)        else:            np.random.shuffle(perm)        #檢測逆序對的奇偶性        flag=checkReversePair(perm.copy())        if flag and not np.array_equal(perm, env.target):            return arrayToMatrix(perm, m, n)        else:            continue


step函數:接收參數a,執行上下左右四個動作中的一個,並返回四元組(next_state,reward,terminal,info);若執行了動作之後如果越界了,則給一個較大懲罰並停讓棋子留在原有位置,否則,移動方格,並獲得獎勵:

def step(self, a):   nowRow=self.pos[0]   nowCol=self.pos[1]   nextRow=nowRow+dRow[a]   nextCol=nowCol+dCol[a]   nextState=self.state.copy()   #檢查越界   if not checkBounds(nextRow, nextCol,self.m,self.n) :       return self.state, -2.0, False, {'info':-1,'MSG':'OutOfBounds!'}   #移動方格   swap(nextState, nowRow, nowCol, nextRow, nextCol)   self.pos=np.array([nextRow,nextCol])   #獲得獎勵   re=self.reward(self.state,nextState)   self.state=nextState   if re==EightPuzzleEnv.SuccessReward:       return self.state, re, True, {'info':2,'MSG':'Finish!'}   return self.state, re, False, {'info':1,'MSG':'NormalMove!'}


render函數:打印當前棋盤,比較簡單。

重點分析一下reward函數,也就是獎勵怎麼設置,因為合理的獎勵對於強化學習模型的準確度至關重要。首先,在step函數中,我們已經給予數組越界這種情況一個較大懲罰;其次,到達終點狀態顯然要給予最大獎勵:

#成功回報SuccessReward=10.0if self.isFinish(nextState):    #到達終點,給予最大獎勵    return EightPuzzleEnv.SuccessReward


還剩下普通的情況,就是移動方格,我們分析一下以下兩種狀態:

[[4,1,2],[5,0,3]],與終點狀態[[1,2,3],[4,5,0]] 相比,有6個棋子不一樣,其最小移動步數為5

[[1,2,3],[4,0,5]],與終點狀態[[1,2,3],[4,5,0]] 相比,有1個棋子不一樣,其最小移動步數為1

所以,用當前狀態與終點狀態有多少個棋子位置不同來評估當前狀態似乎有些道理,不妨把這個標準叫做A_star_dist(A*距離):

def A_star_dist(target,now):    length=len(now)    cnt=0;    for i in range(0,length):        if target[i]!=now[i]:            cnt+=1;    return cnt


有了這個距離,獎勵函數的設置就可以更加合理了。這裏我設定了3種不同類型的懲罰,代碼實現如下:

#對移動前的棋盤、移動後的棋盤分別進行估價lastDist=A_star_dist(self.target, nowState.ravel())nowDist=A_star_dist(self.target, nextState.ravel())#距離減小,給予較小懲罰if nowDist   return -0.1#距離不變,給予中等懲罰elif nowDist==lastDist:   return -0.2#距離增大,給予較大懲罰else:   return -0.5


那麼,是不是每次只要選擇使得AStar距離變小的方向進行移動,就一定能得到最優解呢?其實不是的,因為AStar距離的思想和即時獎勵一致,它只是對於當前局勢的一種估計,做出在當前看來最優的選擇,並不一定能使長期回報最大。下文講到Dueling-DQN的時候,就會舉例説明這個現象。

環境差不多構建完了,不妨再構建一個簡單的智能體,這個智能體非常的佛系,它完全採取隨機的動作。雖然顯得很young很naive,但它的作用並不小,用它作為基線,我們不僅可以測試環境的正確性,還可以對數據的分佈情況進行統計。

def buddaAgent(env):    return np.random.randint(0,env.ActionDim)


讓智能體跑多輪episode,統計到達終點的步數,查看數據分佈情況:

if __name__ == '__main__':    env=EightPuzzleEnv(2,3)    #統計到達終點的步數,猜測數據分佈    lst=[]    MAX_ITER=2048    for i in range(MAX_ITER):        env.reset()        cnt=0        while True:            a=buddaAgent(env)            nextS,r,terminal,info=env.step(a)            cnt+=1;            if terminal:                break        if i%50==0:            print(i,'stepCnt:',cnt)        lst.append(cnt)    print()    print('min:',np.min(lst))    print('mean:',np.mean(lst))    print('median:',np.median(lst))    print('max:',np.max(lst))    print('std:',np.std(lst))    print('mean divide std:',np.mean(lst)/np.std(lst))


觀察一下運行結果:

發現均值和標準差非常的接近,而且我們希望用一條連續的曲線來擬合這個分佈,正好指數分佈也服從均值和標準差相等的性質,不妨建模為指數分佈:

def expDist(x, mu):    '''    指數分佈    均值和標準差均是:1/λ    '''    landa=1/mu    return landa*np.exp(-landa*x)

使用pyplot可以畫出如下曲線,基本上擬合了數據的分佈圖,效果還是很不錯的:

這樣做的目的是什麼呢?其實是為了確定_max_episode_steps(一輪episode最多執行多少次step)這個閾值;通過觀察可以發現,絕大部分數據都分佈在均值以下,而且中位數median在2000以下,所以我們可以取個吉利的數字2048作為閾值,這樣的話,步數超過這個閾值的episode就可以捨棄,這輪episode中產生的所有數據都可以認為是噪聲。

self._max_episode_steps=2048

如果沒有這個超參數會發生什麼情況呢?試想一下,假設經驗池的大小設置為10萬,而剛好連續產生了5個步數為2萬的episode,那麼經驗池很快就填滿了,這些數據顯然不可能是最優解,甚至大部分數據都是離羣點或者噪聲,把它們餵給模型的話是沒有任何意義的,很有可能使模型不收斂;事實上我們更需要採集均值以下、中位數左右的數據作為正常樣本,所以有這個限制是很必要的。

看一下TestCase.py,這個程序定義了10個標準的測試用例,將棋盤的初始狀態作為特徵X,最小步數作為標籤Y_True(假設可以經過先驗知識得到):

def testCase():    '''    創建一些測試數據    '''    #棋盤的初始狀態作為特徵    x=[]    x.append(np.array([[1,2,3],[4,0,5]]))    x.append(np.array([[4,1,2],[5,0,3]]))    x.append(np.array([[1,3,2],[5,4,0]]))    ....    #最小步數作為標籤    label=[1,5,16,...]    return x,label

有了測試數據之後,就測試佛系agent的能算出最小步數的準確率了,經過測試可以得到以下輸出:

佛系智能體的準確率:0.0%


可以看到,佛系agent的準確率為0,它完全處於人工智障的狀態,即使是最簡單的[[1,2,3],[4,0,5]],它甚至也想不到只要往右移動一格就能成功這一步,這樣的代碼提交到OJ系統顯然是要WA的。看來,為了AC掉這道題,訓練強大智能體的任務就迫在眉睫了。


Double-DQN

經過這樣的變換,模型在過高估計的問題上得到了緩解,穩定性也就得到了提高。


Dueling DQN

Dueling DQN也是DQN的改進算法之一,它的主要突破點在於將神經網絡的單一輸出分解成兩個不同的輸出,再將這兩個輸出進行合併,使得模型能夠擁有更好的表現。從架構上來説,這是個一分為二,再合二為一的過程。

這個公式表明,狀態行為值函數q可以分解成狀態值函數v以及優勢函數(Advantage Function)A,這個優勢函數A可以反映當前動作比平均價值(或其它動作)好多少的程度:如果優於平均表現,優勢函數A為正;反之則為負,表明當前動作不如平均表現好。

有了這樣的分解,我們就可以在保持網絡主體結構不變的基礎上,將原本網絡中的單一輸出變成兩路輸出,一個用於輸出v,它的維度為1;另一個輸出A,它的維度為|Action|,和動作的維度相同。最後將兩部分輸出結果相加起來,就是原本的q。下圖展示了原始DQN結構與Dueling DQN結構的異同:

讓每一個A減去當前狀態下所有A的均值,就可以滿足的期望為0的約束條件,從而增加了v和A輸出的穩定性。

這樣分解之後有什麼好處呢?

首先,通過這樣的分解,我們不但可以得到狀態行為值函數q,同時還得到了狀態值函數v以及優勢函數A,那麼在某些需要使用v的場景下,就可以直接使用已經訓練好的v而不必再訓練一個新的網絡。

其次,從訓練網絡的角度來看,原本的DQN需要訓練|A|個取值為[0,∞] 的數值,現如今的Dueling DQN需要訓練一個取值為[0,∞] 的數值,和|A|個均值為0、取值範圍為[-C,C] 的數值,對於訓練網絡來説,後者顯然是更加友好的。

再次,將行為值函數分解後,每一部分的結構都具有實際的意義,我們也可以從中挖掘出有價值的信息。從論文給出的實驗結果可以看出,在訓練過程中,上下兩行圖展現不同時刻的V和A,圖中紅色區域代表V和A所關注的地方。V關注於地平線上是否有車輛出現以及分數,即長期目標;A則更關心會立即造成碰撞的車輛,也就是短期策略的選擇。兩種策略相互制約、相互促進,使得智能體同時學到了當前最優策略與長期最優回報,訓練出來的模型也更容易收斂到最優解。

回想一下上文提到的智能體不一定總是選擇使得當前AStarDist減小的動作,其實是有道理的,我們看一下[[1,2,4],[5,0,3]] 這個初始狀態移動到終止狀態的執行過程,假設神經網絡已經收斂,可以得到最優解了:

其中,標紅圈的表示選擇某個動作後使得A*Dist減小的情況,這種情況僅僅佔一半左右;其他的移動步驟可能使得A*Dist不變,甚至有可能變大,但總體的移動步數卻是最小的。這説明使得AStarDist減小的動作雖然優先考慮,但並不一定總是最優,看來訓練八數碼問題的時候需要在當前決策和長期決策之間做一定的取捨和平衡,這和Dueling DQN的思想是一致的。


線段樹SegmentTree

在瞭解Prioritized Replay Buffer之前,讀者可以試一試能否AC這兩道題:LeetCode 307POJ 3264,前者是RangeSumQuery(區間和查詢問題),後者是RMQ(Range Min/Max Query,區間最值問題),這兩道題都用到線段樹的知識。

線段樹SegmentTree是一種高效的數據結構,它可以在O(logN)時間內完成修改和查詢操作。

舉個例子,假設我們需要求數組[2, 5, 1, 4, 9, 3] 的區間最小值,那麼就可以使數組中的每個元素擁有四個域:node_Index表示節點在樹中的下標,i表示區間的開始位置,j表示區間的結束位置j,value表示區間最小值;於是可以構造如下二叉樹:

具體代碼實現,讀者可以參考OpenAI的項目Baselines,baselines/common/segment_tree.py中的__setitem__以及_reduce_helper函數。


Prioritized Replay Buffer

我們知道,普通DQN使用了經驗回放機制,在訓練模型的時候對Replay Buffer進行隨機採樣,打破了樣本之間的相關性,並提高了樣本的利用率。那麼這種以均勻策略進行採樣的方式一定合理嗎?其實不一定,因為每一個樣本所佔權重不一樣,難度也不一樣。有的樣本相對簡單,學習得到的收穫不會太高;有的樣本則有些困難,經過學習後會對模型產生較大的幫助。如果平等地看待每一個樣本,那麼就會在那些簡單的樣本上花費較多的時間,而學習的潛力並沒有被充分挖掘出來。

Prioritized Replay Buffer就很好的解決了這個問題,它打破了均勻採樣,賦予學習效率高的樣本更大的採樣權重。如何選擇權重呢?一個理想的標準就是將TD-Error,也就是δ作為權重,如果TD-Error越大,説明該狀態處的值函數與TD目標的差距越大,神經網絡參數的更新量就越大,因此該樣本的學習效率越高。並且,如果一個樣本是新產生的,我們期待它可以被多次訓練,於是可以賦予新加入經驗池的樣本一個較大的權重。

那麼直接使用TD-Error作為權重合理嗎?其實不是很合理,因為不同的樣本直接的TD-Error可能差距很大,比如某個樣本的TD-Error為100,另外一個樣本的TD-Error為0.01,那麼0.01的這個樣本基本上不會被採樣到,這樣模型很容易過擬合,所以我們需要追求:確保TD-Error較高的樣本被採樣的次數較多,同時也要使得那些TD-Error較低的樣本以一定的概率被採集到。

於是解決的辦法是這樣的,定義樣本採樣概率的計算公式為:

其中priority(i)表示樣本的採樣概率,p(i)表示每個樣本所佔總δ[zhoubo1] 的比例,α可以調整樣本δ的重要性。當α=1時,相當於直接使用δ的數值;當α

有了優先級priority,為了有效地獲取priority較高的樣本,SumTree就派上用場了。

SumTree的節點存儲的是區間[i,j]的和,為了方便,我們舉一個滿二叉樹的例子,這個二叉樹存儲了數組[3,10,12,4,1,2,8,2] 的區間和,有8個葉子節點,節點總數為

除了普通的RangeSumQuery之外,還有第二種查詢方式——前綴和查詢。

前綴和sum[i]的定義為:數組0位置到i位置的所有元素的和;請查找尋找前綴和sum[i-1]小於等於v的最大下標i。

可以使用SumTree快速求解這個問題,它的基本思想是這樣的:

  • 比較當前要查詢的前綴和v與節點左子樹leftV的大小

  • 若leftV⩾v,則在左子樹中繼續查詢v

  • 否則,用v減去leftV,並在右子樹中繼續查詢v-leftV

  • 重複執行以上過程,直到遇到葉子節點

舉個例子,假設需要查詢的前綴和為23,過程如下:

  • 比較23與根節點的左子樹(1號節點)的大小,發現29>23,則進入左子樹(1號節點)繼續查詢23

  • 比較23與左子樹(3號節點)的大小,發現13

  • 比較10與左子樹(9號節點)的大小,發現12>10,而12又是葉子,故算法停止,返回9號節點作為結果,其對應的數組下標為2,數組的值為12

經過檢測可以發現:3+10=1323,故結果正確。

下圖展示了查詢的過程,標藍的部分為每次查詢的值,標紅的部分為本次查詢所經過的路徑:

顯然,整個查詢過程會形成從根節點到葉子節點的一條完整路徑,所以時間複雜度為O(logN),代碼實現可以參考官方baselines/common/segment_tree.py中的find_prefixsum_idx函數

實際進行抽樣時,將根節點的priority作為總和sum,除以batch_size,分成batch_size個大致相等的區間。如上圖所示,所有node的priority加起來是42,假設抽7個樣本,這時priority區間的分佈就是這樣:[0,6],[7-12],[13-18],[19-24],[25,30],[31,36],[37,42]。然後在每個區間裏使用均勻分佈產生一個隨機數,並進行前綴和的查詢。

注意到,雖然產生的隨機數是均勻的,但每個節點的採樣權重並不均勻,甚至有可能存在橫跨多個區間的節點(比如9號節點的優先級為12,所佔區域為[13-25],橫跨[13-18]、[19-24]兩個區間,對這兩個區間進行採樣的時候,9號節點都是100%會被選到的),所以優先級越大的節點越有可能被選到,這樣就能完成優先級採樣的過程。下圖的藍色框表示每個葉子節點所佔的區域:

這樣,優先級採樣的功能似乎已經完成了,但可能有同學注意到剛才的這樣一個Trick:

每個節點的採樣權重並不均勻,甚至有可能存在橫跨多個區間的節點

是的,優先級越大的節點越有可能被選到,也就是説,Prioritized Replay Buffer的樣本是非等概率取出的,甚至在極端情況下,有可能一個bacth_size中出現多個重複的樣本,這顯然是一個有偏估計,可能會使模型欠擬合。為了矯正這個偏差,使得模型的訓練變的無偏,可以給每個樣本的乘以一個重要性採樣係數:

其中,N表示Replay Buffer中的樣本數量,Priority(i)表示樣本的採樣優先級,β是介於(0,1]之間的一個數。這個權重係數Weight表示當前樣本對梯度更新的影響,樣本的優先級Priority越大,Weight就越小,這樣就削弱了採樣優先級較高的樣本對梯度更新以及loss函數的影響。β一開始是小於1的某個超參數,隨着迭代次數的增加,讓β不斷變大,逐漸接近1,最後效果就等同於普通的Replay Buffer,這樣既能提升樣本的利用率,又能確保結果不會帶來太大的偏差。

SegmentTree.py的sample函數,就實現了優先級採樣、計算重要性採樣權重兩個功能,其中,重要性採樣係數Weight的計算既可以使用原始公式,也可以使用另外一個公式:

它倆是等效的,並無實質性差別:

def sample(self, n):#根據採樣優先級Priority進行採樣,並計算重要性採樣係數Weight    b_idx, b_memory, ISWeights = np.empty((n,), dtype=np.int32), [], np.empty((n, 1))    #平均分成n個區間,每個區間的大小為pri_seg    pri_seg = self.tree.total_p / n    self.beta = np.min([1., self.beta + self.beta_increment])    min_prob = np.min(self.tree.tree[-self.tree.capacity:]) / self.tree.total_p      for i in range(n):        a, b = pri_seg * i, pri_seg * (i + 1)        #使用均勻採樣的方式,隨機產生[a,b]區間的某一個數        v = np.random.uniform(a, b)        #使用SumTree的查詢進行採樣        idx, p, data = self.tree.get_leaf(v)        #計算採樣優先級Priority        priority = p / self.tree.total_p      #計算重要性採樣權重Weight,用於訓練階段,削弱較高優先級樣本對梯度更新、loss函數的影響        #ISWeights[i, 0] = np.power(priority*self.tree.size, -self.beta)        ISWeights[i, 0] = np.power(priority/(min_prob+1e-4), -self.beta)        b_idx[i] = idx        b_memory.append(data)    return b_idx, b_memory, ISWeights



Rainbow模型以及代碼實現

主角Rainbow終於登場了,它將DQN的幾種改進算法融合在了一起,是基於集成學習(Ensemble Learning)思想的一種模型。今天我們就嘗試着擴展一下PARL的Algorithm層,並將前面的Double DQN、Dueling DQN、Prioritized Replay Buffer整合成新的強化學習模型——Rainbow。

關於DQN的改進算法,還有Multi-step Learning、Distributional Network、Param Noise等等,並不在本文的討論範圍之內,有興趣的讀者可以查閲其它資料。

依次分析replay_memory、env、model、algorithm、agent、train等模塊

1、SegmentTree.py:裏邊有SumTree和Memory兩個類;SumTree前面已經分析過了,Memory類裏邊最重要的函數sample(實現優先級採樣、重要性採樣係數計算兩個功能)也已經分析過了,還剩下一些超參數。

#微小擾動,防止除0錯誤epsilon = 0.01#將TD-Error轉化為優先級priority,使得較低優先級的樣本也可以被採集到alpha = 0.6#超參數,用於重要性採樣,初始值小於1,最後逐漸增長到1beta = 0.3#用於歸一化TD-Errorabs_err_upper = 1.0#超參數,beta的增長速率beta_increment=2e-6


store函數:用於存儲樣本,注意將新樣本的優先級設置為最大值。

def store(self, transition):    max_p = np.max(self.tree.tree[-self.tree.capacity:])     if max_p == 0:        max_p = self.abs_err_upper     #將新添加的數據的優先級設置為最大值     self.tree.add(max_p, transition)


batch_update函數:進一步封裝了SumTree的update,用於批量更新優先級,由於樣本訓練之後TD-Error會產生變化,所以每訓練一批樣本,這個函數都會被調用。

def batch_update(self, tree_idx, abs_errors):     abs_errors += self.epsilon     clipped_errors = np.minimum(abs_errors, self.abs_err_upper)     ps = np.power(clipped_errors, self.alpha)     for ti, p in zip(tree_idx, ps):         self.tree.update(ti, p)


2、FcPolicyReplayMemory.py:經驗池,用於存儲交互過程中產生的(state, action, reward, terminal,next_state) 這個五元組。


今天我們使用PriorityReplayMemory這個類,它封裝了SegmentTree.py中的Memory類,有append、updatePriority、size、sample_batch等功能,其中append用於添加新樣本,updatePriority封裝了memory的batch_update方法,size返回經驗池的當前大小,這三個函數都非常簡單。sample_batch函數用於優先級採樣,它調用了Memory的sample方法,比普通的Replay Memory返回的五元組多了兩個字段,一個是重要性採樣權重weights,另外一個是樣本在SumTree中的下標tree_index:

def sample_batch(self, batch_size):    tree_index, batch_memory, is_weights = self.memory.sample(batch_size)    state = np.zeros((batch_size, ) + self.state_shape, dtype='float32')    action = np.zeros((batch_size, ), dtype='int32')    reward = np.zeros((batch_size, ), dtype='float32')    isOver = np.zeros((batch_size, ), dtype='bool')    nextState = np.zeros((batch_size, ) + self.state_shape, dtype='float32')          weights = np.zeros((batch_size, ), dtype='float32')    for i in range(0,len(batch_memory)):        exp=batch_memory[i]        state[i]=exp.state        action[i]=exp.action        reward[i]=exp.reward        isOver[i]=exp.isOver        nextState[i]=exp.nextState        weights[i]=is_weights[i]    return [state, action, reward, isOver,nextState,weights,tree_index]


3、ReversePairs.py:使用歸併排序求逆序對數;上文已經分析過了。

4、TestCase.py:生成一些標準的測試用例;上文已經分析過了。

5、EightPuzzleEnv.py:八數碼問題的環境;上文已經分析過了。

6、PuzzleDuelingModel.py:神經網絡模型,有兩個隱藏層,維度都是64,激活函數選用tanh;有兩個出口,分別是狀態值函數V、優勢函數A,然後計算A的均值,再用A減去均值加上V就得到狀態行為值函數Q,最後將Q作為神經網絡的輸出。

class PuzzleDuelingModel(Model):    def __init__(self, act_dim):        self.act_dim = act_dim        self.fc0=layers.fc(size=64,act='tanh')         self.fc1=layers.fc(size=64,act='tanh')        #狀態值函數V        self.valueFc = layers.fc(size=1)        #優勢函數A        self.advantageFc = layers.fc(size=act_dim)            def value(self, obs):        out = self.fc0(obs)        out = self.fc1(out)        V=self.valueFc(out)        advantage=self.advantageFc(out)        #計算優勢函數的均值,用於歸一化       advMean=fluid.layers.reduce_mean(advantage, dim=1, keep_dim=True)        #狀態行為值函數Q=V+A        Q = advantage + (V - advMean)        return Q


7、pddqn.py:算法層,是本文的核心內容之一;pddqn是Priority_Double_DQN的簡寫,繼承自PARL官方的Algorithm,主要重寫了define_learn函數,在原始dqn.py的基礎上加入了三個新功能:

  • Double DQN方式的參數更新

  • 接受樣本的訓練權重weight,將weight作為loss函數以及梯度更新的係數

  • 不僅要返回cost,而且要返回新的TD-Error

define_learn函數的代碼如下:

def define_learn(self, obs, action, reward, next_obs, terminal,weight):     #Q(s,a|θ)    pred_value = self.model.value(obs)     #Q(s',a'|θ')    targetQ_predict_value = self.target_model.value(next_obs)     #Q(s',a'|θ)    next_s_predcit_value = self.model.value(next_obs)     #argMax[Q(s',a'|θ)]    greedy_action = fluid_argmax(next_s_predcit_value)     predict_onehot = fluid.layers.one_hot(greedy_action, self.action_dim)     #Q(s',argMax[Q(s',a'|θ)]|θ')    best_v = fluid.layers.reduce_sum(fluid.layers.elementwise_mul(predict_onehot, targetQ_predict_value),dim=1)    best_v.stop_gradient = True    #TD目標: R+γ*Q(s',argMax[Q(s',a'|θ)]|θ')    target = reward + (1.0 - layers.cast(terminal, dtype='float32')) * self.gamma * best_v    action_onehot = layers.one_hot(action, self.action_dim)    action_onehot = layers.cast(action_onehot, dtype='float32')    pred_action_value = layers.reduce_sum(           layers.elementwise_mul(action_onehot, pred_value), dim=1)            #計算新的TD-Error    newTd = layers.abs(target - pred_action_value)    cost = layers.square_error_cost(pred_action_value, target)    #weight表示樣本的權重,影響cost的更新幅度    cost=weight*cost    cost = layers.reduce_mean(cost)    optimizer = fluid.optimizer.Adam(self.lr, epsilon=1e-3)    optimizer.minimize(cost)    return cost,newTd



可以看到,通過繼承parl.Algorithm,我們很方便地定義自己的算法。

正是由於PARL具有可讀性好、可擴展性高、可複用性強等優點,才使得用户能夠非常方便的實現新的強化學習算法!


8、PrioritizedAgent.py,智能體agent層;其中,build_program、sample、predict這幾個函數的寫法與普通DQN類似;唯一不同的是learn函數,它比普通DQN多了一個weight參數,接收六元組(obs, act, reward, next_obs, terminal, weight) 完成學習功能,返回值也多了newTd,表示新的TD-Error

def learn(self, obs, act, reward, next_obs, terminal,weight):    if self.global_step % self.update_target_steps == 0:        self.alg.sync_target(self.gpu_id)    self.global_step += 1    act = np.expand_dims(act, -1)    if self.clip_reward:        reward = np.clip(reward, -1, 1)    feed = {        'obs': obs.astype('float32'),        'act': act.astype('int32'),        'reward': reward,        'next_obs': next_obs.astype('float32'),        'terminal': terminal,        'weight':weight    }    cost,newTd = self.fluid_executor.run(        self.learn_program, feed=feed, fetch_list=[self.cost,self.newTd])    return cost[0],newTd


除此之外,還多了一個新的超參數clip_reward,用來決定是否歸一化即時獎勵。

#是否歸一化獎勵,超參數可微調self.clip_reward=False

通常情況下,常用的做法是把所有即時獎勵進行歸一化;但是對於八數碼問題來説,由於最終狀態獲得的即時獎勵對整個過程影響較大,為了使得這個獎勵能夠更快的傳遞到之前的所有狀態,所以不進行歸一化也是有道理的,可能會加速模型的收斂,後期我們測試一下這個超參數的影響。


9、TrainTest_Rainbow.py,訓練與測試,讓環境evn與智能體agent進行交互。

首先看一下init_rainbow,這個函數用於構建rainbow模型,初始化環境-environment、神經網絡-model、算法-algorithm、智能體-agent以及優先級經驗池-PrioritizedReplayBuffer:

def init_rainbow():    #構建環境    env = EightPuzzleEnv()    #動作維度    action_dim = 4    #超參數    hyperparas = {        'action_dim': action_dim,        #學習率        'lr': LEARNING_RATE,        #獎勵衰減係數        'gamma': GAMMA    }    #model表現為Dueling-DQN    model = PuzzleDuelingModel(act_dim=action_dim)    #Double-DQN 的功能在algorithm中實現    #Prioritized_Replay_Buffer的功能由algorithm+agent+ReplayMemory共同組成    algorithm = PDDQN(model, hyperparas)    agent = PrioritizedAgent(algorithm, action_dim)    rpm =PriorityReplayMemory(max_size=MEMORY_SIZE, state_shape=StateShape)    return env,agent,rpm


其次,看一下run_train_episode,這個函數用於訓練一輪episode,和普通DQN的訓練方式相比,有兩個主要改進:

  • 普通DQN採集的樣本是五元組,而這裏是一個七元組(s, a, r, terminal, s', weight, tree_index),其中weight表示重要性採樣的權重,tree_index表示樣本對應SumTree中的節點下標;並且,在訓練之後,還需通過新的δ批量更新樣本的採樣優先級。

#採集的樣本是一個七元組batch_state, batch_action, batch_reward, batch_isOver,batch_next_state,batch_weight,tree_index= rpm.sample_batch(batchSize)#執行SGD,訓練參數θcost,newTd = agent.learn(batch_state, batch_action, batch_reward,batch_next_state, batch_isOver,batch_weight)#通過新的TD-Error更新採樣優先級rpm.updatePriority(tree_index,newTd)


  • 二級緩存L2Cache;在環境執行step之後,並不是馬上將五元組存入ReplayBuffer,而是先存入二級緩存。當本輪episode結束後,判斷本輪episode是否到達終止狀態,如果到達,則將二級緩存中的數據複製進ReplayBuffer;否則,説明總步數大於閾值_max_episode_steps,那麼就將本輪的樣本當做噪聲全部捨棄。

#二級緩存L2Cache,存儲本輪的數據L2Cache=[]...    #執行動作    next_state, reward, isOver,_ = env.step(action)    step += 1    #將五元組(s,a,r,terminal,s')加入二級緩存    L2Cache.append(Experience(state.ravel(), action, reward, isOver,next_state.ravel()))...##過濾掉步數大於_max_episode_steps的episode,並將二級緩存中的數據複製到經驗池if L2Cache[-1].isOver==True:    for exp in L2Cache:        rpm.append(exp)


除了這兩個改進之外,其他的邏輯和普通DQN相差無幾。

然後,我們再看一下超參數以及日誌記錄的部分。其中,MEMORY_SIZE、UPDATE_FREQ、TOTAL、logFreq等超參數和普通DQN的寫法完全一致。除此之外,有兩個指標評價模型的好壞,分別是平均獎勵、平均步數;平均獎勵很好理解,模型越優,平均獎勵越大;由於八數碼問題需要求解最小步數,所以平均步數越小,模型越優:

#記錄平均獎勵meanReward=0#記錄平均步數meanEpStep=0


另外,我們可以將平均步數以及平均獎勵保存至文件,後期繪製成學習曲線,就可以對比Rainbow模型和普通DQN的訓練效率:

#平均獎勵的學習曲線reward_curve=[]#平均步數的學習曲線step_curve=[]#保存學習曲線nowDir = os.path.dirname(__file__)parentDir = os.path.dirname(nowDir)np.savetxt(parentDir+'/learning_curve_log/rainbow_reward.txt',reward_curve,fmt='%.3f')np.savetxt(parentDir+'/learning_curve_log/rainbow_step.txt',step_curve,fmt='%.3f')


最後,為了測試模型的準確率,可以將TestCase.py裏的標準數據作為測試集,並用訓練好的模型對測試集進行檢驗,若步數一致,輸出"Accepted!",表示預測正確,若步數不一致,輸出"Wrong Answer!",表示預測失敗。這樣的話,就相當於通過監督學習的方式統計出準確率Accuracy。moveDirection表示移動的箭頭,也就是上下左右,有了它,我們就可以查看智能體每一步所採取的具體策略了:

#移動的箭頭moveDirection={0:'↑',1:'↓',2:'←',3:'→'}...x,correctList=testCase()okCnt=0for i in range(len(x)):    board=x[i]   correctCnt=correctList[i]   isOk=run_test_episode(env, agent,board,correctCnt)      if isOk:         okCnt+=1print('準確率:{:.1f}%'.format(okCnt/len(x)*100))


運行main函數,輸入train進行訓練,輸入test進行測試。在這裏我們輸入train,經過一段時間的訓練之後,可以看到如下輸出日誌(這裏僅僅取一個樣本進行觀察):

對於[[1,2,4],[5,0,3]] 這個初始狀態,智能體使用了11步走到了終點,預測值和真實值一樣,而且動作也顯示正確。再看一下最後一行的輸出:

準確率:100.0%

説明模型在測試集上表現良好,對於每一條測試數據,都能準確無誤的找到最小步數和最優路徑。

看一下AutoWriteCode_Rainbow.py,這個程序用於自動寫代碼和統計數據分佈,分為以下幾個步驟:

1、利用隨機數發生器生成所有的初始狀態,這樣做的好處是代碼裏邊的靜態表是隨機打亂的,從而排除了被檢測到作弊的可能

2、使用已經訓練好的模型生成所有的解,並存入一個map,map的key為初始狀態,value為步數

3、遍歷map,就可以打一個靜態表,並生成可執行的python程序,write_ac_code函數就實現了這個功能:

def write_ac_code(dic):    '''    自動寫AC代碼    '''    print()    print('''def boardToKey(board):    key=""    for array in board:        for i in array:            key+=str(i)    return key    ''')        print('''class Solution:    def slidingPuzzle(self, board: List[List[int]]) -> int:''')    print('        d={}')    for entry in dic.items():        print('       d["'+entry[0]+'"]='+str(entry[1]))    print('        key=boardToKey(board)')    print('''        if key in d:            return d[key]        else:            return -1''')


4、由於我們並不知道數據的真實值,所以在提交代碼之前,可以統計一下預測值的數據分佈情況,看看是否符合常理,使用pyplot實現這個功能。

看一下控制枱的輸出,經過2200多次的迭代,終於自動寫完了代碼:



再看一下預測值的數據分佈情況:


均值、最大值、方差比原來的佛系agent少了好幾個數量級,均值和中位數幾乎相等,經過這樣的分析,我們心裏至少有了一個底——模型是堪用的。

接下來便是見證奇蹟的時刻,將控制枱輸出的代碼copy到OJ系統進行提交(有興趣的同學可以寫個自動腳本),看到如下結果:

代碼不僅AC了,還打敗了100%的程序,並且成為了這一題的默認Solution!

好了,強化學習除了遊戲、機器人、自動駕駛、NLP、CV等經典應用之外,一個新領域的潘多拉魔盒已經打開——刷題!


對比普通DQN以及經典解法

由於我們保存了平均獎勵、平均步數,所以可以對比Rainbow模型和普通的DQN的訓練效率。打開DrawLearningCurve.py這個文件並運行程序,觀察到如下結果:

可以發現,無論是平均獎勵還是平均步數,Rainbow一開始的表現並不比普通DQN好,但隨着迭代次數的不斷增加,Rainbow的平均獎勵明顯增加、平均步數明顯減小,收斂速度也快於普通DQN。看來把DQN的各種改進算法整合到一起是有道理的,符合"集成學習器的效果往往優於單個學習器"的定律。如果進一步調參,學習曲線的差異會更加明顯,讀者可以自行嘗試。

另外,和監督學習的測試方法類似,可以採用對拍的方式進一步測試強化學習算法,詳見Duipai_Rainbow.py,它的主要步驟如下:

  • 調用Permutation(全排列)生成輸入數據(特徵X)

  • 調用StandardAStarSolve(標準A*算法)產生正確輸出(標籤Y_True)

  • 調用TrainTest_Rainbow中的init_rainbow初始化rainbow模型,並進行預測(Y_Predict)

  • 對比Y_True與Y_Predict,統計正確率Accuracy

還有一個小技巧,因為模型已經訓練完畢,我們期待它的最優步數不會太大,所以設置一個閾值,如果某輪episode的步數超過這個閾值,就認為智能體進入了死循環,提前終止掉本輪episode:

#若步數超過閾值,則認為智能體進入了死循環step_threshold=256

運行main函數,可以看到如下輸出:

未達到嚴格意義的100%,説明個別樣本預測失敗,模型並不是絕對的完美。這個現象很正常,因為參數化逼近的方式計算值函數,本身就是一種近似算法,一般來説都會存在誤差。另外,強化學習畢竟加載了第三方庫,所以比A*稍慢,這也是正常現象。

不到30毫秒的平均耗時,表明強化學習模型的預測速度是非常快的。所以:

只要OJ系統支持PaddlePaddle和PARL,就能使用強化學習算法切相當一部分Game題和Puzzle題,並且無需再打表!

以下表格展示了超參數對於結果的影響,讀者可以試着調節超參數,看看哪些參數素會影響訓練時間?哪些參數會影響平均獎勵或平均步數的變化率?能否到達絕對的100%?


總結與展望

總結一下用強化學習算法切OJ題的主要步驟,可以畫出如下流程圖:

如果未來OJ系統支持PARL,那麼就可以省去打表這一步,直接使用訓練好的模型進行預測。

個人感想與注意點:

  • RL與CLRS(算法導論)強關聯,無論是動態規劃,還是樹搜索,或者OpenAI的線段樹。

  • 訓練模型的時候,儘量在基線的基礎上逐漸優化,比如buddaAgent、L2Cache、clipReward等參數和技巧都是實驗出來的,不要一蹴而就的追求100%;這和做題思路是一樣的,一般得到WA或者TLE之後,離AC也就不遠了。

  • 百度之星原題的起始狀態和終止狀態不固定,使用鏈表交點或者LCA即可解決,無需訓練新的網絡。

本文希望給大家兩個啟示:

  • 是時候拋棄“刷題只是為了面試,其他地方用不到”的錯誤觀念了!

  • 強化學習能解決部分Game或者Puzzle類題目,並不適用於所有算法題;當然我期待大家能做出更強的AI,甚至是全自動讀題、全自動寫代碼的AI。


百度AIStudio源碼:  https://aistudio.baidu.com/aistudio/projectdetail/63441

強化學習PARL倉庫:https://github.com/PaddlePaddle/PARL

本文提及的源碼: https://github.com/kosoraYintai/PARL-Sample/tree/master/eight_puzzle

 

 

參考文獻:

  • Hado van Hasselt, Arthur Guez,Silver D.(2015). Deep Reinforcement Learning with Double Q-learning

  • Tom Schaul, John Quan, Ioannis Antonoglou, Silver D.(2016). PRIORITIZED EXPERIENCE REPLAY

  • Ziyu Wang, Tom Schaul, Matteo Hessel, et all.(2016). Dueling Network Architectures for DRL

  • Matteo Hessel, Dan Horgan, Silver D, et all.(2017). Rainbow, Combining Improvements in DRL

  • POJ 1077. Eight

  • POJ 3264. Balanced Lineup

  • LintCode 532. Reverse Pairs

  • LeetCode 773. Sliding Puzzle

  • LeetCode 307. Range Sum Query - Mutable

  • LeetCode 47. Permutations II



●編號864,輸入編號直達本文

●輸入m獲取文章目錄

推薦↓↓↓

開源最前線

https://weiwenku.net/d/201202712