1. 程式人生 > >java 點對點實例

java 點對點實例

star bsp str ext div ase world 發送消息 ext.get

1.創建一個抽象類定義發送消息和接受消息的抽象方法

package cn.base.jms;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:20 
 */  
public abstract class Queuehandler {  
  
    //默認隊列名稱queue  
    private String queue = "queue";  
  
    //連接工廠  
    private Connection connection;  
  
    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;  
  
    
private boolean transacted = false; public Queuehandler() { } /** * 發送消息 * @return */ public abstract Object sendMessage(MessageProducer producer,Session session); /** * 接收消息 * @return */ public abstract Object reciveMessage(Message message);
/** * 執行發送 */ public void doSend() { Session session = null; try { session = connection.createSession(transacted,acknowledgeMode); Destination destination = session.createQueue(queue); MessageProducer producer = session.createProducer(destination); sendMessage(producer,session); }
catch (JMSException e) { e.printStackTrace(); } finally { if(null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 執行發送 */ public void doRecive() { Session session = null; try { session = connection.createSession(transacted,acknowledgeMode); Destination destination = session.createQueue(queue); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { reciveMessage(message); } }); //保持進程啟動狀態 while (true) {} } catch (JMSException e) { e.printStackTrace(); } finally { if(null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } private void init() { } public String getQueue() { return queue; } public void setQueue(String queue) { this.queue = queue; } public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public int getAcknowledgeMode() { return acknowledgeMode; } public void setAcknowledgeMode(int acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; } public boolean isTransacted() { return transacted; } public void setTransacted(boolean transacted) { this.transacted = transacted; } }

2.定義一個發送類集成上面抽象方法

package cn.base.jms;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:50 
 */  
public class OneProducer extends Queuehandler {  
    static ConnectionFactory connectionFactory = null;  
    static Connection connection = null;  
    static {  
        connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public OneProducer() {  
    }  
  
    @Override  
    public Object sendMessage(MessageProducer producer, Session session) {  
        try {  
            for (int i = 0; i <10 ; i++) {  
                Message message = session.createTextMessage("hello,world!" + i);  
                producer.send(message);  
            }  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
        return null;  
    }  
  
    @Override  
    public Object reciveMessage(Message message) {  
        return null;  
    }  
  
    public static void main(String[] args) {  
        OneProducer oneProducer = new OneProducer();  
        oneProducer.setConnection(connection);  
        oneProducer.doSend();  
        try {  
            connection.close();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
}  

3.定義兩個消費者

package cn.base.jms;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:50 
 */  
public class OneCustomer extends Queuehandler {  
  
    static ConnectionFactory connectionFactory = null;  
    static Connection connection = null;  
    static {  
        connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public OneCustomer() {  
    }  
  
    @Override  
    public Object sendMessage(MessageProducer producer, Session session) {  
        return null;  
    }  
  
    @Override  
    public Object reciveMessage(Message message) {  
        TextMessage text = (TextMessage)message;  
        try {  
            Thread.sleep(1000);  
            System.out.println("One接受消息:" + text.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return null;  
    }  
  
    public static void main(String[] args) {  
        OneCustomer customer = new OneCustomer();  
        customer.setConnection(connection);  
        customer.doRecive();  
        try {  
            connection.close();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
}  
package cn.base.jms;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:50 
 */  
public class TwoCustomer extends Queuehandler {  
  
    static ConnectionFactory connectionFactory = null;  
    static Connection connection = null;  
    static {  
        connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public TwoCustomer() {  
    }  
  
    @Override  
    public Object sendMessage(MessageProducer producer, Session session) {  
        return null;  
    }  
  
    @Override  
    public Object reciveMessage(Message message) {  
        TextMessage text = (TextMessage)message;  
        try {  
            Thread.sleep(1000);  
            System.out.println("Two接受消息:" + text.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return null;  
    }  
  
    public static void main(String[] args) {  
        TwoCustomer customer = new TwoCustomer();  
        customer.setConnection(connection);  
        customer.doRecive();  
        try {  
            connection.close();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
}  

先啟動兩個消費者,然後啟動生產者

結果如下:

one:

One接受消息:hello,world!1
One接受消息:hello,world!3
One接受消息:hello,world!5
One接受消息:hello,world!7
One接受消息:hello,world!9

two:

Two接受消息:hello,world!0
Two接受消息:hello,world!2
Two接受消息:hello,world!4
Two接受消息:hello,world!6
Two接受消息:hello,world!8

java 點對點實例