1. 程式人生 > >基於Neety的高效能中介軟體Mom

基於Neety的高效能中介軟體Mom

前言

今年7月份左右報名參加了阿里巴巴組織的高效能中介軟體挑戰賽,這次比賽不像以往的比賽,是從一個工程的視角來比賽的。
這個比賽有兩個賽題,第一題是實現一個RPC框架,第二道題是實現一個Mom訊息中介軟體。

MOM題目如下

實現一個基於釋出-訂閱模型的訊息中介軟體(broker+client)
必選特性:
提供可靠訊息服務,broker要保證資料同步落盤才能向生產者返回傳送成功的ack,並保證投遞給所有的消費者,直至所有的消費者都消費成功(消費者消費成功或者失敗都會返回對應的ack)。一旦消費者對一條訊息發生訂閱後,那麼該消費者消費失敗,消費超時(如果訊息推送給消費者後,10秒沒有返回響應,那麼認為消費超時),或者不線上,訊息不能丟失,需要儘快重投,讓消費者在恢復後可以儘快消費完堆積的訊息。
採用實時推送模型(只能push,push完後等待消費者ack,不能使用長輪詢),訊息一旦到達broker,要立馬推送給消費者,訊息延遲不能高於50ms。
訊息支援自定義屬性,能夠支援簡單的訊息屬性過濾訂閱,broker只能投遞符合屬性條件的訊息給訂閱者。例如訂閱者發起topic為trade,filter為area=hz的訂閱,那麼只有topic為trade,並帶有area屬性值為hz的訊息才會投遞給訂閱者。client要實現提供的api,傳送介面必須訊息持久化成功才能返回成功。
訊息儲存必須基於檔案系統自己實現,不能使用現成的儲存系統,資料儲存的根目錄為$userhome/store/。系統執行過程中如果突然宕機或者斷電(一定要保障訊息資料落盤後才能向傳送者響應傳送成功),重啟後訊息資料不能丟失。(資料丟失的定義:生產者訊息傳送返回成功ack後,如果broker出現宕機或者斷電後重啟,訊息丟失了,無法投遞給所有的訂閱者,直至所有訂閱組都消費成功)
消費者和生產者啟動的時候可以指定需要連線的broker ip,也就是實現broker單機的模式,前期跑分主要看broker的單機能力。
支援消費者叢集,消費負載均衡。比如消費者A是一個叢集,訂閱了topicA。broker收到topicA的某條訊息後,只投遞給消費者A叢集的某臺機器,消費者叢集的每臺機器每秒訊息消費量是均衡的。
加分特性(如果最後實現了必選特性,效能脫穎而出的幾個團隊,則還會綜合考慮系統設計是否能支援以下的特性):
服務高可用,broker可以叢集化部署,統一對外提供服務。broker叢集中部分機器當機,不會導致訊息傳送失敗,或者無法消費,對訊息服務的影響越小越好。
資料高可用,訊息儲存多份,單一資料儲存損壞,不會導致訊息丟失。
具備良好的線上橫向擴容能力。
支援大量的訊息堆積,在大量消費失敗或者超時的場景下,broker的效能和穩定不受影響,有良好的削峰填谷能力。
高效能、低成本。
考核方式
從系統設計角度和執行功能測試用例來評判必選特性,不滿足必選特性,直接淘汰。
服務高可用、資料高可用、線上橫向擴容能力從系統設計角度來評判
效能指標包括:每秒訊息接收量,每秒訊息投遞量,訊息投遞延遲,訊息傳送的rt,訊息堆積能力,削峰填谷能力。
效能壓測場景
4k訊息,一個釋出者釋出topicA,一個訂閱者訂閱這個topicA的所有訊息,訂閱者健康消費每條訊息,無堆積
4k訊息,一個釋出者釋出topicA,20個訂閱者分別訂閱topicA不同屬性的訊息,消費者健康消費,無堆積
4k訊息,一個釋出者釋出topicA,一個訂閱者訂閱這個topicA的所有訊息,訂閱者消費超時,大量堆積
4k訊息,一個釋出者釋出topicA,20個訂閱者分別訂閱topicA不同屬性的訊息,20個訂閱者只有一個訂閱者消費成功,其他訂閱者消費超時、失敗以及不線上,訊息出現大量堆積。
4k訊息,20個釋出者釋出20個不同的topic,每個topic都有20個訂閱者,他們分別訂閱不同屬性值的訊息,消費健康,無堆積
4k訊息,20個釋出者釋出20個不同的topic,每個topic都有20個訂閱者,他們分別訂閱不同屬性值的訊息,所有消費均超時,大量堆積。堆積持續一段時間後,減少90%的傳送量,並讓消費者恢復正常,broker需要儘可能快的投遞堆積的訊息。

其實剛讀了題目的時候內心是崩潰的,什麼是訊息釋出者什麼是訊息訂閱者。
但是仔細看看調理還是很清楚的,最主要的是實現裡面的Broker,因為需要保證每條資料都不能丟失所以需要對資料進行持久化,而且題目要求不能使用資料庫,所以只能選擇檔案系統了,這裡面有個要求就是

broker要保證資料同步落盤才能向生產者返回傳送成功的ack

