1. 程式人生 > >Spring Boot的訊息事件機制

Spring Boot的訊息事件機制

Spring Boot自帶了訊息機制,可以讓我們在一個地方釋出訊息,多個地方同時接收訊息並處理訊息,當然這是在同一個JVM記憶體下進行的,不同的程序還需要使用MQ來實現。我覺得該訊息模式跟觀察者模式有一定的區別,觀察者模式一般觀察的是一個物件內部屬性發生變化的時候使用。而該訊息機制可以在任意地方使用。

訊息事件本身是一個物件,繼承於ApplicationEvent

@Data
public class DemoEvent extends ApplicationEvent {
    private String type;
    private List<Map> msg
; public DemoEvent(Object source,String type,List<Map> msg) { super(source); this.type = type; this.msg = msg; } }

還需要有一個訊息事件釋出者,將這個訊息事件給釋出出去

@Component
public class DemoPublisher {
    @Autowired
    ApplicationContext applicationContext;

    public void 
publish(DemoEvent demoEvent) { applicationContext.publishEvent(demoEvent); } }

然後就是我們的偵聽者,偵聽者可以有任意個根據業務不同做不同的處理,他的寫法分兩種,一個是實現了ApplicationListener介面,一個是在方法上打上@EventListener標籤

@Component
@Slf4j
public class DemoListener implements ApplicationListener<DemoEvent> {
    @Override
@Async public void onApplicationEvent(DemoEvent demoEvent) { log.info("接收到publisher傳送到訊息,時間" + Time.getTime()); List<Map> msg = demoEvent.getMsg(); String type = demoEvent.getType(); try { Thread.sleep(3000); }catch (Exception e) { e.printStackTrace(); } log.info("型別" + type + ",訊息內容:" + msg); } }
@Component
@Slf4j
public class DemoListener1 {
    @EventListener
    public void onDemoEvent(DemoEvent demoEvent) {
        log.info("listener1通過註解接收到了publisher傳送的訊息,時間" + Time.getTime());
        String type = demoEvent.getType();
        List<Map> msg = demoEvent.getMsg();
        try {
            Thread.sleep(2000);
        }catch (Exception e) {
            e.printStackTrace();
        }
        log.info("listener1:型別" + type + ",訊息內容:" + msg);
    }
}

但是我們需要知道的是,多個訊息監聽是同步執行的,他們會發生阻塞,所以我們需要進行非同步監聽,實現非同步監聽只需要在方法上打上@Async標籤,同時在Springboot主程式中開啟允許非同步

@EnableAsync
@SpringBootApplication
public class LanmdaApplication {

   public static void main(String[] args) {
      SpringApplication.run(LanmdaApplication.class, args);
   }

}

最後寫一個測試的Controller

@Slf4j
@RestController
public class TestController {
    @Autowired
    private DemoPublisher publisher;

    @GetMapping("/test")
    public String testListener() {
        List<Map> list = new ArrayList<>();
        Map<String,String> m1 = new HashMap<>();
        m1.put("1","2");
        Map<String,String> m2 = new HashMap<>();
        m2.put("3","4");
        Map<String,String> m3 = new HashMap<>();
        m3.put("5","6");
        list.add(m1);
        list.add(m2);
        list.add(m3);
        log.info("開始釋出訊息:" + Time.getTime());
        publisher.publish(new DemoEvent(this,"測試訊息",list));
        log.info("訊息釋出結束:" + Time.getTime());
        return "訊息釋出成功";
    }
}

執行後,日誌如下

2019-07-21 10:42:39.686  INFO 1756 --- [nio-8080-exec-1] c.g.lanmda.controller.TestController     : 開始釋出訊息:10:42:39
2019-07-21 10:42:39.687  INFO 1756 --- [nio-8080-exec-1] com.guanjian.lanmda.event.DemoListener1  : listener1通過註解接收到了publisher傳送的訊息,時間10:42:39
2019-07-21 10:42:41.690  INFO 1756 --- [nio-8080-exec-1] com.guanjian.lanmda.event.DemoListener1  : listener1:型別測試訊息,訊息內容:[{1=2}, {3=4}, {5=6}]
2019-07-21 10:42:41.695  INFO 1756 --- [nio-8080-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2019-07-21 10:42:41.697  INFO 1756 --- [nio-8080-exec-1] c.g.lanmda.controller.TestController     : 訊息釋出結束:10:42:41
2019-07-21 10:42:41.697  INFO 1756 --- [cTaskExecutor-1] com.guanjian.lanmda.event.DemoListener   : 接收到publisher傳送到訊息,時間10:42:41
2019-07-21 10:42:44.701  INFO 1756 --- [cTaskExecutor-1] com.guanjian.lanmda.event.DemoListener   : 型別測試訊息,訊息內容:[{1=2}, {3=4},