1. 程式人生 > >Redis高階客戶端Lettuce詳解

Redis高階客戶端Lettuce詳解

前提

Lettuce是一個RedisJava驅動包,初識她的時候是使用RedisTemplate的時候遇到點問題Debug到底層的一些原始碼,發現spring-data-redis的驅動包在某個版本之後替換為LettuceLettuce翻譯為生菜,沒錯,就是吃的那種生菜,所以它的Logo長這樣:

既然能被Spring生態所認可,Lettuce想必有過人之處,於是筆者花時間閱讀她的官方文件,整理測試示例,寫下這篇文章。編寫本文時所使用的版本為Lettuce 5.1.8.RELEASESpringBoot 2.1.8.RELEASEJDK [8,11]。超長警告:這篇文章斷斷續續花了兩週完成,超過4萬字.....

Lettuce簡介

Lettuce是一個高效能基於Java編寫的Redis驅動框架,底層集成了Project Reactor提供天然的反應式程式設計,通訊框架集成了Netty使用了非阻塞IO5.x版本之後融合了JDK1.8的非同步程式設計特性,在保證高效能的同時提供了十分豐富易用的API5.1版本的新特性如下:

  • 支援Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX
  • 支援通過Brave模組跟蹤Redis命令執行。
  • 支援Redis Streams
  • 支援非同步的主從連線。
  • 支援非同步連線池。
  • 新增命令最多執行一次模式(禁止自動重連)。
  • 全域性命令超時設定(對非同步和反應式命令也有效)。
  • ......等等

注意一點:Redis的版本至少需要2.6,當然越高越好,API的相容性比較強大。

只需要引入單個依賴就可以開始愉快地使用Lettuce

  • Maven
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
  • Gradle
dependencies {
  compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}

連線Redis

單機、哨兵、叢集模式下連線Redis需要一個統一的標準去表示連線的細節資訊,在Lettuce中這個統一的標準是RedisURI。可以通過三種方式構造一個RedisURI例項:

  • 定製的字串URI語法:
