1. 程式人生 > >Netty實戰開發(7):Netty結合kafka實現分散式訊息佇列

Netty實戰開發(7):Netty結合kafka實現分散式訊息佇列

在分散式遊戲伺服器系統中,訊息處理佇列主要解決問題就是解耦系統中的業務,使得每個系統看起來功能比較單一,而且解決一些全服資料共享等問題。 通常我們知道kafka是作為訊息佇列比較火的一種方式,其實還有(Active MQ,Rabbit MQ,Zero MQ)個人覺得kafka比較好用點,哈哈,習慣吧。 同樣我們來複習kafka基礎。

kafka基礎。 要向使用kafka。首先的搭建kafka環境。博主用Windows進行開發,所以本片文章用小部分的內容來介紹kafka的基本安裝。

首先我們要去kafka官方網站上下載kafka依賴包。https://kafka.apache.org/ 下載下來之後的到安裝包解壓。 在這裡插入圖片描述

當然我編寫了一個啟動指令碼,內容很簡單

@echo off
.\bin\windows\kafka-server-start.bat .\config\server.properties

執行kafka需要啟動zookeeper,所以需要確保機器上有可用的zookeeper。關於zookeeper,可以閱讀我上篇文章所寫的內容。https://blog.csdn.net/baidu_23086307/article/details/82769234 進入conf目錄。使用文字編譯器編輯service.properties檔案,修改log.dirs熟悉,修改為你自己的路徑。 例如我的路徑為這個我就修改成這了。

在這裡插入圖片描述

然後儲存檔案。 然後我們進入bin目錄,裡面有個Windows目錄,我們執行如下命令,

kafka-topics.bat --create --zookeeper 0.0.0.0:2181 --replication-factor 1 --partitions 1 --topic defaultTopic
@echo off
.\bin\windows\kafka-server-start.bat .\config\server.properties

當然我更喜歡把它編寫成一個bat指令碼。方便與下次直接使用。然後我們啟動kafka服務。 在這裡插入圖片描述 大概看到這樣的說明kafka啟動沒有問題。否者啟動出現各種問題,需要讀者自己排查問題。

整合kafka實現分散式訊息佇列

萬事具備,我們就要將kafka使用到我們的業務場景中來了。在遊戲伺服器中,我們通常用kafka做一些全域性的東西。比如全服聊天,全服活動之類的任務。也可以用於任務系統,解耦每一部分的邏輯。 我們的系統採用netty+spring整合的遊戲伺服器系統。當然spring也對kafka進行封裝了。所以我們在工程中需要新增kafka的依賴。


        <!--kafka-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.0.0.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

新增完maven依賴後我們就可以使用spring提供的kafka進行封裝了。首先我們需要一個kafka中心伺服器的配置檔案。 簡單的配置了kafka的基礎屬性 game-kafka.properties

kafka.taskThreadSize=4
kafka.corePoolSize=4
kafka.maximumPoolSize=4
kafka.keepAliveTime=5000


然後我們新增kafka的 consumer配置,application-kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 定義consumer的引數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- 配置kafka的broke -->
                <entry key="bootstrap.servers" value="0.0.0.0:9092"/>
                <!-- 配置組-->
                <entry key="group.id" value="0"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="30000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 建立consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="consumerProperties"/>
    </bean>

    <!-- 實際執行訊息消費的類 -->
    <bean id="messageListenerConsumerService"
          class="com.twjitm.core.common.kafka.NettyKafkaConsumerListener"/>

    <!-- 消費者容器配置資訊 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- 重要!配置topic -->
        <constructor-arg value="defaultTopic"/>
        <property name="messageListener" ref="messageListenerConsumerService"/>
    </bean>

    <!-- 建立kafka template bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>
</beans>

然後在新增一個kafka的producer。application-kafka-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--bootstrap.servers 消費者提供伺服器:example:0.0.0.1:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092-->
    <!-- 定義producer的引數 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>

                <entry key="bootstrap.servers"
                       value="0.0.0.0:9092"/>
                <entry key="group.id" value="0"/>
                <entry key="retries" value="1"/>
                <entry key="batch.size" value="16384"/>
                <entry key="linger.ms" value="1"/>
                <entry key="buffer.memory" value="33554432"/>
                <!--序列化方式-->
                <entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.StringSerializer"/>
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>

        </constructor-arg>
    </bean>

    <!-- 建立kafka template需要使用的producer factory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg ref="producerProperties"/>
    </bean>

    <!-- 建立kafka template bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="defaultTopic"/>
        <!--<property name="producerListener" ref="producerListener"/>-->
    </bean>


</beans>

具體詳細配置正如程式碼中的註釋一樣,配置好kafka的基礎屬性,整合到spring 的bean物件中,使用spring容器來管理這些bean物件。省去了物件管理的工作。

程式實現: 我們需要定義一個kafka抽象任務,每個任務其實就是想kafka裡面傳送訊息。屬於producer範疇。AbstractKafkaPushTask .java

package com.twjitm.core.common.kafka;

/**
 * @author twjitm - [Created on 2018-09-04 21:31]
 */
public abstract class AbstractKafkaPushTask {
    private KafkaTaskType taskType;
    public Object value;

