1. 程式人生 > >架構設計 | 線上防雪崩利器—熔斷器設計原理與實現

架構設計 | 線上防雪崩利器—熔斷器設計原理與實現

上週六,我負責的業務在凌晨00-04點的支付全部失敗了。

結果一查,MD,晚上銀行維護,下游支付系統沒有掛維護公告,在此期間一直請求維護中的銀行,當然所有返回就是失敗了,有種欲哭無淚的感覺,鍋讓業務來背。

為了杜絕在此出現這種大面積批量的支付失敗情況發生,保障系統的健壯性。我需要個在集中性異常的時候可以終止請求,當服務恢復,恢復請求。

我想了一些方式,最後,覺得熔斷器比較適合幹這種事情。

狀態模式

在狀態模式中,我們建立表示各種狀態的物件和一個行為隨著狀態物件改變而改變的 context 物件。

我們以一個開關為例:

package com.lpf.熔斷器;

public class Main
{ public static void main(String[] args) { Context context = new Context(); context.state = new CloseState(); context.switchState(); context.switchState(); context.switchState(); context.switchState(); context.switchState(); } } /** * 狀態上下文 */ class Context { public State state; void
switchState() { state.switchState(this); } } /** * 狀態的抽象 */ interface State { void switchState(Context context); } /** * 開狀態 **/ class OpenState implements State { public void switchState(Context context) { System.out.println("當前狀態:開"); context.state = new CloseState(); } } /** * 關狀態 **/
class CloseState implements State { public void switchState(Context context) { System.out.println("當前狀態:關"); context.state = new OpenState(); } }

在每一種狀態下,context不必關心每一種狀態下的行為。交給每一種狀態自己處理。

熔斷器基本原理

熔斷器是當依賴的服務已經出現故障時,為了保證自身服務的正常執行不再訪問依賴的服務,防止雪崩效應。 在這裡插入圖片描述

熔斷器本身就是一個狀態機:

  • 關閉狀態:熔斷器的初始化狀態,該狀態下允許請求通過。當失敗超過閥值,轉入開啟狀態
  • 開啟狀態:熔斷狀態,該狀態下不允許請求通過,當進入該狀態經過一段時間,進入半開狀態
  • 半開狀態:在半開狀態期間,允許部分請求通過,在半開期間,觀察失敗狀態是否超過閥值。如果沒有超過進入關閉狀態,如果超過了進入開啟狀態。如此往復。

之前,查了一些資料,網上所有的資料幾乎都是針對Hystrix的。這個只是針對分散式系統的介面請求,並不能運用於我們的系統中,因此這種情況下,根據原理自己實現了一個基本的分散式熔斷器,數值與計數器存放在redis中,因為redis的操作客戶端不一樣,我就以本地熔斷器為例,講解熔斷器實現。

希望我的文章能對於理解熔斷器,以及需要熔斷器的人有所幫助。

簡單的本地熔斷器實現

一個基本的本地熔斷器。 在這裡插入圖片描述

對外暴露介面
熔斷器對外暴露介面
/**
 * 熔斷器介面
 */
public interface CircuitBreaker {
    /**
     * 重置熔斷器
     */
    void reset();

    /**
     * 是否允許通過熔斷器
     */
    boolean canPassCheck();

    /**
     * 統計失敗次數
     */
    void countFailNum();
}
熔斷器狀態對外暴露介面
/**
 * 熔斷器狀態
 */
public interface CBState {
    /**
     * 獲取當前狀態名稱
     */
    String getStateName();

    /**
     * 檢查以及校驗當前狀態是否需要扭轉
     */
    void checkAndSwitchState(AbstractCircuitBreaker cb);

    /**
     * 是否允許通過熔斷器
     */
    boolean canPassCheck(AbstractCircuitBreaker cb);

    /**
     * 統計失敗次數
     */
    void countFailNum(AbstractCircuitBreaker cb);
}
三種狀態
關閉狀態
import com.lpf.熔斷器.cb.AbstractCircuitBreaker;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 熔斷器-關閉狀態
 */
public class CloseCBState implements CBState {

    /**
     * 進入當前狀態的初始化時間
     */
    private long stateTime = System.currentTimeMillis();

    /**
     * 關閉狀態,失敗計數器,以及失敗計數器初始化時間
     */
    private AtomicInteger failNum = new AtomicInteger(0);
    private long failNumClearTime = System.currentTimeMillis();

    public String getStateName() {
        // 獲取當前狀態名稱
        return this.getClass().getSimpleName();
    }

    public void checkAndSwitchState(AbstractCircuitBreaker cb) {
        // 閥值判斷,如果失敗到達閥值,切換狀態到開啟狀態
        long maxFailNum = Long.valueOf(cb.thresholdFailRateForClose.split("/")[0]);
        if (failNum.get() >= maxFailNum) {
            cb.setState(new OpenCBState());
        }
    }

