1. 程式人生 > >實戰Java高併發程式設計(3.1同步控制)

實戰Java高併發程式設計(3.1同步控制)

3.1重入鎖

重入鎖使用java.util.concurrent.locks.ReentrantLock來實現

public class Test implements Runnable {

    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;

    @Override
    public void run() {
        for (int j = 0; j < 1000000; j++) {
                lock.lock();
                try {
                    i++;
                }finally {
                    lock.unlock();
                }
        }
    }

    public static void main(String[] args) throws Exception {
        Test t1 = new Test();
        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t1);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(i);
    }
}

重入鎖與synchronized相比有明顯的操作過程,開發人員必須指定何時加鎖,何時解鎖。同時,重入鎖是可以反覆新增的。一個縣城可以連續兩次獲得同一把鎖。

    lock.lock();
    lock.lock();
    try {
        i++;
        } finally {
            lock.unlock();
            lock.unlock();
        }

3.1.1重入鎖可以被中斷

public class Test implements Runnable {

    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;

    /**
     * 控制加鎖順序,方便構成死鎖
     *
     * @param lock
     */
    public Test(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();  // 以可以響應中斷的方式加鎖
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
                lock2.lockInterruptibly();
            } else {
                lock2.lockInterruptibly();  // 以可以響應中斷的方式加鎖
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
                lock1.lockInterruptibly();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (lock1.isHeldByCurrentThread()) {
                lock1.unlock();
            }
            if (lock2.isHeldByCurrentThread()) {
                lock2.unlock();
            }
            System.err.println(Thread.currentThread().getId() + "退出!");
        }
    }

    public static void main(String[] args) throws Exception {
        Test t1 = new Test(1);
        Test t2 = new Test(2);
        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t2);
        System.out.println("thread1: "+thread1.getId());
        System.out.println("thread2: "+thread2.getId());
        thread1.start();
        thread2.start();
        Thread.sleep(1000);
        thread2.interrupt();//③給t2執行緒狀態標記為中斷
    }
}

t1、t2執行緒開始執行時,會分別持有lock1和lock2而請求lock2和lock1,這樣就發生了死鎖。但是,在③處給t2執行緒狀態標記為中斷後,持有重入鎖lock2的執行緒t2會響應中斷,並不再繼續等待lock1,同時釋放了其原本持有的lock2,這樣t1獲取到了lock2,正常執行完成。t2也會退出,但只是釋放了資源並沒有完成工作。

3.1.2鎖申請等待限時

可以使用 tryLock()或者tryLock(long timeout, TimeUtil unit) 方法進行一次限時的鎖等待。

前者不帶引數,這時執行緒嘗試獲取鎖,如果獲取到鎖則繼續執行,如果鎖被其他執行緒持有,則立即返回 false ,也就是不會使當前執行緒等待,所以不會產生死鎖。
後者帶有引數,表示在指定時長內獲取到鎖則繼續執行,如果等待指定時長後還沒有獲取到鎖則返回false。

public class Test implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();

    @Override
    public void run() {
        try {
            if (lock.tryLock(5, TimeUnit.SECONDS)){
                Thread.sleep(6000);
            }else {
                System.out.println("get lock failed");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            if (lock.isHeldByCurrentThread()){
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Test test = new Test();
        Thread t1 = new Thread(test);
        Thread t2 = new Thread(test);
        t1.start();
        t2.start();
    }
}

tryLock()方法接受的兩個引數,一個表示等待時長,另一個表示計時單位。在本例中,由於佔用鎖的程序持有鎖長達6秒,另一個執行緒無法再5秒的等待時間中得到鎖,會返回false。

3.2公平鎖

在大多數情況下,鎖的申請並不是公平的,先申請的執行緒並不一定在鎖可用時先使用。要實現公平鎖需要系統維護一個有序數列,效能較低,因此,預設情況下,鎖都是非公平的。

public class Test implements Runnable {
    /**
     * 預設為false,表示為非公平鎖
     */
    public static ReentrantLock lock = new ReentrantLock(true);

    @Override
    public void run() {
        while (true){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"獲得鎖");
            }finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        Test test = new Test();
        Thread t1 = new Thread(test);
        Thread t2 = new Thread(test);
        t1.start();
        t2.start();
    }
}

兩個執行緒會交替輸出。如果是非公平鎖:

執行緒1顯示一大堆,然後執行緒2顯示一大堆。一個執行緒會傾向於再次獲取已經持有的鎖。

3.3Condition條件

await()方法會使當前執行緒等待,同時釋放當前鎖,當其他執行緒使用signal()或者signalAll()方法時,執行緒會重新獲得鎖並執行。

public class Test implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            condition.await();
            System.out.println("Thread is going on");
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) throws InterruptedException{
        Test test = new Test();
        Thread t1 = new Thread(test);
        t1.start();
        Thread.sleep(2000);
        //通知執行緒繼續執行
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

3.4訊號量(Semaphore)

訊號量可以指導多個執行緒同時訪問某一資源。

acquire()方法嘗試獲取一個准入許可,若無法獲得,則執行緒等待。

tryAcquire()方法嘗試獲取一個許可,成功返回true,失敗返回false,不會等待。

release()用於執行緒訪問完資源後,釋放許可。

public class Test implements Runnable {
    final Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId()+" done");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException{
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final Test test = new Test();
        for (int i = 0;i<20;i++){
            executorService.submit(test);
        }
    }
}

