Java遊戲伺服器開發之十一-- 將訊息分發給執行緒佇列執行及路由功能
新增的類有:
com.lizhaoblog.base.concurrent.commond.ICommand
com.lizhaoblog.base.concurrent.commond.IHandler
com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary
com.lizhaoblog.base.concurrent.handler.AbstractHandler
com.lizhaoblog.base.network.processor.IProcessor
com.lizhaoblog.server.biz.constant.CommonValue
com.lizhaoblog.server.biz.dictionary.MessageHandlerDictionary
com.lizhaoblog.server.biz.handler.TestFirstHandler
com.lizhaoblog.server.core.processor.LogicProcessor;
修改的類有
NetworkConsumer
這次改的東西比較多了。
我們一個個來看,我們的主線目標,將訊息通過不同的id來找到對應的執行類(handler/control),然後扔到執行緒池中執行。
這樣我們寫的話就是2個, 一個將訊息放到執行緒池中執行,一個是訊息通過不同的id來找到對應的執行類(handler/control)
所以我們先有一個執行緒池佇列,使用IProcessor,裡面只有一個方法process,然後看到實現類LogicProcessor,先是聲明瞭一個具體的執行緒池變數(ThreadPoolExecutor),裡面的方法就是執行執行緒具體執行緒(executor.execute(handler);)
執行緒池要執行的需要是一個執行緒,這個執行緒需要我們自己實現,所以我們寫一個介面ICommand繼承Runnable,然後後面的子類繼承於ICommand。
在ICommand中就2個方法,一個是執行操作doAction,一個是runable的run方法。這2個都寫著,讓子類實現就好。
再看下IHandler,IHandler是ICommand的子類,是我們現在所有handler的介面,因為我們是訊息傳輸,肯定會有message和session,所以就在裡面寫了get/set方法。
但我們寫上多個handler後,其實會發現裡面的執行邏輯都是一樣的,就是在run裡面呼叫doAction方法。就寫了一個AbstractHandler,宣告
protected T message; protected V param;,然後再run方法裡面呼叫doAction
後面我們要寫handler的話(TestFirstHandler),只要繼承AbstractHandler然後實現裡面的doAction方法
但我們的訊息傳輸上來,我們不確定要通過哪個handler來執行,這時候就需要一個訊息字典來進行區分IMessageDictionary(就像web中使用網址區分url一樣),具體實現MessageHandlerDictionary就是管理一個map,然後根據鍵來獲取handler。
可以看下具體程式碼的實現了
com.lizhaoblog.base.concurrent.commond.ICommand
/* * Copyright (C), 2015-2018 * FileName: ICommand * Author: zhao * Date: 2018/6/25 15:37 * Description: 繼承runable介面,可以放線上程池中執行 * History: * <author> <time> <version> <desc> * 作者姓名 修改時間 版本號 描述 */ package com.lizhaoblog.base.concurrent.commond; /** * 〈一句話功能簡述〉<br> * 〈繼承Runnable介面,可以放線上程池中執行〉 * * @author zhao * @date 2018/6/25 15:37 * @since 1.0.0 */ public interface ICommand extends Runnable { /** * 執行具體的方法 */ void doAction(); /** * 執行 */ @Override default void run() { doAction(); } }
com.lizhaoblog.base.concurrent.commond.IHandler
/* * Copyright (C), 2015-2018 * FileName: IHandler * Author: zhao * Date: 2018/6/25 15:35 * Description: 佇列執行器 * History: * <author> <time> <version> <desc> * 作者姓名 修改時間 版本號 描述 */ package com.lizhaoblog.base.concurrent.commond; /** * 〈一句話功能簡述〉<br> * 〈佇列執行器〉 * * @author zhao * @date 2018/6/25 15:35 * @since 1.0.0 */ public interface IHandler<T,V> extends ICommand { /** * 訊息 * @return 訊息request */ T getMessage(); /** * 設定訊息request * @param message request */ void setMessage(T message); /** * 引數 * @return 引數 */ V getParam(); /** * 設定引數 * @param parm 引數 */ void setParam(V parm); }
com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary
/*
* Copyright (C), 2015-2018
* FileName: IMessageDictionary
* Author: zhao
* Date: 2018/6/25 14:51
* Description: 訊息字典介面
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.base.concurrent.dictionary;
import com.lizhaoblog.base.concurrent.commond.IHandler;
/**
* 〈一句話功能簡述〉<br>
* 〈訊息字典介面〉
*
* @author zhao
* @date 2018/6/25 14:51
* @since 1.0.0
*/
public interface IMessageDictionary {
/**
* 註冊 id--handle
* @param messageId
* @param handler
*/
void register(int messageId, Class<? extends IHandler> handler);
/**
* 根據messageId獲取handler
* @param messageId
* @return
*/
IHandler getHandlerFromMessageId(Integer messageId);
}
com.lizhaoblog.base.concurrent.handler.AbstractHandler
/*
* Copyright (C), 2015-2018
* FileName: AbstractHandler
* Author: zhao
* Date: 2018/6/25 15:54
* Description: 繼承ICommand,將一些通用的處理過程寫在裡面
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.base.concurrent.handler;
import com.lizhaoblog.base.concurrent.commond.IHandler;
/**
* 〈一句話功能簡述〉<br>
* 〈繼承ICommand,將一些通用的處理過程寫在裡面〉
*
* @author zhao
* @date 2018/6/25 15:54
* @since 1.0.0
*/
public abstract class AbstractHandler<T, V> implements IHandler<T, V> {
protected T message;
protected V param;
/**
* 執行具體的操作,交由子類實現
*/
@Override
public abstract void doAction();
@Override
public void run() {
doAction();
}
@Override
public T getMessage() {
return message;
}
@Override
public void setMessage(T message) {
this.message = message;
}
@Override
public V getParam() {
return param;
}
@Override
public void setParam(V param) {
this.param = param;
}
}
com.lizhaoblog.base.network.processor.IProcessor
/*
* Copyright (C), 2015-2018
* FileName: IProcessor
* Author: zhao
* Date: 2018/6/25 15:34
* Description: 程序介面
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.base.network.processor;
import com.lizhaoblog.base.concurrent.commond.IHandler;
/**
* 〈一句話功能簡述〉<br>
* 〈程序介面〉
*
* @author zhao
* @date 2018/6/25 15:34
* @since 1.0.0
*/
public interface IProcessor {
/**
* 執行具體的指令
* @param handler 具體的執行
*/
void process(IHandler handler);
}
com.lizhaoblog.server.biz.constant.CommonValue
/*
* Copyright (C), 2015-2018
* FileName: CommonValue
* Author: zhao
* Date: 2018/6/25 16:45
* Description: 放置常量
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.server.biz.constant;
/**
* 〈一句話功能簡述〉<br>
* 〈放置常量〉
*
* @author zhao
* @date 2018/6/25 16:45
* @since 1.0.0
*/
public class CommonValue {
public static final int CM_MSG_TEST = 10001;
private CommonValue() {
}
}
com.lizhaoblog.server.biz.dictionary.MessageHandlerDictionary
/*
* Copyright (C), 2015-2018
* FileName: MessageHandlerDictionary
* Author: zhao
* Date: 2018/6/25 16:39
* Description: 訊息字典繫結
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.server.biz.dictionary;
import com.lizhaoblog.base.concurrent.commond.IHandler;
import com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary;
import com.lizhaoblog.server.biz.constant.CommonValue;
import com.lizhaoblog.server.biz.handler.TestFirstHandler;
import com.lizhaoblog.server.pojo.ServerConfig;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
/**
* 〈一句話功能簡述〉<br>
* 〈訊息字典繫結〉
*
* @author zhao
* @date 2018/6/25 16:39
* @since 1.0.0
*/
@Component
@Scope("singleton")
public class MessageHandlerDictionary implements IMessageDictionary {
private final Map<Integer, Class<? extends IHandler>> idHandleMap = new HashMap<>(10);
@PostConstruct
public void init() {
register(CommonValue.CM_MSG_TEST, TestFirstHandler.class);
}
@Override
public void register(int messageId, Class<? extends IHandler> handler) {
idHandleMap.put(messageId, handler);
}
@Override
public IHandler getHandlerFromMessageId(Integer messageId) {
Class<? extends IHandler> clazz = idHandleMap.get(messageId);
if (clazz != null) {
try {
return (IHandler) ServerConfig.getInstance().getApplicationContext().getBean(clazz.getSimpleName());
} catch (Exception e) {
return null;
}
}
return null;
}
}
com.lizhaoblog.server.biz.handler.TestFirstHandler
/*
* Copyright (C), 2015-2018
* FileName: TestFirstHandler
* Author: zhao
* Date: 2018/6/25 16:24
* Description: 用於測試的第一個handler
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.server.biz.handler;
import com.lizhaoblog.base.concurrent.handler.AbstractHandler;
import com.lizhaoblog.base.session.Session;
import com.lizhaoblog.base.session.SessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈用於測試的第一個handler〉
*
* @author zhao
* @date 2018/6/25 16:24
* @since 1.0.0
*/
@Component("TestFirstHandler")
public class TestFirstHandler extends AbstractHandler<String, Session> {
private static final Logger logger = LoggerFactory.getLogger(TestFirstHandler.class);
@Override
public void doAction() {
// logger.info("TestFirstHandler doAction " + this);
logger.info("伺服器收到的資料內容:data=" + message);
String result = "小李,我是伺服器,我收到你的資訊了。";
SessionManager.getInstance().sendMessage(param, result);
}
}
com.lizhaoblog.server.core.processor.LogicProcessor;
/*
* Copyright (C), 2015-2018
* FileName: LogicProcessor
* Author: zhao
* Date: 2018/6/25 16:57
* Description: 具體的訊息處理器,程序
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.server.core.processor;
import com.lizhaoblog.base.concurrent.commond.IHandler;
import com.lizhaoblog.base.network.processor.IProcessor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 〈一句話功能簡述〉<br>
* 〈具體的訊息處理器,程序〉
*
* @author zhao
* @date 2018/6/25 16:57
* @since 1.0.0
*/
public class LogicProcessor implements IProcessor {
private ExecutorService executor = new ThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100000), new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public void process(IHandler handler) {
this.executor.execute(handler);
}
}
NetworkConsumer
/*
* Copyright (C), 2015-2018
* FileName: INetworkConsumer
* Author: zhao
* Date: 2018/6/23 21:06
* Description: 網路訊息處理器
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaoblog.server.core.customer;
import com.lizhaoblog.base.concurrent.commond.IHandler;
import com.lizhaoblog.base.network.processor.IProcessor;
import com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary;
import com.lizhaoblog.base.constant.ConstantValue;
import com.lizhaoblog.base.network.customer.INetworkConsumer;
import com.lizhaoblog.base.session.Session;
import com.lizhaoblog.base.session.SessionManager;
import com.lizhaoblog.server.biz.constant.CommonValue;
import com.lizhaoblog.server.core.processor.LogicProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import io.netty.channel.Channel;
/**
* 〈一句話功能簡述〉<br>
* 〈網路訊息處理器,實現類〉
*
* @author zhao
* @date 2018/6/23 21:09
* @since 1.0.0
*/
@Component
@Scope("singleton")
public class NetworkConsumer implements INetworkConsumer {
private static final Logger logger = LoggerFactory.getLogger(NetworkConsumer.class);
private Map<Integer, IProcessor> processors = new HashMap<>(10);
@Autowired
private IMessageDictionary messageDictionary;
public void registerProcessor(int queueId, IProcessor processor) {
processors.put(queueId, processor);
}
@PostConstruct
public void init() {
registerProcessor(ConstantValue.QUEUE_LOGIC, new LogicProcessor());
}
@Override
public void consume(String message, Channel channel) {
//獲取session,後面需要根據session中的channel進行訊息傳送
Session session = SessionManager.getInstance().getSessionByChannel(channel);
if (session == null) {
logger.debug("consume session is not found");
return;
}
IHandler handler = messageDictionary.getHandlerFromMessageId(CommonValue.CM_MSG_TEST);
handler.setMessage(message);
handler.setParam(session);
IProcessor processor = processors.get(ConstantValue.QUEUE_LOGIC);
processor.process(handler);
}
}
上面的程式碼在碼雲上 https://gitee.com/lizhaoandroid/JgServer
可以加qq群一起探討Java遊戲伺服器開發的相關知識 676231564