    public boolean canPassCheck(AbstractCircuitBreaker cb) {
        // 關閉狀態,請求都應該允許通過
        return true;
    }

    public void countFailNum(AbstractCircuitBreaker cb) {
        // 檢查計數器是否過期了,否則重新計數
        long period = Long.valueOf(cb.thresholdFailRateForClose.split("/")[1]) * 1000;
        long now = System.currentTimeMillis();
        if (failNumClearTime + period <= now) {
            failNum.set(0);
        }
        // 失敗計數
        failNum.incrementAndGet();

        // 檢查是否切換狀態
        checkAndSwitchState(cb);
    }
}
開啟狀態
import com.lpf.熔斷器.cb.AbstractCircuitBreaker;

/**
 * 熔斷器-開啟狀態
 */
public class OpenCBState implements CBState {
	/**
	 * 進入當前狀態的初始化時間
	 */
	private long stateTime = System.currentTimeMillis();

	public String getStateName() {
		// 獲取當前狀態名稱
		return this.getClass().getSimpleName();
	}

	public void checkAndSwitchState(AbstractCircuitBreaker cb) {
		// 開啟狀態,檢查等待時間是否已到,如果到了就切換到半開狀態
		long now = System.currentTimeMillis();
		long idleTime = cb.thresholdIdleTimeForOpen * 1000L;
		if (stateTime + idleTime <= now) {
			cb.setState(new HalfOpenCBState());
		}
	}

	public boolean canPassCheck(AbstractCircuitBreaker cb) {
		// 檢測狀態
		checkAndSwitchState(cb);
		return false;
	}

	public void countFailNum(AbstractCircuitBreaker cb) {
		// nothing
	}
}
半開狀態
import com.lpf.熔斷器.cb.AbstractCircuitBreaker;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 熔斷器-半開狀態
 */
public class HalfOpenCBState implements CBState {

	/**
	 * 進入當前狀態的初始化時間
	 */
	private long stateTime = System.currentTimeMillis();

	/**
	 * 半開狀態,失敗計數器
	 */
	private AtomicInteger failNum = new AtomicInteger(0);

	/**
	 * 半開狀態,允許通過的計數器
	 */
	private AtomicInteger passNum = new AtomicInteger(0);

	public String getStateName() {
		// 獲取當前狀態名稱
		return this.getClass().getSimpleName();
	}

	public void checkAndSwitchState(AbstractCircuitBreaker cb) {
		// 判斷半開時間是否結束
		long idleTime = Long.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[1]) * 1000L;
		long now = System.currentTimeMillis();
		if (stateTime + idleTime <= now) {
			// 如果半開狀態已結束,失敗次數是否超過了閥值
			int maxFailNum = cb.thresholdFailNumForHalfOpen;
			if (failNum.get() >= maxFailNum) {
				// 失敗超過閥值,認為服務沒有恢復,重新進入熔斷開啟狀態
				cb.setState(new OpenCBState());
			} else {
				// 沒超過,認為服務恢復,進入熔斷關閉狀態
				cb.setState(new CloseCBState());
			}
		}
	}

	public boolean canPassCheck(AbstractCircuitBreaker cb) {
		// 檢查是否切換狀態
		checkAndSwitchState(cb);

		// 超過了閥值,不再放量
		int maxPassNum = Integer.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[0]);
		if (passNum.get() > maxPassNum) {
			return false;
		}
		// 檢測是否超過了閥值
		if (passNum.incrementAndGet() <= maxPassNum) {
			return true;
		}
		return false;
	}

	public void countFailNum(AbstractCircuitBreaker cb) {
		// 失敗計數
		failNum.incrementAndGet();

		// 檢查是否切換狀態
		checkAndSwitchState(cb);
	}
}
熔斷器
抽象熔斷器
import com.lpf.熔斷器.state.CBState;
import com.lpf.熔斷器.state.CloseCBState;

