1. 程式人生 > >Storm 的流量控制和多執行緒併發處理

Storm 的流量控制和多執行緒併發處理

  • 面臨問題

storm多執行緒的時候,會遇到併發修改的問題,會報concurrentModificationException,如下圖所示
這裡寫圖片描述

  • 解決方法
    • 第一種治標不治本的方法:

一方面,對傳送到kafka的資料進行控制,將執行緒sleep的時間變長

if(count==18000)
            {
                try {
                    Thread.sleep(1000);
                    time++;
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
e.printStackTrace(); } count=0; }

另一方面,可以從處理的執行緒入手,即不允許併發修改(單執行緒不會報這種錯誤);
可以將執行緒數設低,控制比較快的處理,即前一部分的資料處理過快,導致後面資料處理成為瓶頸,也就是要找到一個平衡點。

  • 第二種網路上的其他人寫的方法(試過,都不管用):
    1.hasNext()是不會丟擲ConcurrentModificationException的,將next()換成hasNext(),但是還是會用到next()對ArrayList進行遍歷
    2.使用併發容器CopyOnWriteArrayList代替ArrayList
    下面是原先程式碼。
public static void modify(ArrayList<frequent> afr, double count,String time, PatternTree tree) 
{
    for (frequent fr : afr) 
        {
            ArrayList<String> item = fr.getItem();
            double value = fr.getCount();
        }
}

做了如下修改

public static void modify(List<frequent> afr, double
count,String time, PatternTree tree) { for(Iterator it = afr.iterator(); it.hasNext();) { List<String> item=new CopyOnWriteArrayList<String>(); item = ((frequent) it.next()).getItem(); double value = ((frequent) it.next()).getCount(); } }

最終還是報這個錯,不管是修改遍歷方法,還是換成CopyOnWriteArrayList,當一個執行緒執行modify方法的時候,另一個執行緒修改了modify中的afr,就會丟擲ConcurrentModificationException。

  • 最終方案
    將ConcurrentModificationException的異常catch住,這樣導致了傳送資料會fail掉,storm有出錯重發的機制,所以會不斷重發,拖慢了處理速度,在後面的部落格中,我會介紹storm的Qos的方法,減少fail的資料,加快處理的速度。