1. 程式人生 > >Java高併發——多執行緒協作,同步控制

Java高併發——多執行緒協作,同步控制

      繼上一篇:Java高併發——多執行緒基礎 中講到,共享資源的合理使用,才能夠使多執行緒程式有條不紊的執行。其中我們通過synchronized來實現臨界區資源的是否可以訪問。而,這篇我們來重點總結synchronized的增強替代版鎖,以及其它JDK併發包提供的一些同步控制的功能。

      好,還是先看下知識的總結思維導圖,然後分開進行總結:

       一,ReentrantLock(重入鎖):1,顧名思義就是像一把鎖,我們可以鎖住,又可以開啟,從而控制資源的同步訪問。而其中重入特性指的是同一個執行緒,可以反覆的進入;2,中斷響應,對於synchronized只有保持等待,和繼續執行兩種情況;而ReentrantLock在等待的過程,我們可以通知其放棄等待(類似生活中約會,你等了一會朋友沒到,但是朋友遇到突發情況不能來了,給你打了電話通知你,你就不等了);3,申請等待時間:就是指定等待時間,在指定時間沒得到,則放棄;4,公平鎖:指定fair為true則進行先到先得,而不是隨機選取。這裡看下ReentrantLock的相關例子:

//1,ReentrantLock例子
public class ReentrantLockTest implements Runnable{
    public static ReentrantLock lock  =new ReentrantLock();

    public static int i =0;

    public void run() {
        for (int j = 0; j < 1000; j++) {
            lock.lock();
            //lock.lock();
            try {
                i++;
            }finally {
                lock.unlock();
                //lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        Thread t1 = new Thread(reentrantLockTest);
        Thread t2 = new Thread(reentrantLockTest);

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}


//2,lock1.lockInterruptibly()中斷後可放棄
public class InterruptLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();

    int lock;

    public InterruptLock(int lock) {
        this.lock = lock;
    }

    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();
                Thread.sleep(500);
                lock2.lockInterruptibly();
            } else {
                lock2.lockInterruptibly();
                Thread.sleep(500);
                lock1.lockInterruptibly();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (lock1.isHeldByCurrentThread()) {
                lock1.unlock();
            }
            if (lock2.isHeldByCurrentThread()) {
                lock2.unlock();
            }
            System.out.println(Thread.currentThread().getName() + "執行緒退出");

        }
    }

    public static void main(String[] args) throws InterruptedException {
        InterruptLock interruptLock1 = new InterruptLock(1);
        InterruptLock interruptLock2 = new InterruptLock(2);
        Thread t1 = new Thread(interruptLock1);
        Thread t2 = new Thread(interruptLock2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
        t2.interrupt();

    }
}



//3,申請等待時間例子:如果直接使用tryLock()如果拿不到則直接返回,不會等待
public class TimeLock implements Runnable {

    public static ReentrantLock lock = new ReentrantLock();