5個執行緒為一組同時訪問資源,但執行緒的順序隨機。

3.5ReadWriteLock讀寫鎖

讀寫鎖允許多個執行緒同時讀,但寫寫操作和讀寫操作間依然需要相互等待及持有鎖。

public class Test {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

//    模擬讀操作
    public Object handleRead(Lock lock)throws InterruptedException{
        try {
            lock.lock();
//            讀操作耗時越多,讀寫鎖的優勢越明顯
            Thread.sleep(1000);
            return value;
        } finally {
            lock.unlock();
        }
    }

//    模擬寫操作
    public void handleWrite(Lock lock,int index) throws InterruptedException{
        try {
            lock.lock();
            Thread.sleep(1000);
            value = index;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        final Test test = new Test();
        Runnable readRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    test.handleRead(readLock);
//                    test.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable writeRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    test.handleWrite(writeLock,new Random().nextInt());
//                    test.handleWrite(lock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0;i<18;i++){
            new Thread(readRunnable).start();
        }
        for (int i = 18;i<20;i++){
            new Thread(writeRunnable).start();
        }
    }
}

讀執行緒和寫執行緒可以並行,寫會阻塞讀,所以這段程式碼執行2秒多就結束,如果不用讀寫鎖,則需要20多秒。

3.6倒計時器:CountDownLatch

例如火箭發射倒計時,完成10項檢查才能發射火箭。

public class Test implements Runnable {
//    標明要完成10個任務
    static final CountDownLatch end = new CountDownLatch(10);
    static final Test test = new Test();

    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("check template");
            end.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException{
//        建立10個任務
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            service.submit(test);
        }
//        等待檢查,要求主執行緒完成所有任務後才能執行
        end.await();
        System.out.println("Fire!");
        service.shutdown();
    }
}

3.7迴圈柵欄:CyclicBarrier

假設司令下達命令,要求10個士兵一起去完成一項任務。士兵需要先集合,再去完成任務。

public class Test {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;

        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }

        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + " 任務完成");
        }

        @Override
        public void run() {
            try {
//                確定是否都集合完畢
                cyclic.await();
                doWork();
//                確定是否都完成工作
                cyclic.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static class BarrierRun implements Runnable {
        boolean flag;
        int N;

        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }

        @Override
        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "個,任務完成]");
            } else {
                System.out.println("司令:[士兵" + N + "個,集合完畢]");
                flag = true;
            }
        }
    }

    public static void main(String[] args) {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        //迴圈呼叫這個計時器
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        System.out.println("集合隊伍!");
        for (int i = 0; i < N; i++) {
            System.out.println("士兵" + i + "報道");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
            allSoldier[i].start();
        }
    }
}

3.8執行緒阻塞工具類:LockSupport

LockSupport可以線上程中的任何位置讓執行緒阻塞。

public class Test {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");

    public static class ChangeObjectThread extends Thread{
        public ChangeObjectThread(String name) {
            super.setName(name);
        }

        @Override
        public void run(){
            synchronized (u){
                System.out.println("in "+getName());
                LockSupport.park();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException{
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}