學習一下 SpringCloud (三)-- 服務呼叫、負載均衡 Ribbon、OpenFeign
(1) 相關博文地址:
學習一下 SpringCloud (一)-- 從單體架構到微服務架構、程式碼拆分(maven 聚合): https://www.cnblogs.com/l-y-h/p/14105682.html 學習一下 SpringCloud (二)-- 服務註冊中心 Eureka、Zookeeper、Consul、Nacos :https://www.cnblogs.com/l-y-h/p/14193443.html
(2)程式碼地址:
https://github.com/lyh-man/SpringCloudDemo
一、引入 服務呼叫、負載均衡
1、問題 與 解決
【問題:】 在上一篇中,介紹了 Eureka、Zookeeper、Consul 作為註冊中心,並使用 RestTemplate 進行服務呼叫。 詳見:https://www.cnblogs.com/l-y-h/p/14193443.html 那麼是如何進行負載均衡的呢? 【解決:】 在 @Bean 宣告 RestTemplate 時,新增一個 @LoadBalanced,並使用 註冊中心中 的服務名 作為 RestTemplate 的 URL 地址(ip、埠號)。 就這麼簡單的兩步,即可實現了負載均衡。 那麼這裡面又涉及到什麼技術知識呢?能不能更換負載均衡策略?能不能自定義負載均衡策略? 常用技術: Ribbon(維護狀態,替代產品為 Loadbalancer) OpenFeign(推薦使用) 【說明:】 此處以 Eureka 偽叢集版建立的幾個模組作為演示。程式碼地址:https://github.com/lyh-man/SpringCloudDemo 服務註冊中心:eureka_server_7001、eureka_server_7002、eureka_server_7003 服務提供者:eureka_client_producer_8002、eureka_client_producer_8003、eureka_client_producer_8004 服務消費者:eureka_client_consumer_9002 注: 主要還是在 服務消費者 上配置負載均衡策略(可以 Debug 模式啟動看看執行流程),其他模組直接啟動即可。
二、服務呼叫、負載均衡 -- Ribbon
1、什麼是 Ribbon?
【Ribbon:】 Ribbon 是 Netflix 公司實現的一套基於 HTTP、TCP 的客戶端負載均衡的工具。 SpringCloud 已將其整合到 spring-cloud-netflix 中,實現 SpringCloud 的服務呼叫、負載均衡。 Ribbon 提供了多種方式進行負載均衡(預設輪詢),也可以自定義負載均衡方法。 注: Ribbon 雖然已進入維護模式,但是一時半會還不容易被完全淘汰,還是可以學習一下基本使用的。 Ribbon 替代產品是 Loadbalancer。 【相關網址:】 https://github.com/Netflix/ribbon http://jvm123.com/doc/springcloud/index.html#spring-cloud-ribbon
2、Ribbon 與 Nginx 負載均衡區別
【負載均衡(Load Balance):】 負載均衡指的是 將工作任務 按照某種規則 平均分攤到 多個操作單元上執行。 注: Web 專案的負載均衡,可以理解為:將使用者請求 平均分攤到 多個伺服器上處理,從而提高系統的併發度、可用性。 【負載均衡分類:】 按照軟硬體劃分: 硬體負載均衡: 一般造價昂貴,但資料傳輸更加穩定。比如: F5 負載均衡。 軟體負載均衡: 一般採用某個代理元件,並使用 某種 負載均衡 演算法實現(一種訊息佇列分發機制)。比如:Nginx、Ribbon。 按照負載均衡位置劃分: 集中式負載均衡:提供一個 獨立的 負載均衡系統(可以是軟體,比如:Nginx,可以是硬體,比如:F5)。 通過此係統,將服務消費者的 請求 通過某種負載均衡策略 轉發給 服務提供者。 客戶端負載均衡(程序式負載均衡):將負載均衡邏輯整合到 服務消費者中,服務消費者 定時同步獲取到 服務提供者資訊,並儲存在本地。 每次均從本地快取中取得 服務提供者資訊,並根據 某種負載均衡策略 將請求發給 服務提供者。 注: 使用集中式負載均衡時,服務消費者 不知道 任何一個服務提供者的資訊,只知道獨立負載均衡裝置的資訊。 使用客戶端負載均衡時,服務消費者 知道 所有服務提供者的資訊。 【Nginx 負載均衡:】 Nginx 實現的是 集中式負載均衡,Nginx 接收 客戶端所有請求,並將請求轉發到不同的伺服器進行處理。 【Ribbon 負載均衡:】 Ribbon 實現的是 客戶端負載均衡,從註冊中心獲得服務資訊並快取在本地,在本地進行 負載均衡。
3、更換 Ribbon 負載均衡規則(兩種方式)
(1)引入依賴
【依賴:】 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-ribbon</artifactId> </dependency>
一般使用 Ribbon 時需要引入上述依賴,但是對於 eureka 來說,其 eureka-client 依賴中已經集成了 ribbon 依賴,所以無需再次引入。
(2)Ribbon 提供的幾種負載均衡演算法
Ribbon 提供了 IRule 介面,通過其可以設定並更換負載均衡規則。
IRule 實質就是 根據某種負載均衡規則,從服務列表中選取一個需要訪問的服務。
一般預設使用 ZoneAvoidanceRule + RoundRobinRule。
【IRule 子類如下:】 RoundRobinRule 輪詢,按照服務列表順序 迴圈選擇服務。 RandomRule 隨機,隨機的從服務列表中選取服務。 RetryRule 重試,先按照輪詢策略獲取服務,若獲取失敗,則在指定時間進行重試,重新獲取可用服務。 WeightedResponseTimeRule 加權響應時間,響應時間越低(即響應時間快),權重越高,越容易被選擇。剛開始啟動時,使用輪詢策略。 BestAvailableRule 高可用,先過濾掉不可用服務(多次訪問故障而處於斷路器跳閘的服務),選擇一個併發量最小的服務。 AvailabilityFilteringRule 可用篩選,先過濾掉不可用服務 以及 併發量超過閾值的服務,對剩餘服務按輪詢策略訪問。 ZoneAvoidanceRule 區域迴避,預設規則,綜合判斷服務所在區域的效能 以及 服務的可用性,過濾結果後採用輪詢的方式選擇結果。 【IRule:】 package com.netflix.loadbalancer; public interface IRule { Server choose(Object var1); void setLoadBalancer(ILoadBalancer var1); ILoadBalancer getLoadBalancer(); }
以 Dubug 模式 啟動 eureka_client_consumer_9002,並在 IRule 介面 實現類的 choose() 方法上打上斷點,傳送請求時,將會進入斷點,此時可以看到執行的 負載均衡規則。
不停的重新整理頁面,可以看到請求以輪詢的方式被 服務提供者 處理。
(3)替換負載均衡規則(方式一:新建配置類)
【步驟一:】 新建一個配置類(該類不能被 @ComponentScan 掃描到,即不能與 啟動類 在同一個包下),並定義規則。 比如: package com.lyh.springcloud.customize; import com.netflix.loadbalancer.IRule; import com.netflix.loadbalancer.RandomRule; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class CustomizeLoadBalanceRule { @Bean public IRule customizeRule() { return new RandomRule(); } } 【步驟二:】 在啟動類上新增 @RibbonClient 註解,並指定服務名 以及 規則。 比如: @RibbonClient(name = "EUREKA-CLIENT-PRODUCER", configuration = CustomizeLoadBalanceRule.class)
不停的重新整理頁面,可以看到請求以隨機的方式被 服務提供者 處理,而非輪詢。
(4)替換負載均衡規則(方式二:修改配置檔案)
在服務消費者 配置檔案中 根據 服務提供者 服務名,
通過 ribbon.NFLoadBalancerRuleClassName 指定負載均衡策略。
注:
在後面的 OpenFeign 的使用中進行演示。
【舉例:】 EUREKA-CLIENT-PRODUCER: # 服務提供者的服務名 ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
4、輪詢原理(RoundRobinRule)
(1)相關原始碼
主要就是 choose()、incrementAndGetModulo() 這兩個方法。
choose() 用於選擇 server 服務。
incrementAndGetModulo() 用來決定 選擇哪個服務,返回服務下標。
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } private AtomicInteger nextServerCyclicCounter; private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } }
(2)程式碼分析
【choose():】 初始進入 choose() 方法,server 為 null 表示服務不存在,count 為 0 表示屬於嘗試第一次獲取服務。 進入 while 迴圈後,退出條件為 server 不為 null(即找到服務) 或者 count 大於等於 10 (即嘗試了 10 次仍未找到服務)。 而 獲取 server 的核心在於獲取 服務的下標,即 int nextServerIndex = incrementAndGetModulo(serverCount); 【incrementAndGetModulo()】 核心就是 自旋 CAS 並取模。 modulo 表示伺服器總數,current 表示當前服務下標,next 表示下一個服務下標。 compareAndSet() 即 CAS 實現,如果 記憶體中的值 與 current 相同,那麼將記憶體中值改為 next,並返回 true,否則返回 false。 即 compareAndSet 失敗後,會不停的執行迴圈 以獲取 最新的 current。 注: 自旋、CAS 後面會講到。CAS 保證原子性。 其實就是一個公式: 第幾次請求 % 伺服器總數量 = 實際呼叫伺服器下標位置
5、手寫一個輪詢演算法
(1)說明
【說明:】 建立一個與 eureka_client_consumer_9002 模組類似的模組 eureka_client_consumer_9005。 修改配置檔案,並去除 @LoadBalanced 註解(避免引起誤解)。 自己實現一個輪詢演算法(與 RoundRobinRule 類似)。 注: 此處在 controller 中定義一個介面,用於測試 輪詢的功能(僅供參考,可以繼承 AbstractLoadBalancerRule,自行構造一個負載均衡類)。 去除 @LoadBalanced 註解後,訪問呼叫 RestTemplate 請求的介面時會報錯(用於區分)。
(2)相關程式碼
模組建立此處省略(需要修改 pom.xml,配置檔案)。
詳情請見上篇部落格:https://www.cnblogs.com/l-y-h/p/14193443.html#_label2_3
面向介面程式設計,此處新建一個 LoadBalacner 介面,用於定義抽象方法(返回服務資訊)。
並定義一個 LoadBalacner 介面的實現類 LoadBalancerImpl。
在 controller 中編寫介面(服務發現),測試一下。
【LoadBalacner】 package com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance; import org.springframework.cloud.client.ServiceInstance; import java.util.List; public interface LoadBalacner { /** * 從服務例項列表中獲取出 服務例項 */ ServiceInstance getInstances(List<ServiceInstance> serviceInstances); } 【LoadBalancerImpl】 package com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.impl; import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner; import org.springframework.cloud.client.ServiceInstance; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @Component public class LoadBalancerImpl implements LoadBalacner { private AtomicInteger atomicInteger = new AtomicInteger(); @Override public ServiceInstance getInstances(List<ServiceInstance> serviceInstances) { if (serviceInstances == null || serviceInstances.size() == 0) { return null; } return serviceInstances.get(incrementAndGetModulo(serviceInstances.size())); } public int incrementAndGetModulo(int count) { int current = 0; int next = 0; do { current = atomicInteger.get(); next = (current >= Integer.MAX_VALUE ? 0 : current + 1) % count; } while (!atomicInteger.compareAndSet(current, next)); return next; } } 【ConsumerController】 package com.lyh.springcloud.eureka_client_consumer_9005.controller; import com.lyh.springcloud.common.tools.Result; import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner; import com.lyh.springcloud.eureka_client_consumer_9005.entity.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.web.bind.annotation.*; import org.springframework.web.client.RestTemplate; @RestController @RequestMapping("/consumer/user") public class ConsumerController { // 注意,此處 url 寫死的,僅用於演示,實際專案中不能這麼幹。 // public static final String PRODUCER_URL = "http://localhost:8001/producer/"; // 通過服務名 找到 Eureka 註冊中心真實訪問的 地址 public static final String PRODUCER_URL = "http://EUREKA-CLIENT-PRODUCER"; @Autowired private RestTemplate restTemplate; @Autowired private DiscoveryClient discoveryClient; @Autowired private LoadBalacner loadBalancer; @GetMapping("/loadBalance") public Result testLoadBalance() { ServiceInstance serviceInstance = loadBalancer.getInstances(discoveryClient.getInstances("EUREKA-CLIENT-PRODUCER")); if (serviceInstance == null) { return Result.error().message("服務不存在"); } return Result.ok().data("HostAndPort", serviceInstance.getHost() + ":" + serviceInstance.getPort()); } @GetMapping("/get/{id}") public Result getUser(@PathVariable Integer id) { return restTemplate.getForObject(PRODUCER_URL + "/producer/user/get/" + id, Result.class); } @PostMapping("/create") public Result createUser(@RequestBody User user) { return restTemplate.postForObject(PRODUCER_URL + "/producer/user/create", user, Result.class); } }
三、補充知識
1、CAS
(1)什麼是 CAS?
【CAS:】 CAS 是 Compare And Swap 的縮寫,即 比較交換。 是一種無鎖演算法,在不加鎖的情況下實現多執行緒之間變數同步,從而保證資料的原子性。 屬於硬體層面對併發操作的支援(CPU 原語)。 注: 原子性:一個操作或多個操作要麼全部執行且執行過程中不會被其他因素打斷,要麼全部不執行。 原語:指的是若干條指令組成的程式段,實現特定的功能,執行過程中不能被中斷(也即原子性)。 【基本流程:】 CAS 操作包含三個運算元 —— 記憶體值(V)、預期原值(A)和新值(B)。 如果記憶體裡面的值 V 和 A 的值是一樣的,那麼就將記憶體裡面的值更新成 B, 若 V 與 A 不一致,則不操作(某些情況下,可以通過自旋操作,不斷嘗試修改資料直至成功修改)。 即 V = V == A ? B : V; 或者 for(;;) { V = getV(); if (V == A) { V = B; break; } } 【缺點:(詳見下面的 Atomic 類底層原理)】 會出現 ABA 問題(兩次讀取資料時值相同,但不確定值是否被修改過)。 使用自旋(死迴圈)CAS 時會佔用系統資源、影響執行效率。 每次只能對一個共享變數進行原子操作。
(2)原子性
【說明:】 初始 i = 0,現有 10 個執行緒,分別執行 i++ 10000次,若不對 i++ 做任何限制,那麼最終執行結果一般都是小於 100000 的。 因為 A、B 執行 i++ 操作時,彼此會相互干擾,也即不能保證原子性。 【如何保證原子性:】 可以給 i++ 操作加上 synchronized 進行同步控制,從而保證操作按照順序執行、互不干擾。 也可以使用 Atomic 相關類進行操作(核心是 自旋 CAS 操作 volatile 變數)。 注: synchronized 在 JDK1.6 之前,屬於重量級鎖,屬於悲觀鎖的一種(在操作鎖變數前就給物件加鎖,而不管物件是否發生資源競爭),效能較差。 在 JDK1.6 之後,對 synchronized 進行了優化,引入了 偏向鎖、輕量級鎖、採用 CAS 思想,提升了效率。 【CAS 與 synchronized 比較:】 CAS: CAS 屬於無鎖演算法,可以支援多個執行緒併發修改,併發度高。 CAS 每次只支援一個共享變數進行原子操作。 CAS 會出現 ABA 問題。 synchronized: synchronized 一次只能允許一個執行緒修改,併發度低。 synchronized 可以對多個共享變數進行原子操作。 【舉例:】 package com.lyh.tree; import java.util.concurrent.atomic.AtomicInteger; public class Test { private int count = 0; private int count2 = 0; private AtomicInteger count3 = new AtomicInteger(0); /** * 普通方法 */ public void increment() { count++; } /** * 使用 synchronized 修飾的方法 */ public synchronized void increment2() { count2++; } /** * 使用 atomic 類的方法 */ public void increment3() { count3.getAndIncrement(); } public static void main(String[] args) { // 例項化一個物件 Test test = new Test(); // 建立 10 個執行緒 for (int i = 0; i < 10; i++) { // 每個執行緒內部均 執行 10000 次 三種 i++ 操作 new Thread(() -> { for (int j = 0; j < 10000; j++) { // 普通方法 i++ test.increment(); // 新增 synchronized 關鍵字後的 i++ test.increment2(); // 使用 atomic 類的 i++ test.increment3(); } }, "thread-" + "i").start(); } // 等待 1 秒,確保上面執行緒可以執行完畢 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 輸出三種 i++ 的最終結果 System.out.println("普通方法 i++ 操作後最終值 i = " + test.count); System.out.println("新增 synchronized 關鍵字後的 i++ 操作後最終值 i = " + test.count2); System.out.println("使用 atomic 類的 i++ 操作後最終值 i = " + test.count3); } }
2、Atomic 類底層原理
(1)Atomic 常用類有哪些?
【Atomic:】 Atomic 類存放於 java.util.concurrent.atomic 包下,用於提供對變數的原子操作(保證變數操作的原子性)。 【常用類:】 操作基本型別的 Atomic 類(提供了對 boolean、int、long 型別的原子操作): AtomicBoolean AtomicInteger AtomicLong 操作引用型別的 Atomic 類(提供了引用型別的原子操作): AtomicReference AtomicStampedReference AtomicMarkableReference 注: AtomicStampedReference、AtomicMarkableReference 以版本號、標記的方式解決 ABA 問題。 運算元組的 Atomic 類(提供了陣列的原子操作): AtomicIntegerArray AtomicLongArray AtomicReferenceArray
(2)底層原理
【底層原理:】 一句話概括:Atomic 類基於 Unsafe 類 以及 (自旋) CAS 操作 volatile 變數實現的。 核心點: Unsafe 類 提供 native 方法,用於操作記憶體 valueOffset 變數 指的是變數在記憶體中的地址。 volatile 變數 指的是共享變數(保證操作可見性)。 CAS CPU 原語(保證操作原子性)。 【Unsafe:】 Unsafe 存放於 JDK 原始碼 rt.jar 的 sun.misc 包下。 內部提供了一系列 native 方法,用於與作業系統互動(可以操作特定記憶體中的資料)。 注: Unsafe 屬於 CAS 核心類,Java 無法直接訪問底層作業系統,需要通過 native 方法進行操作,而 Unsafe 就是為此存在的。 【volatile 變數:】 使用 volatile 修改變數,保證資料在多執行緒之間的可見性(一個執行緒修改資料後,其餘執行緒均能知道修改後的資料)。 【valueOffset 變數:】 valueOffset 變數 表示共享變數在記憶體中的偏移地址,Unsafe 根據此地址獲取 共享變數在記憶體中的值。
(3)以 AtomicInteger 為例。
compareAndSet() 直接呼叫 CAS 進行比較。
getAndIncrement() 使用 自旋 CAS 進行比較。
【AtomicInteger 類:】 /** * 直接呼叫 Unsafe 的 CAS 操作。 * 根據 this 以及 valueOffset 獲取到記憶體中的值 V,expect 為期望值 A,如果 V == A,則將 V 值改為 update,否則不操作。 * * this 表示當前物件 * valueOffset 表示共享變數在記憶體的偏移地址 * expect 表示期望值 * update 表示更新值 */ public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } /** * 以原子方式遞增當前 value。 * 呼叫 Unsafe 的 getAndAddInt() 進行操作,即 自旋 CAS 操作。 * * this 表示當前物件 * valueOffset 表示共享變數在記憶體的偏移地址 * 1 表示每次增加值為 1 */ public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } 【Unsafe 類:】 /** * CAS 原語操作。 * 各變數含義參考上面 AtomicInteger compareAndSet() 的註釋。 */ public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); /** * 自旋 CAS 操作。 * var1 表示當前物件。 * var2 表示共享變數在記憶體的偏移地址 * var4 表示共享變數每次遞增的值 */ public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { // 先根據偏移地址,獲取到 記憶體中儲存的值 var5 = this.getIntVolatile(var1, var2); // 然後死迴圈(自旋)CAS 判斷。 // 根據偏移地址再去取一次記憶體中的值 與 已經取得的值進行比較,相同則加上需要增加的值。 // 不同,則 CAS 失敗,也即 while 條件為 true,再進行下一次比較。 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
(4)缺點:
【CAS 缺點:】 會出現 ABA 問題(兩次讀取資料時值相同,但不確定值是否被修改過)。 自旋(死迴圈)CAS 會佔用系統資源、影響執行效率。 每次只能對一個共享變數進行原子操作。 【對於自旋 CAS:】 在上面 Unsafe 類的 getAndAddInt() 方法中,可以看到一個 do-while 迴圈。 存在這麼一種情況:while 條件中 CAS 一直失敗,也即迴圈一直持續(死迴圈), 此時將會佔用 CPU 資源不斷執行迴圈,影響執行效率。 【對於 ABA 問題:】 CAS 演算法是從記憶體中取出某時刻的值並與當前期望值進行比較,從記憶體中取出的值就可能存在問題。 存在這麼一種情況:執行緒 A、執行緒 B 同時操作記憶體值 V,執行緒 A 由於某種原因停頓了一下,而執行緒 B 先將值 V 改為 K,然後又將 K 改為 V, 此時 A 再次獲取的值仍是 V,然後 A 進行 CAS 比較成功。 雖然 A 兩次獲取的值是同一個值,但是這個值中間是發生過變化的,也即此時 A 執行 CAS 不應該成功,這就是 ABA 問題。 為了解決ABA問題,可以在物件中額外再增加一個標記來標識物件是否有過變更,當且僅當 標記 與 預期標記位 也相同時,CAS 才可以執行成功。 比如: AtomicStampedReference 或者 AtomicMarkableReference 類。
(5)ABA 再現
使用 AtomicReference<Integer> 再現 ABA 問題。
【問題再現:】 package com.lyh.tree; import java.util.concurrent.atomic.AtomicReference; public class Test { public static void main(String[] args) { // 使用引用型別原子類(Integer 在快取中 -128 ~ 127 表示的是同一個值,超出此範圍,則會自動建立一個新的 Integer,即地址不同) AtomicReference<Integer> atomicInteger = new AtomicReference<>(100); System.out.println("原值為: " + atomicInteger.get()); // 建立執行緒 A,執行兩次值修改操作 new Thread(() -> { // 第一次從 100 改為 127 atomicInteger.compareAndSet(100, 127); System.out.println("第一次修改後,值為: " + atomicInteger.get()); // 第二次從 127 改為 100 atomicInteger.compareAndSet(127, 100); System.out.println("第二次修改後,值為: " + atomicInteger.get()); }, "thread-A").start(); // 建立執行緒 B,等待 1 秒,使執行緒 A 執行完畢,然後再執行值修改操作 new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 第三次從 100 改為 128 atomicInteger.compareAndSet(100, 128); System.out.println("第三次修改後,值為: " + atomicInteger.get()); // 第四次,由於 128 超過 Integer 快取範圍,會自動建立一個新的 Integer,此時期望值 與 記憶體中 值地址不同,也即 CAS 失敗 atomicInteger.compareAndSet(128, -128); System.out.println("最終值為: " + atomicInteger.get()); }, "thread-B").start(); } } 【結果:】 原值為: 100 第一次修改後,值為: 127 第二次修改後,值為: 100 第三次修改後,值為: 128 最終值為: 128
(6)ABA 解決
使用 AtomicStampedReference<Integer> 解決 ABA 問題。
新增了標記位,用於判斷當前值是否發生過變化。
package com.lyh.tree; import java.util.concurrent.atomic.AtomicStampedReference; public class Test2 { public static void main(String[] args) { AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(100, 1); System.out.println("原值為: " + atomicInteger.getReference() + " , 標記為: " + atomicInteger.getStamp()); int stamp = atomicInteger.getStamp(); new Thread(() -> { atomicInteger.compareAndSet(100, 127, stamp, stamp + 1); System.out.println("第一次修改後,值為: " + atomicInteger.getReference() + " , 標記為: " + atomicInteger.getStamp()); atomicInteger.compareAndSet(127, 100, stamp + 1, stamp + 2); System.out.println("第二次修改後,值為: " + atomicInteger.getReference() + " , 標記為: " + atomicInteger.getStamp()); }, "thread-A").start(); new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } atomicInteger.compareAndSet(100, 128, stamp, stamp + 1); System.out.println("最終值為: " + atomicInteger.getReference() + " , 標記為: " + atomicInteger.getStamp()); }, "thread-B").start(); } }
3、自旋鎖(SpinLock)
(1)什麼是自旋鎖?
【自旋鎖:】 指的是當一個執行緒嘗試去獲取鎖時, 若此時鎖已經被其他執行緒獲取,那麼當前執行緒將採用 迴圈 的方式不斷的去嘗試獲取鎖。 當迴圈執行的某次操作成功獲取鎖時,將退出迴圈。 【優點:】 減少了執行緒上下文切換帶來的消耗。 【缺點:】 迴圈會佔用 CPU 資源,若長時間獲取不到鎖,那麼相當於一個死迴圈在執行。 【舉例:】 前面介紹的 Unsafe 類中 getAndAddInt() 即為 自旋鎖實現,自旋 CAS 進行值的遞增。 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
(2)手寫一個自旋鎖
通過 CAS 操作作為 迴圈(自旋)條件。
【SpinLockDemo】 package com.lyh.tree; import java.util.concurrent.atomic.AtomicReference; /** * 演示自旋鎖(類似於 Unsafe 類中的 getAndAddInt() 方法)。 * * 通過 CAS 操作作為自旋鎖條件,A 執行緒先獲取鎖,並佔用鎖時間為 3 秒, * B 執行緒獲取鎖,發現鎖被 A 佔用,B 則執行 迴圈 等待。 * A 執行緒釋放鎖後,B 在某次迴圈中 CAS 成功,即 B 獲取到鎖,然後釋放。 */ public class SpinLockDemo { private AtomicReference<Thread> atomicReference = new AtomicReference<>(); /** * 獲取鎖 */ public void lock() { // 獲取當前執行緒 Thread thread = Thread.currentThread(); System.out.println("當前執行 lock 的執行緒為: " + thread.getName()); System.out.println("執行緒 " + thread.getName() + " 嘗試獲取鎖"); Long start = System.currentTimeMillis(); // 自旋 CAS,當值為 null 時,CAS 成功,此時鎖被獲取。 // 當值不為 null 時,CAS 失敗,不斷執行迴圈直至值為 null。 while(!atomicReference.compareAndSet(null, thread)) { } Long end = System.currentTimeMillis(); System.out.println("執行緒 " + thread.getName() + " 成功獲取到鎖, 嘗試獲取時間為: " + ((end - start) / 1000) + "秒"); } /** * 釋放鎖 */ public void unLock() { // 獲取當前執行緒 Thread thread = Thread.currentThread(); System.out.println("當前執行 unLock 的執行緒為: " + thread.getName()); // 釋放鎖 atomicReference.compareAndSet(thread, null); } public static void main(String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); // 建立執行緒 A new Thread(() -> { // A 獲取鎖 spinLockDemo.lock(); // A 佔用鎖 3 秒 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // A 釋放鎖 spinLockDemo.unLock(); }, "thread-A").start(); // 等待 1 秒,確保 A 執行緒先執行 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 建立執行緒 B new Thread(() -> { // B 獲取鎖 spinLockDemo.lock(); // B 釋放鎖 spinLockDemo.unLock(); }, "thread-B").start(); } } 【輸出結果:】 當前執行 lock 的執行緒為: thread-A 執行緒 thread-A 嘗試獲取鎖 執行緒 thread-A 成功獲取到鎖, 嘗試獲取時間為: 0秒 當前執行 lock 的執行緒為: thread-B 執行緒 thread-B 嘗試獲取鎖 當前執行 unLock 的執行緒為: thread-A 執行緒 thread-B 成功獲取到鎖, 嘗試獲取時間為: 2秒 當前執行 unLock 的執行緒為: thread-B
四、服務呼叫、負載均衡 -- Feign、OpenFeign
1、什麼是 Feign、 OpenFeign ?
【Feign:】 Feign 是一個宣告式 web service 客戶端,使用 Feign 使得編寫 Java HTTP 客戶端更容易。 SpringCloud 元件中 Feign 作為一個輕量級 Restful 風格的 HTTP 服務客戶端,內建了 Ribbon,提供客戶端的負載均衡。 使用 Feign 註解定義介面,呼叫該介面即可訪問 服務。 【OpenFeign:】 SpringCloud 元件中 Open Feign 是在 Feign 基礎上支援了 SpringMVC 註解。 通過 @FeignClient 註解可以解析 SpringMVC 中 @RequestMapping 等註解下的介面,並通過動態代理的方式產生實現類,在實現類中做負載均衡、服務呼叫。 【相關地址:】 https://cloud.spring.io/spring-cloud-openfeign/reference/html/ https://github.com/OpenFeign/feign
2、Feign 集成了 Ribbon
前面使用 Ribbon + RestTemplate 實現 負載均衡 以及 服務呼叫時, 流程如下: Step1:在配置類中通過 @Bean 宣告一下 RestTemplate 類,再新增上 @LoadBalanced 註解。 Step2:在 controller 中注入 RestTemplate 。 Step3:使用 RestTemplate 根據服務名傳送 HTTP 請求。 而不同的模組若要呼叫服務(一個介面可能會被多個模組呼叫),則每次都要進行 Step1、Step2、Step3。 實際開發中,若不進行處理,對於編碼、維護都是一件麻煩的事情。 Feign 就是在 Ribbon 基礎上做了進一步封裝,只需要建立一個介面,並使用註解的方式進行配置, 即可完成 介面 與 服務提供方介面的繫結。通過自定義的介面即可完成 服務呼叫。 注: 類似於在 Dao 層介面上新增 @Mapper。 Feign 在介面上新增 @FeignClient,並配置服務名。
3、使用 OpenFeign
(1)說明
【說明:】 建立一個空模組 eureka_client_consumer_9006。 用於測試 OpenFeign 的使用。 注: 建立流程此處省略,詳細可參考上一篇部落格:https://www.cnblogs.com/l-y-h/p/14193443.html#_label2_3
(2)建立專案 eureka_client_consumer_9006
建立 eureka_client_consumer_9006 模組,修改 父工程以及當前工程 pom.xml 檔案。
修改配置類。
【依賴:】 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> 【application.yml】 server: port: 9006 spring: application: name: eureka-client-consumer eureka: instance: appname: eureka-client-consumer # 優先順序比 spring.application.name 高 instance-id: eureka-client-consumer-instance3 # 設定當前例項 ID hostname: eureka.client.consumer.9006 # 設定主機名 client: register-with-eureka: true # 預設為 true,註冊到 註冊中心 fetch-registry: true # 預設為 true,從註冊中心 獲取 註冊資訊 service-url: # 指向 註冊中心 地址,註冊到 叢集所有的 註冊中心。 defaultZone: http://eureka.server.7001.com:7001/eureka,http://eureka.server.7002.com:7002/eureka,http://eureka.server.7003.com:7003/eureka
(3)編寫介面,繫結服務。
通過 @Component 註解,將該介面交給 Spring 管理(用於 @Autowired 注入)。
通過 @FeignClient 註解配置 服務名,並編寫方法,用於繫結需要訪問的介面。
【ProducerFeignService】 package com.lyh.springcloud.eureka_client_consumer_9006.service; import com.lyh.springcloud.common.tools.Result; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @FeignClient(value = "EUREKA-CLIENT-PRODUCER") @Component public interface ProducerFeignService { @GetMapping("/producer/user/get/{id}") Result getUser(@PathVariable Integer id); }
(4)編寫 controller
【ConsumerController】 package com.lyh.springcloud.eureka_client_consumer_9006.controller; import com.lyh.springcloud.common.tools.Result; import com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/consumer/user") public class ConsumerController { @Autowired private ProducerFeignService producerFeignService; @GetMapping("/get/{id}") public Result getUser(@PathVariable Integer id) { return producerFeignService.getUser(id); } }
(5)啟動服務,並測試。
按順序依次啟動,Eureka Server 以及 Producer。
在 eureka_client_consumer_9006 啟動類上新增 @EnableFeignClients 註解,並啟動。
效果與使用 ribbon 時相同(預設 ZoneAvoidanceRule 負載均衡規則)。
(6)啟動失敗時的錯誤
【錯誤:】 Description: Field producerFeignService in com.lyh.springcloud.eureka_client_consumer_9006.controller.ConsumerController required a bean of type 'com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService' that could not be found. The injection point has the following annotations: - @org.springframework.beans.factory.annotation.Autowired(required=true) Action: Consider defining a bean of type 'com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService' in your configuration. 【解決:】 在啟動類上新增 @EnableFeignClients 註解。
(7)自定義負載均衡策略
Feign 集成了 Ribbon,所以 Ribbon 自定義負載均衡策略也適用於 Feign。
此處使用配置檔案的方式進行自定義負載均衡。
在服務呼叫方的配置檔案中,根據 服務提供者的 服務名,進行 ribbon 負載均衡策略更換。
【自定義負載均衡策略:】 EUREKA-CLIENT-PRODUCER: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule 【application.yml】 server: port: 9006 spring: application: name: eureka-client-consumer eureka: instance: appname: eureka-client-consumer # 優先順序比 spring.application.name 高 instance-id: eureka-client-consumer-instance3 # 設定當前例項 ID hostname: eureka.client.consumer.9006 # 設定主機名 client: register-with-eureka: true # 預設為 true,註冊到 註冊中心 fetch-registry: true # 預設為 true,從註冊中心 獲取 註冊資訊 service-url: # 指向 註冊中心 地址,註冊到 叢集所有的 註冊中心。 defaultZone: http://eureka.server.7001.com:7001/eureka,http://eureka.server.7002.com:7002/eureka,http://eureka.server.7003.com:7003/eureka EUREKA-CLIENT-PRODUCER: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
4、Feign 超時控制 以及 日誌列印
(1)超時控制
【說明:】 OpenFeign 預設請求等待時間為 1 秒鐘,即 若伺服器超過 1 秒仍未返回處理結果,那麼 OpenFeign 將認為調用出錯。 有時伺服器業務處理時間需要超過 1 秒,此時直接報錯是不合適的。 為了避免這樣的問題,可以根據實際情況 適當設定 Feign 客戶端的超時控制。 【演示:】 在 服務提供者 eureka_client_producer_8004 提供一個介面,並在內部暫停 3 秒,用於模擬業務處理時間。 在 服務消費者 eureka_client_consumer_9006 中呼叫並訪問介面,用於測試 是否會出錯。 在配置檔案中 配置超時控制後,再次檢視是否出錯。 【application.yml】 # 設定 OpenFeign 超時時間(OpenFeign 預設支援 Ribbon) ribbon: # 指的是建立連線所用的超時時間 ConnectTimeout: 3000 # 指的是建立連線後從伺服器獲取資源的超時時間(即請求處理的超時時間) ReadTimeout: 4000
(2)日誌列印
【說明:】 Feign 提供了日誌列印功能,通過配置調整日誌級別,可以方便了解 Feign 中請求呼叫的細節。 日誌級別: NONE:預設,不顯示任何日誌。 BASIC: 僅記錄請求方法、URL、響應狀態以及執行時間。 HEADERS:包含 BASIC、請求頭資訊、響應頭資訊。 FULL:包含 HEADERS、請求資料、響應資料。
Step1:
配置 日誌級別(Logger.Level)。
注:
import feign.Logger。不要導錯包了。
package com.lyh.springcloud.eureka_client_consumer_9006.config; import feign.Logger; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FeignConfig { /** * 配置 Feign 日誌級別 */ @Bean Logger.Level feignLoggerLever() { return Logger.Level.FULL; } }
Step2:
配置需要列印日誌的介面。
【application.yml】 logging: level: com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService: debug
Step3:
啟動服務,並呼叫服務,可以在控制檯看到日誌資訊。
&n