1. 程式人生 > >Java 高併發快取與Guava Cache

Java 高併發快取與Guava Cache

一.背景

快取是我們在開發中為了提高系統的效能,把經常的訪問業務的資料第一次把處理結果先放到快取中,第二次就不用在對相同的業務資料在重新處理一遍,這樣就提高了系統的效能。快取分好幾種:

(1)本地快取。

(2)資料庫快取。

(3)分散式快取。

分散式快取比較常用的有memcached等,memcached是高效能的分散式記憶體快取伺服器,快取業務處理結果,減少資料庫訪問次數和相同複雜邏輯處理的時間,以提高動態Web應用的速度、 提高可擴充套件性

二.本地快取在高併發下的問題以及解決

今天我們介紹的是本地快取快取,我們這邊採用java.util.concurrent.ConcurrentHashMap

來儲存,ConcurrentHashMap是一個執行緒安全的HashTable,並提供了一組和HashTable功能相同但是執行緒安全的方法,ConcurrentHashMap可以做到讀取資料不加鎖,提高了併發能力。我們先不考慮記憶體元素回收或者在儲存資料會出現記憶體溢位的情況,我們用ConcurrentHashMap模擬本地快取,當在高併發環境一下,會出現一些什麼問題?

我們這邊採用實現多個執行緒來模擬高併發場景。

第一種:我們先來看一下程式碼:


[java] view plain copy print?
  1. public class TestConcurrentHashMapCache<K,V> {
  2. private final ConcurrentHashMap<K, V> cacheMap=new ConcurrentHashMap<K,V> ();
  3. public Object getCache(K keyValue,String ThreadName){
  4. System.out.println("ThreadName getCache=============="+ThreadName);
  5. Object value=null;
  6. //從快取獲取資料
  7. value=cacheMap.get(keyValue);
  8. //如果沒有的話,把資料放到快取
  9. if(value==null){
  10. return putCache(keyValue,ThreadName);
  11. }
  12. return value;
  13. }
  14. public Object putCache(K keyValue,String ThreadName){
  15. System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
  16. //可以根據業務從資料庫獲取等取得資料,這邊就模擬已經獲取資料了
  17. @SuppressWarnings("unchecked")
  18. V value=(V) "dataValue";
  19. //把資料放到快取
  20. cacheMap.put(keyValue, value);
  21. return value;
  22. }
  23. public static void main(String[] args) {
  24. final TestConcurrentHashMapCache<String,String> TestGuaVA=new TestConcurrentHashMapCache<String,String>();
  25. Thread t1=new Thread(new Runnable() {
  26. @Override
  27. public void run() {
  28. System.out.println("T1======start========");
  29. Object value=TestGuaVA.getCache("key","T1");
  30. System.out.println("T1 value=============="+value);
  31. System.out.println("T1======end========");
  32. }
  33. });
  34. Thread t2=new Thread(new Runnable() {
  35. @Override
  36. public void run() {
  37. System.out.println("T2======start========");
  38. Object value=TestGuaVA.getCache("key","T2");
  39. System.out.println("T2 value=============="+value);
  40. System.out.println("T2======end========");
  41. }
  42. });
  43. Thread t3=new Thread(new Runnable() {
  44. @Override
  45. public void run() {
  46. System.out.println("T3======start========");
  47. Object value=TestGuaVA.getCache("key","T3");
  48. System.out.println("T3 value=============="+value);
  49. System.out.println("T3======end========");
  50. }
  51. });
  52. t1.start();
  53. t2.start();
  54. t3.start();
  55. }
  56. }
public class TestConcurrentHashMapCache<K,V> {
	private final ConcurrentHashMap<K, V>  cacheMap=new ConcurrentHashMap<K,V> (); 
	
	public  Object getCache(K keyValue,String ThreadName){
		System.out.println("ThreadName getCache=============="+ThreadName);
		Object value=null;
		//從快取獲取資料
		value=cacheMap.get(keyValue);
		//如果沒有的話,把資料放到快取
		if(value==null){
			return putCache(keyValue,ThreadName);
		}
		return value;
	}
	
	public Object putCache(K keyValue,String ThreadName){
		System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
		//可以根據業務從資料庫獲取等取得資料,這邊就模擬已經獲取資料了
		@SuppressWarnings("unchecked")
		V value=(V) "dataValue";
		//把資料放到快取
		 cacheMap.put(keyValue, value);
		return value;
	}

	
	