RedisURI uri = RedisURI.create("redis://localhost/");
  • 使用建造器(RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • 直接通過建構函式例項化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

定製的連線URI語法

  • 單機(字首為redis://
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://[email protected]:6379/0?timeout=10s
簡單:redis://localhost
  • 單機並且使用SSL(字首為rediss://) <== 注意後面多了個s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://[email protected]:6379/0?timeout=10s
簡單:rediss://localhost
  • 單機Unix Domain Sockets模式(字首為redis-socket://
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • 哨兵(字首為redis-sentinel://
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://[email protected]:6379,127.0.0.1:6380/0?timeout=10s#mymaster

超時時間單位:

  • d 天
  • h 小時
  • m 分鐘
  • s 秒鐘
  • ms 毫秒
  • us 微秒
  • ns 納秒

個人建議使用RedisURI提供的建造器,畢竟定製的URI雖然簡潔,但是比較容易出現人為錯誤。鑑於筆者沒有SSLUnix Domain Socket的使用場景,下面不對這兩種連線方式進行列舉。

基本使用

Lettuce使用的時候依賴於四個主要元件:

  • RedisURI:連線資訊。
  • RedisClientRedis客戶端,特殊地,叢集連線有一個定製的RedisClusterClient
  • ConnectionRedis連線,主要是StatefulConnection或者StatefulRedisConnection的子類,連線的型別主要由連線的具體方式(單機、哨兵、叢集、訂閱釋出等等)選定,比較重要。
  • RedisCommandsRedis命令API介面,基本上覆蓋了Redis發行版本的所有命令,提供了同步(sync)、非同步(async)、反應式(reative)的呼叫方式,對於使用者而言,會經常跟RedisCommands系列介面打交道。

一個基本使用例子如下:

@Test
public void testSetGet() throws Exception {
    RedisURI redisUri = RedisURI.builder()                    // <1> 建立單機連線的連線資訊
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);   // <2> 建立客戶端
    StatefulRedisConnection<String, String> connection = redisClient.connect();     // <3> 建立執行緒安全的連線
    RedisCommands<String, String> redisCommands = connection.sync();                // <4> 建立同步命令
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    String result = redisCommands.set("name", "throwable", setArgs);
    Assertions.assertThat(result).isEqualToIgnoringCase("OK");
    result = redisCommands.get("name");
    Assertions.assertThat(result).isEqualTo("throwable");
    // ... 其他操作
    connection.close();   // <5> 關閉連線
    redisClient.shutdown();  // <6> 關閉客戶端
}

注意:

  • <5>:關閉連線一般在應用程式停止之前操作,一個應用程式中的一個Redis驅動例項不需要太多的連線(一般情況下只需要一個連線例項就可以,如果有多個連線的需要可以考慮使用連線池,其實Redis目前處理命令的模組是單執行緒,在客戶端多個連線多執行緒呼叫理論上沒有效果)。
  • <6>:關閉客戶端一般應用程式停止之前操作,如果條件允許的話,基於後開先閉原則,客戶端關閉應該在連線關閉之後操作。

API

Lettuce主要提供三種API

  • 同步(sync):RedisCommands
  • 非同步(async):RedisAsyncCommands
  • 反應式(reactive):RedisReactiveCommands

先準備好一個單機Redis連線備用:

private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;

@BeforeClass
public static void beforeClass() {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    CLIENT = RedisClient.create(redisUri);
    CONNECTION = CLIENT.connect();
}

@AfterClass
public static void afterClass() throws Exception {
    CONNECTION.close();
    CLIENT.shutdown();
}

Redis命令API的具體實現可以直接從StatefulRedisConnection例項獲取,見其介面定義:

public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {

    boolean isMulti();

    RedisCommands<K, V> sync();

    RedisAsyncCommands<K, V> async();

    RedisReactiveCommands<K, V> reactive();
}    

值得注意的是,在不指定編碼解碼器RedisCodec的前提下,RedisClient建立的StatefulRedisConnection例項一般是泛型例項StatefulRedisConnection<String,String>,也就是所有命令APIKEYVALUE都是String型別,這種使用方式能滿足大部分的使用場景。當然,必要的時候可以定製編碼解碼器RedisCodec<K,V>

同步API

先構建RedisCommands例項:

private static RedisCommands<String, String> COMMAND;

@BeforeClass
public static void beforeClass() {
    COMMAND = CONNECTION.sync();
}

基本使用:

@Test
public void testSyncPing() throws Exception {
   String pong = COMMAND.ping();
   Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}


@Test
public void testSyncSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    COMMAND.set("name", "throwable", setArgs);
    String value = COMMAND.get("name");
    log.info("Get value: {}", value);
}

// Get value: throwable

同步API在所有命令呼叫之後會立即返回結果。如果熟悉Jedis的話,RedisCommands的用法其實和它相差不大。

非同步API

先構建RedisAsyncCommands例項:

private static RedisAsyncCommands<String, String> ASYNC_COMMAND;

@BeforeClass
public static void beforeClass() {
    ASYNC_COMMAND = CONNECTION.async();
}

基本使用:

@Test
public void testAsyncPing() throws Exception {
    RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
    log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG

RedisAsyncCommands所有方法執行返回結果都是RedisFuture例項,而RedisFuture介面的定義如下:

public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {

    String getError();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}    

也就是,RedisFuture可以無縫使用Future或者JDK1.8中引入的CompletableFuture提供的方法。舉個例子:

@Test
public void testAsyncSetAndGet1() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
    // CompletableFuture#thenAccept()
    future.thenAccept(value -> log.info("Set命令返回:{}", value));
    // Future#get()
    future.get();
}
// Set命令返回:OK

@Test
public void testAsyncSetAndGet2() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    CompletableFuture<Void> result =
            (CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
                    .thenAcceptBoth(ASYNC_COMMAND.get("name"),
                            (s, g) -> {
                                log.info("Set命令返回:{}", s);
                                log.info("Get命令返回:{}", g);
                            });
    result.get();
}
// Set命令返回:OK
// Get命令返回:throwable

如果能熟練使用CompletableFuture和函數語言程式設計技巧,可以組合多個RedisFuture完成一些列複雜的操作。

反應式API

Lettuce引入的反應式程式設計框架是Project Reactor,如果沒有反應式程式設計經驗可以先自行了解一下Project Reactor

構建RedisReactiveCommands例項:

private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;

@BeforeClass
public static void beforeClass() {
    REACTIVE_COMMAND = CONNECTION.reactive();
}

根據Project ReactorRedisReactiveCommands的方法如果返回的結果只包含0或1個元素,那麼返回值型別是Mono,如果返回的結果包含0到N(N大於0)個元素,那麼返回值是Flux。舉個例子:

@Test
public void testReactivePing() throws Exception {
    Mono<String> ping = REACTIVE_COMMAND.ping();
    ping.subscribe(v -> log.info("Ping result:{}", v));
    Thread.sleep(1000);
}
// Ping result:PONG

@Test
public void testReactiveSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
    REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
    Thread.sleep(1000);
}
// Get命令返回:throwable

@Test
public void testReactiveSet() throws Exception {
    REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
    Flux<String> flux = REACTIVE_COMMAND.smembers("food");
    flux.subscribe(log::info);
    REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
    Thread.sleep(1000);
}
// meat
// bread
// fish

舉個更加複雜的例子,包含了事務、函式轉換等:

@Test
public void testReactiveFunctional() throws Exception {
    REACTIVE_COMMAND.multi().doOnSuccess(r -> {
        REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
        REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
    }).flatMap(s -> REACTIVE_COMMAND.exec())
            .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
            .subscribe();
    Thread.sleep(1000);
}
// OK
// 2
// Discarded:false

這個方法開啟一個事務,先把counter設定為1,再將counter自增1。

釋出和訂閱

非叢集模式下的釋出訂閱依賴於定製的連線StatefulRedisPubSubConnection,叢集模式下的釋出訂閱依賴於定製的連線StatefulRedisClusterPubSubConnection,兩者分別來源於RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub()

  • 非叢集模式:
// 可能是單機、普通主從、哨兵等非叢集模式的客戶端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });

// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

// 非同步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// 反應式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • 叢集模式:
// 使用方式其實和非叢集模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...

這裡用單機同步命令的模式舉一個Redis鍵空間通知(Redis Keyspace Notifications)的例子:

@Test
public void testSyncKeyspaceNotification() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            // 注意這裡只能是0號庫
            .withDatabase(0)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
    RedisCommands<String, String> redisCommands = redisConnection.sync();
    // 只接收鍵過期的事件
    redisCommands.configSet("notify-keyspace-events", "Ex");
    StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
    connection.addListener(new RedisPubSubAdapter<>() {

        @Override
        public void psubscribed(String pattern, long count) {
            log.info("pattern:{},count:{}", pattern, count);
        }

        @Override
        public void message(String pattern, String channel, String message) {
            log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
        }
    });
    RedisPubSubCommands<String, String> commands = connection.sync();
    commands.psubscribe("__keyevent@0__:expired");
    redisCommands.setex("name", 2, "throwable");
    Thread.sleep(10000);
    redisConnection.close();
    connection.close();
    redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

實際上,在實現RedisPubSubListener的時候可以單獨抽離,儘量不要設計成匿名內部類的形式。

事務和批量命令執行

事務相關的命令就是WATCHUNWATCHEXECMULTIDISCARD,在RedisCommands系列介面中有對應的方法。舉個例子:

// 同步模式
@Test
public void testSyncMulti() throws Exception {
    COMMAND.multi();
    COMMAND.setex("name-1", 2, "throwable");
    COMMAND.setex("name-2", 2, "doge");
    TransactionResult result = COMMAND.exec();
    int index = 0;
    for (Object r : result) {
        log.info("Result-{}:{}", index, r);
        index++;
    }
}
// Result-0:OK
// Result-1:OK

RedisPipeline也就是管道機制可以理解為把多個命令打包在一次請求傳送到Redis服務端,然後Redis服務端把所有的響應結果打包好一次性返回,從而節省不必要的網路資源(最主要是減少網路請求次數)。Redis對於Pipeline機制如何實現並沒有明確的規定,也沒有提供特殊的命令支援Pipeline機制。Jedis中底層採用BIO(阻塞IO)通訊,所以它的做法是客戶端快取將要傳送的命令,最後需要觸發然後同步傳送一個巨大的命令列表包,再接收和解析一個巨大的響應列表包。PipelineLettuce中對使用者是透明的,由於底層的通訊框架是Netty,所以網路通訊層面的優化Lettuce不需要過多幹預,換言之可以這樣理解:NettyLettuce從底層實現了RedisPipeline機制。但是,Lettuce的非同步API也提供了手動Flush的方法:

@Test
public void testAsyncManualFlush() {
    // 取消自動flush
    ASYNC_COMMAND.setAutoFlushCommands(false);
    List<RedisFuture<?>> redisFutures = Lists.newArrayList();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        redisFutures.add(ASYNC_COMMAND.set(key, value));
        redisFutures.add(ASYNC_COMMAND.expire(key, 2));
    }
    long start = System.currentTimeMillis();
    ASYNC_COMMAND.flushCommands();
    boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
    Assertions.assertThat(result).isTrue();
    log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms

上面只是從文件看到的一些理論術語,但是現實是骨感的,對比了下JedisPipeline提供的方法,發現了JedisPipeline執行耗時比較低:

@Test
public void testJedisPipeline() throws Exception {
    Jedis jedis = new Jedis();
    Pipeline pipeline = jedis.pipelined();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        pipeline.set(key, value);
        pipeline.expire(key, 2);
    }
    long start = System.currentTimeMillis();
    pipeline.syncAndReturnAll();
    log.info("Jedis cost:{} ms", System.currentTimeMillis()  - start);
}
// Jedis cost:9 ms

個人猜測Lettuce可能底層並非合併所有命令一次傳送(甚至可能是單條傳送),具體可能需要抓包才能定位。依此來看,如果真的有大量執行Redis命令的場景,不妨可以使用JedisPipeline

注意:由上面的測試推斷RedisTemplateexecutePipelined()方法是假的Pipeline執行方法,使用RedisTemplate的時候請務必注意這一點。

Lua指令碼執行

Lettuce中執行RedisLua命令的同步介面如下:

public interface RedisScriptingCommands<K, V> {

    <T> T eval(String var1, ScriptOutputType var2, K... var3);

    <T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);

    <T> T evalsha(String var1, ScriptOutputType var2, K... var3);

    <T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);

    List<Boolean> scriptExists(String... var1);

    String scriptFlush();

    String scriptKill();

    String scriptLoad(V var1);

    String digest(V var1);
}

