1. 程式人生 > >使用Redis的管道(Pipeline)進行批量操作

使用Redis的管道(Pipeline)進行批量操作

Redis管道技術簡介

Reids是一個cs模式的Tcp服務,類似於http的請求。 當客戶端傳送一個請求時,伺服器處理之後會將結果通過響應報文返回給客戶端 。

那麼當需要傳送多個請求時,難道每次都要等待請求響應,再發送下一個請求嗎?

當然不是,這裡就可以採用Redis的管道技術。

舉個例子,如果說jedis是:request response,request response,…;

     那麼pipeline則是:request request… response response的方式。

下面,就簡單測試一下使用管道的效果。

單條插入與批量插入

這裡採用逐條和批量的方式往Redis中寫入一些資料。
先從Mysql中查出需要的資料,這裡大概是300條左右,資料量並不大,但是簡單做個測試應該沒問題。
單條插入—— Jedis:

    Jedis jedis = jedisPool.getResource();
    long start = System.currentTimeMillis();
    List<VehicleInfo> vehicleInfos  = vehicleInfoMapper.selectByParam(param);
    for (VehicleInfo vehicleInfo : vehicleInfos) {      
        //遍歷每個vehicleInfo
        TVehicleRealReportMsg real = new TVehicleRealReportMsg();
        Map<String, String> keysmap = new HashMap<String, String>();
        keysmap.put("vehicleStatus", real.getVehicleStatus() + "");
        keysmap.put("chargeStatus", real.getChargeStatus() + "");
        keysmap.put("longitude", "9");
        keysmap.put("latitude", "9");
        List<Long> list1 = new ArrayList<Long>();
        Long l = 1000L;
        Long l2 = 22222L;
        list1.add(l);
        list1.add(l2);
        real.setEngineFaultsList(list1);
        keysmap.put("engineFaultsList", JSON.toJSONString(list1));
        //單條插入
        jedis.hmset(vehicleInfo.getVehicleSeq()+"", keysmap);
    }   
    jedis.close();          
    long end = System.currentTimeMillis(); System.out.println("耗時:"+(end-start) +"ms");  

結果:467ms

1

批量插入—— Pipeline:

    Jedis jedis = jedisPool.getResource();
    Pipeline pip = jedis.pipelined();
    long start = System.currentTimeMillis();
    List<VehicleInfo> vehicleInfos  = vehicleInfoMapper.selectByParam(param);
    for (VehicleInfo vehicleInfo : vehicleInfos) {      
        //遍歷每個vehicleInfo
        TVehicleRealReportMsg real = new TVehicleRealReportMsg();
        Map<String, String> keysmap = new HashMap<String, String>();
        keysmap.put("vehicleStatus", real.getVehicleStatus() + "");
        keysmap.put("chargeStatus", real.getChargeStatus() + "");
        keysmap.put("longitude", "9");
        keysmap.put("latitude", "9");
        List<Long> list1 = new ArrayList<Long>();
        Long l = 1000L;
        Long l2 = 22222L;
        list1.add(l);
        list1.add(l2);
        real.setEngineFaultsList(list1);
        keysmap.put("engineFaultsList", JSON.toJSONString(list1));
        //批量插入
        pip.hmset(vehicleInfo.getVehicleSeq()+"", keysmap);
    }
    pip.sync();//同步
    jedis.close();      
    long end = System.currentTimeMillis();
    System.out.println("耗時:"+(end-start) +"ms");  

結果:175ms

2

可以看到使用管道之後的時間為,相比於單條插入的總時間大大減少,效能更優。

單條讀取和批量讀取

單條讀取—— Jedis:

    Jedis jedis = jedisPool.getResource();
    long start = System.currentTimeMillis();
    //1.採用redis單條讀取
    List<VehicleInfo> vehicleInfos = vehicleInfoMapper.selectByParam(param);
    List<Coordinate> list = new ArrayList<Coordinate>();
    for(VehicleInfo key: vehicleInfos){
        String hashkey = key.getVehicleSeq()+"";
        if(jedis.exists(hashkey+"")){                           
        Coordinate coord = new Coordinate();
        coord.setVehicleSeq(key.getVehicleSeq());
        coord.setOrgId(key.getOrgId()); 
        coord.setVehiclemodelseq(key.getVehiclemodelseq()); 
        coord.setVin(jedis.hget(hashkey, "vin"));
        coord.setLongitude(Long.valueOf(jedis.hget(hashkey, "longitude")));
        coord.setLatitude(Long.valueOf(jedis.hget(hashkey, "latitude")));       
        list.add(coord);
        }
    }   
    jedis.close();
    long end = System.currentTimeMillis();
    System.out.println("耗時:"+(end-start)+" ms");
    return list;  

結果: 第一次為1032ms,之後穩定在800~900ms
3

批量讀取—— Pipeline

    Jedis jedis = jedisPool.getResource();
    Pipeline pip = jedis.pipelined();
    long start = System.currentTimeMillis();
    //2.採用redis管道讀取
    List<VehicleInfo> vehicleInfos = vehicleInfoMapper.selectByParam(param);
    List<Coordinate> list = new ArrayList<Coordinate>();
    Map<String,Object> map = new HashMap<String, Object>();//map用來暫存屬性
    Map<String,List<Response<String>>> responses  = new HashMap<String, List<Response<String>>>(vehicleInfos.size());
    for(VehicleInfo info: vehicleInfos){
        List<Response<String>> resls = new ArrayList<Response<String>>();
        resls.add(pip.hget(info.getVehicleSeq()+"","longitude"));
        resls.add(pip.hget(info.getVehicleSeq()+"","latitude"));
        responses.put(info.getVehicleSeq() + "", resls);//得到了一輛車所有的實時資料--300輛車
        map.put(info.getVehicleSeq()+"orgId", info.getOrgId());
        map.put(info.getVehicleSeq()+"vin", info.getVin());
        map.put(info.getVehicleSeq()+"vehiclemodelseq", info.getVehiclemodelseq());
    }
    pip.sync(); 
    for(String k:responses.keySet()){
        Coordinate coord = new Coordinate();
        coord.setLongitude(Long.valueOf(responses.get(k).get(0).get()));//是get,不是toString
        coord.setLatitude(Long.valueOf(responses.get(k).get(1).get()));
        coord.setVehicleSeq(Long.valueOf(k));
        coord.setOrgId((String) map.get(k+"orgId"));
        coord.setVin((String) map.get(k+"vin"));
        coord.setVehiclemodelseq((Long) map.get(k+"vehiclemodelseq"));
        list.add(coord);
    }
    jedis.close();
    long end = System.currentTimeMillis();
    System.out.println("耗時:"+(end-start)+" ms");
    return list;  

結果: 第一次為200ms,之後維持在30ms左右
4

總時間大概是單條讀取總時間的1/5甚至更低,可以看出管道大大提升了效率,具有更好的效能。

注:使用管道所獲取的值的型別是Response<\String>,因此需要轉為String,如下程式碼片段:

Map<String,List<Response<String>>> responses  = new HashMap<String, List<Response<String>>>  (vehicleInfos.size());  
    //轉String
 responses.get(k).get(0).get();  

總結

  • 這裡僅僅測試了300條資料的操作,已經取得了相對明顯的效果。
  • 對於大量資料的操作,使用Redis管道可以大大提升效能和效率。