使用Akka持久化——訊息傳送與接收
前言
在《使用Akka持久化——持久化與快照》一文中介紹瞭如何使用Akka持久化訊息及生成快照。對於叢集應用來說,傳送者發出訊息,只有當收到了接受者的成功回覆才應當認為是一次完整的請求和應答(一些RPC框架只提供了遠端呼叫、序列化/反序列化的機制,但是具體呼叫的成功與否實際是拋給了開發者本人),利用Akka的應答機制很容易實現這些功能。特殊情況下發送者傳送了訊息,但是最終接受者卻沒有接收到訊息,導致這一情況發生的因素很多(例如:傳送者呼叫完傳送介面後,傳送者所在程序奔潰了;網路故障;接收者不存在等)。如果這些訊息的成功接收與處理對於整個應用而言具有強一致性的要求,那麼這些都會導致很多困擾,好在我們可以使用Akka的持久化機制。
傳送者在傳送訊息之前先對訊息進行持久化,那麼無論任何原因導致沒有收到接收者的成功回覆時,我們總能有辦法從持久化資訊中找出那些未被成功回覆的訊息進行重發(這說明接收者接到的訊息有可能會重複,此時需要保證接收者的實現是冥等的)。當接收者收到訊息進行處理後需要向傳送者傳送成功回覆,傳送者收到回覆後的第一個動作應當是對回覆內容的持久化,否則有可能在還未真正對成功回覆處理時宕機或程序奔潰導致回覆訊息丟失(在持久化與收到訊息之間仍然會存在宕機和程序奔潰的情況,只不過這個時間非常短,因此丟失回覆的可能會很低),當持久化回覆訊息完成後,可以自己慢慢來處理這些確認資訊,而不用擔心它們丟失了。
本文將根據Akka官網的例子,對其做一些適應性改造後,向大家展示Akka持久化的另一個強大武器——At least once delivery!
訊息投遞規則
一般而言,訊息投遞有下面三種情況:
- at-most-once 意味著每條應用了這種機制的訊息會被投遞0次或1次。可以說這條訊息可能會丟失。
- at-least-once 意味著每條應用了這種機制的訊息潛在的存在多次投遞嘗試並保證至少會成功一次。就是說這條訊息可能會重複但是不會丟失。
- exactly-once 意味著每條應用了這種機制的訊息只會向接收者準確的傳送一次。換言之,這種訊息既不會丟失也不會重複。
配置
at-least-once-delivery {
redeliver-interval = 20000
redelivery-burst-limit = 100
}
redeliver-interval用於配置重新進行投遞嘗試的時間間隔,單位是毫秒。redelivery-burst-limit用於配置每次重新執行投遞嘗試時傳送的最大訊息條數。一致性訊息例子
我們首先來看看本例中用到的訊息體MsgSent、Msg、Confirm及MsgConfirmed。MsgSent代表將要傳送的訊息,但是隻用於持久化,持久化完成後會將MsgSent轉換為Msg進行傳送。也就是說Msg才會被真正用於訊息傳送。接收者收到Msg訊息後將向傳送者回復Confirm訊息,需要注意的是Msg和Confirm都有屬性deliveryId,此deliveryId由傳送者的持久化功能生成,一條Msg訊息和其對應的Confirm回覆的deliveryId必須一致,否則在利用UntypedPersistentActorWithAtLeastOnceDelivery對回覆訊息進行確認時會產生嚴重的bug。傳送者收到接收者的Confirm回覆後首先將其轉換為MsgConfirmed,然後對MsgConfirmed進行持久化,最後呼叫UntypedPersistentActorWithAtLeastOnceDelivery提供的confirmDelivery方法對回覆進行確認。MsgSent、Msg、Confirm及MsgConfirmed的程式碼實現如下:
public interface Persistence {
public static class Msg implements Serializable {
private static final long serialVersionUID = 1L;
public final long deliveryId;
public final String s;
public Msg(long deliveryId, String s) {
this.deliveryId = deliveryId;
this.s = s;
}
}
public static class Confirm implements Serializable {
private static final long serialVersionUID = 1L;
public final long deliveryId;
public Confirm(long deliveryId) {
this.deliveryId = deliveryId;
}
}
public static class MsgSent implements Serializable {
private static final long serialVersionUID = 1L;
public final String s;
public MsgSent(String s) {
this.s = s;
}
}
public static class MsgConfirmed implements Serializable {
private static final long serialVersionUID = 1L;
public final long deliveryId;
public MsgConfirmed(long deliveryId) {
this.deliveryId = deliveryId;
}
}
}
服務端
本例中的服務端非常簡單,是一個接收處理Msg訊息,並向傳送者回復Confirm訊息的Actor,程式碼如下:
@Named("MyDestination")
@Scope("prototype")
public class MyDestination extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof Msg) {
Msg msg = (Msg) message;
log.info("receive msg : " + msg.s + ", deliveryId : " + msg.deliveryId);
getSender().tell(new Confirm(msg.deliveryId), getSelf());
} else {
unhandled(message);
}
}
}
服務端的啟動程式碼如下:
logger.info("Start myDestination");
final ActorRef myDestination = actorSystem.actorOf(springExt.props("MyDestination"), "myDestination");
logger.info("Started myDestination");
客戶端
具體介紹客戶端之前,先來列出其實現,程式碼如下:
@Named("MyPersistentActor")
@Scope("prototype")
public class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private final ActorSelection destination;
@Override
public String persistenceId() {
return "persistence-id";
}
public MyPersistentActor(ActorSelection destination) {
this.destination = destination;
}
@Override
public void onReceiveCommand(Object message) {
if (message instanceof String) {
String s = (String) message;
log.info("receive msg : " + s);
persist(new MsgSent(s), new Procedure<MsgSent>() {
public void apply(MsgSent evt) {
updateState(evt);
}
});
} else if (message instanceof Confirm) {
Confirm confirm = (Confirm) message;
log.info("receive confirm with deliveryId : " + confirm.deliveryId);
persist(new MsgConfirmed(confirm.deliveryId), new Procedure<MsgConfirmed>() {
public void apply(MsgConfirmed evt) {
updateState(evt);
}
});
} else if (message instanceof UnconfirmedWarning) {
log.info("receive unconfirmed warning : " + message);
// After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self. The re-sending will still continue, but you can choose to call confirmDelivery to cancel the re-sending.
List<UnconfirmedDelivery> list = ((UnconfirmedWarning) message).getUnconfirmedDeliveries();
for (UnconfirmedDelivery unconfirmedDelivery : list) {
Msg msg = (Msg) unconfirmedDelivery.getMessage();
confirmDelivery(msg.deliveryId);
}
} else {
unhandled(message);
}
}
@Override
public void onReceiveRecover(Object event) {
updateState(event);
}
void updateState(Object event) {
if (event instanceof MsgSent) {
final MsgSent evt = (MsgSent) event;
deliver(destination, new Function<Long, Object>() {
public Object apply(Long deliveryId) {
return new Msg(deliveryId, evt.s);
}
});
} else if (event instanceof MsgConfirmed) {
final MsgConfirmed evt = (MsgConfirmed) event;
confirmDelivery(evt.deliveryId);
}
}
}
正如我們之前所述——要使用at-least-once的能力,就必須繼承UntypedPersistentActorWithAtLeastOnceDelivery。有關MsgSent、Msg、Confirm及MsgConfirmed等訊息的處理過程已經介紹過,這裡不再贅述。我們注意到onReceiveCommand方法還處理了一種名為UnconfirmedWarning的訊息,這類訊息將在at-least-once機制下進行無限或者一定數量的投遞嘗試後傳送給當前Actor,這裡的數量可以通過在at-least-once-delivery配置中增加配置項warn-after-number-of-unconfirmed-attempts來調整,例如:
at-least-once-delivery {
redeliver-interval = 20000
redelivery-burst-limit = 100
warn-after-number-of-unconfirmed-attempts = 6
}
當你收到UnconfirmedWarning的訊息時,說明已經超出了你期望的最大重試次數,此時可以做一些控制了,例如:對於這些訊息傳送報警、丟棄等。本例中選擇了丟棄。
UntypedPersistentActorWithAtLeastOnceDelivery的狀態由那些尚未被確認的訊息和一個序列號組成。UntypedPersistentActorWithAtLeastOnceDelivery本身不會儲存這些狀態,依然需要你在呼叫deliver方法投遞訊息之前,呼叫persist方法持久化這些事件或訊息,以便於當持久化Actor能夠在恢復階段恢復。在恢復階段,deliver方法並不會將發出訊息,此時持久化Actor一面恢復,一面只能等待接收回復。當恢復完成,deliver將傳送那些被快取的訊息(除了收到回覆,並呼叫confirmDelivery方法的訊息)。執行例子
本文將率先啟動客戶端並向服務端傳送hello-1,hello-2,hello-3這三訊息,但是由於服務端此時並未啟動,所以客戶端會不斷重試,直到重試達到上限或者受到回覆並確認。服務端傳送訊息的程式碼如下:
logger.info("Start myPersistentActor");
final String path = "akka.tcp://[email protected]:2551/user/myDestination";
final ActorSelection destination = actorSystem.actorSelection(path);
final ActorRef myPersistentActor = actorSystem.actorOf(springExt.props("MyPersistentActor", destination), "myPersistentActor");
actorMap.put("myPersistentActor", myPersistentActor);
logger.info("Started myPersistentActor");
myPersistentActor.tell("hello-1", null);
myPersistentActor.tell("hello-2", null);
myPersistentActor.tell("hello-3", null);
客戶端傳送三條訊息後,日誌中立馬打印出了以下內容:但是一直未受到回覆資訊,然後我們啟動服務端,不一會就看到了以下日誌輸出:
我們再來看看客戶端,發現已經收到了回覆,內容如下:
總結
通過使用UntypedPersistentActorWithAtLeastOnceDelivery提供的persist、deliver及confirmDelivery等方法可以對整個應用的at-least-once需求,輕鬆實現在框架層面上一致的實現。
其它Akka應用的博文如下:
後記:個人總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。
相關推薦
使用Akka持久化——訊息傳送與接收
前言在《使用Akka持久化——持久化與快照》一文中介紹瞭如何使用Akka持久化訊息及生成快照。對於叢集應用來說,傳送者發出訊息,只有當收到了接受者的成功回覆才應當認為是一次完整的請求和應答(一些RPC框架只提供了遠端呼叫、序列化/反序列化的機制,但是具體呼叫的成功與否實際是拋
Scala:簡單使用Actor的訊息傳送與接收求和
從Scala的諸多介紹當中,就看到了不少特別指出Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制。或者說,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。 說到並行與訊息傳送、接收,我記起了上學期“平行
spring-boot 整合kafka單節點訊息傳送與接收
springboot還處於學習階段,又同時在學習kafka,兩者結合,繼續學習。 1、官網下載kafka 2、解壓 3、對於單節點來說,按照官網上操作即可實現訊息的傳送和接收。 但是對於客戶端,是通過 @KafkaListener 註解監聽生產者傳送的訊
SpringBoot通過kafka實現訊息傳送與接收(包括不能傳送和消費kafka訊息的採坑記錄)
kafka採坑記錄: 1、kafka服務端server.properties中的broker.id叢集內需要唯一。 2、kafka config檔案中listeners和advertised.listeners需要配置本機ip:9092地址,不然消費不到資
使用 kafka 的java客戶端進行訊息的傳送與接收通訊操作
kafka的傳送端: package com.zwz.test; import kafka.Kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.cli
Java Socket傳送與接收HTTP訊息簡單實現
在上次Java Socket現實簡單的HTTP服務我們實現了簡單的HTTP服務,它可以用來模擬HTTP服務,用它可以截獲HTTP請求的原始碼流,讓我們很清楚的瞭解到我們向服務發的HTTP訊息的結構,對HTTP請求訊息有個清晰的認識。這一節我想寫了一個客戶的程式,就是用來
Android中基於ServerSocket的實際使用與簡單介紹(內附一個PC端群控多臺手機的訊息傳送和接收Demo)
一、要想將ServerSocket整明白首先至少先要知道或是瞭解幾點基礎內容部分(大神或是有一定能力的小神跳過): 1.ServerSocket是需要在服務端定義書寫的而在客戶端不需要ServerSocket,客戶端只需要建立socket就可以了。 2.socket需要在子
rabbitMQ學習筆記(二) 簡單的傳送與接收訊息 HelloWorld
首先要下載rabbitmq的javaClient庫,然後加入到專案中,下載地址為:http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-bin-3.1.5.zip
Spring kafka 學習之二 採用java 配置類方式傳送與接收訊息
參考資料:https://docs.spring.io/spring-kafka/reference/html/_introduction.html#compatibilityspring-kafka 版本:2.1.5.release1、配置類package com.hdsx
VC++自定義訊息的傳送與接收的方法實現
訊息傳遞的方法一:使用ON_MESSAGE 使用ON_MESSAGE響應訊息,必須配合定義訊息#define WM_MY_MESSAGE (WM_USER+100) 對於傳送訊息者-MyMessageDlg, 在其MyMessageDlg.h中,定義#define WM_M
Unity Http 訊息傳送與接受
參考: https://blog.csdn.net/mseol/article/details/54138762 https://blog.csdn.net/h570768995/article/details/50386935 https://stackoverflow.com/que
STM32 HAL庫學習(四) SPI查詢傳送與接收
又是花了兩天時間調SPI......細心細心還是需要細心啊,還是用的上次的SPI Flash晶片mx25l04600E,主要是測試晶片的初始化和讀取晶片ID是否成功。 STM32F070晶片只有一個SPI,但可用作SPI訊號管腳的引腳卻不只一組,建議通訊前先連線MOSI和MISO測試自發自收是否
RabbitMQ的學習(二):簡單的java demo實現RabbitMQ的傳送與接收
通過第一章已經很輕鬆地實現了RabbitMQ的安裝和啟動,第二章開始最簡單的java demo學習,一層一層深入瞭解RabbitMQ的牛逼之處,期間肯定也會碰到很多問題,這些問題,將會收集起來,最後面去解決同時也會寫相關的文章。 一、專案相關jar包匯入: 新建一個maven工程,p
JSON傳送與接收(java)
var xmlHttp;function createXMLHttpRequest() { if(window.XMLHttpRequest) { xmlHttp = new XMLHttpRequest(); } else if(window.ActiveXObject) {
JSON傳送與接收
var xmlHttp;function createXMLHttpRequest() { if(window.XMLHttpRequest) { xmlHttp = new XMLHttpRequest(); } else if(window.ActiveXObject) {
Servlet和JSON傳送與接收
http://it.chinawin.net/softwaredev/article-d431.html 在 JSON 中我們已經介紹過其基本格式,與XML相同,JSON只是一個文字格式,只要客戶端與伺服端可以剖析它,就可以利用它作為傳送資料的格式,但它是JavaScri
【linux c】簡單UDP應用,資料傳送與接收
主要函式: 接收recvfrom() ssize_t recvfrom (int sockfd, void *buf, size_t len, int flags, stru
Socket通訊案例--實現傳送與接收資料
客戶端:import socket client = socket.socket()client.connect(("localhost",6969))while True: msg = input("我要發資料>>:").strip() print("開始傳送資料: "+msg)
ActiveMQ的傳送與接收 簡單例子和監聽配置
簡單傳送例項: package com.xuwei.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; impor
java實現kafka訊息傳送和接收
之前寫了一篇關於kafka叢集搭建的點選開啟連結。想了解的可以看下。今天這個實現是和前面叢集對應的。使用的是新版的API。屬性如果想定製自己的,需要到官方網址上面去檢視一下對應的值。推介大家多去看看官方的介紹和demo。網上有些翻譯過來的例子並不完善,最好是知己知彼,才能百戰