實現方案以及注意點

Broker注意點

首先這意味著每個生產者是同步傳送的,這也就是一意味著Broker每次寫檔案一定要保證刷到磁碟上去後再告知生產者傳送下一個,使用普通的機械硬碟沒刷一次磁碟大概是30ms左右,這也就意味著 每秒每次最多隻能寫30多次磁碟,所以我們為了提高生產者傳送的效率,需要組提交,就是收到多個生產者的訊息後再統一刷磁碟,這樣可以儘可能的提高生產者的傳送速率。
Broker在記憶體中和檔案中都儲存了訊息,Broker開多個執行緒從訊息佇列中取出訊息併發送(注意:每個執行緒傳送了訊息後等待收到consumer的ack訊息,傳送後等待超時就放到佇列尾部)

Consumer注意點

Consumer在實現的時候主要需要注意的地方是,第一隻能收到自己感興趣的Topic的訊息,並且消費成功後返回一個訊息告訴Broker這個訊息已經消費成功

Producer注意點

Producer主要是負責生產訊息的,傳送到Broker後就等待Broker的確認訊息,收到確認訊息後才可以進行下一次訊息的傳送。

Broker的程式碼實現

package com.alibaba.middleware.race.mom.broker.netty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import
io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import com.alibaba.middleware.race.mom.broker.ConsumerManager; import com.alibaba.middleware.race.mom.broker.SemaphoreManager; import com.alibaba.middleware.race.mom.broker.TaskManager; import com.alibaba.middleware.race.mom.model.MomRequest; import com.alibaba.middleware.race.mom.model.MomResponse; import com.alibaba.middleware.race.mom.serializer.RpcDecoder; import com.alibaba.middleware.race.mom.serializer.RpcEncoder; /** * 在我們的系統中服務端收到的資料只能是MomRequest 客戶端收到的資料只能是MomResponse * producer 是客戶端 consumer 也是客戶端 broker是伺服器 * * 所以 producer->broker 是request * 所以 consumer->broker 是request * broker->consumer 是response broker->producer 是respose * 通過以上模型,我們的對資料編解碼就可以不變 * @author zz * */ //broker 伺服器實現類 public class BrokerServerImpl implements BrokerServer { //訂閱關係管理器 //private ConsumerManager cmanager; private ServerBootstrap bootstrap; public BrokerServerImpl() { init(); } @Override public void init() { // TODO Auto-generated method stub //需要做成儲存成為檔案的功能,broker重啟的時刻可以從檔案中恢復 //cmanager=new ConsumerManager(); final BrokerHandler handler=new BrokerHandler(); //設定兩個監聽器 handler.setConsumerRequestListener(new ConsumerMessageListener()); handler.setProducerListener(new ProducerMessageListener()); EventLoopGroup bossGroup = new NioEventLoopGroup(); //處理事件的執行緒池 EventLoopGroup workerGroup = new NioEventLoopGroup(30); try { bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RpcEncoder(MomResponse.class)); ch.pipeline().addLast(new RpcDecoder(MomRequest.class)); ch.pipeline().addLast(handler); } }).option(ChannelOption.SO_KEEPALIVE , true ); } catch (Exception e) { //TODO 異常處理 } } @Override public void start() { // TODO Auto-generated method stub try { ChannelFuture cfuture = bootstrap.bind(8888).sync(); //建立一個寫檔案的鎖 SemaphoreManager.createSemaphore("SendTask"); //建立一個傳送ack訊息的訊號量 SemaphoreManager.createSemaphore("Ack"); //恢復之前的傳送任務到佇列 TaskManager.RecoverySendTask(); //啟動傳送執行緒 ExecutorService executorService=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2+2); for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { executorService.execute(new SendThread()); System.out.println("start sendThread:"+(i+1)); } //啟動ack傳送執行緒 for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { executorService.execute(new AckSendThread()); System.out.println("start ack sendThread:"+(i+1)); } //啟動一個記錄傳送tps的執行緒 executorService.execute(new RecordThread()); //啟動刷磁碟執行緒 executorService.execute(new FlushThread()); cfuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); // TODO: handle exception System.out.println("Broker start error!"); } } public static void main(String[] args) { BrokerServer broker=new BrokerServerImpl(); broker.start(); } } package com.alibaba.middleware.race.mom.broker.netty; import java.util.Random; import com.alibaba.middleware.race.mom.ConsumeResult; import com.alibaba.middleware.race.mom.Message; import com.alibaba.middleware.race.mom.SendResult; import com.alibaba.middleware.race.mom.broker.ClientChannelInfo; import com.alibaba.middleware.race.mom.broker.ConsumerGroupInfo; import com.alibaba.middleware.race.mom.broker.ConsumerManager; import com.alibaba.middleware.race.mom.broker.SendHelper; import com.alibaba.middleware.race.mom.model.InvokeFuture; import com.alibaba.middleware.race.mom.model.MomRequest; import com.alibaba.middleware.race.mom.model.MomResponse; import com.alibaba.middleware.race.mom.model.RequestResponseFromType; import com.alibaba.middleware.race.mom.model.RequestType; import com.alibaba.middleware.race.mom.model.ResponseType; import com.alibaba.middleware.race.mom.model.SubscriptRequestInfo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelHandler.Sharable; /** * 在我們的系統中服務端收到的資料只能是MomRequest 客戶端收到的資料只能是MomResponse * broker 對於網路通訊收到的訊息的處理函式 * * * @author zz * */ @Sharable public class BrokerHandler extends ChannelInboundHandlerAdapter{ //對producer傳送的訊息進行處理,只有一種訊息就是Message private MessageListener producerListener; //對consumer傳送的訊息進行處理,有兩種訊息,第一種是訂閱資訊,第二種是消費資訊 private MessageListener consumerRequestListener; public void setProducerListener(MessageListener producerListener) { this.producerListener = producerListener; } public void setConsumerRequestListener(MessageListener consumerRequestListener) { this.consumerRequestListener = consumerRequestListener; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelActive(ctx); System.out.println("connect from :"+ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO 這裡對broker收到的所有訊息進行dispatch MomRequest request=(MomRequest)msg; //構建響應訊息 MomResponse response=new MomResponse(); response.setRequestId(request.getRequestId()); response.setFromType(RequestResponseFromType.Broker); switch (request.getRequestType()) { case ConsumeResult: //收到客戶端消費結果資訊 ConsumeResult result=(ConsumeResult) request.getParameters(); String key = response.getRequestId(); if(SendHelper.containsFuture(key)) { InvokeFuture<Object> future = SendHelper.removeFuture(key); if (future==null) { return; } else { future.setResult(request); } } //TODO應該在收到這個訊息的時候就得到下一個需要傳送的訊息 //BUT 為了做負載均衡不能再這裡面傳送訊息 consumerRequestListener.onConsumeResultReceived(result); consumerRequestListener.onRequest(request); break; case Message: //收到來自producer的資訊 Message message=(Message)request.getParameters(); //當有多個生產者事同時刷入磁碟的資料量根據生產者上深 if(request.getFromType()==RequestResponseFromType.Producer) { Conf.Increase(message.getTopic()); } producerListener.onProducerMessageReceived(message,request.getRequestId(),ctx.channel()); //返回給producer訊息傳送狀態,由單獨的執行緒返回資料給傳送者 //System.out.println("send message ack"); break; case Subscript: //收到的是來自consumer的訂閱資訊 SubscriptRequestInfo subcript=(SubscriptRequestInfo)request.getParameters(); //構建一個channelinfo clientid => groupid : topic + 隨機數 //String clientId=subcript.getGroupId()+":"+subcript.getTopic()+(new Random()).nextInt(100); String clientKey=subcript.getClientKey(); ClientChannelInfo channel=new ClientChannelInfo(ctx.channel(), clientKey); consumerRequestListener.onConsumeSubcriptReceived(subcript,channel); consumerRequestListener.onRequest(request); response.setResponseType(ResponseType.AckSubscript); ctx.writeAndFlush(response); break; case Stop: String clientId=(String)request.getParameters(); ConsumerManager.stopConsumer(clientId); break; default: System.out.println("type invalid"); break; } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //super.exceptionCaught(ctx, cause); // consumerRequestListener.onError(cause); // producerListener.onError(cause); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub super.channelInactive(ctx); System.out.println("disconnected"); }

