1. 程式人生 > >lua 協程 | 協程實現訊息機制(事件佇列輪詢處理機制)

lua 協程 | 協程實現訊息機制(事件佇列輪詢處理機制)

1 協程基礎知識

Lua 協同程式(coroutine)與執行緒比較類似:擁有獨立的堆疊,獨立的區域性變數,獨立的指令指標,同時又與其它協同程式共享全域性變數和其它大部分東西。


協程有三種狀態:掛起,執行,停止。建立後是掛起狀態,即不自動執行。status函式可以檢視當前狀態。協程可通過yield函式將一段正在執行的程式碼掛起。

lua的resume-yield可以互相交換資料。如果沒有對應的yield,傳遞給resume的額外引數將作為引數傳遞給協程主函式:

co = coroutine.create(function (a, b, c)
     print("co", a, b, c)
end)
coroutine.resume(co, 1, 2, 3)
執行結果: co 1 2 3
如果沒有錯誤的話,resume將返回true和yield的引數,弱出現錯誤,返回false與相關錯誤資訊:
co = coroutine.create(function (a, b)
     coroutine.yield(a+b, a-b)
end)
print(coroutine.resume(co, 3, 8))
執行結果: true 11 5
同樣地,yield也將返回由對應的resume傳遞而來的引數:
co = coroutine.create (function ()
print("begin coroutine")
print("co", coroutine.yield())
end)
coroutine.resume(co) -- 遇到coroutine.yield()掛起
coroutine.resume(co, 4, 5) -- 完成第二個print過程
執行結果: begin coroutine
co 4 5
協程主函式返回值將作為與之對應的resume的返回值(第一個引數是true):
co = coroutine.create(function (a)
return 6, 7, coroutine.yield(a-1)
end)
print(coroutine.resume(co,5)) -- 輸出true與coroutine.yield()執行引數
print(coroutine.resume(co,5)) -- 輸出協程主函式返回值
執行結果: true	4
true 6 7 5

下面用協同程式實現一個典型的生產者-消費者模式。

function receive(prod)
	local status,value = coroutine.resume(prod)
 	return value
end
function send( x )
	coroutine.yield(x) -- 掛起,返回當前協同程式 供resume呼叫
end

function produce() -- 生產者
	return coroutine.create(function ()
		while true do -- 該迴圈保證了生產者被無限驅動
			local x = io.read()
			send(x)
		end
	end)
end
function consume(prod) -- 消費者
	while true do
		local x = receive(prod) -- 消費者驅動生產者的執行
		print(x)
	end
end
function filter( prod ) -- 用過濾器處理生產者資料
	return coroutine.create(function ()
		while true do
			local x = receive(prod) -- 驅動生產者生成資料
			x = doSomeProcess(x)
			send(x)
		end
	end)
end

consume(filter(produce()))

我們嘗試用協程來實現一些演算法和應用級的效果。

1)用生產者-消費者模式實現迭代器,該迭代器能輸出一個數組的全排列。

function permgen( a,n ) -- 生產者
	if n <= 0 then
		coroutine.yield(a)
	else
		for i=1,n do
			a[i],a[n] = a[n],a[i]
			permgen(a,n-1)
			a[i],a[n] = a[n],a[i]
		end
	end
end

