1. 程式人生 > >Java遊戲伺服器開發之十一-- 將訊息分發給執行緒佇列執行及路由功能

Java遊戲伺服器開發之十一-- 將訊息分發給執行緒佇列執行及路由功能

新增的類有:
com.lizhaoblog.base.concurrent.commond.ICommand
com.lizhaoblog.base.concurrent.commond.IHandler
com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary
com.lizhaoblog.base.concurrent.handler.AbstractHandler
com.lizhaoblog.base.network.processor.IProcessor
com.lizhaoblog.server.biz.constant.CommonValue
com.lizhaoblog.server.biz.dictionary.MessageHandlerDictionary
com.lizhaoblog.server.biz.handler.TestFirstHandler
com.lizhaoblog.server.core.processor.LogicProcessor;
  修改的類有
NetworkConsumer



  這次改的東西比較多了。
我們一個個來看,我們的主線目標,將訊息通過不同的id來找到對應的執行類(handler/control),然後扔到執行緒池中執行。
這樣我們寫的話就是2個, 一個將訊息放到執行緒池中執行,一個是訊息通過不同的id來找到對應的執行類(handler/control)

所以我們先有一個執行緒池佇列,使用IProcessor,裡面只有一個方法process,然後看到實現類LogicProcessor,先是聲明瞭一個具體的執行緒池變數(ThreadPoolExecutor),裡面的方法就是執行執行緒具體執行緒(executor.execute(handler);)
執行緒池要執行的需要是一個執行緒,這個執行緒需要我們自己實現,所以我們寫一個介面ICommand繼承Runnable,然後後面的子類繼承於ICommand。
在ICommand中就2個方法,一個是執行操作doAction,一個是runable的run方法。這2個都寫著,讓子類實現就好。
再看下IHandler,IHandler是ICommand的子類,是我們現在所有handler的介面,因為我們是訊息傳輸,肯定會有message和session,所以就在裡面寫了get/set方法。
但我們寫上多個handler後,其實會發現裡面的執行邏輯都是一樣的,就是在run裡面呼叫doAction方法。就寫了一個AbstractHandler,宣告
protected T message; protected V param;,然後再run方法裡面呼叫doAction
後面我們要寫handler的話(TestFirstHandler),只要繼承AbstractHandler然後實現裡面的doAction方法
  
但我們的訊息傳輸上來,我們不確定要通過哪個handler來執行,這時候就需要一個訊息字典來進行區分IMessageDictionary(就像web中使用網址區分url一樣),具體實現MessageHandlerDictionary就是管理一個map,然後根據鍵來獲取handler。

  可以看下具體程式碼的實現了

com.lizhaoblog.base.concurrent.commond.ICommand

/*
 * Copyright (C), 2015-2018
 * FileName: ICommand
 * Author:   zhao
 * Date:     2018/6/25 15:37
 * Description: 繼承runable介面,可以放線上程池中執行
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.base.concurrent.commond;

/**
 * 〈一句話功能簡述〉<br>
 * 〈繼承Runnable介面,可以放線上程池中執行〉
 *
 * @author zhao
 * @date 2018/6/25 15:37
 * @since 1.0.0
 */
public interface ICommand extends Runnable {
  /**
   * 執行具體的方法
   */
  void doAction();

  /**
   * 執行
   */
  @Override
  default void run() {
    doAction();
  }
}

com.lizhaoblog.base.concurrent.commond.IHandler

/*
 * Copyright (C), 2015-2018
 * FileName: IHandler
 * Author:   zhao
 * Date:     2018/6/25 15:35
 * Description: 佇列執行器
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.base.concurrent.commond;

/**
 * 〈一句話功能簡述〉<br>
 * 〈佇列執行器〉
 *
 * @author zhao
 * @date 2018/6/25 15:35
 * @since 1.0.0
 */
public interface IHandler<T,V> extends ICommand {

  /**
   * 訊息
   * @return 訊息request
   */
  T getMessage();

  /**
   * 設定訊息request
   * @param message request
   */
  void setMessage(T message);

  /**
   * 引數
   * @return 引數
   */
  V getParam();

  /**
   *  設定引數
   * @param parm 引數
   */
  void setParam(V parm);

}

com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary

/*
 * Copyright (C), 2015-2018
 * FileName: IMessageDictionary
 * Author:   zhao
 * Date:     2018/6/25 14:51
 * Description: 訊息字典介面
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.base.concurrent.dictionary;

import com.lizhaoblog.base.concurrent.commond.IHandler;

/**
 * 〈一句話功能簡述〉<br>
 * 〈訊息字典介面〉
 *
 * @author zhao
 * @date 2018/6/25 14:51
 * @since 1.0.0
 */
