1. 程式人生 > >使用websocket遇到的一個小問題 The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat

使用websocket遇到的一個小問題 The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat

當使用websocket,向前端實時推送訊息的時候,遇到如下異常:

2018-06-29 13:42:55 ERROR 988 --- [container-699] org.springframework.data.redis.listener.adapter.MessageListenerAdapter : [TxId :  , SpanId : ] Listener execution failed
org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException: Listener
method 'crmpushReceive' threw exception; nested exception is java.lang.IllegalStateException: The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter
.java:380)
[spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:310) [spring-data-redis-1.8.1.RELEASE.jar:?] at org.springframework.data.redis.listener.RedisMessageListenerContainer.executeListener(RedisMessageListenerContainer
.java:249)
[spring-data-redis-1.8.1.RELEASE.jar:?]
at org.springframework.data.redis.listener.RedisMessageListenerContainer.processMessage(RedisMessageListenerContainer.java:239) [spring-data-redis-1.8.1.RELEASE.jar:?] at org.springframework.data.redis.listener.RedisMessageListenerContainer$1.run(RedisMessageListenerContainer.java:967) [spring-data-redis-1.8.1.RELEASE.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65] Caused by: java.lang.IllegalStateException: The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1224) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11] at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textPartialStart(WsRemoteEndpointImplBase.java:1182) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11] at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:222) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11] at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49) ~[tomcat-embed-websocket-8.5.11.jar:8.5.11] at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:203) ~[spring-websocket-4.3.7.RELEASE.jar:4.3.7.RELEASE] at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:101) ~[spring-websocket-4.3.7.RELEASE.jar:4.3.7.RELEASE] at com.msxf.afdi.biz.redis.RedisService.crmpushReceive(RedisService.java:105) ~[classes/:?] at sun.reflect.GeneratedMethodAccessor185.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_65] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_65] at org.springframework.data.redis.listener.adapter.MessageListenerAdapter$MethodInvoker.invoke(MessageListenerAdapter.java:145) ~[spring-data-redis-1.8.1.RELEASE.jar:?] at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:374) ~[spring-data-redis-1.8.1.RELEASE.jar:?] ... 5 more

虛擬碼如下:

class A{
    public static CopyOnWriteArraySet<WebSocketSession> CONNECTS = new CopyOnWriteArraySet<>();

    public void handlerA(String message){
        for (WebSocketSession session : CONNECTS ) {
            session.sendMessage(new TextMessage(message));
        }
    }

    public void handlerB(String message){
        for (WebSocketSession session : CONNECTS ) {
            session.sendMessage(new TextMessage(message));
        }
    }
}

其實原因就是: handlerA和handlerB兩個方法有可能同時執行,當A或者B方法遍歷到某一個session並且呼叫sendMessage傳送訊息的時候,另外一個方法也正好也在使用相同的session傳送另外一個訊息(同一個session訊息傳送衝突了,也就是說同一個時刻,多個執行緒向一個socket寫資料衝突了),就會報上面的異常。

解決方法其實很簡單,就是在傳送訊息的時候加上一把鎖,(保證一個session在某個時刻不會被呼叫多次):

class A{
    public static CopyOnWriteArraySet<WebSocketSession> CONNECTS = new CopyOnWriteArraySet<>();

    public void handlerA(String message){
        for (WebSocketSession session : CONNECTS ) {
            synchronized (session) {
                session.sendMessage(new TextMessage(message));
            }
        }
    }

    public void handlerB(String message){
        for (WebSocketSession session : CONNECTS ) {
            synchronized (session) {
                session.sendMessage(new TextMessage(message));
            }
        }
    }
}

注意:如果伺服器是配置了https的話, websocket需要用wss協議