1. 程式人生 > >服務限流-令牌桶java實現

服務限流-令牌桶java實現

此文非常不錯,抄自:

https://www.cnblogs.com/googlemeoften/p/6020718.html

a. 按特定的速率向令牌桶投放令牌

b. 根據預設的匹配規則先對報文進行分類,不符合匹配規則的報文不需要經過令牌桶的處理,直接傳送;

c. 符合匹配規則的報文,則需要令牌桶進行處理。當桶中有足夠的令牌則報文可以被繼續傳送下去,同時令牌桶中的令牌 量按報文的長度做相應的減少;

d. 當令牌桶中的令牌不足時,報文將不能被髮送,只有等到桶中生成了新的令牌,報文才可以傳送。這就可以限制報文的流量只能是小於等於令牌生成的速度,達到限制流量的目的。

保留學習

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * <pre> * Created by inter12 on 15-3-18. * </pre> */ public class TokenBucket { // 預設桶大小個數 即最大瞬間流量是64M private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64; // 一個桶的單位是1位元組 private
int everyTokenSize = 1; // 瞬間最大流量 private int maxFlowRate; // 平均流量 private int avgFlowRate; // 佇列來快取桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 // * 64 private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE); private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private volatile boolean isStart = false; private ReentrantLock lock = new ReentrantLock(true); private static final byte A_CHAR = 'a'; public TokenBucket() { } public TokenBucket(int maxFlowRate, int avgFlowRate) { this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) { this.everyTokenSize = everyTokenSize; this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public void addTokens(Integer tokenNum) { // 若是桶已經滿了,就不再加入新的令牌 for (int i = 0; i < tokenNum; i++) { tokenQueue.offer(Byte.valueOf(A_CHAR)); } } public TokenBucket build() { start(); return this; } /** * 獲取足夠的令牌個數 * * @return */ public boolean getTokens(byte[] dataSize) { // Preconditions.checkNotNull(dataSize); // Preconditions.checkArgument(isStart, // "please invoke start method first !"); int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數 final ReentrantLock lock = this.lock; lock.lock(); try { boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量 if (!result) { return false; } int tokenCount = 0; for (int i = 0; i < needTokenNum; i++) { Byte poll = tokenQueue.poll(); if (poll != null) { tokenCount++; } } return tokenCount == needTokenNum; } finally { lock.unlock(); } } public void start() { // 初始化桶佇列大小 if (maxFlowRate != 0) { tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate); } // 初始化令牌生產者 TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this); scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS); isStart = true; } public void stop() { isStart = false; scheduledExecutorService.shutdown(); } public boolean isStarted() { return isStart; } class TokenProducer implements Runnable { private int avgFlowRate; private TokenBucket tokenBucket; public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) { this.avgFlowRate = avgFlowRate; this.tokenBucket = tokenBucket; } @Override public void run() { tokenBucket.addTokens(avgFlowRate); } } public static TokenBucket newBuilder() { return new TokenBucket(); } public TokenBucket everyTokenSize(int everyTokenSize) { this.everyTokenSize = everyTokenSize; return this; } public TokenBucket maxFlowRate(int maxFlowRate) { this.maxFlowRate = maxFlowRate; return this; } public TokenBucket avgFlowRate(int avgFlowRate) { this.avgFlowRate = avgFlowRate; return this; } private String stringCopy(String data, int copyNum) { StringBuilder sbuilder = new StringBuilder(data.length() * copyNum); for (int i = 0; i < copyNum; i++) { sbuilder.append(data); } return sbuilder.toString(); } public static void main(String[] args) throws IOException, InterruptedException { TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build(); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("D:/ds_test"))); String data = "xxxx";// 四個位元組 for (int i = 1; i <= 1000; i++) { Random random = new Random(); int i1 = random.nextInt(100); boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes()); TimeUnit.MILLISECONDS.sleep(100); if (tokens) { bufferedWriter.write("token pass --- index:" + i1); System.out.println("token pass --- index:" + i1); } else { bufferedWriter.write("token rejuect --- index" + i1); System.out.println("token rejuect --- index" + i1); } bufferedWriter.newLine(); bufferedWriter.flush(); } bufferedWriter.close(); } }