非同步和反應式的介面方法定義差不多,不同的地方就是返回值型別,一般我們常用的是eval()evalsha()scriptLoad()方法。舉個簡單的例子:

private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
        "local value = ARGV[1]\n" +
        "local timeout = ARGV[2]\n" +
        "redis.call('SETEX', key, tonumber(timeout), value)\n" +
        "local result = redis.call('GET', key)\n" +
        "return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();

@Test
public void testLua() throws Exception {
    LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
    String[] keys = new String[]{"name"};
    String[] args = new String[]{"throwable", "5000"};
    String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
    log.info("Get value:{}", result);
}
// Get value:throwable

高可用和分片

為了Redis的高可用,一般會採用普通主從(Master/Replica,這裡筆者稱為普通主從模式,也就是僅僅做了主從複製,故障需要手動切換)、哨兵和叢集。普通主從模式可以獨立執行,也可以配合哨兵執行,只是哨兵提供自動故障轉移和主節點提升功能。普通主從和哨兵都可以使用MasterSlave,通過入參包括RedisClient、編碼解碼器以及一個或者多個RedisURI獲取對應的Connection例項。

這裡注意一點,MasterSlave中提供的方法如果只要求傳入一個RedisURI例項,那麼Lettuce會進行拓撲發現機制,自動獲取Redis主從節點資訊;如果要求傳入一個RedisURI集合,那麼對於普通主從模式來說所有節點資訊是靜態的,不會進行發現和更新。