public interface IMessageDictionary {

  /**
   * 註冊 id--handle
   * @param messageId
   * @param handler
   */
  void register(int messageId,  Class<? extends IHandler> handler);
  /**
   * 根據messageId獲取handler
   * @param messageId
   * @return
   */
  IHandler getHandlerFromMessageId(Integer messageId);
}

com.lizhaoblog.base.concurrent.handler.AbstractHandler

/*
 * Copyright (C), 2015-2018
 * FileName: AbstractHandler
 * Author:   zhao
 * Date:     2018/6/25 15:54
 * Description: 繼承ICommand,將一些通用的處理過程寫在裡面
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.base.concurrent.handler;

import com.lizhaoblog.base.concurrent.commond.IHandler;

/**
 * 〈一句話功能簡述〉<br>
 * 〈繼承ICommand,將一些通用的處理過程寫在裡面〉
 *
 * @author zhao
 * @date 2018/6/25 15:54
 * @since 1.0.0
 */
public abstract class AbstractHandler<T, V> implements IHandler<T, V> {
  protected T message;

  protected V param;

  /**
   * 執行具體的操作,交由子類實現
   */
  @Override
  public abstract void doAction();

  @Override
  public void run() {
    doAction();
  }

  @Override
  public T getMessage() {
    return message;
  }

  @Override
  public void setMessage(T message) {
    this.message = message;
  }

  @Override
  public V getParam() {
    return param;
  }

  @Override
  public void setParam(V param) {
    this.param = param;
  }

}

com.lizhaoblog.base.network.processor.IProcessor

/*
 * Copyright (C), 2015-2018
 * FileName: IProcessor
 * Author:   zhao
 * Date:     2018/6/25 15:34
 * Description: 程序介面
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.base.network.processor;

import com.lizhaoblog.base.concurrent.commond.IHandler;

/**
 * 〈一句話功能簡述〉<br>
 * 〈程序介面〉
 *
 * @author zhao
 * @date 2018/6/25 15:34
 * @since 1.0.0
 */
public interface IProcessor {
  /**
   * 執行具體的指令
   * @param handler 具體的執行
   */
  void process(IHandler handler);

}

com.lizhaoblog.server.biz.constant.CommonValue

/*
 * Copyright (C), 2015-2018
 * FileName: CommonValue
 * Author:   zhao
 * Date:     2018/6/25 16:45
 * Description: 放置常量
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.server.biz.constant;

/**
 * 〈一句話功能簡述〉<br>
 * 〈放置常量〉
 *
 * @author zhao
 * @date 2018/6/25 16:45
 * @since 1.0.0
 */
public class CommonValue {

  public static final int CM_MSG_TEST = 10001;

  private CommonValue() {
  }
}

com.lizhaoblog.server.biz.dictionary.MessageHandlerDictionary

/*
 * Copyright (C), 2015-2018
 * FileName: MessageHandlerDictionary
 * Author:   zhao
 * Date:     2018/6/25 16:39
 * Description: 訊息字典繫結
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.server.biz.dictionary;

import com.lizhaoblog.base.concurrent.commond.IHandler;
import com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary;
import com.lizhaoblog.server.biz.constant.CommonValue;
import com.lizhaoblog.server.biz.handler.TestFirstHandler;
import com.lizhaoblog.server.pojo.ServerConfig;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

import javax.annotation.PostConstruct;

/**
 * 〈一句話功能簡述〉<br>
 * 〈訊息字典繫結〉
 *
 * @author zhao
 * @date 2018/6/25 16:39
 * @since 1.0.0
 */
@Component
@Scope("singleton")
public class MessageHandlerDictionary implements IMessageDictionary {
  private final Map<Integer, Class<? extends IHandler>> idHandleMap = new HashMap<>(10);

  @PostConstruct
  public void init() {
    register(CommonValue.CM_MSG_TEST, TestFirstHandler.class);
  }

  @Override
  public void register(int messageId, Class<? extends IHandler> handler) {
    idHandleMap.put(messageId, handler);
  }

  @Override
  public IHandler getHandlerFromMessageId(Integer messageId) {
    Class<? extends IHandler> clazz = idHandleMap.get(messageId);
    if (clazz != null) {
      try {
        return (IHandler) ServerConfig.getInstance().getApplicationContext().getBean(clazz.getSimpleName());
      } catch (Exception e) {
        return null;
      }
    }
    return null;
  }
}

