1. 程式人生 > >SpringBoot整合Spring-data-redis實現集中式快取

SpringBoot整合Spring-data-redis實現集中式快取

從框架的角度來看,儲存在Redis中的資料只是位元組。雖然說Redis支援多種資料型別,但是那只是意味著儲存資料的方式,而不是它所代表的內容。由我們將這些資料轉化成字串或者是其他物件。我們通過org.springframework.data.redis.serializer. RedisSerializer將自定義的物件資料和儲存在Redis上的原始資料之間相互轉換,顧名思義,它處理的就是序列化的過程。

先看一下RedisSerializer介面

public interface RedisSerializer<T> {

	/**
	 * 把一個物件序列化二進位制資料
	 */
	byte[] serialize(T t) throws SerializationException;

	/**
	 * 通過給定的二進位制資料反序列化成物件
	 */
	T deserialize(byte[] bytes) throws SerializationException;
}

注意這裡作者提示我們:Redis does not accept null keys or values but can return null replies (fornon existing keys). 大致意思Redis不接受key為null,但是對於那些不存在的key,會返回null。但是這裡可以採用官方提供的org.springframework.cache.support.NullValue作為null的佔位符.

NullValue原始碼如下:

public final class NullValue implements Serializable {

	static final Object INSTANCE = new NullValue();

	private static final long serialVersionUID = 1L;


	private NullValue() {
	}

	private Object readResolve() {
		return INSTANCE;
	}

}

下面是RedisSerializer介面的幾種實現方式:


首先在RedisTemplate中,我們可以看到afterPropertiesSet()方法

public void afterPropertiesSet() {
		super.afterPropertiesSet();
		boolean defaultUsed = false;
		if (defaultSerializer == null) {
			defaultSerializer = new JdkSerializationRedisSerializer(
					classLoader != null ? classLoader : this.getClass().getClassLoader());
		}
		if (enableDefaultSerializer) {
			if (keySerializer == null) {
				keySerializer = defaultSerializer;
				defaultUsed = true;
			}
			if (valueSerializer == null) {
				valueSerializer = defaultSerializer;
				defaultUsed = true;
			}
			if (hashKeySerializer == null) {
				hashKeySerializer = defaultSerializer;
				defaultUsed = true;
			}
			if (hashValueSerializer == null) {
				hashValueSerializer = defaultSerializer;
				defaultUsed = true;
			}
		}
		if (enableDefaultSerializer && defaultUsed) {
			Assert.notNull(defaultSerializer, "default serializer null and not all serializers initialized");
		}
		if (scriptExecutor == null) {
			this.scriptExecutor = new DefaultScriptExecutor<K>(this);
		}
		initialized = true;
	} 

這個方法時org.springframework.beans.factory .InitializingBean介面宣告的一個方法,這個介面主要就是做一些初始化動作,或者檢查已經設定好bean的屬性。或者在XML裡面加入一個init-method,這裡我們可以知道預設的RedisSerializer就是JdkSerializationRedisSerializer,StringRedisTemplate的預設序列化全是public class StringRedisTemplate extendsRedisTemplate<String, String>

public StringRedisTemplate() {
		RedisSerializer<String> stringSerializer = new StringRedisSerializer();
		setKeySerializer(stringSerializer);
		setValueSerializer(stringSerializer);
		setHashKeySerializer(stringSerializer);
		setHashValueSerializer(stringSerializer);
	}

public class StringRedisSerializer implements RedisSerializer<String> {

	private final Charset charset;

	public StringRedisSerializer() {
		this(Charset.forName("UTF8"));
	}

	public StringRedisSerializer(Charset charset) {
		Assert.notNull(charset);
		this.charset = charset;
	}

	public String deserialize(byte[] bytes) {
		return (bytes == null ? null : new String(bytes, charset));
	}

	public byte[] serialize(String string) {
		return (string == null ? null : string.getBytes(charset));
	}
}

RedisCacheManager

RedisCacheManager的父類是AbstractTransactionSupportingCacheManager,有名字可以知道是對事務支援的一個CacheManager,預設是不會感知事務

privateboolean transactionAware = false;

對此官網的解釋是:

Set whether this CacheManager should exposetransaction-aware Cache objects.

Default is "false". Set this to"true" to synchronize cache put/evict

operations with ongoing Spring-managedtransactions, performing the actual cache

put/evict operation only in theafter-commit phase of a successful transaction.