Producer訊息的處理類

package com.alibaba.middleware.race.mom.broker.netty;

import io.netty.channel.Channel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import com.alibaba.middleware.race.mom.Message;
import com.alibaba.middleware.race.mom.SendResult;
import com.alibaba.middleware.race.mom.SendStatus;
import com.alibaba.middleware.race.mom.broker.AckManager;
import com.alibaba.middleware.race.mom.broker.ConsumerGroupInfo;
import com.alibaba.middleware.race.mom.broker.ConsumerManager;
import com.alibaba.middleware.race.mom.broker.SemaphoreManager;
import com.alibaba.middleware.race.mom.broker.SendHelper;
import com.alibaba.middleware.race.mom.broker.TaskManager;
import com.alibaba.middleware.race.mom.file.LogTask;
import com.alibaba.middleware.race.mom.model.SendTask;
import com.alibaba.middleware.race.mom.tool.MemoryTool;
import com.alibaba.middleware.race.mom.tool.Tool;


//broker收到producer的訊息的時候監聽類
public class ProducerMessageListener  extends MessageListener{

    @Override
    void onProducerMessageReceived(Message msg,String requestId,Channel channel) {

        //放入一個requestId對應的channel 在後面傳送ack後刪除
        AckManager.pushRequest(requestId, channel);

        // TODO Auto-generated method stub
        //標識是否序列化成功
        boolean isError=false;
        String mapstr="";
        for(Map.Entry<String, String> entry:msg.getProperties().entrySet()){    
            mapstr+=entry.getKey()+"="+entry.getValue();
        }   
        //System.out.println("receive producer message msgid:"+msg.getMsgId()+" topic:"+msg.getTopic()+" filter:"+mapstr);
        String topic=msg.getTopic();
        //找到訂閱這個訊息的組資訊   最好有訂閱過濾條件
        List<ConsumerGroupInfo> allgroups= ConsumerManager.findGroupByTopic(topic);
        //符合這個訊息過濾訊息的組
        List<ConsumerGroupInfo> groups=new ArrayList<ConsumerGroupInfo>();
        for (ConsumerGroupInfo groupinfo : allgroups) 
        {
            //groupinfo.findSubscriptionData(topic);
            String filterName=groupinfo.findSubscriptionData(topic).getFitlerName();
            String filterValue=groupinfo.findSubscriptionData(topic).getFitlerValue();
            if(filterName==null)
            {
                groups.add(groupinfo);
            }
            else
            {
                //判斷訊息是否有組需要的欄位,且欄位的值和訊息一致
                if(msg.getProperty(filterName)!=null&&msg.getProperty(filterName).equals(filterValue))
                {
                    groups.add(groupinfo);
                }
            }
        }
        if(groups.size()==0)//沒有訂閱這個訊息的組,需要把訊息儲存在預設佇列裡面?
        {
            //TODO 丟失資訊?
            //查詢有沒有專屬於儲存這個topic的佇列
            //QueueFile queue=QueueManager.findQueue(topic);
            System.out.println("don't have match group");
            //返回這個ack
            SendResult ack=new SendResult();
            ack.setMsgId(msg.getMsgId());//message id
            ack.setInfo(requestId);//request id
            ack.setStatus(SendStatus.SUCCESS);
            AckManager.pushAck(ack);
            SemaphoreManager.increase("Ack");
        }
        //遍歷所有的訂閱該訊息的組
        List<byte[]> logList=new ArrayList<byte[]>();
        List<SendTask> taskList=new ArrayList<SendTask>();
        for (ConsumerGroupInfo consumerGroupInfo : groups) {
            SendTask task=new SendTask();
            task.setGroupId(consumerGroupInfo.getGroupId());
            task.setTopic(topic);
            task.setMessage(msg);

            taskList.add(task);
            LogTask log=new LogTask(task, 0);
            byte[] data=Tool.serialize(log);
            logList.add(data);
        }
        try 
        {
            //生成一個requestID和messageID組成的鍵值
            String key=requestId+"@"+msg.getMsgId();
            //先把這些任務寫到緩衝區,這時候ack訊息還沒有生成,producer還在等待ack訊息
            //等到一定時機生成ACK訊息加入ack訊息佇列等待ack傳送執行緒傳送ack訊息
            FlushTool.writeToCache(logList,key);
        }
        catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        //新增一個傳送任務
        if(MemoryTool.moreThan(1024*1024*100*8))//100MB可用記憶體
        {
            TaskManager.pushTask(taskList);//把這些任務放到記憶體中的任務佇列

            for (int i=0;i<taskList.size();i++) {
                SemaphoreManager.increase("SendTask");
            }
        }
        else
        {
            //不新增到傳送隊列了
        }

    }

}

