1. 程式人生 > >8.RabbitMQ 消息傳遞Java對象

8.RabbitMQ 消息傳遞Java對象

per 共享 idt sin 序列 技術分享 默認端口 發送 trac

通過消息服務器傳遞Java對象,Java類必須實現序列化接口,可以把Java對象轉化為字節數組,從消費者或生產者傳遞到另外一個JVM中,一定需要兩個JVM共享這個類,比如是UserInfo類。

1、定義序列化的類UserInfo
技術分享圖片
2、消費者中,實例化UserInfo的對象,並取出它的字節數組 技術分享圖片
技術分享圖片
3、編寫生產者 技術分享圖片
技術分享圖片
代碼: UserInfo.java package com.test.rfc; public class UserInfo implements java.io.Serializable{ private String name = null; public String getName() { return name; } public void setName(String name) { this.name = name; } } Server.java
package com.test.rfc; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Server { public byte[] getUserByte() throws Exception { UserInfo u = new UserInfo(); u.setName("Hello I come from MQ server"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(u); oos.close(); baos.close(); return baos.toByteArray(); } public static void main(String[] argv) { Server s = new Server(); ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.142"); //使用默認端口5672 Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); //序列化對象 final byte[] data = s.getUserByte(); System.out.println(data.length); String queueName = "queue_rpc"; channel.queueDeclare(queueName, false, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("rfc=" + new String(body)); AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", properties.getReplyTo(), replyProps, data); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queueName, false, consumer); } catch (Exception e) { e.printStackTrace(); } } } Client.java
package com.test.rfc; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.rabbitmq.client.*; public class Client { public static void main(String[] argv) { try { //發送消息的隊列,Server在這個隊列上接受消息 String queueName = "queue_rpc"; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.142"); //使用默認端口5672 Connection connection = null; connection = factory.newConnection(); Channel channel = connection.createChannel(); //生成臨時的隊列,Client在這隊列上等待Server返回信息,Server向這個隊列發消息 String replyQueueName = channel.queueDeclare().getQueue(); //生成唯一ID final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId).replyTo(replyQueueName).build(); //客戶端發送RFC請求 channel.basicPublish("", queueName, props, "GetUserInfo".getBytes()); //Server返回消息 final BlockingQueue response = new ArrayBlockingQueue(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getCorrelationId()+",body="+body.length); if (properties.getCorrelationId().equals(corrId)) { response.offer(body); } } }); byte[] b = response.take(); System.out.println(b.length); //反序列化對象 ByteArrayInputStream bais = new ByteArrayInputStream(b); ObjectInputStream oii = new ObjectInputStream(bais); UserInfo u = (UserInfo)oii.readObject(); System.out.println(u.getName()); channel.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } }

8.RabbitMQ 消息傳遞Java對象