大致意思是可感知事務的意思,put,evict,意味著會改變cache,所以put,evict操作必須一個事務(同步操作),其他執行緒必須等正在進行put,evict操作的執行緒執行完,才能緊接著操作。

再往上抽取的類就是AbstractCacheManager

取幾個比較經典的方法:


// Lazy cache initialization on access
	@Override
	public Cache getCache(String name) {
		Cache cache = this.cacheMap.get(name);
		if (cache != null) {
			return cache;
		}
		else {
			// Fully synchronize now for missing cache creation...
			synchronized (this.cacheMap) {
				cache = this.cacheMap.get(name);
				if (cache == null) {
					cache = getMissingCache(name);
					if (cache != null) {
						cache = decorateCache(cache);
						this.cacheMap.put(name, cache);
						updateCacheNames(name);
					}
				}
				return cache;
			}
		}
	}

這個cacheMap就是非常經典的併發容器ConcurrentHashMap,它Spring自帶管理cache的工具,每個Java開發人員都應該去讀一下它的實現思想。。。

private finalConcurrentMap<String, Cache> cacheMap = newConcurrentHashMap<String, Cache>(16);

不用說,我們直接可以想到肯定set管理cache的name。

private volatileSet<String> cacheNames = Collections.emptySet();

關於volatile,不用多說,原子性,可見性每個Java開發人員都應該理解的。。

這個synchronized,不用多說。。也是必須理解的- -getMissingCache()這個方法預設返回null,決定權交給其實現者,可以根據name建立,也可以記錄日誌什麼的。

	protected Cache getMissingCache(String name) {
		return null;
	}

decorateCache()這個方法顧名思義就是裝飾這個cache,預設直接返回,就是我們經典的裝飾模式,IO類庫的設計裡面也有這個裝飾模式。所以說常用的設計模式也必須要掌握啊。

	protected Cache decorateCache(Cache cache) {
		return cache;
	}

這裡就在子類AbstractTransactionSupportingCacheManager,裡面去根據isTransactionAware欄位去判斷是否進行事務可感知來修飾這個cache。

	@Override
	protected Cache decorateCache(Cache cache) {
		return (isTransactionAware() ? new TransactionAwareCacheDecorator(cache) : cache);
	}

updateCacheNames()這個方法

	private void updateCacheNames(String name) {
		Set<String> cacheNames = new LinkedHashSet<String>(this.cacheNames.size() + 1);
		cacheNames.addAll(this.cacheNames);
		cacheNames.add(name);
		this.cacheNames = Collections.unmodifiableSet(cacheNames);
	}

一個有順序的set集合,最後用Collections包裝成一個不能修改的set檢視,LinkedHashSet也是非常有必要去了解一下底層原理的。。

配置RedisCacheManager非常簡單,首先RedisCacheManager依賴RedisTemplate,RedisTemplate又依賴於連線工廠,這裡就是我們的RedisConnectionFactory的實現類

JedisConnectionFactory,關於這個連線工廠:

Note: Though the database index isconfigurable, the JedisConnectionFactory only supports connecting to one Redisdatabase at a time.

Because Redis is single threaded, you areencouraged to set up multiple instances of Redis instead of using multipledatabases within a single process. This allows you to get better CPU/resourceutilization.

大意就是:雖然資料庫索引是可配置的,但JedisConnectionFactory只支援一次連線到一個Redis資料庫。由於Redis是單執行緒的,因此建議您設定多個Redis例項,而不是在一個程序中使用多個數據庫。這可以讓你獲得更好的CPU /資源利用率。

預設是下面配置:

·        hostName=”localhost”

·        port=6379

·        timeout=2000 ms

·        database=0

·        usePool=true

先看下屬性

//Redis具體操作的類
	@SuppressWarnings("rawtypes")//
	private final RedisOperations redisOperations;
	//是否使用字首修飾cache
	private boolean usePrefix = false;
	// usePrefix = true的時候使用預設的字首DefaultRedisCachePrefix,是:
	private RedisCachePrefix cachePrefix = new DefaultRedisCachePrefix();
	//遠端載入快取
	private boolean loadRemoteCachesOnStartup = false;
	//當super的快取不存在時,是否建立快取,false的話就不會去建立快取
	private boolean dynamic = true;

	// 0 - never expire  永不過期
	private long defaultExpiration = 0;
	// 針對專門的key設定快取過期時間
	private Map<String, Long> expires = null;
	//快取的名字集合
	private Set<String> configuredCacheNames;

