多執行緒那些事(三)
阿新 • • 發佈:2018-12-15
這篇文章主要是用簡單的程式碼展示自己在梳理和複習多執行緒的過程中的一些問題,歡迎大家指正;
- 關鍵字volatile
/** * @program: demo * @description: ${description} * @author: Will Wu * @create: 2018-12-06 20:22 **/ @Slf4j public class Counter { //volatile只能修飾變數,原意是"不穩定的,易揮發的",在java中修飾變數,保證了該變數的立即可見性,一旦一個執行緒修改了該變數的值,其他執行緒能立馬獲取到該變數最新的值 //另外,需要注意,volatile修飾變數,其本質就是告訴jvm,表示該變數的值不穩定,不應該從快取區中獲取該變數的值,應該從主存區中獲取該變數的值 private volatile int count=0; public void inc(){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count++; } @Override public String toString() { return "[count="+count+"]"; } public static void main(String[] args) { Counter counter = new Counter(); for(int i=0;i<1000;i++){ new Thread(new Runnable() { @Override public void run() { counter.inc(); } }).start(); } System.out.println(counter); } }
- 死鎖
/** * @program: 死鎖類 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 10:58 **/ public class DeadLock implements Runnable{ public int flag=1; private static Object o1=new Object(),o2=new Object(); @Override public void run() { System.out.println("flag="+flag); if (flag==1){ synchronized (o1){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o2){ System.out.println("1"); } } } if (flag==0) { synchronized (o2){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o1){ System.out.println("0"); } } } } public static void main(String[] args) { DeadLock lock1 = new DeadLock(); DeadLock lock2 = new DeadLock(); lock1.flag=1; lock2.flag=0; new Thread(lock1).start(); new Thread(lock2).start(); } }
- 如何避免死鎖?
/** * @program: * 避免死鎖方法一:加鎖順序,即執行緒按照一定的順序加鎖 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 11:19 **/ public class ResDeadLockOne { public int flag=1; //static 修飾的變數為成員變數,為類所有,即類的所有例項物件共有 private static Object o1=new Object(),o2=new Object(); public void money(int flag){ this.flag=flag; if (flag==1){ synchronized (o1){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o2){ System.out.println("當前執行緒:"+Thread.currentThread().getName()+",flag="+flag); } } } if (flag==0){ synchronized (o2){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o1){ System.out.println("當前執行緒:"+Thread.currentThread().getName()+",flag="+flag); } } } } public static void main(String[] args) { final ResDeadLockOne td1 = new ResDeadLockOne(); final ResDeadLockOne td2 = new ResDeadLockOne(); td1.flag=1; td2.flag=0; Thread t1= new Thread(new Runnable() { @Override public void run() { td1.flag = 1; td1.money(1); } }); t1.start(); Thread t2= new Thread(new Runnable() { @Override public void run() { try { t1.join(); } catch (InterruptedException e) { e.printStackTrace(); } td2.flag=0; td2.money(0); } }); t2.start(); } }
** * @program: 執行緒死鎖解決方法二: * 加鎖時限(執行緒嘗試獲取鎖的時候加上一定的時限,超過時限則放棄對該鎖的請求,並釋放自己佔有的鎖) * @description: ${description} * @author: Will Wu * @create: 2018-12-09 11:37 **/ @Slf4j public class ResDeadLockTwo { public int flag=1; private static Object o1=new Object(),o2=new Object(); public void money(int flag){ this.flag=flag; if (flag==1){ synchronized (o1){ try { Thread.sleep(1000); synchronized (o2){ System.out.println("當前執行緒:"+Thread.currentThread().getName()+",flag="+flag); } } catch (InterruptedException e) { e.printStackTrace(); } } } if (flag==0){ synchronized (o2){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o1){ System.out.println("當前執行緒:"+Thread.currentThread().getName()+",flag="+flag); } } } } public static void main(String[] args) { ResDeadLockTwo td1 = new ResDeadLockTwo(); ResDeadLockTwo td2 = new ResDeadLockTwo(); final ReentrantLock lock = new ReentrantLock(); td1.flag=1; td2.flag=0; Thread t1 = new Thread(new Runnable() { @Override public void run() { String tname=Thread.currentThread().getName(); td1.flag=1; try { //獲取不到鎖,等待五秒,如果超時,就返回false if(lock.tryLock(5000, TimeUnit.MILLISECONDS)){ System.out.println(tname+"獲得鎖..."); }else { System.out.println(tname+"沒有獲得鎖..."); return; } } catch (InterruptedException e) { e.printStackTrace(); } try { td1.money(1); } catch (Exception e) { e.printStackTrace(); System.out.println(tname+"出錯啦..."); } finally { System.out.println("當前執行緒:"+tname); lock.unlock(); } } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { String tname = Thread.currentThread().getName(); td2.flag = 0; try { //獲取不到鎖,等待五秒,如果超時,就返回false if (lock.tryLock(5000, TimeUnit.MILLISECONDS)) { System.out.println(tname + "獲得鎖..."); } else { System.out.println(tname + "沒有獲得鎖..."); return; } } catch (InterruptedException e) { e.printStackTrace(); } try { td2.money(1); } catch (Exception e) { e.printStackTrace(); System.out.println(tname+"出錯啦..."); } finally { System.out.println("當前執行緒:"+tname); lock.unlock(); } } }); t2.start(); } }
- 多個執行緒同時訪問同一個類中的多個加鎖方法
/** * @program: 多個執行緒不可訪問同一個類中的 2 個加了 Lock 鎖的方法 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 09:53 **/ @Slf4j public class MultiThread { private int count=0; //lock private Lock lock=new ReentrantLock(); private Runnable runnableone=new Runnable() { @Override public void run() { //lock lock.lock(); while (count<100){ try { //是否執行該方法 log.info(Thread.currentThread().getName()+"run 1:"+count++); } catch (Exception e) { e.printStackTrace(); } } lock.unlock(); } }; private Runnable runnabletwo=new Runnable() { @Override public void run() { lock.lock(); while(count<100){ log.info(Thread.currentThread().getName()+"run 2:"+count++); } lock.unlock(); } }; public static void main(String[] args) { MultiThread qq = new MultiThread(); new Thread(qq.runnableone).start(); new Thread(qq.runnabletwo).start(); } }
/** * @program: * 使用 synchronized 時,當我們訪問同一個類物件的時候,是同一把鎖,所以可以訪問 該物件的其他 synchronized 方法 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 10:09 **/ @Slf4j public class MultiThreadO { private int count=0; private Lock lock=new ReentrantLock(); public Runnable runnableone=new Runnable() { @Override public void run() { //設定關鍵字synchronized,以當前類為鎖 synchronized (this){ while (count<100){ log.info(Thread.currentThread().getName()+"run 1:"+count++); } } } }; public Runnable runnabletwo=new Runnable() { @Override public void run() { //設定關鍵字synchronized,以當前類為鎖 synchronized (this){ while (count<100){ log.info(Thread.currentThread().getName()+"run 2:"+count++); } } } }; public static void main(String[] args) { MultiThreadO multiThread = new MultiThreadO(); new Thread(multiThread.runnableone).start(); new Thread(multiThread.runnabletwo).start(); } }
- Callabel介面
/** * @program: demo * @description: ${description} * @author: Will Wu * @create: 2018-12-06 20:00 **/ @AllArgsConstructor @NoArgsConstructor @Slf4j public class MyCallable implements Callable <Object>{ private String taskNum; @Override public Object call() throws Exception { log.info("=======>"+taskNum+" 任務啟動..."); Date date1 = new Date(); Thread.sleep(1000); Date date2= new Date(); long time=date2.getTime()-date1.getTime(); log.info("=======>"+taskNum+" 任務結束..."); return taskNum+"任務返回執行結果,當前任務時間耗時:" + time+"毫秒"; } public static void main(String[] args) throws ExecutionException, InterruptedException { log.info("程式開始執行..."); Date date = new Date(); int size=5; ExecutorService pool = Executors.newFixedThreadPool(size); ArrayList<Future> list = new ArrayList<>(); for(int i=0;i<size;i++){ MyCallable callable = new MyCallable(i + ""); Future<Object> future = pool.submit(callable); log.info(future.get().toString()); list.add(future); } pool.shutdown(); list.stream().collect(Collectors.toList()).forEach(System.out::println); } }
- 多執行緒之間通訊
/** * @program: 多執行緒之間如何實現通訊 * 方法一:共享變數 * 執行緒間通訊可以通過傳送訊號,傳送訊號的一個簡單方式是在共享物件的變數裡面設定訊號值 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 12:04 **/ @Slf4j public class MySignal { //共享的變數 @[email protected] private boolean hasDataToProcess=false; public static void main(String[] args) { //同一個物件 final MySignal mySignal = new MySignal(); Thread t1 = new Thread(() -> mySignal.setHasDataToProcess(true)); t1.start(); Thread t2 = new Thread(() -> { try { t1.join(); } catch (InterruptedException e) { e.printStackTrace(); } mySignal.isHasDataToProcess(); System.out.println("t1改變後的值:"+mySignal.isHasDataToProcess()); }); t2.start(); } }
/** * @program: 多執行緒之間如何實現通訊 * 方法二:wait/notify--等待喚醒機制 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 12:17 **/ @Slf4j public class Resource { private String name; private int count=1; private boolean flag=false; public synchronized void set(String name){ while(flag){ try { //執行緒等待,消費之消費資源 wait(); } catch (InterruptedException e) { e.printStackTrace(); } this.name=name+"---"+count++; System.out.println(Thread.currentThread().getName()+"...生產者..."+this.name); flag=true; //喚醒等待中的消費者 this.notifyAll(); } } public synchronized void out(String name){ while(!flag){ try { //執行緒等待,生產者生產資源 wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"...消費者..."+this.name); flag=false; //喚醒等待中的消費者 this.notifyAll(); } } }
- 如何控制併發執行緒的個數
/** * @program: demo * 如何控制某個方法允許併發訪問執行緒的個數 * @description: ${description} * @author: Will Wu * @create: 2018-12-08 10:06 **/ public class SeamphoreTest { //採用該Semaphore類,如下程式碼去控制test()方法最多五個執行緒併發訪問 static Semaphore semaphore=new Semaphore(5,true); public static void main(String[] args) { for(int i=0;i<100;i++){ new Thread(() -> test()).start(); } } public static void test(){ try { //申請一個請求 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"is coming..."); Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"is going..."); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); } }
- 幾種常見的執行緒池
/** * @program: 執行緒池 * * @description: ${description} * @author: Will Wu * @create: 2018-12-04 21:24 **/ public class ThreadPool { static class MyThread extends Thread{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"正在執行..."); } } public static void main(String[] args) throws ExecutionException, InterruptedException { //固定數量的執行緒池 // ExecutorService pool = Executors.newFixedThreadPool(2); //單任務執行緒池 // ExecutorService pool = Executors.newSingleThreadExecutor(); //可變執行緒池 // ExecutorService pool = Executors.newCachedThreadPool(); //定時執行緒池 // ScheduledExecutorService pool= Executors.newScheduledThreadPool(2); ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); HashSet<Callable<String>> set = new HashSet<>(); set.add(new Callable<String>() { @Override public String call() throws Exception { return "task1"; } }); set.add(new Callable<String>() { @Override public String call() throws Exception { return "task2"; } }); set.add(new Callable<String>() { @Override public String call() throws Exception { return "task3"; } }); // String s = singleThreadExecutor.invokeAny(set); // System.out.println("==========>"+s); List<Future<String>> futures = singleThreadExecutor.invokeAll(set); for (Future<String> future:futures){ System.out.println("==============>"+future.get()); } // Future<Object> objectFuture = pool.submit( // new Callable<Object>() { // @Override // public Object call() throws Exception { // return null; // } // } // ); // Future<?> future = pool.submit(new Runnable() { // @Override // public void run() { // System.out.println("Asynchronous task"); // } // }); // // Object o = future.get(); // System.out.println(o); // pool.execute(new Runnable() { // @Override // public void run() { // // } // }); // MyThread thread1 = new MyThread(); // MyThread thread2 = new MyThread(); // MyThread thread3 = new MyThread(); // MyThread thread4 = new MyThread(); // MyThread thread5 = new MyThread(); // // pool.execute(thread1); // pool.execute(thread2); // pool.execute(thread3); //// pool.execute(thread4); //// pool.execute(thread5); // pool.schedule(thread4,1000, TimeUnit.MILLISECONDS); // pool.schedule(thread5,2000, TimeUnit.MILLISECONDS); // // pool.shutdown(); } }
- 多執行緒的兩個簡單應用例項
/** * @program: * 三個執行緒 a、b、c 併發執行,b,c 需要 a 執行緒的資料怎麼實現 * @description: ${description} * @author: Will Wu * @create: 2018-12-09 09:31 **/ @Slf4j public class ThreadCommunication { private static int num;//資料 //定義一個訊號量,該類內部維持了多個執行緒鎖,可以阻塞多個執行緒,釋放多個執行緒,執行緒的阻塞和釋放是通過Permit概念來實現的, //執行緒通過semaphore.acquire()獲取permit,如果當前semaphore有permit則分配給該執行緒,沒有則阻塞執行緒直到semaphore呼叫 //release()方法釋放Permit,引數,代表permit允許的個數 private static Semaphore semaphore=new Semaphore(0); public static void main(String[] args) { //first thread a to generate num; Thread threadA = new Thread(() -> { try { Thread.sleep(1000); num=1; //初始化引數後是釋放兩個permit semaphore.release(2); } catch (InterruptedException e) { e.printStackTrace(); } }); //then thread b and c to use num Thread threadB = new Thread(() -> { try { //獲取permit,如果沒有可用的,則處於阻塞狀態 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName() + "獲取到num的值為:" + num); }); Thread threadC = new Thread(() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName() + "獲取到num的值為:" + num); }); ; threadA.start(); threadB.start(); threadC.start(); } }
/** * @program: 子執行緒執行執行 10 次後,主執行緒再執行 5 次。這樣交替執行三遍 * @description: ${description} * @author: Will Wu * @create: 2018-12-03 20:20 **/ public class ThreadTest { static class Business{ private boolean subFlag=false; public synchronized void mainMethod(){ while(subFlag){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int i=0;i<5;i++){ System.out.println(Thread.currentThread().getName()+": main thread runing count--"+i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } subFlag=true; notify(); } public synchronized void sumMethod(){ while(!subFlag){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for (int i=0;i<10;i++){ System.out.println(Thread.currentThread().getName()+": sub thread runing count--"+i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } subFlag=false; notify(); } } public static void main(String[] args) { final Business business=new Business(); Thread thread = new Thread(() -> { for (int i = 0; i < 3; i++) { business.sumMethod(); } }); thread.start(); for (int i=0;i<3;i++){ business.mainMethod(); } } }