1. 程式人生 > >java同步代碼(synchronized)中使用BlockingQueue

java同步代碼(synchronized)中使用BlockingQueue

reat zed end hello empty pub fas pty stack

說起BlockingQueue,大家最熟悉的就是生產者-消費者模式下的應用。但是如果在調用queue的上層代碼加了同步塊就會導致線程死鎖。

例如:

    static BlockingQueue<String> queue = new LinkedBlockingQueue();

    /**
     * 同步鎖
     */
    static Object lock = new Object();

    static void producer(){
        synchronized (lock){
            queue.put("1");
        }
    }

    
static void cosumer(){ synchronized (lock){ //一旦阻塞,將掛起當前線程,lock鎖永遠等不到釋放,生產者也就無法添加元素,take也就永遠阻塞 String msg = queue.take(); } }

但是同步塊必須使用的情況下,怎樣改進queue的使用呢?見下面示例:

package com.hdwang;

import com.alibaba.fastjson.JSON;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; /** * Created by hdwang on 2018/4/17. */ public class MultiQueueSynTest { static BlockingQueue<Packet> queue1 = new LinkedBlockingQueue(); static BlockingQueue<Packet> queue2 = new LinkedBlockingQueue(); static int seq = 1;
/** * 同步鎖 */ static Object lock = new Object(); static void commit(String msg){ synchronized (lock) { Packet packet = new Packet(); packet.setSeq(seq++); packet.setMsg(msg); try { //queue1.put(packet); //阻塞式添加元素 while(queue1.size()== Integer.MAX_VALUE){ //隊滿,等待 lock.wait(); } queue1.offer(packet); //非阻塞式添加元素即可 System.out.println("commit msg:" + JSON.toJSONString(packet)); lock.notifyAll(); //通知等待線程 } catch (InterruptedException e) { e.printStackTrace(); } } } static void send(){ while(true) { synchronized (lock) { try { //Packet packet = queue1.take(); //阻塞式取元素 //queue2.put(packet); while(queue1.isEmpty()) { //隊空,等待 lock.wait(); //等待,交出鎖 } Packet packet = queue1.poll(); //非阻塞式取元素即可 System.out.println("send msg:" + JSON.toJSONString(packet)); lock.notifyAll(); //通知等待線程 while (queue2.size() == Integer.MAX_VALUE){ //隊滿,等待 lock.wait(); //等待,交出鎖 } queue2.offer(packet); System.out.println("msg->queue2:"+JSON.toJSONString(packet)); lock.notifyAll(); //通知等待線程 } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { //生產者1 new Thread(new Runnable() { @Override public void run() { while(true){ //不斷產生消息 commit("hello1"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); //生產者2 new Thread(new Runnable() { @Override public void run() { while(true){ //不斷產生消息 commit("hello2"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); //消費者 new Thread(new Runnable() { @Override public void run() { send(); } }).start(); } static class Packet{ int seq; String msg; public int getSeq() { return seq; } public void setSeq(int seq) { this.seq = seq; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } } }

運行結果

commit msg:{"msg":"hello1","seq":1}
send msg:{"msg":"hello1","seq":1}
msg->queue2:{"msg":"hello1","seq":1}
commit msg:{"msg":"hello2","seq":2}
send msg:{"msg":"hello2","seq":2}
msg->queue2:{"msg":"hello2","seq":2}
commit msg:{"msg":"hello1","seq":3}
send msg:{"msg":"hello1","seq":3}
msg->queue2:{"msg":"hello1","seq":3}
commit msg:{"msg":"hello2","seq":4}
send msg:{"msg":"hello2","seq":4}
msg->queue2:{"msg":"hello2","seq":4}
commit msg:{"msg":"hello1","seq":5}
send msg:{"msg":"hello1","seq":5}
msg->queue2:{"msg":"hello1","seq":5}
commit msg:{"msg":"hello2","seq":6}
send msg:{"msg":"hello2","seq":6}
msg->queue2:{"msg":"hello2","seq":6}

java同步代碼(synchronized)中使用BlockingQueue