	public static void main(String[] args) {
		final TestConcurrentHashMapCache<String,String> TestGuaVA=new TestConcurrentHashMapCache<String,String>();
		
		Thread t1=new Thread(new Runnable() {
			@Override
			public void run() {
				
				System.out.println("T1======start========");
			    Object value=TestGuaVA.getCache("key","T1");
			    System.out.println("T1 value=============="+value);
				System.out.println("T1======end========");
				
			}
		});
		
		Thread t2=new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("T2======start========");
				Object value=TestGuaVA.getCache("key","T2");
			    System.out.println("T2 value=============="+value);
				System.out.println("T2======end========");
				
			}
		});
		
		Thread t3=new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("T3======start========");
				Object value=TestGuaVA.getCache("key","T3");
			    System.out.println("T3 value=============="+value);
				System.out.println("T3======end========");
				
			}
		});
		
		t1.start();
		t2.start();
		t3.start();

	}

}
我們看一下執行結果,如圖所示:

我們實現了本地快取程式碼,我們執行一下結果,發現在多執行緒時,出現了在快取裡沒有快取時,會執行一樣執行多次的業務資料並返回處理的資料,我們分析一下出現這種情況的:

(1)當執行緒T1訪問cacheMap裡面有沒有,這時根據業務到後臺處理業務資料並返回處理資料,並放入快取

(2)當執行緒T2訪問cacheMap裡面同樣也沒有,也把根據業務到後臺處理業務資料並返回處理資料,並放入快取

第二種:

這樣相同的業務並處理兩遍,如果在高併發的情況下相同的業務不止執行兩遍,這樣這樣跟我們當初做快取不相符合,這時我們想到了Java多執行緒時,在執行獲取快取上加上Synchronized,程式碼如下:

[java] view plain copy print?
  1. public class TestConcurrentHashMapCache<K,V> {
  2. private final ConcurrentHashMap<K, V> cacheMap=new ConcurrentHashMap<K,V> ();
  3. public <span style="color:#ff0000;">synchronized </span>Object getCache(K keyValue,String ThreadName){
  4. System.out.println("ThreadName getCache=============="+ThreadName);
  5. Object value=null;
  6. //從快取獲取資料
  7. value=cacheMap.get(keyValue);
  8. //如果沒有的話,把資料放到快取
  9. if(value==null){
  10. return putCache(keyValue,ThreadName);
  11. }
  12. return value;
  13. }
  14. public Object putCache(K keyValue,String ThreadName){
  15. System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
  16. //可以根據業務從資料庫獲取等取得資料,這邊就模擬已經獲取資料了
  17. @SuppressWarnings("unchecked")
  18. V value=(V) "dataValue";
  19. //把資料放到快取
  20. cacheMap.put(keyValue, value);
  21. return value;
  22. }
  23. public static void main(String[] args) {
  24. final TestConcurrentHashMapCache<String,String> TestGuaVA=new TestConcurrentHashMapCache<String,String>();
  25. Thread t1=new Thread(new Runnable() {
  26. @Override
  27. public void run() {
  28. System.out.println("T1======start========");
  29. Object value=TestGuaVA.getCache("key","T1");
  30. System.out.println("T1 value=============="+value);
  31. System.out.println("T1======end========");
  32. }
  33. });
  34. Thread t2=new Thread(new Runnable() {
  35. @Override
  36. public void run() {
  37. System.out.println("T2======start========");
  38. Object value=TestGuaVA.getCache("key","T2");
  39. System.out.println("T2 value=============="+value);
  40. System.out.println("T2======end========");
  41. }
  42. });
  43. Thread t3=new Thread(new Runnable() {
  44. @Override
  45. public void run() {
  46. System.out.println("T3======start========");
  47. Object value=TestGuaVA.getCache("key","T3");
  48. System.out.println("T3 value=============="+value);
  49. System.out.println("T3======end========");
  50. }
  51. });
  52. t1.start();
  53. t2.start();
  54. t3.start();
  55. }
  56. }
public class TestConcurrentHashMapCache<K,V> {
	private final ConcurrentHashMap<K, V>  cacheMap=new ConcurrentHashMap<K,V> (); 
	
	public <span style="color:#ff0000;">synchronized </span>Object getCache(K keyValue,String ThreadName){
		System.out.println("ThreadName getCache=============="+ThreadName);
		Object value=null;
		//從快取獲取資料
		value=cacheMap.get(keyValue);
		//如果沒有的話,把資料放到快取
		if(value==null){
			return putCache(keyValue,ThreadName);
		}
		return value;
	}
	
	public Object putCache(K keyValue,String ThreadName){
		System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
		//可以根據業務從資料庫獲取等取得資料,這邊就模擬已經獲取資料了
		@SuppressWarnings("unchecked")
		V value=(V) "dataValue";
		//把資料放到快取
		 cacheMap.put(keyValue, value);
		return value;
	}

	
	