消費者ACK訊息的處理類

package com.alibaba.middleware.race.mom.broker.netty;


import com.alibaba.middleware.race.mom.ConsumeResult;
import com.alibaba.middleware.race.mom.ConsumeStatus;
import com.alibaba.middleware.race.mom.Message;
import com.alibaba.middleware.race.mom.broker.ClientChannelInfo;
import com.alibaba.middleware.race.mom.broker.ConsumerManager;
import com.alibaba.middleware.race.mom.broker.SemaphoreManager;
import com.alibaba.middleware.race.mom.broker.SendHelper;
import com.alibaba.middleware.race.mom.broker.SubscriptionInfo;
import com.alibaba.middleware.race.mom.broker.TaskManager;
import com.alibaba.middleware.race.mom.file.LogTask;
import com.alibaba.middleware.race.mom.model.SendTask;
import com.alibaba.middleware.race.mom.model.SubscriptRequestInfo;
import com.alibaba.middleware.race.mom.tool.Tool;

public class ConsumerMessageListener extends MessageListener {

    @Override
    void onConsumeResultReceived(ConsumeResult msg) {
        // TODO Auto-generated method stub
        if(msg.getStatus()==ConsumeStatus.SUCCESS)//消費成功的話,找到對應的佇列刪除那個訊息,然後繼續傳送下個訊息
        {
            //String key=msg.getGroupID()+msg.getTopic()+msg.getMsgId();
            //System.out.println("consume ok");

            if(TaskManager.findInResend(msg.getGroupID(), msg.getTopic(), msg.getMsgId()))
            {
                //表示這個訊息時已經超時的 這時候發過來的消費訊息無效
                return;
            }
            //TODO 記錄一個傳送成功的日誌
            SendTask task=new SendTask();
            task.setGroupId(msg.getGroupID());
            task.setTopic(msg.getTopic());
            Message message=new Message();
            message.setMsgId(msg.getMsgId());
            task.setMessage(message);
            LogTask logtask=new LogTask(task, 1);
            byte[] data=Tool.serialize(logtask);
            //消費情況直接寫入檔案
            FlushTool.writeConsumeResult(data);
        }
    }
    //收到訂閱訊息後的操作
    @Override
    void onConsumeSubcriptReceived(SubscriptRequestInfo msg,ClientChannelInfo channel) {
        // TODO Auto-generated method stub

        System.out.println("receive subcript info groupid:"+msg.getGroupId()+" topic:"+msg.getTopic()+" filterName:"+msg.getPropertieName()+" filterValue:"+msg.getPropertieValue()+" clientId:"+channel.getClientId());

        SubscriptionInfo subscript=new SubscriptionInfo();
        subscript.setTopic(msg.getTopic());
        subscript.setFitlerName(msg.getPropertieName());
        subscript.setFitlerValue(msg.getPropertieValue());
        channel.setSubcript(subscript);//設定訂閱資訊

        //加入某個組
        //相同的組和相同的topic,更新訂閱條件就好
        ConsumerManager.addGroupInfo(msg.getGroupId(), channel);

        //TODO nothing todo

    }

}