/**
 * 基礎熔斷器
 */
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
	/**
	 * 熔斷器當前狀態
	 */
	private volatile CBState state = new CloseCBState();

	/**
	 * 在熔斷器關閉的情況下,在多少秒內失敗多少次進入,熔斷開啟狀態(預設10分鐘內,失敗10次進入開啟狀態)
	 */
	public String thresholdFailRateForClose = "10/600";

	/**
	 * 在熔斷器開啟的情況下,熔斷多少秒進入半開狀態,(預設熔斷30分鐘)
	 */
	public int thresholdIdleTimeForOpen = 1800;

	/**
	 * 在熔斷器半開的情況下, 在多少秒內放多少次請求,去試探(預設10分鐘內,放10次請求)
	 */
	public String thresholdPassRateForHalfOpen = "10/600";

	/**
	 * 在熔斷器半開的情況下, 試探期間,如果有超過多少次失敗的,重新進入熔斷開啟狀態,否者進入熔斷關閉狀態。
	 */
	public int thresholdFailNumForHalfOpen = 1;

	public CBState getState() {
		return state;
	}

	public void setState(CBState state) {
		// 當前狀態不能切換為當前狀態
		CBState currentState = getState();
		if (currentState.getStateName().equals(state.getStateName())) {
			return;
		}

		// 多執行緒環境加鎖
		synchronized (this) {
			// 二次判斷
			currentState = getState();
			if (currentState.getStateName().equals(state.getStateName())) {
				return;
			}

			// 更新狀態
			this.state = state;
			System.out.println("熔斷器狀態轉移:" + currentState.getStateName() + "->" + state.getStateName());
		}
	}
}
本地熔斷器
import com.lpf.熔斷器.state.CloseCBState;

/**
 * 本地熔斷器(把它當成了工廠了)
 */
public class LocalCircuitBreaker extends AbstractCircuitBreaker {
	
	public LocalCircuitBreaker(String failRateForClose, int idleTimeForOpen, String passRateForHalfOpen, int failNumForHalfOpen) {
		this.thresholdFailRateForClose = failRateForClose;
		this.thresholdIdleTimeForOpen = idleTimeForOpen;
		this.thresholdPassRateForHalfOpen = passRateForHalfOpen;
		this.thresholdFailNumForHalfOpen = failNumForHalfOpen;
	}

	public void reset() {
		this.setState(new CloseCBState());
	}

	public boolean canPassCheck() {
		return getState().canPassCheck(this);
	}

	public void countFailNum() {
		getState().countFailNum(this);
	}
}
測試例子
import com.lpf.熔斷器.cb.CircuitBreaker;
import com.lpf.熔斷器.cb.LocalCircuitBreaker;

import java.util.Random;
import java.util.concurrent.CountDownLatch;


public class App {

	public static void main(String[] args) throws InterruptedException {
		final int maxNum = 200;
		final CountDownLatch countDownLatch = new CountDownLatch(maxNum);

		final CircuitBreaker circuitBreaker = new LocalCircuitBreaker("5/20", 10, "5/10", 2);

		for (int i = 0; i < maxNum; i++) {
			new Thread(new Runnable() {
				public void run() {
					try {
						// 模擬隨機請求
						Thread.sleep(new Random().nextInt(20) * 1000);
						// 過熔斷器
						if (circuitBreaker.canPassCheck()) {
							// do something
							System.out.println("正常業務邏輯操作");

							// 模擬後期的服務恢復狀態
							if (countDownLatch.getCount() >= maxNum / 2) {
								// 模擬隨機失敗
								if (new Random().nextInt(2) == 1) {
									throw new Exception("mock error");
								}
							}
						} else {
							System.out.println("攔截業務邏輯操作");
						}
					} catch (Exception e) {
						System.out.println("業務執行失敗了");
						// 熔斷器計數器
						circuitBreaker.countFailNum();
					}
					countDownLatch.countDown();
				}
			}).start();

			// 模擬隨機請求
			Thread.sleep(new Random().nextInt(5) * 100);
		}

		countDownLatch.await();
		System.out.println("end");
	}
}
結果
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
業務執行失敗了
正常業務邏輯操作
正常業務邏輯操作
業務執行失敗了
正常業務邏輯操作
正常業務邏輯操作
業務執行失敗了
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
業務執行失敗了
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
業務執行失敗了
熔斷器狀態轉移:CloseCBState->OpenCBState
攔截業務邏輯操作
攔截業務邏輯操作
攔截業務邏輯操作
......
攔截業務邏輯操作
攔截業務邏輯操作
熔斷器狀態轉移:OpenCBState->HalfOpenCBState
攔截業務邏輯操作
正常業務邏輯操作
業務執行失敗了
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
業務執行失敗了
正常業務邏輯操作
業務執行失敗了
攔截業務邏輯操作
攔截業務邏輯操作
......
攔截業務邏輯操作
熔斷器狀態轉移:HalfOpenCBState->OpenCBState
攔截業務邏輯操作
攔截業務邏輯操作
......
攔截業務邏輯操作
熔斷器狀態轉移:OpenCBState->HalfOpenCBState
攔截業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
攔截業務邏輯操作
攔截業務邏輯操作
......
攔截業務邏輯操作
熔斷器狀態轉移:HalfOpenCBState->CloseCBState
攔截業務邏輯操作
正常業務邏輯操作
正常業務邏輯操作
......
正常業務邏輯操作
end

參考文章: