1. 程式人生 > >並發編程-concurrent指南-阻塞隊列-延遲隊列DelayQueue

並發編程-concurrent指南-阻塞隊列-延遲隊列DelayQueue

resultmap host star 放置 ces version sel println iterator

DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。註意:不能將null元素放置到這種隊列中。

Delayed

一種混合風格的接口,用來標記那些應該在給定延遲時間之後執行的對象。

此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。

下面例子是訂單超時處理的具體代碼:

重點是DelayOrderComponent 和OrderMessage

import com.concurrent.delayqueue.component.DelayOrderComponent;
import com.concurrent.delayqueue.model.OrderInfo; import com.concurrent.delayqueue.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import
java.util.Date; @RestController @RequestMapping("/order") public class OrderController { @Autowired private OrderService orderService; //創建訂單 @RequestMapping("insert") public void insert() { OrderInfo orderInfo = new OrderInfo(); orderInfo.setCreateTime(new Date()); orderInfo.setStatus(
0); orderService.insert(orderInfo); } //取消訂單 @RequestMapping("cancel") public void cancel(Long orderId) { orderService.cancel(orderId); } //支付訂單 @RequestMapping("paysuccess") public void paysuccess(Long orderId) { orderService.paysuccess(orderId); } //查看隊列中剩余處理數 @RequestMapping("queuecount") public int queuecount() { return DelayOrderComponent.getDelayQueueCount(); } }
@Service
public class OrderService {
    @Autowired
    private OrderInfoMapper orderInfoMapper;
    @Autowired
    private DelayOrderComponent delayOrderComponent;

    /**
     * 插入
     * @param orderInfo
     */
    @Transactional
    public void insert(OrderInfo orderInfo){
        orderInfoMapper.insert(orderInfo);
        //加入到延時隊列中,用於超時未支付
        boolean flag = delayOrderComponent.addDelayQueue(new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime()));
        if(!flag){
            throw new RuntimeException();
        }
    }

    /**
     * 取消
     */
    @Transactional
    public void cancel(Long orderId){
        orderInfoMapper.updateByStatus(orderId,0,-1);
        delayOrderComponent.removeDelayQueue(orderId);
    }

    /**
     * 用戶支付成功
     */
    public void paysuccess(Long orderId){
        orderInfoMapper.updateByStatus(orderId,0,1);
        delayOrderComponent.removeDelayQueue(orderId);
    }

}
import com.concurrent.delayqueue.mapper.OrderInfoMapper;
import com.concurrent.delayqueue.message.OrderMessage;
import com.concurrent.delayqueue.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;

/**
 * 處理訂單超時
 */
@Component
@Lazy(false)
public class DelayOrderComponent {
    @Autowired
    private OrderInfoMapper orderInfoMapper;

    private static DelayQueue<OrderMessage> delayQueue = new DelayQueue<OrderMessage>();
    public static int getDelayQueueCount(){
        return delayQueue.size();
    }

    /**
     * 系統啟動時,預先加載的數據@PostConstruct
     */
    @PostConstruct
    public void init(){
        /**初始化時加載數據庫中需處理超時的訂單**/
        System.out.println("獲取數據庫中需要處理的超時的訂單");
        List<OrderInfo> list = orderInfoMapper.selectByStatus(0);
        for(int i=0;i<list.size();i++){
            OrderInfo orderInfo = list.get(i);
            OrderMessage orderMessage = new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime());
            this.addDelayQueue(orderMessage);//加入隊列
        }

        /**
         * 啟動線程,取延時消息
         */
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        OrderMessage orderMessage = delayQueue.take();
                        //處理超時訂單
                        orderInfoMapper.updateByStatus(orderMessage.getOrderId(),0,2);//訂單狀態改成超時訂單
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    /**
     * 加入延時隊列
     * 用戶下單時,調用此方法
     */
    public boolean addDelayQueue(OrderMessage orderMessage){
        return delayQueue.add(orderMessage);
    }

