高性能消息中間件——NATS
在介紹NATS之前先了解下什麽是分布式系統和消息中間件
對於分布式系統的定義,一直以來我都沒有找到或者想到特別簡練而又合適的定義,這裏引用一下Distributed System Concepts and Design (Thrid Edition)中的一句話A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages,從這句話我們可以看到幾個重點,一是組件分布在網絡計算機上,二是組件之間僅僅通過消息傳遞來通信並協調行動。消息中間件維基百科給出的定義為Message-oriented middleware(MOM) is software infrastructure focused on sending and receiving messages between distrubuted systems,意思就是面向消息的系統(消息中間件)是在分布式系統中完成消息的發送和接收的基礎軟件
圖片描述(最多50字)
消息中間件常被提及的好處即異步和解耦,市面上常常被使用到的中間件有RabbitMQ, ActiveMQ, Kafka等,他們的關註度和使用率都非常的高,並且使用起來也非常的方便。公司的WiseCloud產品就集成了RabbitMQ。而在下一個版本的更新中將會使用NATS來替換RabbitMQ。使用NATS的好處比較多首先就是其性能非常好,下面引用官網的性能對比圖:
圖片描述(最多50字)
NATS介紹 NATS是一個開源、輕量級、高性能的分布式消息中間件,實現了高可伸縮性和優雅的Publish/Subscribe模型,使用Golang語言開發。NATS的開發哲學認為高質量的QoS應該在客戶端構建,故只建立了Request-Reply,不提供 1.持久化 2.事務處理 3.增強的交付模式 4.企業級隊列。 NATS消息傳遞模型 NATS支持各種消息傳遞模型,包括: 發布訂閱(Publish Subscribe) 請求回復(Request Reply) 隊列訂閱(Queue Subscribers ) 提供的功能: 純粹的發布訂閱模型(Pure pub-sub) 服務器集群(Cluster mode server) 自動精簡訂閱者(Auto-pruning of subscribers) 基於文本協議(Text-based protocol) 多服務質量保證(Multiple qualities of service - QoS) 發布訂閱(Publish Subscribe) NATS將publish/subscribe消息分發模型實現為一對多通信,發布者在Subject上發送消息,並且監聽該Subject在任何活動的訂閱者都會收到該消息
圖片描述(最多50字)
java:
//publish
Connection nc = Nats.connect("nats://127.0.0.1:4222");
nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));
//subscribe
Subscription sub = nc.subscribe("subject");
Message msg = sub.nextMessage(Duration.ofMillis(500));
String response = new String(msg.getData(), StandardCharsets.
或者是基於回調的subscribe
//subscribe
Dispatcher d = nc.createDispatcher(msg - >{
String response = new String(msg.getData(), StandardCharsets.UTF_8)
//do something
})
d.subscribe("subject");
請求響應(Request Reply)
NATS支持兩種請求響應消息:點對點或多對多。點對點涉及最快或首先響應。在一對多的消息交換中,需要限制請求響應的限制
在Request Reply過程中,發布請求發布帶有響應主題的消息,期望對該subject做出響應操作
圖片描述(最多50字)
java:
// publish
Connection connection = Nats.connect("nats://127.0.0.1:4222");
String reply = "replyMsg";
//請求回應方法回調
Dispatcher d = connection.createDispatcher(msg ->
System.out.println("reply: " + JSON.toJSONString(msg));
})
d.unsubscribe(repl , 1);
//訂閱請求
d.subscribe(reply);
//發布請求
connection.publish("requestSub", reply, "request".getBytes(StandardCharsets.
UTF_8));
//subscribe
Connection nc = Nats.connect("nats://127.0.0.1:4222");
//註冊訂閱
Dispatcher dispatcher = nc.createDispatcher(msg -> {
System.out.println(JSON.toJSONString(msg));
nc.publish(msg.getReplyTo(), "this is reply".getBytes(StandardCharsets.UTF_8));
});
dispatcher.subscribe("requestSub");
隊列訂閱&分享工作(Queue Subscribers & Sharing Work)
NATS提供稱為隊列訂閱的負載均衡功能,雖然名字為queue(隊列)但是並不是我們所認為的那樣。他的主要功能是將具有相同queue名字的subject進行負載均衡。使用隊列訂閱功能消息發布者不需要做任何改動,消息接受者需要具有相同的對列名
圖片描述(最多50字)
// Subscribe
Connection nc = Nats.connect();
Dispatcher d = nc.createDispatcher(msg -> {
//do something
System.out.println("msg: " + new String(msg.getData(),StandardCharsets.UTF_8));
});
d.subscribe("queSub", "queName");
Nats-Spring集成
NATS雖說是一個性能非常好的消息中間鍵,但是和Spring的集成不是很好。這裏提供兩個集成的思路
CloudFoundry-Community/java-nats
Wanlinus/nats-spring
java-nats
這是一個由CloudFoundry主導的一個NATS java客戶端。提供了區別於官方的nats客戶端,支持註解配置,對Spring有比較好的支持,但是此項目已經有1年多沒有更新且不支持NATS Streaming。相應用法參考Github,這裏不做詳細講解.
nats-spring
由於開源社區只提供一個簡單的NATS Client,缺少對註解和Spring的支持,所以我基於官方jnats客戶端寫了一個SpringBoot的兼容插件.主要是為了兼容spring boot amqp開發模式,盡量使用註解解決問題開發出來的,所以使用方法類似於在代碼中使用@RabbitListener.具體使用方法如下
{{git clone
cd nats-spring
mvn clean install}}}
<dependency>
<groupId>cn.wanlinus</groupId>
<artifactId>nats-spring</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
application.yml
spring:
nats:
urls:
-
nats://127.0.0.1:4222@EnableNats
br/>@EnableNats
public class NatsDemo2Application {
public static void main(String[] args) {
SpringApplication.run(NatsDemo2Application.class, args);}
}
@Component
br/>}
}
@Component
br/>@NatsSubscribe("haha")
System.out.println(message.getSubject() + " : " + new String(message.getData()));
}
}NATS Streaming介紹
NATS由於不能保證消息的投遞正確性和存在其他的缺點,NATS Streaming就孕育而生.他是一個由NATS提供支持的數據流系統,采用Go語言編寫,NATS Streaming與核心NATS平臺無縫嵌入,擴展和互操作.除了核心NATS平臺的功能外,他還提供了以下功能:
NATS Streaming特征
增強消息協議(Enhanced message protocol)
消息/事件持久化(Message/event persistence)
至少一次數據傳輸(At-least-once-delivery)
Publisher限速(Publisher rate limiting)
Subscriber速率匹配(Rate matching/limiting per subscriber)
按主題重發消息(Historical message replay by subject)
持續訂閱(Durable subscriptions)
基本用法
在使用NATS Streaming之前首先要啟動服務器,在這裏我選擇使用docker容器
4222 client默認連接端口
8222 Web端口
6222 集群通信端口
$ docker run -d -p 4222:4222 -p 8222:8222 -p 6222:6222 nats-streaming
STREAM: Starting nats-streaming-server[test-cluster] version 0.11.0
STREAM: ServerID: bzkKJL3jI4KW9Hqb0bC1Ae
STREAM: Go version: go1.11
Starting nats-server version 1.3.0
Git commit [not set]
Starting http monitor on 0.0.0.0:8222
Listening for client connections on 0.0.0.0:4222
Server is ready
STREAM: Recovering the state...
STREAM: No recovered state
STREAM: Message store is MEMORY
STREAM: ---------- Store Limits ----------
STREAM: Channels: 100 *
STREAM: --------- Channels Limits --------
STREAM: Subscriptions: 1000 *
STREAM: Messages : 1000000 *
STREAM: Bytes : 976.56 MB *
STREAM: Age : unlimited *
STREAM: Inactivity : unlimited *
STREAM: ----------------------------------
java:
// 第一個參數表示clusterId,在啟動NATS Streaming容器的時候確定
// 第二個參數表示clientID,連接客戶端的唯一標識符
StreamingConnectionFactory cf = new StreamingConnectionFactory
("test-cluster", "bar");
//設置Nats服務器地址和端口,默認是nats://127.0.0.1:4222
cf.setNatsConnection(Nats.connect("nats://127.0.0.1:4222"));
StreamingConnection sc = cf.createConnection();
Publish: sc.publish("foo", "Hello World".getBytes());
Subscribe:
sc.subscribe("foo", msg -> {
System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
}, new SubscriptionOptions.Builder()
.durableName("aa")
.deliverAllAvailable().build());
在使用NATS Streaming的時候需要註意訂閱主題不支持通配符,在訂閱消息時傳入MessageHandler函數是接口實現和SubscriptionOptions對象.MessageHandler提供消息回調處理, SubscriptionOptions用於設置訂閱選項,比如設置Queue, durableName, ack等。
Streaming-Spring集成
作為一款優秀的消息中間件,卻沒有對Spring做集成,這是非常的可惜的事情.所以為了在工作中方便的使用他,我開發了一個很小的插件.雖然還有很大的改進空間,不過在公司的項目中卻能夠很好的運行.他開發思路和nats-spring差不多,所以使用方式也是大同小異,具體如下:
{{git clone https://github.com/wanlinus/na ... g.git
cd nats-streaming-spring
mvn clean install}}}
<dependency>
<groupId>cn.wanlinus</groupId>
<artifactId>nats-streaming-spring</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
application.yml
spring:
nats:
streaming:
nats-url: nats://127.0.0.1:4222
cluster-id: test-cluster@EnableNatsStreaming
br/>@EnableNatsStreaming
public class StreamingDemoApplication {
public static void main(String[] args) {
SpringApplication.run(StreamingDemoApplication.class, args);}
//發布消息只需要註入StreamingConnection
@Autowired
br/>}
//發布消息只需要註入StreamingConnection
@Autowired
public void sendMsg(){
sc.publish("foo", "publish message".getBytes())}
}
@Service
br/>}
}
@Service
@Subscribe(value = "foo", durableName = "dname", queue = "queue")
public void asd(Message message) throws IOException {
System.out.println(new String(message.getData(), StandardCharsets.UTF_8));
}
}
兩個插件由於是為了結合項目所寫的,所以裏面有些部分並不通用。後續的開發中我將會繼續進行抽象和改進。
歡迎工作一到五年的Java工程師朋友們加入Java架構開發: 855835163
群內提供免費的Java架構學習資料(裏面有高可用、高並發、高性能及分布式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!
高性能消息中間件——NATS