1. 程式人生 > >nginx+lua+strom+kafka實現熱點資料實時展示

nginx+lua+strom+kafka實現熱點資料實時展示

假設最近一段時間內有10萬個請求過來,其中1萬次請求訪問商品1,2萬次請求訪問商品2,3萬次請求訪問商品3,其他商品被訪問次數均勻。請實時統計熱點資料展示出來。

說下主要的解決步驟:

1.分發層nginx將所有的請求hash分發到對應的應用層nginx,防止第一次請求落到了應用層nginx1,訪問資料庫,加了點快取,第二次請求落到了nginx2,快取沒有,照樣訪問資料庫了,因此,需要保證productid=1的商品請求只會落到對應的應用層nginx上處理。

2.需要三級快取架構,應用層nginx本地快取+ 快取生產服務(redis快取+enchache快取)

3.資料庫修改後,需要傳送一條資料修改訊息給佇列,快取生產服務監聽到這個訊息,會自主的更新快取

4.使用者請求到達應用層nginx後,即使快取被命中了,也需要統計出來使用者的這次訪問請求吧,因此,每請求一次就需要用nginx+lua向kafka傳送一條自定義的訊息,用於後續請求處理。

5.strom從kafka佇列消費訊息,並統計出來每個商品的訪問次數,並統計出來前5名的商品productid,將productidlist

的資訊分段儲存在zookeeper中。

6.在做熱儲備的時候,需要從zookeeper中獲取分散式鎖,讀取節點資訊,寫入到快取生產服務中去。

1.首先是請求分發,需要nginx+lua指令碼,分發nginx在收到請求的時候,計算出hash值,傳送一個http請求到應用層上去處理。

這句話(local uri_args = ngx.req.get_uri_args())的意思是:

拿到url請求地址的引數

local uri_args = ngx.req.get_uri_args()

local productId = uri_args["productId"]

定義我們需要分發的應用層伺服器ip地址,對商品id進行取模,得到選用的伺服器的ip地址,字串拼接使用..

注意:2不是寫死的,代表的是備用伺服器ip地址的個數,hash%2不是0就是1,那麼加1得到的就是1,2,lua指令碼中的local host不同於陣列下標是從0開始的

local host = {"192.168.31.19", "192.168.31.187"}

local hash = ngx.crc32_long(productId)

local index = (hash % 2) + 1

local backend = "http://"..host[index]

對於method的處理,requestPath是不帶引數也不帶host的請求地址,/hello,請求拼接就是requestPath+引數

local requestPath = uri_args["requestPath"]

requestPath = "/"..requestPath.."?productId="..productId

因為需要從nginx嚮應用nginxf傳送一個http請求,因此需要引用我們在lua安裝包後又手動加入的http包,建立一個http請求

local http = require("resty.http")

local httpc = http.new()

建立一個get的http請求,指明機器地址和請求

local resp, err = httpc:request_uri(backend, {

method = "GET",

path = requestPath

})

如果沒有響應,返回請求錯誤,否則返回請求結果,關閉http連線

if not resp then

ngx.say("request error :", err)

return

end

ngx.say(resp.body)

httpc:close()

2.需要三級快取架構

3.資料庫修改後,快取資料生產服務自動更新

應用層nginx收到請求後,先找本地nginx快取,再去生產服務查快取,資料生產服務就是一個redis+encache的jar包,加到專案中可以直接使用快取。應用層nginx找本地快取,再找生產服務那塊用lua指令碼處理。

#得到請求的引數

local uri_args = ngx.req.get_uri_args()

local productId = uri_args["productId"]

local shopId = uri_args["shopId"]

#得到nginx的本地共享快取,my_cache就是我們定義的nginx快取塊的名字

local cache_ngx = ngx.shared.my_cache

#定義在nginx儲存的key的名字

local productCacheKey = "product_info_"..productId

local shopCacheKey = "shop_info_"..shopId

#試圖首次查詢快取,為空

local productCache = cache_ngx:get(productCacheKey)

local shopCache = cache_ngx:get(shopCacheKey)

#如果沒有找到本地快取,那麼建立一個http請求,傳送一個http請求,179是對應的快取生產服務所在的機器的地址,

#path表示實際快取生產服務的請求路徑地址,然後得到返回結果,將結果儲存到本地本地快取(cache_ngx)中,指定時間10分鐘

if productCache == "" or productCache == nil then

local http = require("resty.http")