Broker訊息刷盤的類

package com.alibaba.middleware.race.mom.broker.netty;

import java.util.concurrent.TimeUnit;


/**
 * 刷磁碟執行緒  當收到多個生產者訊息的時候快取滿 的時候刷磁碟,然後喚醒等待刷磁碟的執行緒  也就是等待發送ACK的執行緒將他們喚醒
 * @author sei.zz
 *
 */
public class FlushThread implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            try 
            {
                long start =System.currentTimeMillis();
                //等待一段時間,是否收集齊一定數量的訊息
                if (!FlushTool.semp.tryAcquire(1000, TimeUnit.MILLISECONDS)) {
                    synchronized (FlushTool.syncObj) 
                    {
                        //TODO 把cacheList裡面的資料刷入磁碟
                        //FlushTool.logWriter.log("time out");
                        FlushTool.flush();
                    }
                    continue;
                }
                else
                {
                    long end=System.currentTimeMillis();
                    FlushTool.logWriter.log("collect all:use time"+(end-start)+" num:"+FlushTool.cacheList.size());
                    synchronized (FlushTool.syncObj) 
                    {
                        //TODO 把cacheList裡面的資料刷入磁碟
                        FlushTool.flush();
                    }
                }
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException();
            }
        }
    }
}

package com.alibaba.middleware.race.mom.broker.netty;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;

import com.alibaba.middleware.race.mom.SendResult;
import com.alibaba.middleware.race.mom.SendStatus;
import com.alibaba.middleware.race.mom.broker.AckManager;
import com.alibaba.middleware.race.mom.broker.SemaphoreManager;
import com.alibaba.middleware.race.mom.file.MessageLog;
import com.alibaba.middleware.race.mom.tool.LogWriter;


/**
 * 
 * @author sei.zz
 *
 */
public class FlushTool {


    public static MessageLog log=null;
    public static LogWriter logWriter=null;
    static
    {
        try 
        {
            log=new MessageLog("message");//初始化持久化檔案的例項
            logWriter=LogWriter.getLogWriter();
        }
        catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static Object syncObj=new Object();//用來刷磁碟的時候同步的

    //阻塞每個執行緒是否可以返回了
    public static Semaphore canReturn =new Semaphore(0);
    //一段時間內收到的訊息快取在這裡,由一個單獨的執行緒來刷磁碟
    public static List<byte[]> cacheList=new ArrayList<byte[]>();
    //儲存正在等待ack訊息的requestID和message ID
    public static List<String> requestCacheList=new ArrayList<String>();

    public static Semaphore semp=null;//一個標識收到多少個數據後就開始刷盤的訊號量。
    public static int threadNum=Conf.connNum;
    static
    {
        semp=new Semaphore(-threadNum);//與傳送執行緒數量相同
    }

    public static void writeToCache(byte[] data,String requestId)
    {
        //System.out.println("write in cache");
        synchronized (cacheList) {
            cacheList.add(data);
            requestCacheList.add(requestId);
            semp.release();
        }
    }
    public static void writeToCache(List<byte[]> list,String requestId)
    {
        //System.out.println("write in cache");
        synchronized (cacheList) {
            cacheList.addAll(list);
            requestCacheList.add(requestId);
            semp.release();//收到一個數據,釋放一下,當釋放到足夠多的時候 執行緒會刷盤
        }
    }
    public static void reset()
    {
        semp=new Semaphore(-threadNum);
    }

    //將緩衝區的資料寫入硬碟,喚醒在等待刷盤操作的執行緒
    public static void flush()
    {
        List<byte[]> temp=null;
        List<String> requestTemp=null;
        synchronized (cacheList) {
            temp=new ArrayList<byte[]>();
            requestTemp=new ArrayList<String>();
            temp.addAll(cacheList);
            requestTemp.addAll(requestCacheList);
            cacheList.clear();
            requestCacheList.clear();
            reset();
        }

        if(temp!=null&&temp.size()>0)//刷已經寫好的資料,此時其他執行緒可以把資料寫入cacheList
        {
            long start=System.currentTimeMillis();
            boolean error=false;
            //同步的把資料儲存到磁碟
            if(!log.SynSave(temp))
                error=true;
            long end=System.currentTimeMillis();

            logWriter.log("save use time:"+(end-start)+" number:"+temp.size()+" cachelist:"+cacheList.size());
            //儲存結束後,把刷盤成功的訊息,生成對應的ack訊息,設定訊息id
            for (int i = 0; i < requestTemp.size(); i++) {
                SendResult ack=new SendResult();
                String[] arr=requestTemp.get(i).split("@");
                ack.setMsgId(arr[1]);//message id
                ack.setInfo(arr[0]);//request id
                if(error)
                    ack.setStatus(SendStatus.FAIL);
                else
                    ack.setStatus(SendStatus.SUCCESS);

                AckManager.pushAck(ack);//往ack佇列裡面放入一個ack訊息
                SemaphoreManager.increase("Ack");
            }
        }
        //System.out.println("flush");
    }

    public static boolean writeConsumeResult(byte[] data)
    {
        if(log!=null)
            return log.AsynSave(data);
        else
            return false;
    }
}

Consumer的實現類

package com.alibaba.middleware.race.mom;

import java.util.UUID;

import com.alibaba.middleware.race.mom.cunsumer.netty.MomConsumerConnection;
import com.alibaba.middleware.race.mom.cunsumer.netty.MomConsumerNettyConnection;
import com.alibaba.middleware.race.mom.cunsumer.netty.MomConsumertHandler;
import com.alibaba.middleware.race.mom.cunsumer.netty.ResponseCallbackListener;
import com.alibaba.middleware.race.mom.model.MomRequest;
import com.alibaba.middleware.race.mom.model.MomResponse;
import com.alibaba.middleware.race.mom.model.RequestType;
import com.alibaba.middleware.race.mom.model.SubscriptRequestInfo;

public class DefaultConsumer implements Consumer{
    //broker伺服器ip地址
    private String brokerIp;
    //連線broker伺服器的連線
    private MomConsumerConnection consumerConn;
    //消費者組id
    private String groupId;
    //訂閱的topic
    private String topic;
    //過濾的屬性名
    private String propertieName;
    //過濾的值
    private String propertieValue;
    //監聽器
    private MessageListener listener;
    //是否有屬性過濾
    private boolean isFilter=false;

