1. 程式人生 > >關於MQTT協議實現訊息推送系統

關於MQTT協議實現訊息推送系統

測試環境:



 硬碟:1T,5400  (效果不佳)

得出了一個異樣的測試結果:

持久: 

插入200000條JSON,共消耗:25.175 s

平均:7944.389275074478 條/秒

插入200000條JSON,共消耗:34.47 s

平均:5802.146794313896 條/秒

插入200000條JSON,共消耗:29.937 s數量:1400000

平均:6680.696128536593 條/秒

插入200000條JSON,共消耗:29.094 s

平均:6874.269608854059 條/秒

非持久:

插入200000條JSON,共消耗:11.35 s數量:1800000

平均:17621.14537444934 條/秒

插入200000條JSON,共消耗:10.714 s

平均:18667.16445771887 條/秒

插入200000條JSON,共消耗:11.153 s

平均:17932.394871335066 條/秒

插入200000條JSON,共消耗:10.717 s數量:2400000

平均:18661.93897545955 條/秒

主要在自己本地測試,最終禍首是硬碟不給力啊;

在進行持久化操作時,ActiveMQ預設是kahadb管理

log的預設大小是32MB,當超過之後會新建一個新的log檔案,完成操作後,activeMQ又將舊的log刪除了。



 

程式碼貼上:

Java程式碼  收藏程式碼
  1. public
     class Sender {  
  2.     static int size = 200000;  
  3.     static Session session;  
  4.     static MessageProducer producer;  
  5.     static Topic topic;  
  6.     static Connection connection;  
  7.     static String str = "[{'flag':'1','value':'8854c92e92404b188e63c4031db0eac9','label':'交換機(虛機)'},{'flag':'1','value':'3f367296c2174b7981342dc6fcb39d64','label':'防火牆'},{'flag':'1','value':'8a3e05eeedf54f8cbed37c6fb38c6385','label':'負載均衡'},{'flag':'1','value':'4f0ebc601dfc40ed854e08953f0cdce8','label':'其他裝置'},{'flag':'1','value':'6','label':'路由器'},{'flag':'1','value':'4','label':'交換機'},{'flag':'1','value':'b216ca1af7ec49e6965bac19aadf66da','label':'伺服器'},{'flag':'1','value':'7','label':'安全裝置'},{'flag':'1','value':'cd8b768a300a4ce4811f5deff91ef700','label':'DWDM\\SDH'},{'flag':'1','value':'5','label':'防火牆(模組)'},{'flag':'1','value':'01748963956649e589a11c644d6c09b5','label':'機箱'}]"
    ;  
  8.     public static void init_connection() throws Exception {  
  9.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
  10.         connection = factory.createConnection();  
  11.         connection.start();  
  12.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  13.         topic = session.createTopic("java.activemq.tps");  
  14.         producer = session.createProducer(topic);  
  15.         producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
  16.     }  
  17.     public static void sendMessage(String msg) {  
  18.         TextMessage message;  
  19.         try {  
  20.             message = session.createTextMessage();  
  21.             message.setText(str);  
  22.             producer.send(message);  
  23.         } catch (JMSException e) {  
  24.             e.printStackTrace();  
  25.         }  
  26.     }  
  27.     public static void close() throws Exception {  
  28.         connection.close();  
  29.     }  
  30.     public static void main(String[] arg) throws Exception {  
  31.         long start = System.currentTimeMillis();  
  32.         ExecutorService es = Executors.newFixedThreadPool(10);  
  33.         final CountDownLatch cdl = new CountDownLatch(size);  
  34.         init_connection();  
  35.         for (int a = 0; a < size; a++) {  
  36.             es.execute(new Runnable() {  
  37.                 @Override  
  38.                 public void run() {  
  39.                     sendMessage(str);  
  40.                     cdl.countDown();  
  41.                 }  
  42.             });  
  43.         }  
  44.         cdl.await();  
  45.         es.shutdown();  
  46.         long time = System.currentTimeMillis() - start;  
  47.         System.out.println("插入" + size + "條JSON,共消耗:" + (double)time / 1000 + " s");  
  48.         System.out.println("平均:" + size / ((double)time/1000) + " 條/秒");  
  49.         close();  
  50.     }  
  51. }