Akka邊學邊寫(4)-- MiniRPG
前面幾篇文章用Akka寫了HelloWorld和EchoServer,為了更進一步學習Akka,本文將會實現一個很小的RPG遊戲伺服器:MiniRPG。
遊戲邏輯
因為是迷你RPG,所以邏輯很簡單。伺服器可以處理四種操作:建立玩家、給玩家加經驗、升級、查詢玩家資訊。下面是Player類的程式碼(Getters和Setters省略):
public class Player { private int id; private String name; private int exp; private int level; // Getters & Setters ... public void addExp(int val) { exp += val; } public void levelUp() { if (exp > 100) { exp -= 100; level++; } } }
訊息協議
MiniRPG底層使用TCP協議,訊息使用JSON格式。完整的訊息格式如下圖所示:
前八個位元組可以認為是訊息頭,其中前四個位元組是訊息ID,後四個位元組是JSON字串長度。其餘位元組是訊息體,也就是UTF8格式編碼的JSON字串。
訊息介面
MiniRPG設計了三個介面來表示遊戲訊息,這三個介面都是Marker介面,裡面沒有定義任何方法,如下圖所示:
MsgRegistry
MiniRPG使用GSON來編碼和解碼JSON字串,為了把JSON解析為相應的訊息物件,需要一個訊息ID和class之間的對映關係。MsgRegistry類便是要建立起這樣一個對映關係,下面是它的完整程式碼:
public class MsgRegistry { private static final Map<Integer, Class<?>> msgById = new HashMap<>(); private static final Map<Class<?>, Integer> idByMsg = new HashMap<>(); static { register(1, CreatePlayerRequest.class); register(2, CreatePlayerResponse.class); register(3, AddExpRequest.class); register(4, AddExpResponse.class); register(5, LevelUpRequest.class); register(6, LevelUpResponse.class); register(7, GetPlayerInfoRequest.class); register(8, GetPlayerInfoResponse.class); } private static void register(int msgId, Class<?> msgClass) { msgById.put(msgId, msgClass); idByMsg.put(msgClass, msgId); } public static Class<?> getMsgClass(int msgId) { return msgById.get(msgId); } public static int getMsgId(Class<?> msgClass) { return idByMsg.get(msgClass); } public static int getMsgId(Object msg) { return getMsgId(msg.getClass()); } }
伺服器Actor系統設計
MiniRPG伺服器的Actor系統如下圖所示:
TcpServer負責監聽TCP連線,連線建立之後,交給Codec處理。Codec將收到的位元組編碼成訊息物件,然後交給MsgHandler處理。對於每條請求訊息,MsgHandler都會產生一條響應訊息,響應訊息被Codec編碼之後傳送到客戶端。下面詳細介紹整個Actor系統是如何實現的。
TcpServer
TcpServer是一個UntypedActor,例項化TcpServer時,我們把MsgHandler引用傳給它:
public class TcpServer extends UntypedActor {
private final ActorRef msgHandler;
public TcpServer(ActorRef msgHandler) {
this.msgHandler = msgHandler;
}
}
TcpServer只關心四種訊息,下面是onReceive()方法實現:
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof Integer) {
final int port = (Integer) msg;
startServer(port);
} else if (msg instanceof Bound) {
getSender().tell(msg, getSelf());
} else if (msg instanceof CommandFailed) {
getContext().stop(getSelf());
} else if (msg instanceof Connected) {
final Connected conn = (Connected) msg;
getSender().tell(conn, getSelf());
registerCodec(getSender());
}
}
Integer訊息通知TcpServer繫結到某個埠,準備接收客戶端連線。如果收到Bound訊息,則埠繫結成功,伺服器正常啟動。如果是CommandFailed訊息,則伺服器啟動失敗:
private void startServer(int port) {
final InetSocketAddress endpoint = new InetSocketAddress("localhost", port);
final Object bindCmd = TcpMessage.bind(getSelf(), endpoint, 100);
Tcp.get(getContext().system()).getManager()
.tell(bindCmd, getSelf());
}
如果是Connected訊息,說明有客戶端連線已經建立,TcpServer建立一個子Actor(也就是Codec)來處理客戶端連線:
private void registerCodec(ActorRef connection) {
final Props codecProps = Props.create(MsgCodec.class, connection, msgHandler);
final ActorRef codec = getContext().actorOf(codecProps);
connection.tell(TcpMessage.register(codec), getSelf());
}
MsgCodec
MsgCodec主要負責訊息的編碼和解碼,為此,MsgCodec內部使用了一個ByteString來快取接收到的位元組:
public class MsgCodec extends UntypedActor {
private static final Gson GSON = new Gson();
private final ActorRef connection;
private final ActorRef msgHandler;
private ByteString buf = ByteString.empty();
public MsgCodec(ActorRef connection, ActorRef msgHandler) {
this.connection = connection;
this.msgHandler = msgHandler;
}
}
如果MsgCodec收到的是Received訊息,說明有資料到達,MsgCodec嘗試解碼出一個訊息物件。如果收到的是GameMessage訊息,MsgCodec將其編碼為byte[]然後傳送給客戶端。如果收到的是ConnectionClosed,說明連線已經斷開了:
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof Received) {
final ByteString data = ((Received) msg).data();
buf = buf.concat(data);
decodeMsg();
} else if (msg instanceof ConnectionClosed) {
getContext().stop(getSelf());
} else if (msg instanceof GameMessage) {
final ByteString data = encodeMsg(msg);
connection.tell(TcpMessage.write(data), getSelf());
}
}
每當有資料到達時,decodeMsg()方法都會被呼叫。decodeMsg()先確定是否可以把訊息頭解碼出來,如果不能,就繼續等待更多的位元組到達。如果訊息頭完整到達,decodeMsg()就可以知道訊息體的長度,然後等到訊息體完整到達。之後根據訊息ID和JSON字串解碼訊息物件,然後通知msgHandler:
private void decodeMsg() {
while (buf.length() > 8) {
final ByteIterator it = buf.iterator();
final int msgId = it.getInt(ByteOrder.BIG_ENDIAN);
final int jsonLength = it.getInt(ByteOrder.BIG_ENDIAN);
if (buf.length() >= 8 + jsonLength) {
final Object msg = decodeMsg(msgId, buf.slice(8, 8 + jsonLength));
buf = buf.drop(8 + jsonLength);
msgHandler.tell(msg, getSelf());
}
}
}
private Object decodeMsg(int msgId, ByteString jsonData) {
final Class<?> msgClass = MsgRegistry.getMsgClass(msgId);
final Reader reader = new InputStreamReader(
jsonData.iterator().asInputStream(),
StandardCharsets.UTF_8);
return GSON.fromJson(reader, msgClass);
}
訊息的編碼就簡單多了,程式碼如下所示:
private ByteString encodeMsg(Object msg) {
final int msgId = MsgRegistry.getMsgId(msg);
final byte[] jsonBytes = GSON.toJson(msg)
.getBytes(StandardCharsets.UTF_8);
final ByteStringBuilder bsb = new ByteStringBuilder();
bsb.putInt(msgId, ByteOrder.BIG_ENDIAN);
bsb.putInt(jsonBytes.length, ByteOrder.BIG_ENDIAN);
bsb.putBytes(jsonBytes);
return bsb.result();
}
MsgHandler
遊戲邏輯由MsgHandler來處理。因為只是個demo,所以MsgHandler內部使用HashMap來模擬資料庫。下面是MsgHandler的完整程式碼:
public class MsgHandler extends UntypedActor {
private final List<Player> players = new ArrayList<>();
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CreatePlayerRequest) {
int newPlayerId = createPlayer((CreatePlayerRequest) msg);
getSender().tell(new CreatePlayerResponse(newPlayerId), getSelf());
} else if (msg instanceof AddExpRequest) {
int newExp = addExpToPlayer((AddExpRequest) msg);
getSender().tell(new AddExpResponse(newExp), getSelf());
} else if (msg instanceof LevelUpRequest) {
int newLevel = levelUpPlayer((LevelUpRequest) msg);
getSender().tell(new LevelUpResponse(newLevel), getSelf());
} else if (msg instanceof GetPlayerInfoRequest) {
PlayerInfo playerInfo = getPlayerInfo((GetPlayerInfoRequest) msg);
getSender().tell(new GetPlayerInfoResponse(playerInfo), getSelf());
}
}
private int createPlayer(CreatePlayerRequest req) {
int playerId = players.size() + 1;
Player newPlayer = new Player();
newPlayer.setId(playerId);
newPlayer.setLevel(1);
newPlayer.setName(req.getPlayerName());
players.add(newPlayer);
return playerId;
}
private int addExpToPlayer(AddExpRequest req) {
Player player = players.get(req.getPlayerId());
player.addExp(req.getExp());
return player.getExp();
}
private int levelUpPlayer(LevelUpRequest req) {
Player player = players.get(req.getPlayerId());
player.levelUp();
return player.getLevel();
}
private PlayerInfo getPlayerInfo(GetPlayerInfoRequest req) {
Player player = players.get(req.getPlayerId());
return new PlayerInfo(player.getId(), player.getName(),
player.getExp(), player.getLevel());
}
}
ServerApp
ServerApp是MiniRPG遊戲伺服器主類,main()方法建立好整個Actor系統,然後通知tcpServer繫結到埠12345,讓伺服器運轉起來:
public class ServerApp {
public static void main(String[] args) {
ActorSystem mySystem = ActorSystem.create("rpgServer");
ActorRef msgHandler = mySystem.actorOf(Props.create(MsgHandler.class));
ActorRef tcpServer = mySystem.actorOf(Props.create(TcpServer.class, msgHandler));
tcpServer.tell(12345, ActorRef.noSender());
}
}
客戶端
為了測試MiniServer,我寫了個簡單的客戶端程式,具體實現就不在這裡介紹了。