    //是否輸入訂閱資訊
    private boolean isSubcript=false;

    private boolean isRunning=false;

    private String clientKey="";

    public DefaultConsumer() {
        //this.brokerIp=System.getProperty("SIP");
        brokerIp="127.0.0.1";
        consumerConn=new MomConsumerNettyConnection(brokerIp,8888);
    }

    @Override
    public void start() {
        // TODO Auto-generated method stub
        if(isSubcript)
        {
            //設定處理器 和結果回撥函式
            consumerConn.setHandle(new MomConsumertHandler(consumerConn,new ResponseCallbackListener() {
                @Override
                public void onTimeout() {
                    // TODO Auto-generated method stub

                }
                @Override
                public Object onResponse(Object response) {
                    // TODO Auto-generated method stub
                    MomResponse mr=(MomResponse)response;
                    //呼叫上層設定的回撥函式
                    ConsumeResult result=listener.onMessage((Message)mr.getResponse());
                    result.setGroupID(groupId);
                    result.setTopic(topic);
                    result.setMsgId(((Message)mr.getResponse()).getMsgId());
                    return result;
                }
                @Override
                public void onException(Throwable e) {
                    // TODO Auto-generated method stub
                    if(e instanceof java.net.ConnectException)
                    {
                        System.out.println("connect error");
                        if(isRunning)
                            restartConnect();
                    }
                }
                @Override
                public void onDisconnect(String msg) {
                    // TODO Auto-generated method stub
                    if(isRunning)
                        restartConnect();

                }
            }));
            //連線伺服器
            consumerConn.connect();
            clientKey=groupId+topic+UUID.randomUUID().toString();
            //clientKey=groupId+topic+"0000007";
            //TODO 傳送一個自己的訂閱資訊給伺服器
            isRunning=true;
            MomRequest request=new MomRequest();
            request.setRequestType(RequestType.Subscript);
            request.setRequestId(UUID.randomUUID().toString());

            //構造訂閱資訊傳送給
            SubscriptRequestInfo subscript=new SubscriptRequestInfo();
            subscript.setGroupId(groupId);
            subscript.setTopic(topic);
            subscript.setPropertieName(propertieName);
            subscript.setPropertieValue(propertieValue);
            subscript.setClientKey(clientKey);
            request.setParameters(subscript);
            consumerConn.Send(request);
        }
    }