    /**
     * 從延時隊列中刪除
     * 用戶主動取消,或者支付成功後,調用此方法
     */
    public boolean removeDelayQueue(Long orderId){
        for (Iterator<OrderMessage> iterator = delayQueue.iterator(); iterator.hasNext();) {
            OrderMessage queue = iterator.next();
            if(orderId.equals(queue.getOrderId())){
                return delayQueue.remove(queue);
            }
        }
        return false;
    }

}
public class OrderMessage implements Delayed {
    private final static long DELAY = 15*60*1000L;//默認延遲15分鐘

    private Long orderId;//訂單號
    private Long expireTime;//過期時間
    public OrderMessage(Long orderId,Long createTime){
        this.orderId = orderId;
        this.expireTime = createTime + DELAY;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this){
            return 0;
        }
        if(other instanceof OrderMessage){
            OrderMessage otherRequest = (OrderMessage)other;
            long otherStartTime = otherRequest.expireTime;
            return (int)(this.expireTime - otherStartTime);
        }
        return 0;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public Long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(Long expireTime) {
        this.expireTime = expireTime;
    }
}
import java.util.Date;

public class OrderInfo {
    private Long orderId;//訂單狀態
    private Date createTime;//創建時間
    private Integer status;//訂單狀態:0待支付1已支付-1取消2已超時

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }
}
import com.concurrent.delayqueue.model.OrderInfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
public interface OrderInfoMapper {
    int deleteByPrimaryKey(Long orderId);

    int insert(OrderInfo record);

    int insertSelective(OrderInfo record);

    OrderInfo selectByPrimaryKey(Long orderId);

    int updateByPrimaryKeySelective(OrderInfo record);

    int updateByPrimaryKey(OrderInfo record);

    List<OrderInfo> selectByStatus(int status);
    int updateByStatus(@Param("orderId")Long orderId, @Param("oldstatus")Integer oldstatus,@Param("newstatus")Integer newstatus);
}
OrderInfoMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.concurrent.delayqueue.mapper.OrderInfoMapper" >
  <resultMap id="BaseResultMap" type="com.concurrent.delayqueue.model.OrderInfo" >
    <id column="order_id" property="orderId" jdbcType="BIGINT" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="status" property="status" jdbcType="INTEGER" />
  </resultMap>
  <sql id="Base_Column_List" >
    order_id, create_time, status
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" >
    select
    <include refid="Base_Column_List" />
    from t_order
    where order_id = #{orderId,jdbcType=BIGINT}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Long" >
    delete from t_order
    where order_id = #{orderId,jdbcType=BIGINT}
  </delete>
  <insert id="insert" parameterType="com.concurrent.delayqueue.model.OrderInfo"
          useGeneratedKeys="true" keyProperty="orderId">
    insert into t_order (order_id, create_time, status
      )
    values (#{orderId,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{status,jdbcType=INTEGER}
      )
  </insert>
  <insert id="insertSelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    insert into t_order
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="orderId != null" >
        order_id,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="status != null" >
        status,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="orderId != null" >
        #{orderId,jdbcType=BIGINT},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="status != null" >
        #{status,jdbcType=INTEGER},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    update t_order
    <set >
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=INTEGER},
      </if>
    </set>
    where order_id = #{orderId,jdbcType=BIGINT}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    update t_order
    set create_time = #{createTime,jdbcType=TIMESTAMP},
      status = #{status,jdbcType=INTEGER}
    where order_id = #{orderId,jdbcType=BIGINT}
  </update>


  <select id="selectByStatus" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
    select
    <include refid="Base_Column_List" />
    from t_order
    where status = #{status,jdbcType=INTEGER}
  </select>
  <update id="updateByStatus">
    update t_order
    set status = #{newstatus,jdbcType=INTEGER}
    where order_id = #{orderId,jdbcType=BIGINT}
    and status = #{oldstatus,jdbcType=INTEGER}
  </update>
</mapper>

application.properties

spring.datasource.url = jdbc:mysql://localhost:3306/concurrent?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password =  123456

mybatis.mapper-locations=classpath:/mybatis/*Mapper.xml

源碼地址:https://github.com/qjm201000/concurrent_delayqueue.git

數據庫sql文件:到源碼裏面查看readme,按照步驟來就行。

並發編程-concurrent指南-阻塞隊列-延遲隊列DelayQueue