使用websocket遇到的一個小問題 The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat
阿新 • • 發佈:2019-02-10
當使用websocket,向前端實時推送訊息的時候,遇到如下異常:
2018-06-29 13:42:55 ERROR 988 --- [container-699] org.springframework.data.redis.listener.adapter.MessageListenerAdapter : [TxId : , SpanId : ] Listener execution failed
org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException: Listener method 'crmpushReceive' threw exception; nested exception is java.lang.IllegalStateException: The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter .java:380) [spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:310) [spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.RedisMessageListenerContainer.executeListener(RedisMessageListenerContainer .java:249) [spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.RedisMessageListenerContainer.processMessage(RedisMessageListenerContainer.java:239) [spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.RedisMessageListenerContainer$1.run(RedisMessageListenerContainer.java:967) [spring-data-redis-1.8.1.RELEASE.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]
Caused by: java.lang.IllegalStateException: The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1224) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textPartialStart(WsRemoteEndpointImplBase.java:1182) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:222) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11]
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11]
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:203) ~[spring-websocket-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:101) ~[spring-websocket-4.3.7.RELEASE.jar:4.3.7.RELEASE]
at com.msxf.afdi.biz.redis.RedisService.crmpushReceive(RedisService.java:105) ~[classes/:?]
at sun.reflect.GeneratedMethodAccessor185.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_65]
at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_65]
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter$MethodInvoker.invoke(MessageListenerAdapter.java:145) ~[spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:374) ~[spring-data-redis-1.8.1.RELEASE.jar:?]
... 5 more
虛擬碼如下:
class A{
public static CopyOnWriteArraySet<WebSocketSession> CONNECTS = new CopyOnWriteArraySet<>();
public void handlerA(String message){
for (WebSocketSession session : CONNECTS ) {
session.sendMessage(new TextMessage(message));
}
}
public void handlerB(String message){
for (WebSocketSession session : CONNECTS ) {
session.sendMessage(new TextMessage(message));
}
}
}
其實原因就是: handlerA和handlerB兩個方法有可能同時執行,當A或者B方法遍歷到某一個session並且呼叫sendMessage傳送訊息的時候,另外一個方法也正好也在使用相同的session傳送另外一個訊息(同一個session訊息傳送衝突了,也就是說同一個時刻,多個執行緒向一個socket寫資料衝突了),就會報上面的異常。
解決方法其實很簡單,就是在傳送訊息的時候加上一把鎖,(保證一個session在某個時刻不會被呼叫多次):
class A{
public static CopyOnWriteArraySet<WebSocketSession> CONNECTS = new CopyOnWriteArraySet<>();
public void handlerA(String message){
for (WebSocketSession session : CONNECTS ) {
synchronized (session) {
session.sendMessage(new TextMessage(message));
}
}
}
public void handlerB(String message){
for (WebSocketSession session : CONNECTS ) {
synchronized (session) {
session.sendMessage(new TextMessage(message));
}
}
}
}
注意:如果伺服器是配置了https的話, websocket需要用wss協議