    public AbstractKafkaPushTask(KafkaTaskType taskType) {
        this.taskType = taskType;
    }

    public KafkaTaskType getTaskType() {
        return taskType;
    }


    public Object getValue() {
        return value;
    }

    /**
     * 需要將訊息值儲存到這裡面,值如何獲得由子類自己實心
     *
     * @param value
     */
    public abstract void setValue(Object value);
}

kafka訊息監聽器:訊息消費者.

package com.twjitm.core.common.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

/**
 * kafka 訊息消費者
 *
 * @author twjitm- [Created on 2018-09-04 16:12]
 */
public class NettyKafkaConsumerListener implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {

        System.out.println(consumerRecord);
        consumerRecord.value();
    }
}

在這我們對訊息做簡單的處理,但是在實際專案開發中我們還得更具訊息的具體型別做不同的業務處理,因此我們可以將訊息的具體應用做出自己業務邏輯處理。

訊息監聽器繼承MessageListener,而spring對這個介面做了處理,所以能夠監聽到訊息的到來。 在這裡插入圖片描述

有了訊息消費者,我們需要編寫一個訊息提供者,訊息提供者負責將訊息釋出到kafka訊息佇列中。實現程式碼如下:

package com.twjitm.core.common.kafka;

import com.alibaba.fastjson.JSON;
import com.twjitm.core.common.config.global.GlobalConstants;
import com.twjitm.core.common.config.global.KafkaConfig;
import com.twjitm.core.common.service.IService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.threads.thread.NettyThreadNameFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;
import java.util.concurrent.*;

/**
 * kafka 訊息提供者
 *
 * @author twjitm- [Created on 2018-09-04 16:17]
 */
@Service
public class NettyKafkaProducerListener implements IService {
    private Logger logger = LoggerFactory.getLogger(NettyKafkaProducerListener.class);
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 訊息處理執行緒池
     */
    private volatile ExecutorService executorService;

    private boolean run = true;
    /**
     * 訊息佇列
     */
    private BlockingQueue<AbstractKafkaPushTask> queue;

    /**
     * 將訊息傳送給kafka中心
     *
     * @param abstractKafkaPushTask
     */
    private void sendMessage(AbstractKafkaPushTask abstractKafkaPushTask) {
        String type = abstractKafkaPushTask.getTaskType().getTypeName();
        String value = JSON.toJSONString(abstractKafkaPushTask.getValue());
        //本身send方法就是一個非同步執行方法
        ListenableFuture<SendResult<String, String>> result =
                kafkaTemplate.sendDefault(type, value);
        /**
         * 添加回調監聽
         */
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("KAFKA MESSAGE SEND FAIL", throwable);
            }

            @Override
            public void onSuccess(SendResult<String, String> kafkaTaskTypeObjectSendResult) {
                logger.info("KAFKA MESSAGE SEND SUCCESS");
            }
        });


    }

    /**
     * 將任務存放到佇列中
     *
     * @param task
     */
    public void put(AbstractKafkaPushTask task) {
        try {
            queue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String getId() {
        return NettyKafkaProducerListener.class.getSimpleName();
    }

    @Override
    public void startup() throws Exception {
        run = true;
        queue = new LinkedBlockingQueue<>();
        NettyThreadNameFactory factory = new NettyThreadNameFactory(GlobalConstants.Thread.GAME_KAFKA_TASK_EXECUTOR);
        //開啟執行緒數量
        KafkaConfig kafkaConfig = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getKafkaConfig();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(kafkaConfig.getCorePoolSize(), kafkaConfig.getMaximumPoolSize(), kafkaConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
        executorService = poolExecutor;
        for (int i = 0; i < 4; i++) {
            executorService.execute(new Worker());
        }
    }

    @Override
    public void shutdown() throws Exception {
        logger.info("STOP KAFKA EXECUTOR");
        run = false;
        executorService.shutdown();
    }

    private class Worker implements Runnable {

        @Override
        public void run() {
            try {
                while (run) {
                    sendMessage(queue.take());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

封裝看一個訊息本地訊息佇列,將上層呼叫過來的訊息存放到佇列中,再有佇列tack到kafka中。這樣保證每個程序進來的訊息都是有順序的。

測試:

使用spring整合kafka使用起來說是特別方便的。說以我們編寫一個測試類

package com.twjitm.kafka;

import com.twjitm.TestSpring;
import com.twjitm.core.common.kafka.KafkaTaskType;
import com.twjitm.core.common.kafka.NettyKafkaProducerListener;

import javax.annotation.Resource;

/**
 * @author twjitm- [Created on 2018-09-05 12:25]
 */

public class TestKafka {
    @Resource
    static NettyKafkaProducerListener nettyKafkaProducerListener;

    public static void main(String[] args) {
        TestSpring.initSpring();
        test();
    }

    public static void test() {
        WordChatTask task = new WordChatTask(KafkaTaskType.WORLD_CHAT);
        task.setValue("hello,world");
        nettyKafkaProducerListener.put(task);
    }

}

建立執行緒成功: 在這裡插入圖片描述

傳送訊息成功:

在這裡插入圖片描述

接收訊息成功:

在這裡插入圖片描述

相關程式碼已經提交到github,歡迎各位大大star