RabbitMQ客戶端原始碼分析之BlockingCell.md
阿新 • • 發佈:2018-12-16
RabbitMQ-java-client版本
com.rabbitmq:amqp-client:4.3.0
RabbitMQ
版本宣告: 3.6.15
BlockingCell
-
BlockingCell,程式碼文件註釋描述為”簡單的一次性IPC機制“,其實就是一個
Future
物件,大多數長連線裡非同步處理獲取響應值都會採用Future
模式。 -
uml關聯
BlockingCell原始碼分析
-
完整程式碼,我們從程式碼結構來看其實就是一個Future
/** * Simple one-shot IPC mechanism. Essentially a one-place buffer that cannot be emptied once filled. * 簡單的一次性IPC機制。 基本上是一個緩衝區,一旦填滿就不能清空。 * 從程式碼上來看其實就是一個Future物件 */
ValueOrException
-
ValueOrException
值與異常的包裝。public class ValueOrException<V, E extends Throwable & SensibleClone<E>> { private final boolean _useValue; private final V _value; private final E _exception; ...... }
BlockingValueOrException
-
BlockingValueOrException
是BlockingCell
的擴充套件類,泛型引數是ValueOrException
public class BlockingValueOrException<V, E extends Throwable & SensibleClone<E>> extends BlockingCell<ValueOrException<V, E>> { public void setValue(V v) { super.set(ValueOrException.<V, E>makeValue(v)); } public void setException(E e) { super.set(ValueOrException.<V, E>makeException(e)); } public V uninterruptibleGetValue() throws E { return uninterruptibleGet().getValue(); } public V uninterruptibleGetValue(int timeout) throws E, TimeoutException { return uninterruptibleGet(timeout).getValue(); } }
RpcContinuation
-
rpc配置介面
public interface RpcContinuation { void handleCommand(AMQCommand command); void handleShutdownSignal(ShutdownSignalException signal); } public static abstract class BlockingRpcContinuation<T> implements RpcContinuation { public final BlockingValueOrException<T, ShutdownSignalException> _blocker = new BlockingValueOrException<T, ShutdownSignalException>(); //將響應的結果存入到Future物件中(BlockingCell) @Override public void handleCommand(AMQCommand command) { _blocker.setValue(transformReply(command)); } @Override public void handleShutdownSignal(ShutdownSignalException signal) { _blocker.setException(signal); } public T getReply() throws ShutdownSignalException { return _blocker.uninterruptibleGetValue(); } public T getReply(int timeout) throws ShutdownSignalException, TimeoutException { return _blocker.uninterruptibleGetValue(timeout); } //獲取響應的結果 public abstract T transformReply(AMQCommand command); } public static class SimpleBlockingRpcContinuation extends BlockingRpcContinuation<AMQCommand> { @Override public AMQCommand transformReply(AMQCommand command) { return command; } }
擴充套件
自己寫一個
-
這裡是我之前學習netty時候寫的一個Future物件以及實現,供參考
-
ResponseFuture介面
public interface ResponseFuture<T> { T get() throws InterruptedException; T get(final long timeoutMillis) throws InterruptedException; boolean isCancelled(); boolean isDone(); boolean isSuccess(); void setResult(T result); void setFailure(Throwable err); boolean isTimeout(); public long getCreateTime(); public long getTimeoutInMillis(); public void executeAsyncInvokeCallback(); public void setAsyncInvokeCallback(AsyncInvokeCallback<T> invokeCallback); public boolean hasCallback(); }
-
AbstractResponseFuture抽象類
package cn.jananl.netty.transport; import java.util.concurrent.CountDownLatch; public abstract class AbstractResponseFuture<T> implements ResponseFuture<T>{ public enum FutureState { /**新建任務 **/ NEW(0), /** 任務完成 **/ DONE(1), /** 任務取消**/ CANCELLED(2), /** * 任務超時 */ TIMEOUT(3); private int state; FutureState(int state) { this.state = state; } } protected final CountDownLatch countDownLatch = new CountDownLatch(1); protected volatile FutureState state = FutureState.NEW; //狀態 protected final long createTime = System.currentTimeMillis();//處理開始時間 /** * 本地呼叫的超時時間 */ protected long timeoutInMillis; public AbstractResponseFuture(long timeoutInMillis) { this.timeoutInMillis = timeoutInMillis; } @Override public boolean isCancelled() { return this.state == FutureState.CANCELLED; } @Override public boolean isDone() { return this.state == FutureState.DONE; } @Override public boolean isTimeout() { return createTime + timeoutInMillis < System.currentTimeMillis(); } public long getCreateTime() { return createTime; } public long getTimeoutInMillis() { return timeoutInMillis; } }
-
DefaultResponseFuture
具體實現package cn.jananl.netty.transport; import java.util.concurrent.TimeUnit; /** * 預設響應future * @author jannal * @param <T> */ public class DefaultResponseFuture<T> extends AbstractResponseFuture<T>{ private volatile T result; private volatile Throwable throwable; private AsyncInvokeCallback<T> invokeCallback; public DefaultResponseFuture(long timeoutInMillis) { super(timeoutInMillis); } @Override public T get() throws InterruptedException { if(!this.isDone()) { this.countDownLatch.await(super.timeoutInMillis, TimeUnit.MILLISECONDS); } if(this.throwable!=null){ if(this.state==FutureState.CANCELLED){ throw new RuntimeException("任務被取消"); }else{ throw new RuntimeException(this.throwable); } }else{ return this.result; } } @Override public T get(long timeout) throws InterruptedException { if(!this.isDone()) { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } return result; } @Override public boolean isSuccess() { return isDone()&&throwable==null; } @Override public void setFailure(Throwable cause) { if(!this.isDone()) { this.setFailure(cause); this.state = FutureState.DONE; this.countDownLatch.countDown(); } } @Override public void setResult(T result) { if(!this.isDone()){ this.result = result; this.state = FutureState.DONE; this.countDownLatch.countDown(); } } @Override public void executeAsyncInvokeCallback() { if(invokeCallback!=null){ invokeCallback.operationComplete(this); } } @Override public void setAsyncInvokeCallback(AsyncInvokeCallback<T> invokeCallback) { this.invokeCallback = invokeCallback; } @Override public boolean hasCallback() { if(invokeCallback!=null){ return true; } return false; } }