okHttp3 筆記(1)OKhttp3 入口分析
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
ThreadPoolExecutor的引數
-
corePoolSize :0 核心併發數,就是線上程池不飽和時,執行緒池可擁有的執行緒數。如果是0的話,空閒一段時間後所有執行緒將全部被銷燬。
-maximumPoolSize:執行緒池最大執行緒容量。
-keepAliveTIme: 當匯流排程數大於核心執行緒數
corePoolSize
那部分執行緒存活的時間。-BlockingQueue<Runnable>:
這個引數被稱為阻塞佇列(生產者消費者模型)
1.ArrayBlockingQueue
2.LinkedBlockingQueue
上此兩個要注意指定最大容量,如果生產者的效率很高,會把佇列快取佔滿,然而沒有指定最大值會消耗掉記憶體
3.PriorityBlockingQueue
4.DelayQueue
5.SynchronousQueue
它是一個不儲存元素的阻塞佇列。每個插入操作必須等待另一個執行緒的移除操作,同樣移除操作也是如此。因此佇列中沒有儲存一個元素。(多執行緒打印出0101010101)
練習下佇列,看有毛用
public class BlockQueueTest { private static final int QUEUESIZE= 1; private ArrayBlockingQueue<Integer> integers; @Test public void BlockQueueTest(){ integers = new ArrayBlockingQueue<>(QUEUESIZE); Consumer consumer = new Consumer(); Producer producer = new Producer(); consumer.start(); producer.start(); } class Consumer extends Thread{ @Override public void run() { super.run(); while (true) { try { Integer take = integers.take(); System.out.println("消費元素:"+take); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { super.run(); while (true) { try { integers.put(1); System.out.println("生產元素int 1"); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
列印010101 順序一個沒亂
package com.system.bhouse.bhouse.Queue; import org.junit.Test; import java.util.concurrent.SynchronousQueue; /** * Created by wz on 2018-11-11. */ public class SynchronousQueueTest { private static final int QUEUESIZE= 1; private SynchronousQueue<Integer> integers; private volatile boolean isConsumer = false; @Test public void BlockQueueTest(){ integers = new SynchronousQueue<>(); Consumer consumer = new Consumer(); Producer producer = new Producer(); consumer.start(); producer.start(); } class Consumer extends Thread{ @Override public void run() { super.run(); synchronized (SynchronousQueueTest.this) { while (true) { if (isConsumer) { System.out.println("消費元素:" + 1); isConsumer = !isConsumer; SynchronousQueueTest.this.notify(); }else { try { SynchronousQueueTest.this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } class Producer extends Thread{ @Override public void run() { super.run(); synchronized (SynchronousQueueTest.this) { while (true) { if (!isConsumer) { System.out.println("生產元素int 0"); isConsumer = !isConsumer; SynchronousQueueTest.this.notify(); }else { try { SynchronousQueueTest.this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } }
以為SynchronousQueue可以一對一通訊配置,應該是列印01010101的最佳配置,發現你無法控制佇列裡的同步機制。程式碼執行到那佇列操作就停止了。
package com.system.bhouse.bhouse.Queue; import org.junit.Test; import java.util.concurrent.SynchronousQueue; /** * Created by wz on 2018-11-11. */ public class SynchronousQueueTest2 { private SynchronousQueue<Integer> integers; private volatile boolean isConsumer = false; @Test public void BlockQueueTest(){ integers = new SynchronousQueue<>(); Consumer consumer = new Consumer(); Producer producer = new Producer(); consumer.start(); producer.start(); } class Consumer extends Thread{ @Override public void run() { super.run(); while (true) { if (isConsumer) { try { Integer take = integers.take(); System.out.println(take); isConsumer=!isConsumer; } catch (InterruptedException e) { e.printStackTrace(); } } } } } class Producer extends Thread { @Override public void run() { super.run(); while (true) { if (!isConsumer) { try { isConsumer = !isConsumer; integers.put(1); System.out.println(0); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
言歸正傳
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
來測試一下這個執行緒池特點
public class CacheTheadPool { private static ThreadPoolExecutor executorService; @Test public void cacheTheadPool(){ //for (int i=0;i<100;i++) { //newCachedThreadPool().execute(new AsyncCall("thread"+i)); //} newCachedThreadPool().execute(new AsyncCall("thread"+1)); newCachedThreadPool().execute(new AsyncCall("thread"+2)); newCachedThreadPool().execute(new AsyncCall("thread"+3)); System.out.println("先開3個,按書上講會有3個是新建執行緒"); System.out.println("執行緒池核心:"+executorService.getCorePoolSize()); System.out.println("執行緒池數目:"+executorService.getPoolSize()); System.out.println("佇列任務數目:"+executorService.getQueue().size()); //讓上面的用完 //try { //Thread.sleep(500); //} catch (InterruptedException e) { //e.printStackTrace(); //} newCachedThreadPool().execute(new AsyncCall("thread"+4)); newCachedThreadPool().execute(new AsyncCall("thread"+5)); newCachedThreadPool().execute(new AsyncCall("thread"+6)); System.out.println("再開3個,按書上講會有3個是,看看是不是複用"); System.out.println("執行緒池核心:"+executorService.getCorePoolSize()); System.out.println("執行緒池數目:"+executorService.getPoolSize()); System.out.println("佇列任務數目:"+executorService.getQueue().size()); try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } newCachedThreadPool().execute(new AsyncCallSleep("thread"+7)); newCachedThreadPool().execute(new AsyncCallSleep("thread"+8)); newCachedThreadPool().execute(new AsyncCallSleep("thread"+9)); System.out.println("再開3個,按書上講會有3個是,看看是不是新建"); System.out.println("執行緒池核心:"+executorService.getCorePoolSize()); System.out.println("執行緒池數目:"+executorService.getPoolSize()); System.out.println("佇列任務數目:"+executorService.getQueue().size()); } /** * 建立的都是 使用者執行緒優先順序比較高. * @return */ public synchronizedExecutorService newCachedThreadPool(){ if (executorService == null) { executorService = new ThreadPoolExecutor(0, 6, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } final class AsyncCall extends NamedRunnable { private AsyncCall(Object... arg){ super("OkHttp %s",arg); } @Override protected void execute() { String name = Thread.currentThread().getName(); System.out.println("當前處理的執行緒名是:"+name); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } final class AsyncCallSleep extends NamedRunnable { private AsyncCallSleep(Object... arg){ super("OkHttpSleep %s",arg); } @Override protected void execute() { String name = Thread.currentThread().getName(); System.out.println("當前處理的隨眠執行緒名是:"+name); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = String.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); } }