這裡官方強烈建議我們開啟使用字首。redisCacheManager.setUsePrefix(true),因為這裡預設為false。

               /**
		 * @param cacheName must not be {@literal null} or empty.
		 * @param keyPrefix can be {@literal null}.
		 */
		public RedisCacheMetadata(String cacheName, byte[] keyPrefix) {

			hasText(cacheName, "CacheName must not be null or empty!");
			this.cacheName = cacheName;
			this.keyPrefix = keyPrefix;

			StringRedisSerializer stringSerializer = new StringRedisSerializer();

			// name of the set holding the keys
			this.setOfKnownKeys = usesKeyPrefix() ? new byte[] {} : stringSerializer.serialize(cacheName + "~keys");
			this.cacheLockName = stringSerializer.serialize(cacheName + "~lock");
		}

這裡我們通過追蹤原始碼可以看見構造RedisCaheMetdata的setOfKnownKeys時候會生成一個字尾為~keys的key,而這個key的在Redis中型別是zset,它是維護已知key的一個有序set,底層是LinkedHashSet。同時我們也會在官網中看到:

By default RedisCacheManager does notprefix keys for cache regions, which can lead to an unexpected growth of a ZSETused to maintain known keys. It’s highly recommended to enable the usage ofprefixes in order to avoid this unexpected growth and potential key clashesusing more than one cache region.

大致意思就是預設情況下,RedisCacheManager不會為快取區域建立字首,這樣會導致維護管理已知的那些key的那個zset會急劇增長(ps:這個zset的name就是上面說的setOfKnownKeys)。因此強烈建議開啟預設字首,以免這個zset意外增長以及使用多個緩衝區域帶來的潛在衝突。

關於這個cacheLockName,是cache名稱字尾為~lock的key,作為一把鎖存放在Redis伺服器上。而RedisCache其中clear方法用於清除當前cache塊中所有的元素,這裡會加鎖,而鎖的實現就是在伺服器上面放剛才key是cacheLockName的元素,最後清除鎖則是在clear方法執行完成後在finally中清除。 put與get方法執行時會檢視是否存在lock鎖,存在則會sleep 300毫秒。這個過程會一直繼續,直到redis伺服器上不存在鎖時才會進行相應的get與put操作,這裡存在一個問題,如果clear方法執行時間很長,這時當前執行clear操作的機子掛了,就導致lock元素一直存在於redis伺服器上。

 之後就算這個機子重新啟動後,也無法正常使用cache。原因是:get與put方法在執行時,鎖lock始終存在於redis伺服器上,所以在使用時應當小心避免這種問題。下面可以追蹤下原始碼看下:

在RedisCache類中的靜態抽象內部類LockingRedisCacheCallback<T>中,我們可以看見在Cache的元資料中設定鎖。

@Override
		public T doInRedis(RedisConnection connection) throws DataAccessException {

			if (connection.exists(metadata.getCacheLockKey())) {
				return null;
			}
			try {
				connection.set(metadata.getCacheLockKey(), metadata.getCacheLockKey());
				return doInLock(connection);
			} finally {
				connection.del(metadata.getCacheLockKey());
			}
		}

RedisCache類中的靜態抽象內部類中,static abstract classAbstractRedisCacheCallback<T> 

private long WAIT_FOR_LOCK_TIMEOUT = 300;
	protected boolean waitForLock(RedisConnection connection) {

			boolean retry;
			boolean foundLock = false;
			do {
				retry = false;
			if(connection.exists(cacheMetadata.getCacheLockKey())) {
					foundLock = true;
					try {
						Thread.sleep(WAIT_FOR_LOCK_TIMEOUT);
					} catch (InterruptedException ex) {
						Thread.currentThread().interrupt();
					}
					retry = true;
				}
			} while (retry);

			return foundLock;
		}