    public void run() {
        try {
            if (lock.tryLock(5, TimeUnit.SECONDS)) {
                System.out.println( Thread.currentThread().getName() + "get lock success");
                Thread.sleep(6000);
            } else {
                System.out.println( Thread.currentThread().getName() + "get lock failed");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        TimeLock timeLock = new TimeLock();
        Thread t1 = new Thread(timeLock);
        Thread t2 = new Thread(timeLock);
        t1.start();
        t2.start();
    }
}


//4,公平鎖,先到先得
public class FairLock implements Runnable {
    public static ReentrantLock fairLock = new ReentrantLock(true);

    public void run() {
        while (true){
            try{
                fairLock.lock();
                System.out.println(Thread.currentThread().getName() + "get lock");
            }finally {
                fairLock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        FairLock fairLock = new FairLock();
        Thread t1 = new Thread(fairLock,"thread1");
        Thread t2 = new Thread(fairLock,"thread2");
        t1.start();
        t2.start();
    }
}

       二,Condition條件:記得上篇部落格中我們wait()、notify()等待和通知,Condition也可以實現類似的功能,配合鎖進行使用。提供的方法await()、awaitUninterruptibly()、awaitNanos(long nanosTimeout)、await(long time,TimeUnit unit)、signal()、signalAll()等,也很容易理解。這裡提一下:生產者消費者模型中,如果庫房慢了,則生產者await;如果庫房沒了,則消費者await;生產者生成一個則signal;消費者消費一個則signal。是不是非常實用。好看個簡單例子:

public class ConditionTest implements Runnable {

    public static ReentrantLock reentrantLock = new ReentrantLock();
    public static Condition condition = reentrantLock.newCondition();

    public void run() {

        try {
            reentrantLock.lock();
            condition.await();
            System.out.println("Thread is going on");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionTest conditionTest = new ConditionTest();
        Thread t1 = new Thread(conditionTest);
        t1.start();
        Thread.sleep(2000);
        reentrantLock.lock();
        condition.signal();
        reentrantLock.unlock();
    }
}

       三,Semaphore訊號量:這個也挺容易理解的。無論是synchronized還是lock都是一次只能一個執行緒獲取資源,而訊號量可以多個同時訪問。其中方法有:acquire()、acquireUninterruptibly()、tryAcquire()、tryAcquire(long timeout,TimeUnit unit)、release()等。看個簡單的例子:

public class SemaphoreTest implements  Runnable {
    final Semaphore semaphore =new Semaphore(5);

    public void run() {
        try {
            semaphore.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "done");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemaphoreTest semaphoreTest = new SemaphoreTest();
        for (int i = 0; i <20 ; i++) {
            executorService.execute(semaphoreTest);
        }
    }
}

       四,ReadWriteLock讀寫鎖:現實業務場景中往往都是讀多寫少,而讀不會帶來資料的不一致性,所以就有了讀寫鎖,讀讀不阻塞、讀寫阻塞、寫寫阻塞,對於讀遠遠大於寫的非常使用。看個簡單例子:

public class ReadWriteLockTest {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = reentrantReadWriteLock.readLock();
    private static Lock writeLock = reentrantReadWriteLock.writeLock();
    private int value;

    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            return value;
        } finally {
            lock.unlock();
        }

    }

    public void handleWrite(Lock lock, int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = index;
        } finally {
            lock.unlock();
        }
    }


    public static void main(String[] args) {
        final ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
        Runnable readRunnale = new Runnable() {
            public void run() {
                try {
                    readWriteLockTest.handleRead(readLock);
                    //readWriteLockTest.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };


        Runnable writeRunnale = new Runnable() {
            public void run() {
                try {
                    readWriteLockTest.handleWrite(readLock, new Random().nextInt());
                    //readWriteLockTest.handleWrite(lock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 18; i++) {
            new Thread(readRunnale).start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread(writeRunnale).start();
        }
    }
}

       五,CountDownLatch倒計時器:也是非常實用的,可以讓主線等待一組子執行緒執行完畢再進行業務的處理。看個簡單例子:

public class CountDownLatchTest implements Runnable {
    static final CountDownLatch countDownLatch = new CountDownLatch(10);
    static final CountDownLatchTest countDownLatchTest = new CountDownLatchTest();

    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10)*100);
            System.out.println("check complete");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i <10 ; i++) {
            executorService.execute(countDownLatchTest);
        }

        countDownLatch.await();
        System.out.println("all complete");
        executorService.shutdown();
    }
}

       六,CyclicBarrier迴圈柵欄:增強版CountDownLatch,但是是可以迴圈使用。這裡看個經典例子:司令下達命令,10個士兵一起去完成一項任務:10個士兵首先集合報道,然後去執行任務,執行完了再彙報司令,司令宣佈完成任務。相當於計數了兩次,迴圈使用了。好看個例子:

public class CyclicBarrierTest {

    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclicBarrier;

        Soldier(CyclicBarrier cyclicBarrier, String soldierName) {
            this.soldier = soldierName;
            this.cyclicBarrier = cyclicBarrier;
        }

        public void run() {

            try {
                //等待所有士兵到齊
                cyclicBarrier.await();
                doWork();
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + "do work");
        }
    }


    public static class BarrierRun implements Runnable {

        boolean flag;
        int n;

        public BarrierRun(boolean flag, int n) {
            this.flag = flag;
            this.n = n;
        }

        public void run() {
            if (flag) {
                System.out.println("soldier" + n + "個,do work");
            } else {
                System.out.println("soldier" + n + "個,集合完畢");
                flag = true;
            }
        }
    }


    public static void main(String[] args) {
        final int n = 10;
        Thread[] allSoldier = new Thread[n];
        boolean flag = false;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(n, new BarrierRun(flag, n));
        //
        System.out.println("集合");
        for (int i = 0; i < n; i++) {
            System.out.println(i + "+soldier come");
            allSoldier[i] = new Thread(new Soldier(cyclicBarrier, "soldier" + n));
            allSoldier[i].start();

        }
    }
}

       七,LockSupport:上篇講了掛起(suspend)和繼續執行(resume),其中都是JDK不建議使用的,也說到它的不好處。而LockSupport的靜態方法park()可以阻塞當前執行緒,unpark()繼續執行,利用的就是訊號量的原理,它為每個執行緒準備了一個許可證,如果許可證可用,則park()立即返回,並消費許可,如果不可用則進行阻塞;而unpark()則使這個許可變為可用。許可為唯一的。

public class LockSupportTest {

    public static Object u = new Object();
    static ChangeObjectThread t1 =new ChangeObjectThread("t1");
    static ChangeObjectThread t2 =new ChangeObjectThread("t2");

    public static class ChangeObjectThread extends Thread{
        public ChangeObjectThread(String name){
            super.setName(name);
        }

        @Override
        public void run() {
            synchronized (u){
                System.out.println("in"+ getName());
                LockSupport.park();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}

       好,通過上邊幾個類的功能都可以很好的滿足多執行緒之間的協作,同步控制資源。只有讓資料不出錯,才能更高的發揮多執行緒的高效價值!繼續……