    //重新連線broker伺服器
    public void restartConnect()
    {
        boolean isConnected=false;
        System.out.println("restart");
        //TODO 獲取到新的brokerIp地址 從zookeeper獲取
        //brokerIp=System.getProperty("SIP");
        brokerIp="127.0.0.1";
        consumerConn=new MomConsumerNettyConnection(brokerIp,8888);
        //重新設定處理器
        //設定處理器 和結果回撥函式
        consumerConn.setHandle(new MomConsumertHandler(consumerConn,new ResponseCallbackListener() {
            @Override
            public void onTimeout() {
                // TODO Auto-generated method stub

            }
            @Override
            public Object onResponse(Object response) {
                // TODO Auto-generated method stub
                MomResponse mr=(MomResponse)response;
                //呼叫上層設定的回撥函式
                ConsumeResult result=listener.onMessage((Message)mr.getResponse());
                result.setGroupID(groupId);
                result.setTopic(topic);
                result.setMsgId(((Message)mr.getResponse()).getMsgId());
                return result;
            }
            @Override
            public void onException(Throwable e) {
                // TODO Auto-generated method stub
                //System.out.println("DefaultConsumer error");
                if(e instanceof java.net.ConnectException)
                {
                    System.out.println("connect error");
                    if(isRunning)
                        restartConnect();
                }
            }
            @Override
            public void onDisconnect(String msg) {
                // TODO Auto-generated method stub
                if(isRunning)
                    restartConnect();

            }
        }));
        //連線伺服器
        try 
        {
            consumerConn.connect();
            isConnected=true;
        }
        catch (Exception e)
        {
            // 重新連線伺服器失敗,過3秒重新連線
            try 
            {
                Thread.sleep(3000);
            }
            catch (InterruptedException ie) 
            {
                //TODO nothing todo
            }
            isConnected=false;
            restartConnect();
        }

        //TODO 傳送一個自己的訂閱資訊給伺服器
        if(isConnected)
        {
            MomRequest request=new MomRequest();
            request.setRequestType(RequestType.Subscript);
            request.setRequestId(UUID.randomUUID().toString());

            //構造訂閱資訊傳送給
            SubscriptRequestInfo subscript=new SubscriptRequestInfo();
            subscript.setGroupId(groupId);
            subscript.setTopic(topic);
            subscript.setPropertieName(propertieName);
            subscript.setPropertieValue(propertieValue);
            subscript.setClientKey(clientKey);
            request.setParameters(subscript);
            consumerConn.Send(request);
        }
    }

    @Override
    public void subscribe(String topic, String filter, MessageListener listener) {
        // TODO Auto-generated method stub
        this.topic=topic;
        if(filter.trim().length()>0&&filter.contains("="))
        {
            this.propertieName=filter.split("=")[0];
            this.propertieValue=filter.split("=")[1];
            this.isFilter=true;
        }
        this.listener=listener;//設定監聽
        isSubcript=true;
    }

    @Override
    public void setGroupId(String groupId) {
        // TODO Auto-generated method stub
        this.groupId=groupId;
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub

        if(isRunning)
        {
            isRunning=false;
            //傳送一個退訂訊息,把自己從訂閱關係裡面移除
            MomRequest request=new MomRequest();
            request.setRequestType(RequestType.Stop);
            request.setRequestId(UUID.randomUUID().toString());
            request.setParameters(new String(clientKey));
            consumerConn.SendSync(request);
            consumerConn.close();
        }

    }

}

由於這個專案的程式碼太多,沒能全部貼出裡面的程式碼,其中的通訊方式主要就是RPC的實現方案。
其中有很多地方是需要注意的。

原創宣告

相關推薦

基於Neety高效能中介軟體Mom

前言 今年7月份左右報名參加了阿里巴巴組織的高效能中介軟體挑戰賽,這次比賽不像以往的比賽,是從一個工程的視角來比賽的。 這個比賽有兩個賽題,第一題是實現一個RPC框架,第二道題是實現一個Mom訊息中介軟體。 MOM題目如下 實現一個基於釋出

基於Nginx的中介軟體架構》學習筆記---1.環境配置

一、環境除錯確認 (四項確認) 1、確認系統網路 ping www.baidu.com 2、確認yum可用 yum list|grep gcc 3、確認關閉iptables規則 iptables -L // 檢視目前的防火牆規則 iptab

基於Nginx的中介軟體架構》學習筆記---3.nginx的目錄分析

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px ".PingFang SC"; color: #454545 } span.s1 { font: 12.0px "Helvetica Neue" } 一、目錄分析 用yum的方式進行安裝實質上裝的都是

基於Nginx的中介軟體架構》學習筆記---4.nginx編譯引數詳細介紹

通過nginx -V檢視編譯時引數: 在nginx安裝目錄下,通過./configure --help,檢視對應版本ngnix編譯時支援的所有引數: Nginx編譯引數詳細介紹: --help 顯示本提示資訊 --prefix=PATH 設定安裝目錄 --sbin-path=PATH 設定

ASP.NET Core 中基於工廠的中介軟體啟用

IMiddlewareFactory/IMiddleware 是中介軟體啟用的擴充套件點。 UseMiddleware 擴充套件方法檢查中介軟體的已註冊型別是否實現 IMiddleware。 如果是,則使用在容器中註冊的 IMiddlewareFactory&

Delayer 基於 Redis 的延遲訊息佇列中介軟體

Delayer 基於 Redis 的延遲訊息佇列中介軟體,採用 Golang 開發,支援 PHP、Golang 等多種語言客戶端。 參考 有贊延遲佇列設計 中的部分設計,優化後實現。 專案連結:https://github.com/mixstart/d... ,有需要的朋友加 Star 哦。 應用場景

django-基於中介軟體實現限制ip頻繁訪問

########django-基於中介軟體寫一個限制頻繁登陸######## 額額,標題已經很醒目了,通過中介軟體去實現,其他方法也可以實現 瀏覽器前端傳來的請求,必須通過中介軟體,才能到後面路由,檢視函式,所以我們在中介軟體那裡做一層處理, 我們還需要知道是哪個ip,在什麼時候,請求了幾次,這些資

分散式事務之——基於訊息中介軟體實現