拓撲發現的規則如下:

  • 對於普通主從(Master/Replica)模式,不需要感知RedisURI指向從節點還是主節點,只會進行一次性的拓撲查詢所有節點資訊,此後節點資訊會儲存在靜態快取中,不會更新。
  • 對於哨兵模式,會訂閱所有哨兵例項並偵聽訂閱/釋出訊息以觸發拓撲重新整理機制,更新快取的節點資訊,也就是哨兵天然就是動態發現節點資訊,不支援靜態配置。

拓撲發現機制的提供APITopologyProvider,需要了解其原理的可以參考具體的實現。

對於叢集(Cluster)模式,Lettuce提供了一套獨立的API

另外,如果Lettuce連接面向的是非單個Redis節點,連線例項提供了資料讀取節點偏好(ReadFrom)設定,可選值有:

  • MASTER:只從Master節點中讀取。
  • MASTER_PREFERRED:優先從Master節點中讀取。
  • SLAVE_PREFERRED:優先從Slavor節點中讀取。
  • SLAVE:只從Slavor節點中讀取。
  • NEAREST:使用最近一次連線的Redis例項讀取。

普通主從模式

假設現在有三個Redis服務形成樹狀主從關係如下:

  • 節點一:localhost:6379,角色為Master。
  • 節點二:localhost:6380,角色為Slavor,節點一的從節點。
  • 節點三:localhost:6381,角色為Slavor,節點二的從節點。

