1. 程式人生 > >redis使用pipeline通道大幅度提升redis的處理速度,節省成本

redis使用pipeline通道大幅度提升redis的處理速度,節省成本

  最近在做專案的時候,遇到大量的讀寫,最開始都是set,get一條條的迴圈去取資料,當資料量大的時候,資料處理相當慢慢,就想到批處理資料的方式,最開始set資料的時候,想到的是mset 也算是批量插入資料,這個在資料量幾百的話甚至幾千的插入量,也是OK的,取資料的時候用mget  這個100的資料量以下,效能還可以保證,再大的話就是嚴重有問題,資料量越大取出的成本本越高 ,另外一個,在用mset批量插入的時候,遇到一個難道,不過這是對redis不太瞭解的原因,後面無意中發現,set的時候,可以直接在後面參加引數,利用redisTemplate模板,

    redisTemplate.opsForValue().set("key_s2", maps2.toString(), 1000, TimeUnit.SECONDS); 輕鬆搞定 
  下面主要是針對 pipeline 管道方式批量插入,插入的資料量都 是在100000條資料,redis通過tcp來對外提供服務,client通過socket連線發起請求,每個請求在命令發出後會阻塞等待redis伺服器進行處理,處理完畢後將結果返回給client。每一個命令都對應了傳送、接收兩個網路傳輸,假如一個流程需要0.1秒,那麼一秒最多隻能處理10個請求,將嚴重製約redis的效能。

當我們要處理上萬以上資料的時候,就不得不另想辦法,此時pipeline管道就是解決執行大量命令時、會產生大量資料來回次數而導致延遲的技術。其實原理比較簡單,pipeline是把所有的命令一次發過去,避免頻繁的傳送、接收帶來的網路開銷,redis在打包接收到一堆命令後,依次執行,然後把結果再打包返回給客戶端。

下面直接用程式碼試下,當然還作了其它的一些改動,比如RedisConnection 和SessionCallback 引數效能上不太一樣,相差也不多,重點說一下, redisConnection.openPipeline();  這個方法在doInRedis內部其實已經有,但是在這裡首先開啟和關閉,對效能有一定的提高,10萬次提升100毫秒左右

//加 redisConnection.openPipeline(); 開啟資源  第一次耗時:470  第二次 耗時:468  毫秒

//不加redisConnection.openPipeline();開啟和關閉 第一次 耗時:533  第二次 耗時:490,主要是第一次執行操作 其實是可以忽略了,並且這個的引數是 doInRedis重寫方法是 RedisConnection 引數,後續對set的的操作必須是以redisConnection 引數開始,應該是新建一個接連去處理後續的所有操作,再到通道關閉,這樣麻煩一些,所有的序列化都要自己去處理

public void pipelinedTest(){
        Long time = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            redisTemplate.opsForValue().increment("pipline", 1);
        }
        System.out.println("耗時:" + (System.currentTimeMillis() - time));
        time = System.currentTimeMillis();
        Map maps = new HashMap<>();
        maps.put("name", "lmc");
        maps.put("姓名", "李四");
        redisTemplate.executePipelined(new RedisCallback<Object>() {
            @Override  
            public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {  
                redisConnection.openPipeline();  
                for (int i = 0;i < 100000;i++){
                    redisConnection.incrBy("a".getBytes(),1L);
                }
                redisConnection.set("keys".getBytes(), maps.toString().getBytes());
                redisConnection.closePipeline();  
                return null;  
            }
        });
        System.out.println("*****************"+redisTemplate.opsForValue().get("keys"));
        System.out.println("耗時:" + (System.currentTimeMillis() - time));
    }

第二種方式 execute 重寫方法,引數RedisOperations 這種方式比較省心,不需要引數來操作, 用redisTemplate模板操作直接省去了很多資料封裝的問題,但是效能上稍微差一點
// 這是第一次執行耗時:596    第二次執行耗時:589  毫秒


        Long time = System.currentTimeMillis();
        Map maps2 = new HashMap<>();
        maps2.put("name", "lmc");
        maps2.put("姓名", "李四");
        time = System.currentTimeMillis();
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
                for (int i = 0; i < 100000; i++) {
                    redisTemplate.opsForValue().increment("piplineddd", 1L);
                }
                redisTemplate.opsForValue().set("key_s2", maps2.toString(), 1000, TimeUnit.SECONDS);
                return null;
            }
        });
        System.out.println("$$$$$$$$$$$$$$$$: "+redisTemplate.opsForValue().get("key_s2"));
        System.out.println("耗時:" + (System.currentTimeMillis() - time));
    直接一次性提交,所以executePipelined原始碼提供了多種方式供我們選擇,底層提交都 一樣的,只是在對外提供的方式為了滿足更多的需求,

還在慢慢研究當中,如有描述不恰當的,還請各位指教