1. 程式人生 > >使用Java佇列來處理日誌資訊(執行緒池的使用)

使用Java佇列來處理日誌資訊(執行緒池的使用)

阿里的規範是使用new ThreadPoolExecutor()來建立執行緒池,二不是使用Excutor的靜態工具類來建立執行緒池,具體可以檢視部落格(兩篇):

 

https://blog.csdn.net/angus_Lucky/article/details/79862491


https://blog.csdn.net/qq_31615049/article/details/80756781

,以及部落格中有詳解介紹

關於執行緒池ThreadPoolExecutor,其總共有四個構造器,其中最完整的一個構造方法為如下:

public ThreadPoolExecutor(int corePoolSize,                             //核心執行緒池數量
                              int maximumPoolSize,                      //最大執行緒池大小
                              long keepAliveTime,                       //執行緒池中超過                                         corePoolSize數目的空閒執行緒最大存活時間;可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間
                              TimeUnit unit,                            //keepAliveTime時間單位
                              BlockingQueue<Runnable> workQueue,        //阻塞任務佇列
                              ThreadFactory threadFactory,              //新建執行緒工廠
                              RejectedExecutionHandler handler) {...}   //當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

關於其中的引數介紹:

一:ThreadPoolExecutor的重要引數
 
corePoolSize:核心執行緒數
核心執行緒會一直存活,及時沒有任務需要執行
當執行緒數小於核心執行緒數時,即使有執行緒空閒,執行緒池也會優先建立新執行緒處理
設定allowCoreThreadTimeout=true(預設false)時,核心執行緒會超時關閉
queueCapacity:任務佇列容量(阻塞佇列)
當核心執行緒數達到最大時,新任務會放在佇列中排隊等待執行
maxPoolSize:最大執行緒數
當執行緒數>=corePoolSize,且任務佇列已滿時。執行緒池會建立新執行緒來處理任務
當執行緒數=maxPoolSize,且任務佇列已滿時,執行緒池會拒絕處理任務而丟擲異常
keepAliveTime:執行緒空閒時間
當執行緒空閒時間達到keepAliveTime時,執行緒會退出,直到執行緒數量=corePoolSize
如果allowCoreThreadTimeout=true,則會直到執行緒數量=0
allowCoreThreadTimeout:允許核心執行緒超時
rejectedExecutionHandler:任務拒絕處理器
兩種情況會拒絕處理任務:
當執行緒數已經達到maxPoolSize,切佇列已滿,會拒絕新任務
當執行緒池被呼叫shutdown()後,會等待執行緒池裡的任務執行完畢,再shutdown。如果在呼叫shutdown()和執行緒池真正shutdown之間提交任務,會拒絕新任務
執行緒池會呼叫rejectedExecutionHandler來處理這個任務。如果沒有設定預設是AbortPolicy,會丟擲異常
ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
AbortPolicy 丟棄任務,拋執行時異常
CallerRunsPolicy 執行任務
DiscardPolicy 忽視,什麼都不會發生
DiscardOldestPolicy 從佇列中踢出最先進入佇列(最後一個執行)的任務
實現RejectedExecutionHandler介面,可自定義處理器
 
二、ThreadPoolExecutor執行順序:
     執行緒池按以下行為執行任務
 

當執行緒數小於核心執行緒數時,建立執行緒。
當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。
當執行緒數大於等於核心執行緒數,且任務佇列已滿
若執行緒數小於最大執行緒數,建立執行緒
若執行緒數等於最大執行緒數,丟擲異常,拒絕任務
 
三、如何設定引數
 
預設值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
如何來設定
需要根據幾個值來決定
tasks :每秒的任務數,假設為500~1000
taskcost:每個任務花費時間,假設為0.1s
responsetime:系統允許容忍的最大響應時間,假設為1s
做幾個計算
corePoolSize = 每秒需要多少個執行緒處理? 
threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 個執行緒。corePoolSize設定應該大於50
根據8020原則,如果80%的每秒任務數小於800,那麼corePoolSize設定為80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 80。意思是佇列裡的執行緒可以等待1s,超過了的需要新開執行緒來執行
切記不能設定為Integer.MAX_VALUE,這樣佇列會很大,執行緒數只會保持在corePoolSize大小,當任務陡增時,不能新開執行緒來執行,響應時間會隨之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務數-佇列容量)/每個執行緒每秒處理能力 = 最大執行緒數
rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
keepAliveTime和allowCoreThreadTimeout採用預設通常能滿足
以上都是理想值,實際情況下要根據機器效能來決定。如果在未達到最大執行緒數的情況機器cpu load已經滿了,則需要通過升級硬體(呵呵)和優化程式碼,降低taskcost來處理。

 

 

 

新建操作日誌的佇列: LogQueue.java

package com.tencent.queuedemo.queue;


import com.tencent.queuedemo.moudel.LogEntity;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 操作日誌的佇列
 */
public class LogQueue {
    //佇列大小
    public static final int QUEUE_MAX_SIZE    = 100;