 環境需求:假如某人有5個女朋友(有點複雜),每天晚上都會給他的女朋友打電話說晚安,那麼每給一個女朋友打電話,其他女朋友都要進入等待狀態。一個一個打下去。。。等打到最後一個已經是凌晨了,對方都睡了。那麼有什麼辦法可以解決呢?此時這個人可以利用微信公眾號將自己甜言蜜語放進公眾號

WEBAPI基於Owin中介軟體實現身份驗證例項(OAUTH 2.0方式)附原始碼

1,在Webapi專案下新增如下引用: Microsoft.AspNet.WebApi.Owin Owin Microsoft.Owin.Host.SystemWeb Microsoft.Owin.Security.OAuth Microsoft.Owin.Secu

高效能訊息中介軟體——NATS

前 言 這段時間我的主要工作內容是將公司系統中使用的RabbitMQ替換成NATS,而此之前我對Nats一無所知。經過一段時間緊張的學習和開發之後我順利的完成了任務,並對訊息中介軟體有了更深的瞭解。在此感謝同事鍾亮在此過程中對我的幫助。NATS屬於比較小眾的一款

基於JTT808協議的車載終端接入閘道器中介軟體

技術支援QQ:78772895         易聯裝置接入閘道器是一個以netty/mina作為底層架構的高併發高可用的協議中介軟體,接入閘道器支援近200多種網路接入協議,全面覆蓋市場

java架構之高併發,分散式,叢集,高效能中介軟體合集高階學習

視訊課程內容包含: 高階Java架構師包含:Spring boot、Spring  cloud、Dubbo、Redis、ActiveMQ、Nginx、Mycat、Spring、MongoDB、ZeroMQ、Git、Nosql、Jvm、Mecached、Netty、Nio

【陌上軒客】技術領域:涉獵Java、Go、Python、Groovy 等語言,高效能、高併發、高可用、非同步與訊息中介軟體、快取與資料庫、分散式與微服務、容器和自動化等領域; 興趣愛好:籃球,騎行,讀書,發呆; 職業規劃:勵志成為一名出色的伺服器端系統架構師。

陌上軒客 技術領域:涉獵Java、Go、Python、Groovy 等語言,高效能、高併發、高可用、非同步與訊息中介軟體、快取與資料庫、分散式與微服務、容器和自動化等領域; 興趣愛好:籃球,騎行,讀書,發呆; 職業...

基於AgileEAS.NET SOA 中介軟體領域模型資料器快速打造自己的程式碼生成器

一、前言      AgileEAS.NET SOA 中介軟體平臺是一款基於基於敏捷並行開發思想和Microsoft .Net構件(元件)開發技術而構建的一個快速開發應用平臺。用於幫助中小型軟體企業建立一條適合市場快速變化的開發團隊,以達到節省開發成本、縮短開發時間,快速適應市場變化的目的。      A

AgileEAS.NET SOA 中介軟體平臺5.2版本下載、配置學習(一):下載平臺並基於直連環境執行

一、前言      AgileEAS.NET SOA 中介軟體平臺是一款基於基於敏捷並行開發思想和Microsoft .Net構件(元件)開發技術而構建的一個快速開發應用平臺。用於幫助中小型軟體企業建立一條適合市場快速變化的開發團隊,以達到節省開發成本、縮短開發時間,快速適應市場變化的目的。      A

基於 Cookie 的 SSO 中介軟體 kisso

kisso =  cookie sso 基於 Cookie 的 SSO 中介軟體,它是一把快速開發 java Web 登入系統(SSO)的瑞士軍刀。歡迎大家使用 kisso!!  1、支援單點登入 2、支援登入Cookie快取 3、支援防止 xss攻擊, S

MOM:訊息中介軟體

點到點模型 點對點傳遞模型:生產者傳送訊息到一個特定的佇列(Queue)中,而消費者從一個訊息佇列中得到訊息,如下圖所示: 點對點模型的特點: Ø  每條訊息有一個消費者       每條只有一個消費者,如果一條訊息被訊息者接收,那麼其他的消費者就不能得到這條訊息了。 Ø 傳送和接受訊息與時間沒

一種基於SOA的應用整合中介軟體體系架構

1.  關於SOA 1.1  技術背景         為了幫助企業和組織實現隨需應變的業務需要關注兩個要素:業務設計(業務模型和業務流程)以及底層技術基礎設施。企業可能必須修改業務模式和業務流程,以專注於企業的核心競爭力,並克服業務模型本身效率低下的缺點。如果沒有能夠以簡單

基於中介軟體的開發---J2EE

   J2EE 是針對 Web Service、業務物件、資料訪問和訊息報傳送的一組規範。這組應用程式設計介面確定了 Web 應用與駐留它們的伺服器之間的通訊方式。J2EE 注重兩件事,一是建立標準,使 Web 應用的部署與伺服器無關;二是使伺服器能控制構件的生命週期和其他資

基於Dubbo的分散式系統架構(二)-訊息中介軟體在分散式系統中的作用及介紹

一、訊息中介軟體的定義        Message-orientedmiddleware (MOM) is software infrastructure focused on sending a