1. 程式人生 > >rocketmq簡單消息發送

rocketmq簡單消息發送

() trac dex ktr 發送 done encoding tag odin

有以下3種方式發送RocketMQ消息

  • 可靠同步發送 reliable synchronous
  • 可靠異步發送 reliable asynchronous
  • 單向發送 one-way transmission

可靠同步發送

主要運用在比較重要一點消息傳遞/通知等業務

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new
            DefaultMQProducer("test");
        producer.start();
        
for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }

可靠異步發送

通常用於對發送消息響應時間要求更高/更快的場景

public class AsyncProducer {
    public static
void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 10000000; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //重點在這裏 異步發送回調 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }

單向發送

適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。

只發送消息,不等待服務器響應,只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("Test");
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);

        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

技術分享圖片

rocketmq簡單消息發送