1. 程式人生 > >springCloud學習5(Spring-Cloud-Stream事件驅動)

springCloud學習5(Spring-Cloud-Stream事件驅動)

springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33

程式碼見文章結尾

  想想平常生活中做飯的場景,在用電飯鍋做飯的同時,我們可以洗菜、切菜,等待電飯鍋發出飯做好的提示我們回去拔下電飯鍋電源(或者什麼也不知讓它處於保溫狀態),反正這個時候我們知道飯做好了,接下來可以炒菜了。從這裡可以看出我們在日常生活中與世界的互動並不是同步的、線性的,不是簡單的請求--響應模型。它是事件驅動的,我們不斷的傳送訊息、接受訊息、處理訊息。

  同樣在軟體世界中也不全是請求--響應模型,也會需要進行非同步的訊息通訊。使用訊息實現事件通訊的概念被稱為訊息驅動架構(Event Driven Architecture,EDA),也被稱為訊息驅動架構(Message Driven Architecture,MDA)。使用這類架構可以構建高度解耦的系統,該系統能夠對變化做出響應,且不需要與特定的庫或者服務緊密耦合。

  在 Spring Cloud 專案中可以使用Spirng Cloud Stream輕而易舉的構建基於訊息傳遞的解決方案。

為什麼使用訊息傳遞

  要解答這個問題,讓我們從一個例子開始,之前一直使用的兩個服務:許可證服務和組織服務。每次對許可證服務進行請求,許可證服務都要通過 http 請求到組織服務上查詢組織資訊。顯而易見這次額外的 http 請求會花費較長的時間。如果能夠將快取組織資料的讀操作,將會大幅提高許可證服務的響應時間。但是快取資料有如下 2 個要求:

  • 快取的資料需要在許可證服務的所有例項之間儲存一致——這意味著不能將資料快取到服務例項的記憶體中。
  • 在更新或者刪除一個組織資料時,許可證服務快取的資料需要失效——避免讀取到過期資料,需要儘早讓過時資料失效並刪除。

  要實現上面的要求,現在有兩種辦法。

  1. 使用同步請求--響應模型來實現。組織服務在組織資料變化時呼叫許可證服務的介面通知組織服務已經變化,或者直接操作許可證服務的快取。

  2. 使用事件驅動。組織服務發出一個非同步訊息。許可證服務收到該訊息後清除對應的快取。

同步請求-響應方式

  許可證服務在 redis 中快取從組織服務中查詢到的服務資訊,當組織資料更新時,組織服務同步 http 請求通知許可證服務資料過期。這種方式有以下幾個問題:

  • 組織服務和許可證服務緊密耦合
  • 這種方式不夠靈活,如果要為組織服務新增新的消費者,必須修改組織服務程式碼,以讓其通知新的服務資料變動。

使用訊息傳遞方式

  同樣的許可證服務在 redis 中快取從組織服務中查詢到的服務資訊,當組織資料更新時,組織服務將更新資訊寫入到佇列中。許可證服務監聽訊息佇列。使用訊息傳遞有一下 4 個好處:

  • 鬆耦合性:將服務間的依賴,變成了服務對佇列的依賴,依賴關係變弱了。
  • 耐久性:即使服務消費者已經關閉了,也可以繼續往裡傳送訊息,等消費者開啟後處理
  • 可伸縮性: 訊息傳送者不用等待訊息消費者的響應,它們可以繼續做各自的工作
  • 靈活性:訊息傳送者不用知道誰會消費這個訊息,因此在有新的訊息消費者時無需修改訊息傳送程式碼

spring cloud 中使用訊息傳遞

  spring cloud 專案中可以通過 spring cloud stream 框架來輕鬆整合訊息傳遞。該框架最大的特點是抽象了訊息傳遞平臺的細節,因此可以在支援的訊息佇列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。

spring cloud stream 架構

  spring cloud stream 中有 4 個元件涉及到訊息釋出和訊息消費,分別為:

  1. 發射器

      當一個服務準備傳送訊息時,它將使用發射器釋出訊息。發射器是一個 Spring 註解介面,它接收一個普通 Java 物件,表示要釋出的訊息。發射器接收訊息,然後序列化(預設序列化為 JSON)後釋出到通道中。

  2. 通道

      通道是對佇列的一個抽象。通道名稱是與目標佇列名稱相關聯的。但是佇列名稱並不會直接公開在程式碼中,程式碼永遠只會使用通道名。

  3. 繫結器

      繫結器是 spring cloud stream 框架的一部分,它是與特定訊息平臺對話的 Spring 程式碼。通過繫結器,使得開發人員不必依賴於特定平臺的庫和 API 來發布和消費訊息。

  4. 接收器

      服務通過接收器來從佇列中接收訊息,並將訊息反序列化。

處理邏輯如下:

實戰

  繼續使用之前的專案,在許可證服務中快取組織資料到 redis 中。

建立 redis 服務

  為方便起見,使用 docker 建立 redis,建立指令碼如下:

docker run -itd --name redis --net host redis:

建立 kafka 服務

在組織服務中編寫訊息生產者

  首先在 organization 服務中引入 spring cloud stream 和 kafka 的依賴。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

  然後在 events 類中編寫SimpleSouce類,用於組織資料修改,產生一條訊息到佇列中。程式碼如下:

@EnableBinding(Source.class)
public class SimpleSource {
    private Logger logger = LoggerFactory.getLogger(SimpleSource.class);

    private Source source;

    @Autowired
    public SimpleSource(Source source) {
        this.source = source;
    }