com.lizhaoblog.server.biz.handler.TestFirstHandler

/*
 * Copyright (C), 2015-2018
 * FileName: TestFirstHandler
 * Author:   zhao
 * Date:     2018/6/25 16:24
 * Description: 用於測試的第一個handler
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.server.biz.handler;

import com.lizhaoblog.base.concurrent.handler.AbstractHandler;
import com.lizhaoblog.base.session.Session;
import com.lizhaoblog.base.session.SessionManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈用於測試的第一個handler〉
 *
 * @author zhao
 * @date 2018/6/25 16:24
 * @since 1.0.0
 */
@Component("TestFirstHandler")
public class TestFirstHandler extends AbstractHandler<String, Session> {

  private static final Logger logger = LoggerFactory.getLogger(TestFirstHandler.class);

  @Override
  public void doAction() {
    //    logger.info("TestFirstHandler doAction " + this);
    logger.info("伺服器收到的資料內容:data=" + message);
    String result = "小李,我是伺服器,我收到你的資訊了。";
    SessionManager.getInstance().sendMessage(param, result);
  }
}


com.lizhaoblog.server.core.processor.LogicProcessor;

/*
 * Copyright (C), 2015-2018
 * FileName: LogicProcessor
 * Author:   zhao
 * Date:     2018/6/25 16:57
 * Description: 具體的訊息處理器,程序
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.server.core.processor;

import com.lizhaoblog.base.concurrent.commond.IHandler;
import com.lizhaoblog.base.network.processor.IProcessor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 〈一句話功能簡述〉<br>
 * 〈具體的訊息處理器,程序〉
 *
 * @author zhao
 * @date 2018/6/25 16:57
 * @since 1.0.0
 */
public class LogicProcessor implements IProcessor {

  private ExecutorService executor = new ThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<>(100000), new ThreadPoolExecutor.CallerRunsPolicy());

  @Override
  public void process(IHandler handler) {
    this.executor.execute(handler);
  }

}

NetworkConsumer

/*
 * Copyright (C), 2015-2018
 * FileName: INetworkConsumer
 * Author:   zhao
 * Date:     2018/6/23 21:06
 * Description: 網路訊息處理器
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaoblog.server.core.customer;

import com.lizhaoblog.base.concurrent.commond.IHandler;
import com.lizhaoblog.base.network.processor.IProcessor;
import com.lizhaoblog.base.concurrent.dictionary.IMessageDictionary;
import com.lizhaoblog.base.constant.ConstantValue;
import com.lizhaoblog.base.network.customer.INetworkConsumer;
import com.lizhaoblog.base.session.Session;
import com.lizhaoblog.base.session.SessionManager;
import com.lizhaoblog.server.biz.constant.CommonValue;
import com.lizhaoblog.server.core.processor.LogicProcessor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

import javax.annotation.PostConstruct;

import io.netty.channel.Channel;

/**
 * 〈一句話功能簡述〉<br>
 * 〈網路訊息處理器,實現類〉
 *
 * @author zhao
 * @date 2018/6/23 21:09
 * @since 1.0.0
 */
@Component
@Scope("singleton")
public class NetworkConsumer implements INetworkConsumer {
  private static final Logger logger = LoggerFactory.getLogger(NetworkConsumer.class);

  private Map<Integer, IProcessor> processors = new HashMap<>(10);

  @Autowired
  private IMessageDictionary messageDictionary;

  public void registerProcessor(int queueId, IProcessor processor) {
    processors.put(queueId, processor);
  }

  @PostConstruct
  public void init() {
    registerProcessor(ConstantValue.QUEUE_LOGIC, new LogicProcessor());
  }

  @Override
  public void consume(String message, Channel channel) {
    //獲取session,後面需要根據session中的channel進行訊息傳送
    Session session = SessionManager.getInstance().getSessionByChannel(channel);
    if (session == null) {
      logger.debug("consume session is not found");
      return;
    }
    IHandler handler = messageDictionary.getHandlerFromMessageId(CommonValue.CM_MSG_TEST);
    handler.setMessage(message);
    handler.setParam(session);
    IProcessor processor = processors.get(ConstantValue.QUEUE_LOGIC);
    processor.process(handler);
  }
}


上面的程式碼在碼雲上 https://gitee.com/lizhaoandroid/JgServer
可以加qq群一起探討Java遊戲伺服器開發的相關知識 676231564