1. 程式人生 > >基於redis的zSet集合做資料快取實現分頁查詢 java

基於redis的zSet集合做資料快取實現分頁查詢 java

      需要場景:最近公司要做手機頁面展示新聞文章資料查詢的優化工作,讓我提個優化方案。現狀是目前手機頁面的資料請求系統後臺,系統後臺然後呼叫其他系統的介面,返回分頁資料到前臺展示,這樣一來,使用者每次下拉到頁面底部載入更多資料都要呼叫其他介面,使用者體驗顯然不是很好,那有沒有更好的方案呢?

      優化方案:redis正好適合在這種場景下使用,使用者每次下拉到頁面底部,此時從前臺頁面到系統後臺分頁(假如每次取10條)取資料,可以直接到redis裡取資料,如果redis返回的資料為空或者小於你要取的10條資料,那麼呼叫介面取10條分頁資料放入快取,然後再從快取裡取資料返回到前臺。這樣的話,只有當其中一個使用者第一次查詢的時候會呼叫介面資料存入快取,以後這個使用者或者其他使用者再看這個文章資訊的時候,就是直接從快取裡取資料,就相當快捷,提高使用者體驗。經測試,之前每次呼叫介面在700~800ms左右,現在每次從快取裡取資料,只需要200ms左右,效能顯然提升很大。

     前面只提到下拉到頁面底部載入更多資料時的情況,其實我們當重新整理最新的資料時,這時候該怎麼處理呢?事實上系統後臺用到了kafka消費者接收從其他後臺實時傳送的文章資料,這裡接收的文章有三種類型:一種是add,就說明這個文章是新增的釋出到手機頁面的資料;一種是update,就說明這個文章是要更新已經發布的資料,最後一種是del,就說明這個文章是要從手機頁面刪除的。也就是說,我們一方面可以從介面獲取歷史的資料,另一方面可以實時獲取最新的被髮送來的新增文章資料(或者是要修改和刪除的)。另外補充一點,為了提升使用者一開啟手機就能快速的看到新聞資訊的體驗度,我們在系統啟動成功後,預設先呼叫介面存入快取10條記錄,這樣,使用者第一次進入手機頁面預設就能先從快取裡取10條新聞資訊。

    上面說了那麼多業務,無非是兩點,一:從快取裡獲取分頁資料;二:對快取資料進行增刪改查的操作。而redis定義了5種資料結構,這5種資料結構型別分別為String(字串)、List(列表)、Set(集合)、Hash(雜湊)和 Zset(有序集合)。