    public void publishOrChange(String action, String orgId) {
        logger.info("在請求:{}中,傳送kafka訊息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
        OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

這裡使用的是預設通道,Source 類定義的 output 通道發訊息。後面通過 Sink 定義的 input 通道收訊息。

  然後在OrganizationController類中定義一個 delete 方法,並注入 SimpleSouce 類,程式碼如下:

@Autowired
private SimpleSource simpleSource;

@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
    logger.debug("刪除了組織:{}", id);
    simpleSource.publishOrChange("delete", id);
}

  最後在配置檔案中加入訊息佇列的配置:

# 省略了其他配置
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orgChangeTopic
          content-type: application/json
      kafka:
        binder:
          # 替換為部署kafka的ip和埠
          zk-nodes: 192.168.226.5:2181
          brokers: 192.168.226.5:9092

  現在我們可以測試下訪問localhost:5555/apis/org/organization/12,可以看到控制檯列印訊息生成的日誌。

在許可證服務中編寫訊息消費者

  整合 redis 的方法,參看。這裡不作說明。

  首先引入依賴,依賴項同上面組織服務。

  然後在 event 包下建立OrgChange的類,程式碼如下:

@EnableBinding(Sink.class) //使用Sink介面中定義的通道來監聽傳入訊息
public class OrgChange {

    private Logger logger = LoggerFactory.getLogger(OrgChange.class);

    @StreamListener(Sink.INPUT)
    public void loggerSink(OrganizationChange change){
        logger.info("收到一個訊息,組織id為:{},關聯id為:{}",change.getOrgId(),change.getId());
        //刪除失效快取
        RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
    }
}

//下面兩個都在util包下
//RedisKeyUtils.java程式碼如下
public class RedisKeyUtils {

    private static final String  ORG_CACHE_PREFIX = "orgCache_";

    public static String getOrgCacheKey(String orgId){
        return ORG_CACHE_PREFIX+orgId;
    }
}

//RedisUtils.java程式碼如下
@Component
@SuppressWarnings("all")
public class RedisUtils {

    public static RedisTemplate redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtils.redisTemplate = redisTemplate;
    }

    public static boolean setObj(String key,Object value){
        return setObj(key,value,0);
    }

    /**
     * Description:
     *
     * @author fanxb
     * @date 2019/2/21 15:21
     * @param key 鍵
     * @param value 值
     * @param time 過期時間,單位ms
     * @return boolean 是否成功
     */
    public static boolean setObj(String key,Object value,long time){
        try{
            if(time<=0){
                redisTemplate.opsForValue().set(key,value);
            }else{
                redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    public static Object get(String key){
        if(key==null){
            return null;
        }
        try{
            Object obj = redisTemplate.opsForValue().get(key);
            return obj;
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
    }

    public static void del(String... key){
        if(key!=null && key.length>0){
            redisTemplate.delete(CollectionUtils.arrayToList(key));
        }
    }
}

  上面用到的是 Sink.INPUT 通道,這個和之前的 Source.OUTPUT 通道剛好一隊,一個負責收,一個負責發。

  然後修改OrganizationByRibbonService.java檔案中的getOrganizationWithRibbon方法:

    public Organization getOrganizationWithRibbon(String id) {
        String key = RedisKeyUtils.getOrgCacheKey(id);
        //先從redis快取取資料
        Object res = RedisUtils.get(key);
        if (res == null) {
            logger.info("當前資料無快取:{}", id);
            try{

            ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
                    HttpMethod.GET, null, Organization.class, id);
            res = responseEntity.getBody();
            RedisUtils.setObj(key, res);
            }catch (Exception e){
                e.printStackTrace();
            }
        } else {
            logger.info("當前資料為快取資料:{}", id);
        }
        return (Organization) res;
    }

  最後修改配置檔案,為 input 通道指定 topic,配置如下:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: orgChangeTopic
          content-type: application/json
          # 定義將要消費訊息的消費者組的名稱
          # 可能多個服務監聽同一個訊息佇列。如果定義了消費者組,那麼同組中只要有一個消費了訊息,剩餘的不會再次消費該訊息,保證只有訊息的
          # 一個副本會被該組的某個例項所消費
          group: licensingGroup
      kafka:
        binder:
          zk-nodes: 192.168.226.5:2181
          brokers: 192.168.226.5:9092

基本和傳送的配置相同,只是這裡是為input通道對映佇列,然後還定義了一個組名,避免一個訊息被重複消費。

  現在來多次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制檯列印資料從快取中讀取,如下所示:

然後再以 delete 訪問localhost:5555/apis/org/organization/12清除快取,再次訪問 licensingservice 服務,結果如下:

自定義通道

  上面用的是Spring Cloud Stream自帶的 input/output 通道,那麼要如何自定義通道呢?下面以自定義customInput/customOutput通道為例。

自定義發資料通道

public interface CustomOutput {
    @Output("customOutput")
    MessageChannel out();
}

  對於每個自定義的發資料通道,需使用@OutPut 註解標記的返回 MessageChannel 類的方法。

自定義收資料通道

public interface CustomInput {

    @Input("customInput")
    SubscribableChannel in();
}

  同上,對應自定義的收資料通道,需要使用@Input 註解標記的返回 SubscribableChannel 類的方法。

結束

  看完本篇你應該已經能夠在 Spring Cloud 中整合 Spring Cloud Stream 訊息隊列了,貌似這個也能用到普通的 spring boot 專案中,比直接整合 mq 更加的優雅。

2019,Fighting!

本篇原創釋出於:FleyX 的個人部落格

本篇所用全部程式碼:FleyX 的 git