首次動態節點發現主從模式的節點資訊需要如下構建連線:

@Test
public void testDynamicReplica() throws Exception {
    // 這裡只需要配置一個節點的連線資訊,不一定需要是主節點的資訊,從節點也可以
    RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisClient redisClient = RedisClient.create(uri);
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
    // 只從從節點讀取資料
    connection.setReadFrom(ReadFrom.SLAVE);
    // 執行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

如果需要指定靜態的Redis主從節點連線屬性,那麼可以這樣構建連線:

@Test
public void testStaticReplica() throws Exception {
    List<RedisURI> uris = new ArrayList<>();
    RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
    RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
    uris.add(uri1);
    uris.add(uri2);
    uris.add(uri3);
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
            new Utf8StringCodec(), uris);
    // 只從主節點讀取資料
    connection.setReadFrom(ReadFrom.MASTER);
    // 執行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

哨兵模式

由於Lettuce自身提供了哨兵的拓撲發現機制,所以只需要隨便配置一個哨兵節點的RedisURI例項即可:

@Test
public void testDynamicSentinel() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withPassword("你的密碼")
            .withSentinel("localhost", 26379)
            .withSentinelMasterId("哨兵Master的ID")
            .build();
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
    // 只允許從從節點讀取資料
    connection.setReadFrom(ReadFrom.SLAVE);
    RedisCommands<String, String> command = connection.sync();
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    command.set("name", "throwable", setArgs);
    String value = command.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

叢集模式

鑑於筆者對Redis叢集模式並不熟悉,Cluster模式下的API使用本身就有比較多的限制,所以這裡只簡單介紹一下怎麼用。先說幾個特性:

下面的API提供跨槽位(Slot)呼叫的功能:

  • RedisAdvancedClusterCommands
  • RedisAdvancedClusterAsyncCommands
  • RedisAdvancedClusterReactiveCommands

靜態節點選擇功能:

  • masters:選擇所有主節點執行命令。
  • slaves:選擇所有從節點執行命令,其實就是隻讀模式。
  • all nodes:命令可以在所有節點執行。

叢集拓撲檢視動態更新功能:

  • 手動更新,主動呼叫RedisClusterClient#reloadPartitions()
  • 後臺定時更新。
  • 自適應更新,基於連線斷開和MOVED/ASK命令重定向自動更新。

Redis叢集搭建詳細過程可以參考官方文件,假設已經搭建好叢集如下(192.168.56.200是筆者的虛擬機器Host):

  • 192.168.56.200:7001 => 主節點,槽位0-5460。
  • 192.168.56.200:7002 => 主節點,槽位5461-10922。
  • 192.168.56.200:7003 => 主節點,槽位10923-16383。
  • 192.168.56.200:7004 => 7001的從節點。
  • 192.168.56.200:7005 => 7002的從節點。
  • 192.168.56.200:7006 => 7003的從節點。

簡單的叢集連線和使用方式如下:

@Test
public void testSyncCluster(){
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name",10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

節點選擇:

@Test
public void testSyncNodeSelection() {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
//  commands.all();  // 所有節點
//  commands.masters();  // 主節點
    // 從節點只讀
    NodeSelection<String, String> replicas = commands.slaves();
    NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
    // 這裡只是演示,一般應該禁用keys *命令
    Executions<List<String>> keys = nodeSelectionCommands.keys("*");
    keys.forEach(key -> log.info("key: {}", key));
    connection.close();
    redisClusterClient.shutdown();
}

定時更新叢集拓撲檢視(每隔十分鐘更新一次,這個時間自行考量,不能太頻繁):

@Test
public void testPeriodicClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
            .builder()
            .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

自適應更新叢集拓撲檢視:

@Test
public void testAdaptiveClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
            .enableAdaptiveRefreshTrigger(
                    ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
                    ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
            )
            .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

動態命令和自定義命令

自定義命令是Redis命令有限集,不過可以更細粒度指定KEYARGV、命令型別、編碼解碼器和返回值型別,依賴於dispatch()方法:

// 自定義實現PING方法
@Test
public void testCustomPing() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
    log.info("PING:{}", result);
    connect.close();
    redisClient.shutdown();
}
// PING:PONG

// 自定義實現Set方法
@Test
public void testCustomSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
            new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
    String result = sync.get("name");
    log.info("Get value:{}", result);
    connect.close();
    redisClient.shutdown();
}
// Get value:throwable