connection.exists(cacheMetadata.getCacheLockKey()就是判斷哪個鎖是否還在Redis中。下面我們可以簡單測試下:

    @RequestMapping(value = "save/{key}.do", method = RequestMethod.POST)
    @Cacheable(value = "cache",key = "#key",condition = "#key !=  ''")
    public String save( @PathVariable String key)  {
    	System.out.println("走資料庫");
    	System.out.println(cacheManager.getCacheNames());
    	return "succful";
    }

當我們這樣設定的時候

redisCacheManager.setUsePrefix(false);

redisCacheManager.setDefaultExpiration(60*30);

這時候就會產生一個cache名+~keys的一個zset維護key的名字的一個集合

redisCacheManager.setUsePrefix(true);

redisCacheManager.setCachePrefix(new MyRedisCachePrefix());

這個MyRedisCachePrefix實現了MyRedisCachePrefix介面,預設是DefaultRedisCachePrefix()是:作為分隔符,這裡只是換成了#。同時這時候我們的這些快取都有了名稱空間cache加上我們自定義的#分隔符,防止了快取的衝突。

上面對於值的序列化都統一採用了Jackson2JsonRedisSerializer

template.setValueSerializer(jackson2JsonRedisSerializer);,對於key的序列化採用了

StringRedisSerializer。

對於序列的化採用是跟String互動的多就用StringRedisSerializer,儲存POLO類的Json資料時就用Jackson2JsonRedisSerializer。

對於data-redis封裝Cache來說,好處是非常明顯的,既可以很方便的快取物件,相比較於SpringCache、Ecache這些程序級別的快取來說,現在快取的記憶體的是使用redis的記憶體,不會消耗JVM的記憶體,提升了效能。當然這裡Redis不是必須的,換成其他的快取伺服器一樣可以,只要實現Spring的Cache類,並配置到XML裡面就行。和原生態的jedis相比,只要方法上加上註解,就可以實現快取,對於應用開發人員來說,使用快取變的簡單。同時也有利於對不同的業務快取進行分組統計、監控。


附錄:SpringBoot整合data-redis

Redis單節點:

 @Bean  
    public RedisConnectionFactory redisConnectionFactory() {  
        JedisConnectionFactory cf = new JedisConnectionFactory();  
        cf.setHostName("10.188.182.140");  
        cf.setPort(6379);  
        cf.setPassword("root");  
        cf.afterPropertiesSet();  
        return cf;  
}

或者在application.properites、application.yml檔案裡配置

Spring.redis.host: 172.26.223.153 

Spring.redis.port: 6379 

哨兵模式:

類註解:@RedisSentinelConfiguration或者@ PropertySource

配置檔案

·        spring.redis.sentinel.master:mymaster

·        spring.redis.sentinel.nodes: 127.0.0.1:6379

@Bean
public RedisConnectionFactory jedisConnectionFactory() {
  RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration() .master("mymaster")
  .sentinel("127.0.0.1", 26379) .sentinel("127.0.0.1", 26380);
  return new JedisConnectionFactory(sentinelConfig);
}

叢集模式:

·        spring.redis.cluster.nodes:node1,node2…….

·        spring.redis.cluster.max-redirects: 叢集之間最大重定向次數

@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class ClusterConfigurationProperties {

    /*
     * spring.redis.cluster.nodes[0] = 127.0.0.1:7379
     * spring.redis.cluster.nodes[1] = 127.0.0.1:7380
     * ...
     */
    List<String> nodes;

    /**
     * Get initial collection of known cluster nodes in format {@code host:port}.
     *
     * @return
     */
    public List<String> getNodes() {
        return nodes;
    }

    public void setNodes(List<String> nodes) {
        this.nodes = nodes;
    }
}
@Configuration
public class AppConfig {

    /**
     * Type safe representation of application.properties
     */
    @Autowired ClusterConfigurationProperties clusterProperties;

    public @Bean RedisConnectionFactory connectionFactory() {

        return new JedisConnectionFactory(
            new RedisClusterConfiguration(clusterProperties.getNodes()));
    }
}

開啟快取,注意預設@SpringBootApplication會掃描當前同級目錄及其子目錄的帶有@Configuration的類(僅僅支援1.2+版本的springboot,之前是有三個註解@Configuration@ComponentScan@EnableAtuoConfiguration),當啟動類上使用@ComponentScan註解的時候就只會掃描你自定義的基礎包。當有xml.檔案的時候,建議在@Configuration類上面

@ImportResource({"classpath:xxx.xml","classpath:yyy.xml"})匯入

又或者當你的你@Configuration配置類沒有預設在@SpringBootApplication掃描的路徑下,可以使用@Import({xxx.class,yyy.class})

@Configuration
@EnableCaching 
public class RedisConfig extends CachingConfigurerSupport {
	    @Bean
    public CacheManager cacheManager(@SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {
        RedisCacheManager redisCacheManager = new RedisCacheManager(redisTemplate);
        redisCacheManager.setDefaultExpiration(60*30);
        redisCacheManager.setTransactionAware(true);
        redisCacheManager.setUsePrefix(true);
        redisCacheManager.setCachePrefix(new MyRedisCachePrefix());
        return redisCacheManager;
    }
    @Bean
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值
		Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //使用StringRedisSerializer來序列化和反序列化redis的key值
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }


}