local httpc = http.new()

local resp, err = httpc:request_uri("http://192.168.31.179:8080",{

method = "GET",

path = "/getProductInfo?productId="..productId

})

productCache = resp.body

cache_ngx:set(productCacheKey, productCache, 10 * 60)

end

if shopCache == "" or shopCache == nil then

local http = require("resty.http")

local httpc = http.new()

local resp, err = httpc:request_uri("http://192.168.31.179:8080",{

method = "GET",

path = "/getShopInfo?shopId="..shopId

})

shopCache = resp.body

cache_ngx:set(shopCacheKey, shopCache, 10 * 60)

end

#得到的是json字串,我們把字串變成son物件

local cjson = require("cjson")

local productCacheJSON = cjson.decode(productCache)

local shopCacheJSON = cjson.decode(shopCache)

#定義個物件,賦值json物件應該顯示到頁面上的內容

local context = {

productId = productCacheJSON.id,

productName = productCacheJSON.name,

productPrice = productCacheJSON.price,

productPictureList = productCacheJSON.pictureList,

productSpecification = productCacheJSON.specification,

productService = productCacheJSON.service,

productColor = productCacheJSON.color,

productSize = productCacheJSON.size,

shopId = shopCacheJSON.id,

shopName = shopCacheJSON.name,

shopLevel = shopCacheJSON.level,

shopGoodCommentRate = shopCacheJSON.goodCommentRate

}

#建立模板物件,渲染到html頁面中,(resty.template不是模板檔案位置,而是建立所需的模板jar包的位置)

#product.html是存在的模板檔案

local template = require("resty.template")

template.render("product.html", context)

4.每收到一條請求就用nginx+lua向kafka傳送一條自定義的訊息,用於後續請求處理

#建立kafka傳送者也就是需要lua安裝包下的那個kafa-lua互動的jar包

local cjson = require("cjson")

local producer = require("resty.kafka.producer")

#寫死nginx地址列表

local broker_list = {

{ host = "192.168.31.187", port = 9092 },

{ host = "192.168.31.19", port = 9092 },

{ host = "192.168.31.227", port = 9092 }

}

#自定義需要傳送給kafka的訊息,json字串

local log_json = {}

log_json["headers"] = ngx.req.get_headers()

log_json["uri_args"] = ngx.req.get_uri_args()

log_json["body"] = ngx.req.read_body()

log_json["http_version"] = ngx.req.http_version()

log_json["method"] =ngx.req.get_method()

log_json["raw_reader"] = ngx.req.raw_header()

log_json["body_data"] = ngx.req.get_body_data()

local message = cjson.encode(log_json);

local productId = ngx.req.get_uri_args()["productId"]

#非同步傳送訊息,確保根據productid進行機器對應分割槽傳送

local async_producer = producer:new(broker_list, { producer_type = "async" })

local ok, err = async_producer:send("access-log", productId, message)

if not ok then

ngx.log(ngx.ERR, "kafka send err:", err)

return

end

5.strom從kafka佇列消費訊息,並統計出來每個商品的訪問次數

strom設計到幾個概念,spout-bolt-wroker-task-拓撲,基礎版本的拓撲是這麼來的,比如一個問題,實時統計單詞出現次數

5.1spout不斷模擬傳送資料(句子),並轉發給bolt

5.2 bolt收到資料,把資料拆成單詞,並把每個單詞都轉發給下一個bolt

5.3 計數的blot收到單詞後開始統計單詞數量

5.4 拓撲結構的建立和main測試方法

那麼,換到這個功能也是一樣的,一個spout不斷從kafka消費自定義的訊息,表示有個商戶請求過來,訪問的是商品幾的訊息,並轉發給下一個bolt

一個bolt解析這個訊息,獲取訊息中的productid並轉發給下一個bolt

一個bolt獲取到productid,並且計數,將資料放到lrumap中,這個map如果你初始化為100,那麼只會儲存最近請求的100種商品,注意不是100次請求,map計數value是累加的

6.統計出前N名的商品id,然後分散式儲存到zk中,意思也就是和taskid有關,你設定了拓撲結構中最多有2個task,

儲存在zk中的taskidlist就會是5,4 ,每個task都會計算一個前N的排名,然後放到zk對應的task節點上

7從zookeeper中獲取分散式鎖的內容放到快取生產服務中

這裡主要是獲取zk資料狀態鎖,taskid鎖的雙鎖後往redis中插入資料。