動態命令是基於Redis命令有限集,並且通過註解和動態代理完成一些複雜命令組合的實現。主要註解在io.lettuce.core.dynamic.annotation包路徑下。簡單舉個例子:

public interface CustomCommand extends Commands {

    // SET [key] [value]
    @Command("SET ?0 ?1")
    String setKey(String key, String value);

    // SET [key] [value]
    @Command("SET :key :value")
    String setKeyNamed(@Param("key") String key, @Param("value") String value);

    // MGET [key1] [key2]
    @Command("MGET ?0 ?1")
    List<String> mGet(String key1, String key2);
    /**
     * 方法名作為命令
     */
    @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
    String mSet(String key1, String value1, String key2, String value2);
}


@Test
public void testCustomDynamicSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
    CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
    commands.setKey("name", "throwable");
    commands.setKeyNamed("throwable", "doge");
    log.info("MGET ===> " + commands.mGet("name", "throwable"));
    commands.mSet("key1", "value1","key2", "value2");
    log.info("MGET ===> " + commands.mGet("key1", "key2"));
    connect.close();
    redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]

高階特性

Lettuce有很多高階使用特性,這裡只列舉個人認為常用的兩點:

  • 配置客戶端資源。
  • 使用連線池。

更多其他特性可以自行參看官方文件。

配置客戶端資源

客戶端資源的設定與Lettuce的效能、併發和事件處理相關。執行緒池或者執行緒組相關配置佔據客戶端資源配置的大部分(EventLoopGroupsEventExecutorGroup),這些執行緒池或者執行緒組是連線程式的基礎元件。一般情況下,客戶端資源應該在多個Redis客戶端之間共享,並且在不再使用的時候需要自行關閉。筆者認為,客戶端資源是面向Netty的。注意:除非特別熟悉或者花長時間去測試調整下面提到的引數,否則在沒有經驗的前提下憑直覺修改預設值,有可能會踩坑。

客戶端資源介面是ClientResources,實現類是DefaultClientResources

構建DefaultClientResources例項:

// 預設
ClientResources resources = DefaultClientResources.create();

// 建造器
ClientResources resources = DefaultClientResources.builder()
                        .ioThreadPoolSize(4)
                        .computationThreadPoolSize(4)
                        .build()

使用:

ClientResources resources = DefaultClientResources.create();
// 非叢集
RedisClient client = RedisClient.create(resources, uri);
// 叢集
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 關閉資源
resources.shutdown();

客戶端資源基本配置:

屬性 描述 預設值
ioThreadPoolSize I/O執行緒數 Runtime.getRuntime().availableProcessors()
computationThreadPoolSize 任務執行緒數 Runtime.getRuntime().availableProcessors()

客戶端資源高階配置:

屬性 描述 預設值
eventLoopGroupProvider EventLoopGroup提供商 -
eventExecutorGroupProvider EventExecutorGroup提供商 -
eventBus 事件匯流排 DefaultEventBus
commandLatencyCollectorOptions 命令延時收集器配置 DefaultCommandLatencyCollectorOptions
commandLatencyCollector 命令延時收集器 DefaultCommandLatencyCollector
commandLatencyPublisherOptions 命令延時釋出器配置 DefaultEventPublisherOptions
dnsResolver DNS處理器 JDK或者Netty提供
reconnectDelay 重連延時配置 Delay.exponential()
nettyCustomizer Netty自定義配置器 -
tracing 軌跡記錄器 -

非叢集客戶端RedisClient的屬性配置:

Redis非叢集客戶端RedisClient本身提供了配置屬性方法:

RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
                       .autoReconnect(false)
                       .pingBeforeActivateConnection(true)
                       .build());

非叢集客戶端的配置屬性列表:

屬性 描述 預設值
pingBeforeActivateConnection 連線啟用之前是否執行PING命令 false
autoReconnect 是否自動重連 true
cancelCommandsOnReconnectFailure 重連失敗是否拒絕命令執行 false
suspendReconnectOnProtocolFailure 底層協議失敗是否掛起重連操作 false
requestQueueSize 請求佇列容量 2147483647(Integer#MAX_VALUE)
disconnectedBehavior 失去連線時候的行為 DEFAULT
sslOptions SSL配置 -
socketOptions Socket配置 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions 超時配置 -
publishOnScheduler 釋出反應式訊號資料的排程器 使用I/O執行緒

叢集客戶端屬性配置:

Redis叢集客戶端RedisClusterClient本身提供了配置屬性方法:

RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
                .enableAllAdaptiveRefreshTriggers()
                .build();

client.setOptions(ClusterClientOptions.builder()
                       .topologyRefreshOptions(topologyRefreshOptions)
                       .build());

叢集客戶端的配置屬性列表:

屬性 描述 預設值
enablePeriodicRefresh 是否允許週期性更新叢集拓撲檢視 false
refreshPeriod 更新叢集拓撲檢視週期 60秒
enableAdaptiveRefreshTrigger 設定自適應更新叢集拓撲檢視觸發器RefreshTrigger -
adaptiveRefreshTriggersTimeout 自適應更新叢集拓撲檢視觸發器超時設定 30秒
refreshTriggersReconnectAttempts 自適應更新叢集拓撲檢視觸發重連次數 5
dynamicRefreshSources 是否允許動態重新整理拓撲資源 true
closeStaleConnections 是否允許關閉陳舊的連線 true
maxRedirects 叢集重定向次數上限 5
validateClusterNodeMembership 是否校驗叢集節點的成員關係 true

使用連線池

引入連線池依賴commons-pool2

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.7.0</version>
</dependency

基本使用如下:

@Test
public void testUseConnectionPool() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    GenericObjectPool<StatefulRedisConnection<String, String>> pool
            = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
    try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
        RedisCommands<String, String> command = connection.sync();
        SetArgs setArgs = SetArgs.Builder.nx().ex(5);
        command.set("name", "throwable", setArgs);
        String n = command.get("name");
        log.info("Get value:{}", n);
    }
    pool.close();
    redisClient.shutdown();
}

其中,同步連線的池化支援需要用ConnectionPoolSupport,非同步連線的池化支援需要用AsyncConnectionPoolSupportLettuce5.1之後才支援)。

幾個常見的漸進式刪除例子

漸進式刪除Hash中的域-屬性:

@Test
public void testDelBigHashKey() throws Exception {
    // SCAN引數
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP遊標
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目標KEY
    String key = "BIG_HASH_KEY";
    prepareHashTestData(key);
    log.info("開始漸進式刪除Hash的元素...");
    int counter = 0;
    do {
        MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);
        // 重置TEMP遊標
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        Collection<String> fields = result.getMap().values();
        if (!fields.isEmpty()) {
            COMMAND.hdel(key, fields.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("漸進式刪除Hash的元素完畢,迭代次數:{} ...", counter);
}

private void prepareHashTestData(String key) throws Exception {
    COMMAND.hset(key, "1", "1");
    COMMAND.hset(key, "2", "2");
    COMMAND.hset(key, "3", "3");
    COMMAND.hset(key, "4", "4");
    COMMAND.hset(key, "5", "5");
}

漸進式刪除集合中的元素:

@Test
public void testDelBigSetKey() throws Exception {
    String key = "BIG_SET_KEY";
    prepareSetTestData(key);
    // SCAN引數
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP遊標
    ScanCursor cursor = ScanCursor.INITIAL;
    log.info("開始漸進式刪除Set的元素...");
    int counter = 0;
    do {
        ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);
        // 重置TEMP遊標
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<String> values = result.getValues();
        if (!values.isEmpty()) {
            COMMAND.srem(key, values.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("漸進式刪除Set的元素完畢,迭代次數:{} ...", counter);
}

private void prepareSetTestData(String key) throws Exception {
    COMMAND.sadd(key, "1", "2", "3", "4", "5");
}

漸進式刪除有序集合中的元素:

@Test
public void testDelBigZSetKey() throws Exception {
    // SCAN引數
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP遊標
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目標KEY
    String key = "BIG_ZSET_KEY";
    prepareZSetTestData(key);
    log.info("開始漸進式刪除ZSet的元素...");
    int counter = 0;
    do {
        ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);
        // 重置TEMP遊標
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<ScoredValue<String>> scoredValues = result.getValues();
        if (!scoredValues.isEmpty()) {
            COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("漸進式刪除ZSet的元素完畢,迭代次數:{} ...", counter);
}

private void prepareZSetTestData(String key) throws Exception {
    COMMAND.zadd(key, 0, "1");
    COMMAND.zadd(key, 0, "2");
    COMMAND.zadd(key, 0, "3");
    COMMAND.zadd(key, 0, "4");
    COMMAND.zadd(key, 0, "5");
}

在SpringBoot中使用Lettuce

個人認為,spring-data-redis中的API封裝並不是很優秀,用起來比較重,不夠靈活,這裡結合前面的例子和程式碼,在SpringBoot腳手架專案中配置和整合Lettuce。先引入依賴:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.8.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
            <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>5.1.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
</dependencies>        

一般情況下,每個應用應該使用單個Redis客戶端例項和單個連線例項,這裡設計一個腳手架,適配單機、普通主從、哨兵和叢集四種使用場景。對於客戶端資源,採用預設的實現即可。對於Redis的連線屬性,比較主要的有HostPortPassword,其他可以暫時忽略。基於約定大於配置的原則,先定製一系列屬性配置類(其實有些配置是可以完全共用,但是考慮到要清晰描述類之間的關係,這裡拆分多個配置屬性類和多個配置方法):

@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {

    private LettuceSingleProperties single;
    private LettuceReplicaProperties replica;
    private LettuceSentinelProperties sentinel;
    private LettuceClusterProperties cluster;

}

@Data
public class LettuceSingleProperties {

    private String host;
    private Integer port;
    private String password;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {

}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {

    private String masterId;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {

}

配置類如下,主要使用@ConditionalOnProperty做隔離,一般情況下,很少有人會在一個應用使用一種以上的Redis連線場景:

@RequiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {

    private final LettuceProperties lettuceProperties;

    @Bean(destroyMethod = "shutdown")
    public ClientResources clientResources() {
        return DefaultClientResources.create();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisClient singleRedisClient(ClientResources clientResources) {
        LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
        RedisURI uri = RedisURI.builder()
                .withHost(singleProperties.getHost())
                .withPort(singleProperties.getPort())
                .withPassword(singleProperties.getPassword())
                .build();
        return RedisClient.create(clientResources, uri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
        return singleRedisClient.connect();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisClient replicaRedisClient(ClientResources clientResources) {
        LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
        RedisURI uri = RedisURI.builder()
                .withHost(replicaProperties.getHost())
                .withPort(replicaProperties.getPort())
                .withPassword(replicaProperties.getPassword())
                .build();
        return RedisClient.create(clientResources, uri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public StatefulRedisConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient) {
        return replicaRedisClient.connect();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisClient sentinelRedisClient(ClientResources clientResources) {
        LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
        RedisURI uri = RedisURI.builder()
                .withHost(sentinelProperties.getHost())
                .withPort(sentinelProperties.getPort())
                .withPassword(sentinelProperties.getPassword())
                .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort())
                .withSentinelMasterId(sentinelProperties.getMasterId())
                .build();
        return RedisClient.create(clientResources, uri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public StatefulRedisConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient) {
        return sentinelRedisClient.connect();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisClusterClient redisClusterClient(ClientResources clientResources) {
        LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
        RedisURI uri = RedisURI.builder()
                .withHost(clusterProperties.getHost())
                .withPort(clusterProperties.getPort())
                .withPassword(clusterProperties.getPassword())
                .build();
        return RedisClusterClient.create(clientResources, uri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.cluster")
    public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {
        return clusterClient.connect();
    }
}

最後為了讓IDE識別我們的配置,可以新增IDE親緣性,/META-INF資料夾下新增一個檔案spring-configuration-metadata.json,內容如下:

{
  "properties": [
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceSingleProperties",
      "description": "單機配置",
      "sourceType": "club.