下面來對這5種資料結構型別作簡單的介紹(表格引用https://www.jianshu.com/p/7bf5dc61ca06文章裡的):

結構 型別                結構儲存的值結構的讀寫能力
String可以是字串、整數或者浮點數對整個字串或者字串的其中一部分執行操作;物件和浮點數執行自增(increment)或者自減(decrement)
List一個連結串列,連結串列上的每個節點都包含了一個字串從連結串列的兩端推入或者彈出元素;根據偏移量對連結串列進行修剪(trim);讀取單個或者多個元素;根據值來查詢或者移除元素
Set包含字串的無序收集器(unorderedcollection),並且被包含的每個字串都是獨一無二的、各不相同新增、獲取、移除單個元素;檢查一個元素是否存在於某個集合中;計算交集、並集、差集;從集合裡賣弄隨機獲取元素
Hash包含鍵值對的無序散列表新增、獲取、移除單個鍵值對;獲取所有鍵值對
Zset字串成員(member)與浮點數分值(score)之間的有序對映,元素的排列順序由分值的大小決定新增、獲取、刪除單個元素;根據分值範圍(range)或者成員來獲取元素
       所以,Zset結構正是我們想要的快取型別,我們把分數score用文章的主鍵news_id,把每個文章的的內容用json字串放入member裡,redis的資料會根據socre也就是news_id自動排序,我們只需要對redis進行新增、刪除的操作就行了(修改可以先刪除再新增)。最後還要考慮redis裡的資料定期刪除的問題,一般來說,設定快取的過期時間即可,但是設定過期時間是針對key來設定,這裡最好的解決方法就是限制快取的資料的個數,當資料的個數超過設定的限制個數之後,就是從score最低的值開始刪除即可。也就是score最低的值,也就是news_id按照自增長的規則,最小的news_id的資料就是比較早的資料,。

    補充說明一些Zset裡說資料不重複是指:如果新增一個數據裡的member如果快取裡存在,這個資料的socre和member就會覆蓋快取裡的資料,也就是說score是在資料裡會重複,而member在資料裡是不重複的。      

 來一張全部的邏輯圖:


   貼出一些主要程式碼:

一、kafka新增資料(keyNewsList為redis的key值,jm為文章的json資料格式,redisImpNewsListNum是從配置檔案裡取的redis大小的限值)

//建立zset格式的資料,scorenews_iddouble型,member為每個稿件的資料
double score = Double.parseDouble(jm.getString("news_id"));
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
redisTemplate.opsForZSet().add(keyNewsList, jm.toJSONString(), score);
LOG.debug("rediskafka快取首次資料成功score:"+score+",key:"+keyNewsList+",member:"+jm.toString());
//測試
System.out.println("新增之後的個數" + redisTemplate.opsForZSet().zCard(keyNewsList));
//追加邏輯:限制keyNewsList的個數
String redisImpNewsListNum = ApplicationSetting.getProperty("redis.impnews.listNum");
if (StringUtils.isNotBlank(redisImpNewsListNum)){
    //配置檔案裡設定個數限制
Long redisImpNewsListNumLong=Long.parseLong(redisImpNewsListNum);
//keyNewsList的個數
Long keyNewsListSize=redisTemplate.opsForZSet().zCard(keyNewsList);
//如果keyNewsList的個數 超過 設定的限制的話,從socre最小的值開始刪除
if (keyNewsListSize > redisImpNewsListNumLong){
        redisTemplate.opsForZSet().removeRange(keyNewsList,0,keyNewsListSize-redisImpNewsListNumLong-1);
LOG.debug("redis裡的keyNewsList的個數:"+keyNewsListSize+",超過設定的限值redis.impnews.listNum:"+redisImpNewsListNumLong+",刪除超出的資料。");
}
}
System.out.println("刪除之後的個數" + redisTemplate.opsForZSet().zCard(keyNewsList));

二、kafka更新資料

//先刪除後新增
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
Boolean aBoolean= redisTemplate.opsForZSet().add(keyNewsList, jo.toJSONString(), score);
if (aBoolean){
    LOG.debug("kafka更新資料,update成功,key:"+keyNewsList+",score:"+score+",member:"+jo.toJSONString());
}

三、kafka刪除資料

//通過score來刪除快取裡的資料
Double score = Double.parseDouble(data.getString("news_id"));
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
LOG.debug("kafka刪除資料,直接delete成功,key:" + keyNewsList + ",score:" + score);

四、系統首次載入存入快取資料

BSPResponse bspRes = bspClient.getList("",
"topmaceco,topcptmkt,topmoney,topfxmkt,topbond,topcom", "1", "100", "","0","");
//String keyNewsList = "newsList_redis_*";
String keyNewsList = "newsList_redis_impNews";
LOG.info("redis首頁要聞請求bsp介面狀態:"+bspRes.getMessage());
if (bspRes.isSuccess()) {
    JSONArray ja = bspRes.getBodyResult().getJSONArray("LIST");
List<JSONObject> obj = new ArrayList<JSONObject>();
    if(null != ja && ja.size() > 0){
        for (int i = 0; i < ja.size(); i++) {
            JSONObject jm = (JSONObject) ja.get(i);
obj.add(jm);
}
    }
    if (obj != null&& obj.size()>0) {
        //清除所有
redisTemplate.opsForZSet().removeRange(keyNewsList,0,-1);
        for (int i = 0; i < obj.size(); i++) {
            JSONObject jm = obj.get(i);
String news_id = jm.getString("news_id");
String info_id = jm.getString("info_id");
//判斷資訊閱讀數是否應該增加
//is_NewReadertrue 為閱讀數增加1
String is_NewReader = "true";
String keyName_1 = "";
jm.put("is_newreader", is_NewReader);
jm.put("flag", "");
//介面有摘要(news_abst),作者(author), 正文length(data_content_size)、 可分享欄位(is_share)
            // 返回給終端的欄位有:摘要(news_abst),作者(author), 是否有正文(hasContent)、 是否可分享(isShare)
jm.put("isShare","0".equals(jm.getString("is_share"))?false:true);
jm.put("hasContent","0".equals(jm.getString("data_content_size"))? false:true);
jm.put("news_type", jm.getString("info_type")==null?"":jm.getString("info_type"));
//放入快取裡(防止資料重複,先刪除在新增)
Double score = Double.parseDouble(news_id);
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
redisTemplate.opsForZSet().add(keyNewsList,jm.toJSONString(),score);
}
        LOG.info("redis首頁要聞快取:" + redisTemplate.opsForZSet().reverseRange(keyNewsList,0,-1));
}
} else {
    LOG.error("redis首頁要聞請求bsp介面返回失敗");
}

五、前臺呼叫系統後臺

說明:page_news_id是前臺傳遞到後臺的最小news_id,根據這值,我們可以定位到快取的資料位置,然後開始取多條資料。

舉例:注意在score 在redis裡是double型別

 score           member

44390         {“news_id”:44390,"title":............}

44389         {“news_id”:44389,"title":............}

44385         {“news_id”:44385,"title":............}

44378         {“news_id”:44378,"title":............}

44376         {“news_id”:44376,"title":............}

44374         {“news_id”:44374,"title":............}

44373         {“news_id”:44373,"title":............}

44372         {“news_id”:44372,"title":............}

44370         {“news_id”:44370,"title":............}

44369         {“news_id”:44369,"title":............}

44367         {“news_id”:44367,"title":............}

44365         {“news_id”:44365,"title":............}

....               ........

假如說前臺app展示資料已經到44376了,當他下拉資料呼叫後臺介面傳遞引數page_news_id=44376,pageSize=5,

那麼利用reverseRangeByscore(keyNewsList,0,pageScore,1,pageSize)方法,取到的資料就會按照score從大到小排序(RangeByscore是按照從小到大排序):

第一個引數 表示 keyNewsList是key,你要從哪個快取取數;

第二第三個引數 表示 0 pageScore 表示從socre範圍最小是0,最大是pageScore;

第四第五個引數 表示 你要從資料下標開始從1取到pageSize,你要取多個。如果從0開始就會把44376這條資料也會取出來,所以要從1開始取。

取出的結果就是如下資料:

44374         {“news_id”:44374,"title":............}

44373         {“news_id”:44373,"title":............}

44372         {“news_id”:44372,"title":............}

44370         {“news_id”:44370,"title":............}

44369         {“news_id”:44369,"title":............}


if(StringUtils.isNotBlank(page_news_id)){
    System.out.println("下滑分頁載入資料");
//下滑分頁載入資料
pageScore = Double.parseDouble(page_news_id);
System.out.println("pageScore = " + pageScore);
set = redisTemplate.opsForZSet().reverseRangeByScore(keyNewsList,0,pageScore,1,pageSize);
System.out.println("快取資料大小前"+set.size());
//快取裡沒有資料,則呼叫渠道整合介面向快取裡插入資料
if (set == null || set.size()<10){
        bspDataAddToRedis(user_id, classify_code, page_num, page_size,delay, page_news_id,keyNewsList,pageScore,pageSize);
set = redisTemplate.opsForZSet().reverseRangeByScore(keyNewsList,0,pageScore,1,pageSize);
System.out.println("快取資料大小後"+set.size());
}

} else {
    System.out.println("前臺首次載入和下拉重新整理最新資料");
set = redisTemplate.opsForZSet().reverseRange(keyNewsList,0,pageSize-1);
}

 參考資料:

https://www.jianshu.com/p/7bf5dc61ca06

https://www.cnblogs.com/knowledgesea/p/4999288.html

https://my.oschina.net/1107156537/blog/1617252