	public static void main(String[] args) {
		final TestConcurrentHashMapCache<String,String> TestGuaVA=new TestConcurrentHashMapCache<String,String>();
		
		Thread t1=new Thread(new Runnable() {
			@Override
			public void run() {
				
				System.out.println("T1======start========");
			    Object value=TestGuaVA.getCache("key","T1");
			    System.out.println("T1 value=============="+value);
				System.out.println("T1======end========");
				
			}
		});
		
		Thread t2=new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("T2======start========");
				Object value=TestGuaVA.getCache("key","T2");
			    System.out.println("T2 value=============="+value);
				System.out.println("T2======end========");
				
			}
		});
		
		Thread t3=new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("T3======start========");
				Object value=TestGuaVA.getCache("key","T3");
			    System.out.println("T3 value=============="+value);
				System.out.println("T3======end========");
				
			}
		});
		
		t1.start();
		t2.start();
		t3.start();

	}

}
執行結果,如圖所示:

這樣就實現了序列,在高併發行時,就不會出現了第二個訪問相同業務,肯定是從快取獲取,但是加上Synchronized變成序列,這樣在高併發行時效能也下降了。

第三種:

我們為了實現效能和快取的結果,我們採用Future,因為Future在計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態和ConcurrentHashMap.putIfAbsent方法,程式碼如下:

[java] view plain copy print?
  1. public class TestFutureCahe<K,V> {
  2. private final ConcurrentHashMap<K, Future<V>> cacheMap=new ConcurrentHashMap<K, Future<V>> ();
  3. public Object getCache(K keyValue,String ThreadName){
  4. Future<V> value=null;
  5. try{
  6. System.out.println("ThreadName getCache=============="+ThreadName);
  7. //從快取獲取資料
  8. value=cacheMap.get(keyValue);
  9. //如果沒有的話,把資料放到快取
  10. if(value==null){
  11. value= putCache(keyValue,ThreadName);
  12. return value.get();
  13. }
  14. return value.get();
  15. }catch (Exception e) {
  16. }
  17. return null;
  18. }
  19. public Future<V> putCache(K keyValue,final String ThreadName){
  20. // //把資料放到快取
  21. Future<V> value=null;
  22. Callable<V> callable=new Callable<V>() {
  23. @SuppressWarnings("unchecked")
  24. @Override
  25. public V call() throws Exception {
  26. //可以根據業務從資料庫獲取等取得資料,這邊就模擬已經獲取資料了
  27. System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
  28. return (V) "dataValue";
  29. }
  30. };
  31. FutureTask<V> futureTask=new FutureTask<V>(callable);
  32. value=cacheMap.putIfAbsent(keyValue, futureTask);
  33. if(value==null){
  34. value=futureTask;
  35. futureTask.run();
  36. }
  37. return value;
  38. }
  39. public static void main(String[] args) {
  40. final TestFutureCahe<String,String> TestGuaVA=new TestFutureCahe<String,String>();
  41. Thread t1=new Thread(new Runnable() {
  42. @Override
  43. public void run() {
  44. System.out.println("T1======start========");
  45. Object value=TestGuaVA.getCache("key","T1");
  46. System.out.println("T1 value=============="+value);
  47. System.out.println("T1======end========");
  48. }
  49. });
  50. Thread t2=new Thread(new Runnable() {
  51. @Override
  52. public void run() {
  53. System.out.println("T2======start========");
  54. Object value=TestGuaVA.getCache("key","T2");
  55. System.out.println("T2 value=============="+value);
  56. System.out.println("T2======end========");
  57. }
  58. });
  59. Thread t3=new Thread(new Runnable() {
  60. @Override
  61. public void run() {
  62. System.out.println("T3======start========");
  63. Object value=TestGuaVA.getCache("key","T3");
  64. System.out.println("T3 value=============="+value);
  65. System.out.println("T3======end========");
  66. }
  67. });
  68. t1.start();
  69. t2.start();
  70. t3.start();
  71. }
  72. }
public class TestFutureCahe<K,V> {
	private final ConcurrentHashMap<K, Future<V>>  cacheMap=new ConcurrentHashMap<K, Future<V>> (); 
	
