1. 程式人生 > >由單執行緒到多執行緒生產消費模式的程式碼改造歷程

由單執行緒到多執行緒生產消費模式的程式碼改造歷程

最近很長時間沒更新部落格了,主要是最近專案上線,加班比較多,同時自己還得拿出一部分時間在網上找找資料學學新東西,也就沒有額外的時間寫部落格了。

好了 進入正題,這篇部落格主要是記錄我在專案上線期間,由於專案上線之後需要批量獲取現場終端裝置狀態(終端裝置數為1500萬左右,頁面上點的話不能滿足需求,一次只能召測數百),因此我用公司原有的一套程式碼去實現這個召測任務——由於程式碼我不可能貼出來,且我主要是分享我改造的併發思路,因此我將自己編寫一些事例或虛擬碼用於幫助讀者理解!

這套程式碼主要包含兩個方法,一個就是發起召測的send方法,用於告訴終端我要你的資料,你得給我返回來並放到我指定的快取中存著,send方法返回一個token作為下一個方法獲取結果的依據(該token是非同步返回的,這樣會提高程式碼執行效率)。第二個方法為getResult,即根據send方法返回的token獲取快取中的結果。

第一天,由於是試召,公司要求召測的數量只有30萬不到,所有召測必須在2小時之內執行完畢,2小時即為執行時間上限。第一天任務很簡單,直接用公司原有的程式碼就搞定。

第二天,數量增加到50萬,此時我便發現,兩小時好像執行不完了——由於設計硬體裝置,所以這裡並不能簡單的以為就是發50萬資料便了,還需要考慮召測的成功率,終端響應極限等其它因數。這個時候我便開始看原始碼,發現這段程式碼一共開啟了兩個執行緒,一個執行緒用於執行send,另一個執行緒用於執行get方法,從他的程式碼我看出來,他的本意是兩個執行緒共用同一把鎖,通過wait和notify的通訊機制,控制send方法每執行5000次,並將當前執行緒阻塞,並喚醒另一個執行get方法的執行緒,當get執行完成又喚醒執行send方法執行緒,不過由於程式碼中將字串常量作為了鎖物件,因此實際上加鎖失敗,然而原始碼中便是通過sleep()代替了鎖機制,即每執行5000條執行緒便會睡眠固定時間,然而事實上是,方法執行的時間其實並不固定,大部分根本等不了這麼長時間,因此浪費了大量時間,

原始碼:

更改前虛擬碼:
執行緒A {
  run(){
	while(true){
		String token =send();
		arryList.add(token);
		if(sendTimes == 5000 ){
		  Thread.sleep(1000*n);
		}
	}
  }
}


執行緒B {
  run(){
	while(true){
		if(arryList 為空 ){
		  Thread.sleep(1000*n)
		}
	}
  }
}

更改之後虛擬碼:

鎖使用同一把類鎖 建立一個類Clazz

執行緒A {
  run(){
	while(true){
		synchronized(Clazz.class){
			String token =send();
			arryList.add(token);
			if(sendTimes == 5000 ){

			 // Thread.sleep(1000*n);
			  Clazz.class.notify();
			  Clazz.class.wait();
			}
		}
		
	}
  }
}

執行緒B {
  run(){
	while(true){
		synchronized(Clazz.class){
			if(當 arryList 為空 ){
			  //Thread.sleep(1000*n)
			  Clazz.class.notify();
			  Clazz.class.wait();
			}
		}
	}
  }
}

第三天,需要對所有終端實現兩小時內所有資料的召測,這個時候不用想,兩個執行緒絕對不夠,因此我打算開多執行緒了,由於send的返回值需要作為get方法的查詢依據 ,因此我將send方法返回的token存到了一個併發阻塞佇列LinkedBlockingQueue中,使用生產消費模式實現。

ConcurrentLinkedQueue<String> queue= new ConcurrentLinkedQueue<String>();

執行緒A {
  run(){
	while(true){
		synchronized(Clazz.class){
	                if : 當無再召測的終端時countDownLatch.countDown()並break退出迴圈
                        else : String token =send();
			將token儲存到佇列中queue.put(token);
			

		}
		
	}
  }
}

執行緒B {
  run(){
	while(true){
		String  token = queue.take();
                if : 當token為空時countDownLatch.countDown()並break退出迴圈
		執行獲取結果方法get(token);
			
		
	}
  }
}
//主方法的虛擬碼
main(){
    伺服器有16個可用執行緒,主方法通過countDownLatch用於判斷執行緒是否執行,可以通過結束時間判斷並輸出任務執行時間等
	for(迴圈16次){
		執行緒A.start()
		執行緒B.start()
	}
      當任務還在執行的時候主執行緒阻塞在此處:this.countDownLatch.await(); 
      關閉資源,統計任務執行時間等後續操作
		
}