1. 程式人生 > >Spring Websocket+SockJS+STOMP 實現即時通訊(三)—— ChannelInterceptor與ExecutorChannelInterceptor

Spring Websocket+SockJS+STOMP 實現即時通訊(三)—— ChannelInterceptor與ExecutorChannelInterceptor

ChannelInterceptor:

  • Message被髮送到執行緒池,在傳送動作執行前(後)攔截,發生在當前執行緒。

ExecutorChannelInterceptor:

  • Message被髮送到執行緒池後,線上程池持有的新執行緒中,在MessageHandler處理前(後)攔截。

這裡以,為InboundChannel配置ChannelInterceptor為例

  1. 在WebSocketMessageBrokerConfigurer中配置攔截器:
WebSocketMessageBrokerConfigurer:
 @Override
	public void configureClientInboundChannel
(ChannelRegistration registration) { registration.interceptors(new ChannelInterceptor() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class
); StompCommand command = accessor.getCommand(); log.info(command.toString()); log.info(JSON.toJSONString(message)); if (StompCommand.CONNECT.equals(accessor.getCommand())) { //群聊:表示roomId;私聊:表示userId String id = accessor.getFirstNativeHeader
("id"); accessor.setUser(new IdPrincipal(id)); } return message; } }); }
  1. 從WebSocketMessageBrokerConfigurer配置中讀取登記過的攔截器
AbstractMessageBrokerConfiguration:
	@Bean
	public AbstractSubscribableChannel clientInboundChannel() {
		ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
		ChannelRegistration reg = getClientInboundChannelRegistration();
		if (reg.hasInterceptors()) {
			channel.setInterceptors(reg.getInterceptors());
		}
		return channel;
	}
  1. 將通道攔截器繫結到ExecutorSubscribableChannel上,同時為攔截器分類ChannelInterceptorExecutorChannelInterceptor,新增到相應集合中
ExecutorSubscribableChannel:
	@Override
	public void setInterceptors(List<ChannelInterceptor> interceptors) {
		super.setInterceptors(interceptors);
		this.executorInterceptors.clear();
		for (ChannelInterceptor interceptor : interceptors) {
			if (interceptor instanceof ExecutorChannelInterceptor) {
				this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
			}
		}
	}
  1. 普通的ChannelInterceptor,被內部類AbstractMessageChannel$ChannelInterceptorChain構造成攔截器鏈,這個攔截器鏈的所有實現方法都以外部類AbstractMessageChannel例項所持有的interceptors集合為核心,進行攔截器的呼叫。
  2. 攔截器鏈的呼叫發生在AbstractMessageChannel例項的send方法中,在sendInternal方法執行前後同步呼叫,發生在當前執行緒。
AbstractMessageChannel:

public abstract class AbstractMessageChannel implements MessageChannel, InterceptableChannel, BeanNameAware {

	private final List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>(5);

	@Override
	public final boolean send(Message<?> message, long timeout) {
		Assert.notNull(message, "Message must not be null");
		ChannelInterceptorChain chain = new ChannelInterceptorChain();
		boolean sent = false;
		try {
			message = chain.applyPreSend(message, this);
			if (message == null) {
				return false;
			}
			sent = sendInternal(message, timeout);
			chain.applyPostSend(message, this, sent);
			chain.triggerAfterSendCompletion(message, this, sent, null);
			return sent;
		}
		catch (Exception ex) {
			chain.triggerAfterSendCompletion(message, this, sent, ex);
			if (ex instanceof MessagingException) {
				throw (MessagingException) ex;
			}
			throw new MessageDeliveryException(message,"Failed to send message to " + this, ex);
		}
		catch (Throwable err) {
			MessageDeliveryException ex2 =
					new MessageDeliveryException(message, "Failed to send message to " + this, err);
			chain.triggerAfterSendCompletion(message, this, sent, ex2);
			throw ex2;
		}
	}

	/**
	 * Assists with the invocation of the configured channel interceptors.
	 */
	protected class ChannelInterceptorChain {
		private int sendInterceptorIndex = -1;
		private int receiveInterceptorIndex = -1;
		
		public Message<?> applyPreSend(Message<?> message, MessageChannel channel) {
			Message<?> messageToUse = message;
			for (ChannelInterceptor interceptor : interceptors) {
				Message<?> resolvedMessage = interceptor.preSend(messageToUse, channel);
				if (resolvedMessage == null) {
					String name = interceptor.getClass().getSimpleName();
					if (logger.isDebugEnabled()) {
						logger.debug(name + " returned null from preSend, i.e. precluding the send.");
					}
					triggerAfterSendCompletion(messageToUse, channel, false, null);
					return null;
				}
				messageToUse = resolvedMessage;
				this.sendInterceptorIndex++;
			}
			return messageToUse;
		}
		public void applyPostSend(Message<?> message, MessageChannel channel, boolean sent) {
			for (ChannelInterceptor interceptor : interceptors) {
				interceptor.postSend(message, channel, sent);
			}
		}
      
        /**
         *其他方法
         */
	}

}
  1. InboundChannel例項通過sendInternal方法,將Message與MessageHandler封裝成SendTask,傳送到支撐自己的ThreadPoolTaskExecutor執行緒池,在新的執行緒中非同步處理Message;
  2. 在新的執行緒中,在SendTask的run方法裡,通過MessageHandler.handleMessage(Message)方法對Message進行處理,在handleMessage前後呼叫executorInterceptors集合,進行攔截;
ExecutorSubscribableChannel :

public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
	private final Executor executor;
	private final List<ExecutorChannelInterceptor> executorInterceptors = new ArrayList<ExecutorChannelInterceptor>(4);
	
	@Override
	public boolean sendInternal(Message<?> message, long timeout) {
		for (MessageHandler handler : getSubscribers()) {
			SendTask sendTask = new SendTask(message, handler);
			if (this.executor == null) {
				sendTask.run();
			}
			else {
				this.executor.execute(sendTask);
			}
		}
		return true;
	}
	/**
	 * Invoke a MessageHandler with ExecutorChannelInterceptors.
	 */
	private class SendTask implements MessageHandlingRunnable {
		@Override
		public void run() {
			Message<?> message = this.inputMessage;
			try {
				message = applyBeforeHandle(message);
				if (message == null) {
					return;
				}
				this.messageHandler.handleMessage(message);
				triggerAfterMessageHandled(message, null);
			}
			catch (Exception ex) {
				triggerAfterMessageHandled(message, ex);
				if (ex instanceof MessagingException) {
					throw (MessagingException) ex;
				}
				String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
				throw new MessageDeliveryException(message, description, ex);
			}
			catch (Throwable err) {
				String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
				MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
				triggerAfterMessageHandled(message, ex2);
				throw ex2;
			}
		}

		private Message<?> applyBeforeHandle(Message<?> message) {
			Message<?> messageToUse = message;
			for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
				messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler);
				if (messageToUse == null) {
					String name = interceptor.getClass().getSimpleName();
					if (logger.isDebugEnabled()) {
						logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
					}
					triggerAfterMessageHandled(message, null);
					return null;
				}
				this.interceptorIndex++;
			}
			return messageToUse;
		}

		private void triggerAfterMessageHandled(Message<?> message, Exception ex) {
			for (int i = this.interceptorIndex; i >= 0; i--) {
				ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
				try {
					interceptor.afterMessageHandled(message, ExecutorSubscribableChannel.this, this.messageHandler, ex);
				}
				catch (Throwable ex2) {
					logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
				}
			}
		}
	}

}