1. 程式人生 > >skynet源碼分析之cluster集群模式

skynet源碼分析之cluster集群模式

toc 等待 協程 socket font cnblogs tcp連接 監聽配置 返回結果

比起slave/harbor集群模式,skynet提供了用的更為廣泛的cluster集群模式,參考官方wiki https://github.com/cloudwu/skynet/wiki/Cluster。cluster模式利用socketchannel庫(http://www.cnblogs.com/RainRill/p/8892648.html) 與其他skynet進程進行交互,每個請求包帶一個唯一的session值,對端回應包附帶session值。cluster集群模式tcp通道是單向的,即skynet進程1(集群中的節點)通過tcp通道向進程2發送請求包,進程2回應包也走這一通道。但是,進程2向進程1發送請求包及進程1的回應包則是另一條tcp通道。

每個集群節點都有一份完整的cluster配置,會啟動一個clusterd的服務,調用loadconfig加載配置。

第11-19行,加載配置文件(也可以手動傳入配置table tmp)

第20-24行,保存節點名-地址映射關系

 1 -- service/clusterd.lua
 2  skynet.start(function()
 3      loadconfig()
 4      skynet.dispatch("lua", function(session , source, cmd, ...)
 5          local f = assert(command[cmd])
6 f(source, ...) 7 end) 8 end) 9 10 local function loadconfig(tmp) 11 if tmp == nil then 12 tmp = {} 13 if config_name then 14 local f = assert(io.open(config_name)) 15 local source = f:read "*a" 16 f:close() 17 assert
(load(source, "@"..config_name, "t", tmp))() 18 end 19 end 20 for name,address in pairs(tmp) do 21 ... 22 node_address[name] = address 23 ... 24 end 25 end

以skynet進程1的A服務向skynet進程2的B服務發送請求包及回應為例,說明cluster的工作流程:

對於進程2,配置了 db = "127.0.0.1:2528"啟動後調用cluster.open "db"。

第4行,給clusterd服務發送消息。

第12-15行,啟動一個gate服務,然後通知gate服務監聽配置的地址。gate調用socket.listen監聽外部socket連接。

第20行,watchdog就是clusterd服務的地址。

 1 -- lualib/skynet/cluster.lua
 2 function cluster.open(port)
 3     if type(port) == "string" then
 4         skynet.call(clusterd, "lua", "listen", port)
 5     else
 6         skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
 7     end
 8 end
 9 
10 -- service/clusterd.lua
11 function command.listen(source, addr, port)
12     local gate = skynet.newservice("gate")
13     ...
14     skynet.call(gate, "lua", "open", { address = addr, port = port })
15     skynet.ret(skynet.pack(nil))
16 end
17 
18 -- servcice/gate.lua
19 function handler.open(source, conf)
20     watchdog = conf.watchdog or source
21 end

對於進程1,調用cluster.call(db, "A", ...),給節點名為db(進程2)的A服務發送請求,最終調用到send_request

第9行,請求包帶上唯一的sesssion值

第11行,按cluster定義的模式打包數據

第15行,獲取socketchannel對象,如果第一次請求,會先創建socketchannel對象,並建立tcp連接

第16行,調用socketchannel的request接口發送請求包

 1 -- lualib/skynet/cluster.lua
 2 function cluster.call(node, address, ...)
 3     -- skynet.pack(...) will free by cluster.core.packrequest
 4     return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
 5 end
 6 
 7 -- service/clusterd.lua
 8 local function send_request(source, node, addr, msg, sz)
 9     local session = node_session[node] or 1
10     -- msg is a local pointer, cluster.packrequest will free it
11     local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
12     node_session[node] = new_session
13 
14     -- node_channel[node] may yield or throw error
15     local c = node_channel[node]
16 
17     return c:request(request, session, padding)
18 end
19 
20 function command.req(...)
21     local ok, msg = pcall(send_request, ...)
22     if ok then
23         ...
24         skynet.ret(msg)
25     end
26 end

創建socket對象時提供了response參數(第6行),所以是采用帶session值的請求-回應模式。

第11行,協程阻塞在socket.read上,此時暫停co,等待回應包

 1 -- service/clusterd
 2     local host, port = string.match(address, "([^:]+):(.*)$")
 3     c = sc.channel {
 4         host = host,
 5         port = tonumber(port),
 6         response = read_response,
 7         nodelay = true,
 8     }
 9 
10 local function read_response(sock)
11     local sz = socket.header(sock:read(2))
12     local msg = sock:read(sz)
13     return cluster.unpackresponse(msg)      -- session, ok, data, padding
14 end

對於進程2,gate服務收到進程1的tcp連接請求後,

第8行,給clusterd服務發送消息

第17-18行,clusterd收到後,新建一個clusteragent服務。註:clusteragent是skynet最近新加的。參考https://blog.codingnow.com/2018/04/skynet_cluster.html#more

第24-28行,clusteragent服務專門處理進程1的cluster模式的請求。每個cluster節點連接都新建一個cluseteragent服務去處理請求包。

 1 -- service/gate.lua
 2 function handler.connect(fd, addr)
 3     local c = {
 4         fd = fd,
 5         ip = addr,
 6     }
 7     connection[fd] = c
 8     skynet.send(watchdog, "lua", "socket", "open", fd, addr)
 9 end
10 
11 -- service/clusterd.lua
12 function command.socket(source, subcmd, fd, msg)
13     if subcmd == "open" then
14         skynet.error(string.format("socket accept from %s", msg))
15         -- new cluster agent
16         cluster_agent[fd] = false
17         local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
18         cluster_agent[fd] = agent
19         ...     
20 end
21 
22 -- service/clusterdagent.lua
23 skynet.start(function()
24     skynet.register_protocol {
25         name = "client",
26         id = skynet.PTYPE_CLIENT,
27         unpack = cluster.unpackrequest,
28         dispatch = dispatch_request,
29     }
30     ...
31 end

當gate服務收到請求包後,轉發給對應的clusteragent服務(第7行),

 1 -- service/gate.lua
 2 function handler.message(fd, msg, sz)
 3     -- recv a package, forward it
 4     local c = connection[fd]
 5     local agent = c.agent
 6     if agent then
 7         skynet.redirect(agent, c.client, "client", fd, msg, sz)
 8     else
 9         skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
10     end
11 end

clusteragent服務消息分發函數dispatch_request,

第7-9行,如果是push請求,不需要回應,send給目的服務(B服務)後直接返回即可

第11行,如果是call請求,需要回應,給目的服務(B服務)發送消息,然後等待B服務處理完返回。

第14-21行,將消息打包成回應包,通過tcp返回給請求端(skynet進程1)。

進程1收到回應後,重啟協程,返回結果給請求服務(A服務)。這就是cluster模式的調用流程。

 1 -- service/clusteragent.lua
 2 local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
 3     if cluster.isname(addr) then
 4         addr = register_name[addr]
 5     end
 6     if addr then
 7         if is_push then
 8             skynet.rawsend(addr, "lua", msg, sz)
 9             return  -- no response
10         else
11             ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
12         end
13     if ok then
14         response = cluster.packresponse(session, true, msg, sz)
15         if type(response) == "table" then
16             for _, v in ipairs(response) do
17                 socket.lwrite(fd, v)
18             end
19         else
20             socket.write(fd, response)
21         end
22      ...
23 end

skynet源碼分析之cluster集群模式