聊聊rsocket load balancer的Ewma
序
本文主要研究一下rsocket load balancer的Ewma
Moving Average
SMA
SMA(Simple Moving Average
),即簡單移動平均,其公式如下:
SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n
這裡的Pt到為Pt-n+1為最近的n個數據
WMA
WMA(Weighted Moving Average
),即加權移動平均,其公式如下:
WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)
WMA會給最近的n個數據加上權重,其中這些權重加起來和為1,一般是較近的資料權重比較大
EMA或EWMA
EMA(Exponentially Moving Average
)指數移動平均或EWMA(Exponentially Weighted Moving Average
)指數加權移動平均,其公式如下:
EMAt = (Pt * S) + (1- S) * EMAt-1
它有一個S引數為平滑指數,一般是取2/(N+1)
Ewma
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java
public class Ewma { private final long tau; private volatile long stamp; private volatile double ewma; public Ewma(long halfLife, TimeUnit unit, double initialValue) { this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit); stamp = 0L; ewma = initialValue; } public synchronized void insert(double x) { long now = Clock.now(); double elapsed = Math.max(0, now - stamp); stamp = now; double w = Math.exp(-elapsed / tau); ewma = w * ewma + (1.0 - w) * x; } public synchronized void reset(double value) { stamp = 0L; ewma = value; } public double value() { return ewma; } @Override public String toString() { return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")"; } }
-
Ewma的構造器需要指定halfLife、timeunit、initialValue(
ewma初始值
)引數;ewma = wewma + (1.0 - w) x,其中x為當前值,w為權重 -
權重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed為距離上次計算的時間長度;tau(
希臘字母
)為EWMA的時間常量 - 這裡的tau = halfLife / Math.log(2)根據timeunit轉換後的值;其中halfLife引數,代表speed of convergence,即收斂的速度
RSocketSupplier
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java
public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable { private static final double EPSILON = 1e-4; private Supplier<Mono<RSocket>> rSocketSupplier; private final MonoProcessor<Void> onClose; private final long tau; private long stamp; private final Ewma errorPercentage; public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) { this.rSocketSupplier = rSocketSupplier; this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit); this.stamp = Clock.now(); this.errorPercentage = new Ewma(halfLife, unit, 1.0); this.onClose = MonoProcessor.create(); } public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) { this(rSocketSupplier, 5, TimeUnit.SECONDS); } @Override public double availability() { double e = errorPercentage.value(); if (Clock.now() - stamp > tau) { // If the window is expired artificially increase the availability double a = Math.min(1.0, e + 0.5); errorPercentage.reset(a); } if (e < EPSILON) { e = 0.0; } else if (1.0 - EPSILON < e) { e = 1.0; } return e; } private synchronized void updateErrorPercentage(double value) { errorPercentage.insert(value); stamp = Clock.now(); } @Override public Mono<RSocket> get() { return rSocketSupplier .get() .doOnNext(o -> updateErrorPercentage(1.0)) .doOnError(t -> updateErrorPercentage(0.0)) .map(AvailabilityAwareRSocketProxy::new); } @Override public void dispose() { onClose.onComplete(); } @Override public boolean isDisposed() { return onClose.isDisposed(); } @Override public Mono<Void> onClose() { return onClose; } private class AvailabilityAwareRSocketProxy extends RSocketProxy { public AvailabilityAwareRSocketProxy(RSocket source) { super(source); onClose.doFinally(signalType -> source.dispose()).subscribe(); } @Override public Mono<Void> fireAndForget(Payload payload) { return source .fireAndForget(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(v -> updateErrorPercentage(1.0)); } @Override public Mono<Payload> requestResponse(Payload payload) { return source .requestResponse(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(p -> updateErrorPercentage(1.0)); } @Override public Flux<Payload> requestStream(Payload payload) { return source .requestStream(payload) .doOnError(th -> errorPercentage.insert(0.0)) .doOnComplete(() -> updateErrorPercentage(1.0)); } @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return source .requestChannel(payloads) .doOnError(th -> errorPercentage.insert(0.0)) .doOnComplete(() -> updateErrorPercentage(1.0)); } @Override public Mono<Void> metadataPush(Payload payload) { return source .metadataPush(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(v -> updateErrorPercentage(1.0)); } @Override public double availability() { // If the window is expired set success and failure to zero and return // the child availability if (Clock.now() - stamp > tau) { updateErrorPercentage(1.0); } return source.availability() * errorPercentage.value(); } } }
- RSocketSupplier實現了Availability、Supplier、Closeable介面,其中它定義了errorPercentage變數,其型別為Ewma;如果沒有指定halfLife值,則RSocketSupplier預設halfLife為5秒,ewma的初始值為1.0
- RSocketSupplier定義了一個常量EPSILON = 1e-4,其availability方法會先計算availability,然後在距離上次計算時間stamp超過tau值時會重置errorPercentage;之後當availability小於EPSILON時返回0,當availability + EPSILON大於1時返回1.0
- updateErrorPercentage方法用於往ewma插入新值,同時更新stamp;get方法的doOnNext方法updateErrorPercentage值為1.0,doOnError方法updateErrorPercentage值為0.0;map會將RSocket轉換為AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy對目標RSocket進行代理,對相關方法的doOnError及doOnSuccess都織入errorPercentage的統計
小結
-
Moving Average有好幾種演算法,包括SMA(
Simple Moving Average
)、WMA(Weighted Moving Average
)、EMA(Exponentially Moving Average
)或EWMA(Exponentially Weighted Moving Average
) -
Ewma的構造器需要指定halfLife、timeunit、initialValue(
ewma初始值
)引數;ewma = wewma + (1.0 - w) x,其中x為當前值,w為權重;權重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed為距離上次計算的時間長度;tau(希臘字母
)為EWMA的時間常量;這裡的tau = halfLife / Math.log(2)根據timeunit轉換後的值;其中halfLife引數,代表speed of convergence,即收斂的速度 - rsocket load balancer使用了Ewma了統計服務的availability;其中RSocketSupplier實現了Availability、Supplier、Closeable介面,其中它定義了errorPercentage變數,其型別為Ewma;如果沒有指定halfLife值,則RSocketSupplier預設halfLife為5秒,ewma的初始值為1.0;RSocketSupplier的get方法會將RSocket轉換為AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy則會對目標RSocket進行代理,對相關方法的doOnError及doOnSuccess都織入errorPercentage的統計
doc
- Simple Moving Average - SMA Definition
- Weighted Moving Averages: The Basics
- Exponential Moving Average - EMA Definition
- How Is the Exponential Moving Average (EMA) Formula Calculated?
- Moving Average, Weighted Moving Average, and Exponential Moving Average
- Exploring the Exponentially Weighted Moving Average
- EWMA 移動平均模型
- rsocket EWMA