    /**
     * 訊息入隊
     * @param logEntity
     * @return
     */
    public static void push(LogEntity logEntity) throws Exception {
        //佇列已滿時,會阻塞佇列,直到未滿
         blockingQueue.put(logEntity);
    }
    /**
     * 訊息出隊
     * @return
     */
    public static LogEntity poll() {
        LogEntity result = null;
        try {
            //佇列為空時會阻塞佇列,直到不是空
            result = blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
    /**
     * 獲取佇列大小
     * @return
     */
    public static int size() {
        return blockingQueue.size();
    }

    /**
     *
     設定核心池大小,也就是能允許同時執行的執行緒數,corePoolSize 表示允許執行緒池中允許同時執行的最大執行緒數。
     */
    static int corePoolSize = 100;
    /**
     表示執行緒沒有任務時最多保持多久然後停止。預設情況下,只有執行緒池中執行緒數大於corePoolSize 時,keepAliveTime 才會起作用。
     換句話說,當執行緒池中的執行緒數大於corePoolSize,並且一個執行緒空閒時間達到了keepAliveTime,那麼就是shutdown。
     *
     */
    static long keepActiveTime = 200;
    /**
     * 執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。
     * 值得注意的是,如果使用了無界的任務佇列這個引數就沒用了。
     */
    static int maximumPoolSize = 300;


    static TimeUnit timeUnit = TimeUnit.SECONDS;


    /**建立ThreadPoolExecutor執行緒池物件,並初始化該物件的各種引數
     *
     */

    public static ThreadPoolExecutor executor = null;

    /**
     初始化阻塞佇列
     *
     */
    public static BlockingQueue<LogEntity> blockingQueue = null;
    static{
        /**
         * 這是日誌佇列,用來實際操作的
         */
        blockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
        /**
         *queue:workQueue必須是BlockingQueue阻塞佇列。當執行緒池中的執行緒數超過它的corePoolSize的時候,執行緒會進入阻塞佇列進行阻塞等待。通過workQueue,執行緒池實現了阻塞功能
         */
        /**
         * 這個只是執行緒池的阻塞佇列
         */
        executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepActiveTime,timeUnit,new LinkedBlockingQueue<Runnable>(100));
    }


    /**
     * 初始化執行緒池
     * @return
     */
    /*public static ThreadPoolExecutor createThreadPool(){

        if(executor == null){
        executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepActiveTime,timeUnit,blockingQueue);
        }
        return executor;
    }*/

}

建立controller,就是生產者(生產日誌,每次訪問就記錄一次日誌)

package com.tencent.queuedemo.controller;

import com.tencent.queuedemo.moudel.LogEntity;
import com.tencent.queuedemo.queue.LogQueue;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

import java.util.Date;
import java.util.concurrent.BlockingQueue;

@Controller
public class LogController {
   @GetMapping("/visit/{url}")
    public void visitLog(@PathVariable("url") String url){
       LogEntity logEntity = new LogEntity();
       logEntity.setVisitTime(new Date());
       logEntity.setVisitUrl(url);
       System.out.println("訪問的資訊:"+logEntity.toString());
       //將任務加入到佇列中去
       BlockingQueue<LogEntity> blockingQueue = LogQueue.blockingQueue;
       try {
           blockingQueue.put(logEntity);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }

}

 

日誌的實體類為

package com.tencent.queuedemo.moudel;


import java.util.Date;

/**
 * 日誌實體
 */
public class LogEntity {
    /**
     * 訪問的時間
     */
    private Date visitTime;

    /**
     * 訪問的url
     */
    private String visitUrl;

    public Date getVisitTime() {
        return visitTime;
    }

    public void setVisitTime(Date visitTime) {
        this.visitTime = visitTime;
    }

    public String getVisitUrl() {
        return visitUrl;
    }

    public void setVisitUrl(String visitUrl) {
        this.visitUrl = visitUrl;
    }

    @Override
    public String toString() {
        return "LogEntity{" +
                "visitTime=" + visitTime +
                ", visitUrl='" + visitUrl + '\'' +
                '}';
    }
}

 

建立操作日誌的執行緒,到時候需要將這個執行緒放到執行緒池中去

package com.tencent.queuedemo.thread;

import com.tencent.queuedemo.moudel.LogEntity;
import org.springframework.stereotype.Component;

/**
 * 日誌執行緒類,將日誌儲存到佇列中
 */
@Component
public class LogThread implements Runnable {

   private LogEntity logEntity;

    public LogThread() {
    }

    public  LogThread(LogEntity logEntity) {
        this.logEntity = logEntity;
    }

    @Override
    public void run() {
        try {
            System.out.println("這是在將日誌儲存到佇列中去:"+logEntity.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

建立一個定時器來講佇列中的日誌,儲存到資料庫(也就是消費者,消費日誌)

package com.tencent.queuedemo.timer;

import com.tencent.queuedemo.moudel.LogEntity;
import com.tencent.queuedemo.queue.LogQueue;
import com.tencent.queuedemo.thread.LogThread;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;

@Component
@EnableScheduling
public class LogTimer {
    @Scheduled(cron = "0/59 * * * * ? ")  //每5秒執行一次
    public void saveLog(){

        /**
         * 定點迴圈執行佇列中的任務
         */
        while(true){
        //獲取到阻塞佇列
        //獲取到執行緒池
        ThreadPoolExecutor threadPool = LogQueue.executor;
        LogEntity entity = LogQueue.poll();
        //執行佇列中的任務
        if(null != entity){
        System.out.println("hah,佇列的大小:"+LogQueue.size());
        threadPool.submit(new LogThread(entity));
        }
        }
    }
}