Java多執行緒:記憶體可見性
Java中對於volatile變數,通俗點說可以把它看做多執行緒之間分享的共享記憶體,可見性是立即的。
實際上它分成了兩部分,volatile write和volatile read。由於Unsafe提供了getXXXVolatile和putXXXVolatile介面。所以這樣一來Java中對於能夠共享的變數,至少有四種訪問方式:
普通寫、普通讀、putXXXVolatile、getXXXVolatile。
另一方面,像是陣列元素Object[] objs,我們僅能將objs宣告為volatile,而這樣的話對於其中的元素 objs[0]、objs[1]是完全沒有效用的,也就是說,兩種宣告方式:Object[] objs和volatile Object[] objs,對於其中的元素是一樣的。此種情況下只能使用Unsafe提供的介面來保證記憶體可見性。
所以此文來探索下類似於volatile寫 + 普通讀, 普通寫 + volatile讀, 這樣的情況下是不是真的無法保證可見性。
volatile寫 + 普通讀
首先來看一個簡單粗暴的例子我們往一個初始化為空的長度30000的Object[]中寫入資料。另一個執行緒早在寫入資料開始前就從下標0嘗試讀取資料,假如讀到的==null則進入while的迴圈。除非讀到了!=null,則列印資料:
在我的電腦上執行它的輸出僅有為:package com.psly.locksupprot; import com.psly.testatomic.UtilUnsafe; import sun.misc.Unsafe; public class TestVolatileSemantics2 { private static final Unsafe _unsafe = UtilUnsafe.getUnsafe(); private static final int _Obase = _unsafe.arrayBaseOffset(Object[].class); private static final int _Oscale = _unsafe.arrayIndexScale(Object[].class); private final static int N = 30000; private final static Object[] B = new Object[N+1]; private static class Node { public Node(int value){ this.value = value; } private int value; } public static void main(String[] args) throws InterruptedException { Thread writer = new Thread(new Runnable(){ @Override public void run() { for(int i = 1; i <= N; ++i){ _unsafe.putObjectVolatile(B, _Obase + i * _Oscale, new Node(1)); } System.out.println("Done"); } }); Thread reader = new Thread(new Runnable(){ public void run(){ for(int i = 1; i <= N; ++i){ while(B[i] == null){} System.out.println(((Node)B[i]).value + " " + i + " first reader"); } } } ); reader.start(); Thread.sleep(1000); writer.start(); } }
Done
並且始終佔據電腦的cpu資源。
儘管我們稍微修改下程式碼,修改讀執行緒的方式,採取遍歷整個資料組,如果不為null則輸出。那麼它看似是可以讀到的:
輸出為:package com.psly.locksupprot; import com.psly.testatomic.UtilUnsafe; import sun.misc.Unsafe; public class TestVolatileSemantics2 { private static final Unsafe _unsafe = UtilUnsafe.getUnsafe(); private static final int _Obase = _unsafe.arrayBaseOffset(Object[].class); private static final int _Oscale = _unsafe.arrayIndexScale(Object[].class); private final static int N = 30000; private final static Object[] B = new Object[N+1]; private static class Node { public Node(int value){ this.value = value; } private int value; } public static void main(String[] args) throws InterruptedException { Thread writer = new Thread(new Runnable(){ @Override public void run() { for(int i = 1; i <= N; ++i){ _unsafe.putObjectVolatile(B, _Obase + i * _Oscale, new Node(1)); } System.out.println("Done"); } }); Thread reader = new Thread(new Runnable(){ public void run(){ for(;;){ for(int i = 1; i <= N; ++i){ if(B[i] != null){ System.out.println(((Node)B[i]).value + " " + i + " first reader"); } } } } } ); reader.start(); Thread.sleep(1000); writer.start(); } }
1 23132 first reader
1 23133 first reader
1 23134 first reader
1 23135 first reader
1 23136 first reader
1 23137 first reader
1 23138 first reader
1 23139 first reader
1 23140 first reader
1 23141 first reader
1 23142 first reader
1 23143 first reader
1 23144 first reader
1 23145 first reader
1 23146 first reader
1 23147 first reader
1 23148 first reader
1 23149 first reader
1 23150 first reader
1 23151 first reader
1 23152 first reader
1 23153 first reader
1 23154 first reader
1 23155 first reader
1 23156 first reader
1 23157 first reader
1 23158 first reader
1 23159 first reader
1 23160 first reader
1 23161 first reader
1 23162 first reader
但是假如要證明一個規則成立,則必須確保所有符合假設的情況下都成立。而證明一件事情不成立,只需要舉一個例子。
所以根據之前的例子,volatile寫(compareAndSwapXXX也是一樣的) + 普通讀,無法保證後者取到更新後的資料。
(更正之前的說法,事實上是,這裡的B對應的讀取,因為編譯器的優化導致B[i]沒有讀到更新後的值。)
(事實上,只要volatile寫之後,無論怎麼讀都可以讀到更新後的值,只要編譯器不參與優化)(我的推測)
所以對於最上面的那個例子,我們採用在B前面新增volatile或者讀取使用volatile讀就可以解決了,程式碼如下:
package com.psly;
import sun.misc.Unsafe;
public class TestVolatileSemantics2 {
private static final Unsafe _unsafe = UtilUnsafe.getUnsafe();
private static final int _Obase = _unsafe.arrayBaseOffset(Object[].class);
private static final int _Oscale = _unsafe.arrayIndexScale(Object[].class);
private final static int N = 30000;
private volatile static Object[] B = new Object[N+1];
private static class Node {
public Node(int value){
this.value = value;
}
private int value;
}
public static void main(String[] args) throws InterruptedException {
Thread writer = new Thread(new Runnable(){
@Override
public void run() {
for(int i = 1; i <= N; ++i){
_unsafe.putObjectVolatile(B, _Obase + i * _Oscale, new Node(1));
}
System.out.println("Done");
}
});
Thread reader = new Thread(new Runnable(){
public void run(){
for(int i = 1; i <= N; ++i){
while(B[i] == null){}
System.out.println(((Node)B[i]).value + " " + i + " first reader"); }
}
}
);
reader.start();
Thread.sleep(1000);
writer.start();
}
}
package com.psly;
import sun.misc.Unsafe;
public class TestVolatileSemantics2 {
private static final Unsafe _unsafe = UtilUnsafe.getUnsafe();
private static final int _Obase = _unsafe.arrayBaseOffset(Object[].class);
private static final int _Oscale = _unsafe.arrayIndexScale(Object[].class);
private final static int N = 30000;
private final static Object[] B = new Object[N+1];
private static class Node {
public Node(int value){
this.value = value;
}
private int value;
}
public static void main(String[] args) throws InterruptedException {
Thread writer = new Thread(new Runnable(){
@Override
public void run() {
for(int i = 1; i <= N; ++i){
_unsafe.putObjectVolatile(B, _Obase + i * _Oscale, new Node(1));
}
System.out.println("Done");
}
});
Thread reader = new Thread(new Runnable(){
public void run(){
for(int i = 1; i <= N; ++i){
while(_unsafe.getObjectVolatile(B, _Obase + i * _Oscale) == null){}
System.out.println(((Node)B[i]).value + " " + i + " first reader"); }
}
}
);
reader.start();
Thread.sleep(1000);
writer.start();
}
}
普通寫 + volatile讀
我們也跟前面一樣舉一個反例。
但是在我構造的例子中,普通寫 + volatile讀都看似讀到了更新後的資料。但我們依然所以無法判斷究竟是否及時讀取到。
其中的Li4,critical section之前的c[i] :=false是寫入操作,c[j]為讀取操作,critical section之後的c[i]也是寫入操作。
好,我們用java實現這個演算法,第一個c[i]採用普通寫入,c[j]採用getIntVolatile讀入,後一個c[i]採用putIntVolatile寫入。
假設普通寫能夠被後面的volatile read讀取。那麼這裡一定能夠保證任意時刻只有一個執行緒處於critical section(根據演算法保證,可參考併發控制)。
我們在臨界區對變數+1,假如最後的值不符合預期,那麼就說明臨界區同時進入了不止一個執行緒,從而說明假設錯誤。
我們給出的程式碼如下:
package com.psly.testatomic;
import sun.misc.Unsafe;
public class TestVolatileDijkstraMethodWithNoBlock {
//用於記憶體保證:putXXVolatile/getXXVolatile
private static final Unsafe _unsafe = UtilUnsafe.getUnsafe();
private static final int _Obase = _unsafe.arrayBaseOffset(int[].class);
private static final int _Oscale = _unsafe.arrayIndexScale(int[].class);
//N:執行緒數,TIMES每個執行緒需要進入臨界區的次數。
private final static int N = 3;
private final static int TIMES = 1000000;
private final static int[] B = new int[N+1];
private final static int[] C = new int[N+1];
//每個執行緒進入臨界區++count,最終count == N * TIMES
private volatile static long count;
//k與上面的count欄位類似
private static int k = 1;
private final static Object kObj;
private final static long kOffset;
static{
for(int i = 1; i <= N; ++i){
B[i] = 1;
C[i] = 1;
}
try {
kObj = _unsafe.staticFieldBase(TestVolatileDijkstraMethodWithNoBlock.class.getDeclaredField("k"));
} catch (Exception e) {
// TODO Auto-generated catch block
throw new Error(e);//e.printStackTrace();
}
try {
kOffset = _unsafe.staticFieldOffset(TestVolatileDijkstraMethodWithNoBlock.class.getDeclaredField("k"));
} catch (Exception e) {
// TODO Auto-generated catch block
throw new Error(e);//e.printStackTrace();
}
}
final static void dijkstrasConcurMethod(int pM){
int times = TIMES;
int i = pM;
L0: for(;;){
B[i] = 0;
L1: for(;;){
if( k != i ) {
//C[i] = 1;
if(B[_unsafe.getIntVolatile(kObj, kOffset)] == 1)
_unsafe.putIntVolatile(kObj, kOffset, i);//k = i;//k = i;
continue L1;
} else{
C[i] = 0;
for(int j = 1; j <= N; ++j )
if(j != i && _unsafe.getIntVolatile(C, _Obase + j * _Oscale) == 0){
//將C[i]的值更新回去,寫這裡效率更高
_unsafe.putIntVolatile(C, _Obase + i * _Oscale, 1);
continue L1;
}
}
break L1;
}
//臨界區開始
++count;
//臨界區結束
_unsafe.putIntVolatile(C, _Obase + i * _Oscale, 1);
B[i]=1;
if( --times != 0){
continue L0; //goto L0;
}
return;
}
}
public static void main(String[] args) throws InterruptedException
{
//開始時間
long start = System.currentTimeMillis();
//列印累加器初始值
System.out.println( count + " initial\n");
Thread handle[] = new Thread[N+1];
//建立執行緒
for (int i = 1; i <= N; ++i){
int j = i;
handle[i] = new Thread(new Runnable(){
@Override
public void run(){
dijkstrasConcurMethod(j);
}
});
}
//執行緒開始執行
for (int i = 1; i <= N; ++i)
handle[i].start();
//主執行緒等待子執行緒結束
for (int i = 1; i <= N; ++i)
handle[i].join();
//列印累加值,== N * TIMES
System.out.println(count);
//列印程式執行時間
System.out.println((System.currentTimeMillis() - start) / 1000.0 + " milliseconds");
}
}
3個執行緒,每個100000次,最後總共應該是3000000(count)。執行結果是:
0 initial
2999599
0.178 milliseconds
這說明,普通寫 + volatile讀也是無法保證可見性的。
這裡改為volatile寫 + volatile讀,
_unsafe.putIntVolatile(C, _Obase + i * _Oscale, 0);//C[i] = 0;
就能夠得到正確結果:
0 initial
300000
0.023 milliseconds
所以我們的結論是,volatile寫 + 普通讀 和 普通寫 + volatile讀都無法保證可見性,請大家在需要及時看見共享記憶體更新的場景中統一採用volatile寫 + volatile讀。
以上未給出的UtilUnsafe如下:
package com.psly;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
public class UtilUnsafe {
private UtilUnsafe() { } // dummy private constructor
/** Fetch the Unsafe. Use With Caution. */
public static Unsafe getUnsafe() {
// Not on bootclasspath
if( UtilUnsafe.class.getClassLoader() == null )
return Unsafe.getUnsafe();
try {
final Field fld = Unsafe.class.getDeclaredField("theUnsafe");
fld.setAccessible(true);
return (Unsafe) fld.get(UtilUnsafe.class);
} catch (Exception e) {
throw new RuntimeException("Could not obtain access to sun.misc.Unsafe", e);
}
}
}