	public   Object getCache(K keyValue,String ThreadName){
		Future<V> value=null;
		try{
			System.out.println("ThreadName getCache=============="+ThreadName);
			//從快取獲取資料
			value=cacheMap.get(keyValue);
			//如果沒有的話,把資料放到快取
			if(value==null){
				value= putCache(keyValue,ThreadName);
				return value.get();
			}
			return value.get();
				
		}catch (Exception e) {
		}
		return null;
	}
	
	
	public Future<V> putCache(K keyValue,final String ThreadName){
//		//把資料放到快取
		Future<V> value=null;
		Callable<V> callable=new Callable<V>() {
				@SuppressWarnings("unchecked")
				@Override
				public V call() throws Exception {
					//可以根據業務從資料庫獲取等取得資料,這邊就模擬已經獲取資料了
					System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
					return (V) "dataValue";
				}
			};
			FutureTask<V> futureTask=new FutureTask<V>(callable);
			value=cacheMap.putIfAbsent(keyValue, futureTask);
			if(value==null){
				value=futureTask;
				futureTask.run();
			}
		return value;
	}
	
	

	
	
	public static void main(String[] args) {
		final TestFutureCahe<String,String> TestGuaVA=new TestFutureCahe<String,String>();
		
		Thread t1=new Thread(new Runnable() {
			@Override
			public void run() {
				
				System.out.println("T1======start========");
				Object value=TestGuaVA.getCache("key","T1");
				System.out.println("T1 value=============="+value);
				System.out.println("T1======end========");
				
			}
		});
		
		Thread t2=new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("T2======start========");
				Object value=TestGuaVA.getCache("key","T2");
				System.out.println("T2 value=============="+value);
				System.out.println("T2======end========");
				
			}
		});
		
		Thread t3=new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("T3======start========");
				Object value=TestGuaVA.getCache("key","T3");
				System.out.println("T3 value=============="+value);
				System.out.println("T3======end========");
				
			}
		});
		
		t1.start();
		t2.start();
		t3.start();

	}

}

執行緒T1或者執行緒T2訪問cacheMap,如果都沒有時,這時執行了FutureTask來完成非同步任務,假如執行緒T1執行了FutureTask,並把儲存到ConcurrentHashMap中,通過PutIfAbsent方法,因為putIfAbsent方法如果不存在key對應的值,則將valuekey加入Map,否則返回key對應的舊值。這時執行緒T2進來時可以獲取Future物件,如果沒值沒關係,這時是物件的引用,等FutureTask執行完,在通過get返回。

我們問題解決了高併發訪問快取的問題,可以回收元素這些,都沒有,容易造成記憶體溢位,Google Guava Cache在這些問題方面都做得挺好的,接下來我們介紹一下。

三.Google Guava Cache的介紹和應用



http://www.java2s.com/Code/Jar/g/Downloadguava1401jar.htm 下載對應的jar包

Guava CacheConcurrentMap很相似,Guava Cache能設定回收,能解決在大資料記憶體溢位的問題,原始碼如下:



public class TestGuaVA<K,V> {
private Cache<K, V> cache= CacheBuilder.newBuilder() .maximumSize(2).expireAfterWrite(10, TimeUnit.MINUTES).build();
public Object getCache(K keyValue,final String ThreadName){
Object value=null;
try {
System.out.println("ThreadName getCache=============="+ThreadName);
//從快取獲取資料
value = cache.get(keyValue, new Callable<V>() {
@SuppressWarnings("unchecked")
public V call() {
System.out.println("ThreadName 執行業務資料並返回處理結果的資料(訪問資料庫等)=============="+ThreadName);
return (V) "dataValue";
}
});
} catch (ExecutionException e) {
e.printStackTrace();
}
return value;
}





public static void main(String[] args) {
final TestGuaVA<String,String> TestGuaVA=new TestGuaVA<String,String>();


Thread t1=new Thread(new Runnable() {
@Override
public void run() {

System.out.println("T1======start========");
Object value=TestGuaVA.getCache("key","T1");
System.out.println("T1 value=============="+value);
System.out.println("T1======end========");

}
});

Thread t2=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("T2======start========");
Object value=TestGuaVA.getCache("key","T2");
System.out.println("T2 value=============="+value);
System.out.println("T2======end========");

}
});

Thread t3=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("T3======start========");
Object value=TestGuaVA.getCache("key","T3");
System.out.println("T3 value=============="+value);
System.out.println("T3======end========");

}
});

t1.start();
t2.start();
t3.start();


}


}


說明:

CacheBuilder.newBuilder()後面能帶一些設定回收的方法:

1maximumSize(long):設定容量大小,超過就開始回收。

2expireAfterAccess(long, TimeUnit):在這個時間段內沒有被讀/寫訪問,就會被回收。

3expireAfterWrite(long, TimeUnit):在這個時間段內沒有被寫訪問,就會被回收

(4)removalListener(RemovalListener):監聽事件,在元素被刪除時,進行監聽。

執行結果,如圖所示: