1. 程式人生 > >Spring Cloud2.1-Ribbon核心源碼(四)

Spring Cloud2.1-Ribbon核心源碼(四)

strac isalive timeunit lms gets mod creates ide cati

Ribbon負載均衡相關類

AbstractloadBalancer

ILoadBalancer的抽象實現類

public abstract class AbstractLoadBalancer implements ILoadBalancer {

    //服務實例分組枚舉
    //• ALL: 所有服務實例。
    //• STATUS_UP: 正常服務的實例。
    //• STATUS_NOT_UP: 停止服務的實例。
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP
    }
    
//再根據負載均衡器選擇服務實例時忽略key public Server chooseServer() { return chooseServer(null); } //定義了根據分組類型來獲取 不同的服務實例的列表。 public abstract List<Server> getServerList(ServerGroup serverGroup); //對象被用來存儲負載均衡器中各個 服務實例當前的屬性和 統計信息。 這些信息非常有用, 我們可以利用這些信息來觀察負載均衡器的運行情 //況, 同時這些信息也是用來制定負載均衡策略的重要依據。
public abstract LoadBalancerStats getLoadBalancerStats(); }

BaseloadBalancer

BaseLoadBalancer 類是和ribbon 負載均衡器的基礎實現類,在該類中定義了很多關 於負載均衡器相關的基礎內容。

public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnectionListener, IClientConfigAware {
    private
static Logger logger = LoggerFactory.getLogger(BaseLoadBalancer.class); private static final IRule DEFAULT_RULE = new RoundRobinRule(); private static final BaseLoadBalancer.SerialPingStrategy DEFAULT_PING_STRATEGY = new BaseLoadBalancer.SerialPingStrategy(); private static final String DEFAULT_NAME = "default"; private static final String PREFIX = "LoadBalancer_"; //BaseLoadBalancer服務選擇是委托給IRule 這個接口表示負載均衡選擇策略 protected IRule rule; //使用IPing檢查服務是否有效的執行對象 內部使用線性輪訓 默認實現類內部類 SerialPingStrategy protected IPingStrategy pingStrategy; //用於檢查服務是否有效 默認為空需要註入 protected IPing ping; //維護所有服務 @Monitor(name = "LoadBalancer_AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList; //維護有效服務 @Monitor(name = "LoadBalancer_UpServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> upServerList; ...... public BaseLoadBalancer() { this.rule = DEFAULT_RULE;//默認使用new RoundRobinRule() this.pingStrategy = DEFAULT_PING_STRATEGY; this.ping = null; this.allServerList = Collections.synchronizedList(new ArrayList()); this.upServerList = Collections.synchronizedList(new ArrayList()); ..... this.ping = null; this.setRule(DEFAULT_RULE); this.setupPingTask(); } //將服務添加到服務清單 public void addServer(Server newServer) { if (newServer != null) { try { ArrayList<Server> newList = new ArrayList(); newList.addAll(this.allServerList); newList.add(newServer); this.setServersList(newList); } catch (Exception var3) { logger.error("LoadBalancer [{}]: Error adding newServer {}", new Object[]{this.name, newServer.getHost(), var3}); } } } //將服務添加到服務清單 public void addServers(List<Server> newServers) { if (newServers != null && newServers.size() > 0) { try { ArrayList<Server> newList = new ArrayList(); newList.addAll(this.allServerList); newList.addAll(newServers); this.setServersList(newList); } catch (Exception var3) { logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var3); } } } //將服務添加到服務清單 void addServers(Object[] newServers) { if (newServers != null && newServers.length > 0) { try { ArrayList<Server> newList = new ArrayList(); newList.addAll(this.allServerList); Object[] var3 = newServers; int var4 = newServers.length; for (int var5 = 0; var5 < var4; ++var5) { Object server = var3[var5]; if (server != null) { if (server instanceof String) { server = new Server((String) server); } if (server instanceof Server) { newList.add((Server) server); } } } this.setServersList(newList); } catch (Exception var7) { logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var7); } } } //獲得所有服務 public List<Server> getAllServers() { return Collections.unmodifiableList(this.allServerList); } //根據組獲得服務 public List<Server> getServerList(ServerGroup serverGroup) { switch (serverGroup) { case ALL: return this.allServerList; case STATUS_UP: return this.upServerList; case STATUS_NOT_UP: ArrayList<Server> notAvailableServers = new ArrayList(this.allServerList); ArrayList<Server> upServers = new ArrayList(this.upServerList); notAvailableServers.removeAll(upServers); return notAvailableServers; default: return new ArrayList(); } } //根據實例id獲得服務 public Server chooseServer(Object key) { if (this.counter == null) { this.counter = this.createCounter(); } this.counter.increment(); if (this.rule == null) { return null; } else { try { return this.rule.choose(key); } catch (Exception var3) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3}); return null; } } } //標記服務無效 public void markServerDown(Server server) { if (server != null && server.isAlive()) { logger.error("LoadBalancer [{}]: markServerDown called on [{}]", this.name, server.getId()); server.setAlive(false); this.notifyServerStatusChangeListener(Collections.singleton(server)); } } } }

DynamicServerlistloadBalancer

繼承BaseloadBalancer 是對BaseloadBalancer擴展 實現了再服務運行期間動態更新的能力 還增加通過過濾器 選擇性的過濾一些服務的功能

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;

    // to keep track of modification of server lists
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);

    //用於更新服務清單 由EurekaRibbonClientConfiguration.ribbonServerList 創建DiscoveryEnabledNIWSServerList
    volatile ServerList<T> serverListImpl;
    //從註冊中心獲取到服務後更新本地服務
    protected volatile ServerListUpdater serverListUpdtater;
    //通過serverListImpl.getUpdatedListOfServers執行更新操作
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
}

ServerList<T>

技術分享圖片

public interface ServerList<T extends Server> {

    //於獲取初始化的服務實例 清單,
    public List<T> getInitialListOfServers();
    //用於獲取更新的服務實例清單 
    public List<T> getUpdatedListOfServers();

}

DiscoveryEnabledNIWSServerList

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer> {

    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        return this.obtainServersViaDiscovery();
    }

    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        return this.obtainServersViaDiscovery();
    }

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList();
        if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
            EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
            if (this.vipAddresses != null) {
                //可以理解為邏輯服務名字 比如PROVIDER
                String[] var3 = this.vipAddresses.split(",");
                int var4 = var3.length;

                for(int var5 = 0; var5 < var4; ++var5) {
                    String vipAddress = var3[var5];
                    //通過EurekaClient從註冊中心拉取服務
                    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
                    Iterator var8 = listOfInstanceInfo.iterator();

                    while(var8.hasNext()) {
                        InstanceInfo ii = (InstanceInfo)var8.next();
                        if (ii.getStatus().equals(InstanceStatus.UP)) {
                            if (this.shouldUseOverridePort) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
                                }

                                InstanceInfo copy = new InstanceInfo(ii);
                                if (this.isSecure) {
                                    ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
                                } else {
                                    ii = (new Builder(copy)).setPort(this.overridePort).build();
                                }
                            }

                            DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
                            serverList.add(des);
                        }
                    }

                    if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
                        break;
                    }
                }
            }
            //最終返回拉取的服務
            return serverList;
        } else {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList();
        }
    }


}

ServerListUpdater

技術分享圖片

public interface ServerListUpdater {
    public interface UpdateAction {
        void doUpdate();
    }
    //啟動服務更新器, 傳入的UpdadataAction對象為更新操作的具體實現。
    void start(UpdateAction updateAction);

    //停止服務更新器
    void stop();

    ////荻取最近的更新時間戳
    String getLastUpdate();

    //獲取上一次更新到現在的時間間隔,單位為毫秒
    long getDurationSinceLastUpdateMs();

    ////荻取錯過的更新周期數
    int getNumberMissedCycles();

    ////荻取核心線程數
    int getCoreThreads();
}

PollingServerListUpdater動態更新服務的默認策略 采用定時任務

EurekaNotificationServerListUpdater 也是動態更新服務但是它利用 Eureka 的事件監聽器來驅動服務列表的更新操作。

PollingServerListUpdater

public class PollingServerListUpdater implements ServerListUpdater {
    //是DynamicServerListlodBalancer的成員變量傳入
    public synchronized void start(final UpdateAction updateAction) {
        if (this.isActive.compareAndSet(false, true)) {
            Runnable wrapperRunnable = new Runnable() {
                public void run() {
                    if (!PollingServerListUpdater.this.isActive.get()) {
                        if (PollingServerListUpdater.this.scheduledFuture != null) {
                            PollingServerListUpdater.this.scheduledFuture.cancel(true);
                        }

                    } else {
                        try {
                            //外部傳入拉取服務 調用的DynamicServerListLoadBalancer.updateAction.doUpdate()   這個doupdate裏面調用serverListImpl.getUpdatedListOfServers();拉取服務
                            updateAction.doUpdate();
                            PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
                        } catch (Exception var2) {
                            PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
                        }

                    }
                }
            };
            //開啟定時任務
            this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
        } else {
            logger.info("Already active, no-op");
        }

    }

}

ServerListFilter

DynamicServerlistloadBalancer 最終拉取服務的方法 涉及到一個filter

@VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                //對應成員變量volatile ServerListFilter<T> filter;
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

public interface ServerListFilter<T extends Server> {
    //主要用於實現對服務實例列表的過濾, 通過傳入的 服務實例清單, 根據 一 些規則返回過濾後的服務實例清單
    public List<T> getFilteredListOfServers(List<T> servers);
}

技術分享圖片

AbstractServerListFilter

這是一 個抽象過濾器,在這裏定義了過濾時需要 的一個重要依據對象 LoadBalancerStats, LoadBalancerStats保存了負載均衡器的屬性和統計信息

public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
    //保存了負載均衡器的屬性和統計信息
    private volatile LoadBalancerStats stats;

    public AbstractServerListFilter() {
    }

    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }

    public LoadBalancerStats getLoadBalancerStats() {
        return this.stats;
    }
}

ZoneAffinityServerListFilter

該過濾器基於 “ 區域感知 (Zone Affinity)" 的方式實現服務實例的過濾, 也就是說, 它會根據提供服務的實例所處的區域 (Zone) 與消費者自身的所處區域 (Zone) 進行比較, 過濾掉那些不是同處 一 個區

域的實例。

public class ZoneAffinityServerListFilter<T extends Server> extends AbstractServerListFilter<T> implements IClientConfigAware {
    private volatile boolean zoneAffinity;
    private volatile boolean zoneExclusive;

    public List<T> getFilteredListOfServers(List<T> servers) {
        if (this.zone != null && (this.zoneAffinity || this.zoneExclusive) && servers != null && servers.size() > 0) {
            //Iterables.filter實現過濾
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            //判斷是否要啟用區域感知功能 調用下面shouldEnableZoneAffinity方法
            if (this.shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            }

            if (this.zoneAffinity) {
                this.overrideCounter.increment();
            }
        }

        return servers;
    }
    private boolean shouldEnableZoneAffinity(List<T> filtered) {
        if (!this.zoneAffinity && !this.zoneExclusive) {
            return false;
        } else if (this.zoneExclusive) {
            return true;
        } else {
            //保存了負載均衡器的相關統計信息
            LoadBalancerStats stats = this.getLoadBalancerStats();
            if (stats == null) {
                return this.zoneAffinity;
            } else {
                logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
                //取這些過濾後的同區域實例的基礎指標(包含實例數量、斷路器斷開數、 活動請求數、 實例平均負載等),
                ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
                double loadPerServer = snapshot.getLoadPerServer();
                int instanceCount = snapshot.getInstanceCount();
                int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
                /**
                 * 當以下其中一個滿足就不啟用區域感知
                 *  blackOutServerPercentage: 故障實例百分比(斷路器斷開數/實例數量) >=0.8。
                 *  activeReqeustsPerServer: 實例平均負載 >=0.6 。
                 * availableServers: 可用實例數(實例數量 - 斷路器斷開數) <2。
                 */
                if ((double)circuitBreakerTrippedCount / (double)instanceCount < this.blackOutServerPercentageThreshold.get() && loadPerServer < this.activeReqeustsPerServerThreshold.get() && instanceCount - circuitBreakerTrippedCount >= this.availableServersThreshold.get()) {
                    return true;
                } else {
                    logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[]{(double)circuitBreakerTrippedCount / (double)instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
                    return false;
                }
            }
        }
    }


}

DefaultNIWSServerListFilter

完全繼承ZoneAffinityServerListFilter 是默認的過濾器

public class DefaultNIWSServerListFilter<T extends Server> extends ZoneAffinityServerListFilter<T> {
    public DefaultNIWSServerListFilter() {
    }
}

ServerListSubsetFilter

技術分享圖片

ZonePreferenceServerListFilter

/**
 * eureka和ribbon整合額默認過濾器
 */
public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {
    private String zone;
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        //通過父類的區域感知獲得所有服務列表
        List<Server> output = super.getFilteredListOfServers(servers);
        //如果消費者配置了zone
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList();
            Iterator var4 = output.iterator();

            //遍歷服務列表剔除跟消費者配置zone不一致的服務
            while(var4.hasNext()) {
                
                Server server = (Server)var4.next();
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }

            if (!local.isEmpty()) {
                return local;
            }
        }

        return output;
    }
}

ZoneAwareLoadBalancer

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {


    //重寫了父類的方法
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        //父類會獲獲得所有服務 並根據zone進行分組 每個zone對應一個Zonestats
        super.setServerListForZones(zoneServersMap);
        //創建一個自己的ConcurrentHashMap 存儲對應zone的負載均衡器
        if (this.balancers == null) {
            this.balancers = new ConcurrentHashMap();
        }
         //獲得服務的叠代器
        Iterator var2 = zoneServersMap.entrySet().iterator();

        Entry existingLBEntry;
        while(var2.hasNext()) {
            existingLBEntry = (Entry)var2.next();
            String zone = ((String)existingLBEntry.getKey()).toLowerCase();
            //內部管理根據balancers 獲得對應的負載均衡器(內部會創建對應的IRole規則 沒有指定的話 默認Availability­ FilieringRule) 將對應的zone的服務放到對應的負載均衡器
            this.getLoadBalancer(zone).setServersList((List)existingLBEntry.getValue());
        }
        //遍歷負載均衡器
        var2 = this.balancers.entrySet().iterator();
        while(var2.hasNext()) {
            existingLBEntry = (Entry)var2.next();
            //檢查對應的zone下面是否沒有服務了 如果是的話就清空
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                ((BaseLoadBalancer)existingLBEntry.getValue()).setServersList(Collections.emptyList());
            }
        }

    }

    //重寫父類的chooseServer
    public Server chooseServer(Object key) {
        //當負載均衡器中的zone大於1的時候才執行自定義策略 否則還是用父類的
        if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
            Server server = null;

            try {
                LoadBalancerStats lbStats = this.getLoadBalancerStats();
                //為當前負載均衡器的所有zone創建快照
                Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                logger.debug("Zone snapshots: {}", zoneSnapshot);
                if (this.triggeringLoad == null) {
                    this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D);
                }

                if (this.triggeringBlackoutPercentage == null) {
                    this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D);
                }
                /**
                 * 對應zone的可用區的選擇
                 * . 首先它會剔除符合 這些規則的 Zone區域: 所屬實例數為零的 Zone 區域; Zone 區域內實例的平均負載小千零,或者實例故障率( 斷路器斷開次數/實例數)大 於 等於闕值(默認為0.99999)。
                 * . 然後根據Zone區域的實例平均負載計算出最差的Zone區域,這裏的最差指的是 實例平均負載最高的Zone區域。
                 * . 如果在上面的過程中沒有符合剔除要求的區域,同時實例最大平均負載小千閭值 (默認為20%), 就直接返回所有Zone區域為可用區域。 否則,從最壞Zone區
                 * 域集合中隨機選擇 一 個,將它從可用Zone區域集合中 剔除。
                 */
                Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());
                logger.debug("Available zones: {}", availableZones);
                //當獲取的zone區域不等於空 並且小於總數
                if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
                    //隨機選擇一個
                    String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                    logger.debug("Zone chosen: {}", zone);
                    if (zone != null) {
                        BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone);
                        //選用具體的服務實例  內部使用ZoneAvoidanceRule選擇
                        server = zoneLoadBalancer.chooseServer(key);
                    }
                }
            } catch (Exception var8) {
                logger.error("Error choosing server using zone aware logic for load balancer={}", this.name, var8);
            }

            if (server != null) {
                return server;
            } else {
                logger.debug("Zone avoidance logic is not invoked.");
                return super.chooseServer(key);
            }
        } else {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
    }

    @VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = (BaseLoadBalancer)this.balancers.get(zone);
        if (loadBalancer == null) {
            IRule rule = this.cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = (BaseLoadBalancer)this.balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        }

        return loadBalancer;
    }

Spring Cloud2.1-Ribbon核心源碼(四)