function perm( a ) -- 迭代器 消費者
	return coroutine.wrap(function ()
		permgen(a,#a)
	end)
	--[[
	-- 與上面程式碼效果相同
	local prod = coroutine.create(function ()
		permgen(a,#a)
	end)
	return function ()
		local code,rt = coroutine.resume(prod)
		return rt
	end
	]]
end

local a = {1,2,3}
for v in perm(a) do
	local st = ""
	for i,v in ipairs(v) do
		st = st .. v .. "   "
	end
	print(st)
end

2)用協程實現一個多檔案下載分發器

有多個檔案需要下載,如果採用順序排隊方式,那麼大量時間將浪費在 請求下載-下載完成反饋之間的等待過程中。我們用協程解決這一效率問題:請求下載後,如果超時則yield掛起當前協程,並有分發器呼叫其他協程。

download = function (host,file)
	local c = socket.connect(host,80)
	c:send("GET " .. file .. "HTTP/1.0\r\n") -- 傳送請求
	while true do
		local s,status = receive(c) -- 接收
		if status == "closed" then
			-- 當前協程的下載過程已完成
			break
		end
	end
end
receive = function (connection)
	local s,status = connection:receive()
	if status == "timeout" then
		coroutine.yield(connection)
	end 
	return s,status
end

dispatch = function ()
	while true do 
		if #loadLst <= 0 then
			-- 所有下載器均完成 結束分發
			break
		end
		for i,v in ipairs(loadLst) do
			local status,res = coroutine.resume(v)
			if not res then
				table.remove(loadLst,i) -- 當前下載器完成
				break
			end
		end
	end
end


2 協同實現訊息機制(不同場景等待式訊息處理)

遊戲中,有時在處理訊息時,希望一條一條訊息獨立處理(獨立堆疊),且希望每條訊息在不同場景內等待式逐步進行(如一個場景訊息處理完,掛起,經過100ms再進行當前訊息下一場景的處理),協程能夠完成這一過程。下面提供一種實現方案。

local msgLst = {}   -- 儲存
local curMsgCor = nil -- 當前訊息對應的協程

function insertPerMsg( msg )
  table.insert(msgLst,msg)
  scheduleScript(processMsg)  -- -- 定時器中迴圈執行函式
end

function processMsg( )
    if #msgLst <= 0 then
        unScheduleScript(processPerMsgCor)
    else
        if not curMsgCor then
            local curMsg = table.remove(msgLst,1)
            curMsgCor = coroutine.create(function () processPerMsgCor(curMsg) end) -- 建立coroutine
        end
        if curMsgCor then
            local state,errMsg = coroutine.resume(curMsgCor) -- 重啟coroutine
            local status = coroutine.status(curMsgCor)  -- 檢視coroutine狀態: dead,suspend,running
            -- 啟動失敗
            if not state then
                curMsgCor = nil
                local debugInfo = debug.traceback(curMsgCor)
                print(debugInfo)
            end
            -- coroutine消亡
            if status == "dead" then
                curMsgCor = nil
            end
        end
    end
end

function processPerMsgCor( curMsg )
    processForTalk()
    coroutine.yield() --  掛起coroutine
    processForSpecialRoom()
    coroutine.yield()
    processForOtherWay()
end

3 事件佇列輪詢處理機制

遊戲場景經常需要一次執行一系列事件,每個事件的完成均需要一定時間。如需要在奔跑至指定區域後釋放技能或攻擊某一物件。可通過事件佇列方式完成這一過程。
local Event = class("Event")
Event.State = {
	None = 1,
	Doing = 2,
	Done = 3,
}
function Event:ctor()
	self.state = self.State.None
end
function Event:isNotStart()
	return self.state == self.State.None
end
function Event:setDoing()
	self.state = self.State.Doing
end
function Event:setFunc( func,... )
	self.func = func
	self.funcParams = {...}
end
function Event:doFunc()
	self.func(self,unpack(self.funcParams))
end
-- Func完成後呼叫
function Event:setDone()
	self.state = self.State.Done
end
function Event:isDone()
	return self.state == self.State.Done
end


function Test:ctor( )
	-- 建立事件佇列
	self.eventQueue = Queue:create()
	-- 定時器輪詢時間事件佇列
	local schedule = cc.Director:getInstance():getScheduler()
	self.scheduleId = schedule:scheduleScriptFunc(handler(self,self.runEventQueue),1,false)
	-- 新增幾個事件
	local deltaTime = cc.DelayTime:create(5)
	local event = Event:create()
	-- func中的引數event在執行doFunc時傳入
	local func = function (event)
		local f1 = function()
			-- 5秒後輸出
			print("print this after 5 sec")
			event:setDone()
		end
		self:runAction(cc.Sequence:create(deltaTime,cc.CallFunc:create(f1)))
	end
	event:setFunc(func)
	self:addEvent(event)


	event = Event:create()
	func = function (event)
		local f1 = function()
			-- 10秒後輸出
			print("print this after 10 sec")
			event:setDone()
		end
		self:runAction(cc.Sequence:create(deltaTime,cc.CallFunc:create(f1)))
	end
	event:setFunc(func)
	self:addEvent(event)


	event = Event:create()
	func = function (event)
		local f1 = function()
			-- 15秒後輸出
			print("print this after 15 sec")
			event:setDone()
		end
		self:runAction(cc.Sequence:create(deltaTime,cc.CallFunc:create(f1)))
	end
	event:setFunc(func)
	self:addEvent(event)
end
function Test:addEvent(event)
	self.eventQueue:push_back(event)
end
function Test:runEventQueue()
	if self.eventQueue:size() <= 0 then
		return
	end
	-- 兩種寫法
	--[[ 第一種寫法:完成當前事件後等待計時器執行下一個事件,事件之間存在時間間隙
	local curEvent = self.eventQueue:front()
	if curEvent and not curEvent:isDoing() then
		if curEvent:isNotStart() then
			-- 執行事件
			curEvent:setDoing()
			curEvent:doFunc()
		elseif curEvent:isDone() then
			-- 事件已完成,刪除並轉至下個事件
			self.eventQueue:pop_front()
			curEvent = nil
			if self.eventQueue:size() > 0 then
				curEvent = self.eventQueue:front()
			end
		end
	end
	]]
	-- 第二種寫法:當前時間完成即刻執行下一時間
	while curEvent and not curEvent:isDoing() do
		if curEvent:isNotStart() then
			-- 執行事件
			curEvent:setDoing()
			curEvent:doFunc()
		elseif curEvent:isDone() then
			-- 事件已完成,刪除並轉至下個事件
			self.eventQueue:pop_front()
			curEvent = nil
			if self.eventQueue:size() > 0 then
				curEvent = self.eventQueue:front()
			end
		end
	end
end