多級緩存在微服務的架構設計中可謂隨處可見,多級緩存作為提升系統高并發的常規手段,在各類大中小型的系統設計中都有體現;
下圖是一張簡單的服務端多級緩存設計示意圖,多級緩存的常用解決方案,像ehcache + redis,或caffeine + springcache等,即利用JVM內存緩存 + redis緩存配合;
一、緩存一致性問題
多級緩存帶來的好處是顯著的,一定程度上可以應對較高的并發,但隨之帶來了一個比較大的問題就是緩存一致性問題;
我們知道,JVM緩存屬于進程級的緩存,和當前服務實例是綁定的,而redis緩存可以作為分布式緩存,通常JVM緩存的是那些生命周期較短的熱點查詢數據,即過期時間不會太久,而redis緩存相對來說,過期時間相對長一點,JVM緩存通常作為服務端扛壓的第一道屏障,如果設置的過期時間太長,將會對JVM內存的開銷非常大,所以一般作為短頻使用;
設想這么一個場景,服務A采用多實例部署,這里假設部署了兩個節點,首次根據ID查詢一個用戶信息的對象數據將會同時被JVM緩存,同時也會被redis緩存,下一次過來同樣參數的請求時,首先走JVM緩存,查到了直接返回,否則走redis緩存;
上面是一個正常的關于緩存存取的過程,問題是,JVM緩存是同進程綁定的,如果第一個節點的數據發生了變更,比如刪除了,對于redis緩存來說,可以做到動態刷緩存的效果,但是redis緩存和本地緩存之間并沒有一種強同步的機制確保兩者的緩存保持一致;
甚至來說,第一個節點與第二個節點之間,兩者是無狀態的,當第一個節點上面的數據被刪除時,假如此刻并發的查詢請求到達第二個節點,JVM緩存查詢到必然是上一次緩存的數據;
于是,我們的問題就是,在多級緩存模式下,如何解決緩存一致性的問題呢?
二、一個簡單的案例
基于之前的一篇springcache 詳細使用和spring boot 二級緩存案例基礎上我們進行案例演示和改造;
在案例中,我們提供了幾個核心的接口:
- 根據用戶ID查詢用戶,并緩存到redis;
- 根據用戶ID查詢用戶,并緩存到JVM,這里采用caffeine;
- 根據用戶ID刪除用戶;
import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.annotation.JsonTypeInfo;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.MApperFeature;import com.fasterxml.jackson.databind.ObjectMapper;import com.fasterxml.jackson.databind.SerializationFeature;import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;import com.fasterxml.jackson.datatype.jsr310.JAVATimeModule;import com.github.benmanes.caffeine.cache.Caffeine;import org.springframework.cache.CacheManager;import org.springframework.cache.annotation.CachingConfigurerSupport;import org.springframework.cache.caffeine.CaffeineCacheManager;import org.springframework.cache.interceptor.KeyGenerator;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.data.redis.cache.RedisCacheConfiguration;import org.springframework.data.redis.cache.RedisCacheManager;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializationContext;import org.springframework.data.redis.serializer.StringRedisSerializer;import org.springframework.util.StringUtils;import java.lang.reflect.Method;import java.time.Duration;import java.util.concurrent.TimeUnit;@Configurationpublic class RedisConfig extends CachingConfigurerSupport {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);//使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(mapper);template.setValueSerializer(jackson2JsonRedisSerializer);//使用StringRedisSerializer來序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;* 分鐘級別* @param connectionFactory* @return@Bean("cacheManagerMinutes")public RedisCacheManager cacheManagerMinutes(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3 * 60L);return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 小時級別* @param connectionFactory* @return@Bean("cacheManagerHour")@Primarypublic RedisCacheManager cacheManagerHour(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3600L);return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 天級別* @param connectionFactory* @return@Bean("cacheManagerDay")public RedisCacheManager cacheManagerDay(RedisConnectionFactory connectionFactory){RedisCacheConfiguration configuration = instanceConfig(3600 * 24L);;return RedisCacheManager.builder(connectionFactory).cacheDefaults(configuration).transactionAware().build();* 正常時間的本地緩存@Bean("caffeineCacheManager")public CacheManager caffeineCacheManager() {CaffeineCacheManager cacheManager = new CaffeineCacheManager();cacheManager.setCaffeine(Caffeine.newBuilder().expireAfterWrite(50, TimeUnit.SECONDS).initialCapacity(256).maximumSize(10000));return cacheManager;private RedisCacheConfiguration instanceConfig(long ttl){Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);objectMapper.registerModule(new JavaTimeModule());objectMapper.configure(MapperFeature.USE_ANNOTATIONS,false);//只針對非空的值進行序列化objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);//將類型序列化到屬性的json字符串objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL,JsonTypeInfo.As.PROPERTY);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofSeconds(ttl)).disableCachingNullValues().serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));* 自定義key生成策略* @return@Bean("defaultSpringKeyGenerator")public KeyGenerator defaultSpringKeyGenerator(){return new KeyGenerator() {@Overridepublic Object generate(Object o, Method method, Object... objects) {String key = o.getClass().getSimpleName() + "_"+ method.getName() +"_"+ StringUtils.arrayToDelimitedString(objects,"_");System.out.println("key :" + key);return key;}2、配置文件開啟使用 springcache
spring:redis:host: localhostport: 6379cache:type: redis3、幾個核心接口1)根據用戶ID獲取用戶@GetMapping("/getById")public DbUser getById(String id){return dbUserService.getById(id);@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public DbUser getById(String id) {System.out.println("首次查詢走數據庫");DbUser dbUser = dbUserMapper.getByUserId(id);return dbUser;
2)根據用戶ID查詢用戶,并緩存到JVM;@GetMapping("/getByIdFromCaffeine")public DbUser getByIdFromCaffeine(String id){return dbUserService.getByIdFromCaffeine(id);@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "caffeineCacheManager")public DbUser getByIdFromCaffeine(String id) {System.out.println("查詢數據庫");DbUser dbUser = dbUserMapper.getByUserId(id);System.out.println("第一次走緩存");return dbUser;
3)根據用戶ID刪除用戶;@GetMapping("/deleteById")public String deleteById(String id){return dbUserService.deleteById(id);@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);return "delete success";
4、功能測試
首先在數據庫的 db_user 表準備一條測試數據
分別調用查詢用戶接口
1、 http://localhost:8083/getById?id=1 ;
多次刷新接口,sql只輸出了一次
2、 http://localhost:8083/getByIdFromCaffeine?id=1
多次刷新接口,sql只輸出了一次
從上面的結果可以看到,我們模擬了查詢數據分別緩存到了JVM內存和redis的效果,接下來,刪除當前這條數據,執行下面的接口
3、 http://localhost:8083/deleteById?id=1
再次調用第一個查詢用戶的接口,無返回數據,表明redis中緩存的結果被清理了,這是我們使用了springcache后,通過 CacheEvict 這個注解,會自動幫我們管理redis中的緩存;
但這時,再次調用查詢JVM緩存的接口,發現仍然可以從本地緩存中得到數據
4、 http://localhost:8083/getByIdFromCaffeine?id=1
基于上面的測試結果,可以看到,緩存一致性的問題就產生了,這里我故意將本地緩存的時間調整的長了一點,實際開發過程中,建議本地緩存的時間一般不要超過1分鐘;
三、解決方案一:清理redis緩存,同步清理本地緩存1、增加一個本地緩存操作的工具類import com.congge.config.SpringContextHolder;import org.springframework.cache.Cache;import org.springframework.cache.CacheManager;import java.util.Objects;public class CaffeineCacheUtils {private static CacheManager cm;static {cm = SpringContextHolder.getBean("caffeineCacheManager");* 添加緩存* @param cacheName 緩存名稱* @param key 緩存key* @param value 緩存值public static void put(String cacheName, String key, Object value) {Cache cache = cm.getCache(cacheName);cache.put(key, value);* 獲取緩存* @param cacheName 緩存名稱* @param key 緩存key* @returnpublic static Object get(String cacheName, String key) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;return Objects.requireNonNull(cache.get(key)).get();* 獲取緩存(字符串)* @param cacheName 緩存名稱* @param key 緩存key* @returnpublic static String getString(String cacheName, String key) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;Cache.ValueWrapper wrapper = cache.get(key);if (wrapper == null) {return null;return Objects.requireNonNull(wrapper.get()).toString();* 獲取緩存(泛型)* @param cacheName 緩存名稱* @param key 緩存key* @param clazz 緩存類* @param 返回值泛型* @returnpublic static T get(String cacheName, String key, Class clazz) {Cache cache = cm.getCache(cacheName);if (cache == null) {return null;Cache.ValueWrapper wrapper = cache.get(key);if (wrapper == null) {return null;return (T) wrapper.get();* 清理緩存* @param cacheName 緩存名稱* @param key 緩存keypublic static void evict(String cacheName, String key) {Cache cache = cm.getCache(cacheName);System.out.println(cache.getName());if (cache != null) {cache.evict(key);
2、刪除用戶接口中同步清理本地緩存
只需改造下刪除接口的服務實現方法即可
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();* 刪除,同時需要刪除相關的key* @param id* @return@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);caffeineCacheUtils.evict("dbUser",id);return "delete success";
3、方案優缺點優點
- 操作簡便;
- 只要參數傳入正確,就可以確保緩存一致性;
- 適合單機模式下使用
- 代碼產生了一定的耦合性;
- 不適合分布式環境使用;
- 需要手動管理key的相關參數;
對zookeeper有所了解和使用的同學,應該對zk的節點管理不陌生,zk作為一款分布式協調中間件,在很多分布式場景都有著廣泛的使用,比如實現集群選舉,分布式鎖,節點管理等等,利用zk的節點屬性,可以很好的解決這個問題;
使用zk的解決思路
- 查詢用戶接口中,注冊一個節點,節點命名最好和緩存的key保持一致;
- 刪除接口中,手動觸zk的節點刪除;
- zk監聽到刪除節點的事件變化時,同步清理本地緩存;
com.101teczkclient0.10org.slf4jslf4j-log4j12
2、提供一個zk節點操作工具類import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.serialize.SerializableSerializer;import org.Apache.zookeeper.CreateMode;public class ZkUtils {private ZkClient zkClient = null;private String node;/*public static void main(String[] args) {ZkUtils zkUtils = new ZkUtils();zkUtils.createNode(node);zkUtils.nodeExist(node);zkUtils.deleteNode(node);public ZkUtils(String node) {zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());//監聽節點變化//需要通過java修改zookeeper數據,才能監聽到zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {//節點數據變化時觸發@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println("change Node: " + s);System.out.println("change data: " + o);//節點數據刪除時觸發@Overridepublic void handleDataDeleted(String s) throws Exception {System.out.println("delete Node: " + s);* 創建zk節點* @param nodepublic void createNode(String node) {//創建持久節點String node1 = zkClient.create("/" + node, node, CreateMode.PERSISTENT);System.out.println(node1);* 修改zk節點數據* @param node* @param datapublic void writeNodeData(String node, String data) {zkClient.writeData("/" + node, 233);* 查詢zk節點* @param nodepublic boolean nodeExist(String node) {boolean exists = zkClient.exists("/" + node);return exists;* 查詢節點數據* @param node* @returnpublic String findNodeData(String node) {Object data = zkClient.readData("/" + node);System.out.println(data);return data.toString();* 刪除節點* @param nodepublic void deleteNode(String node) {boolean b2 = zkClient.deleteRecursive("/" + node);System.out.println(b2);
3、查詢用戶接口,注冊緩存的key對應的z-node節點@Override@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public DbUser getById(String id) {System.out.println("首次查詢走數據庫");DbUser dbUser = dbUserMapper.getByUserId(id);//FIXME 將緩存注冊到節點registerCacheNode(id);return dbUser;public void registerCacheNode(String id){String node = "user:" + id;ZkUtils zkUtils = new ZkUtils(node);zkUtils.createNode(node);
4、刪除用戶接口添加刪除zk節點邏輯@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);//刪除 z-node 節點String node = "user:" + id;ZkUtils zkUtils = new ZkUtils(node);zkUtils.deleteNode(node);return "delete success";
5、改造zk監聽邏輯,同步移除本地緩存private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();public ZkUtils(String node) {zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());//監聽節點變化//需要通過java修改zookeeper數據,才能監聽到zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {//節點數據變化時觸發@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println("change Node: " + s);System.out.println("change data: " + o);//節點數據刪除時觸發@Overridepublic void handleDataDeleted(String s) throws Exception {System.out.println("delete Node: " + s);caffeineCacheUtils.evict("dbUser","1");
6、測試1、啟動服務后,按照上面的測試步驟,分別調用2個查詢接口
通過控制臺輸出結果,可以看到節點數據注冊到zk中
2、調用刪除接口
此時zk的監聽邏輯中監聽到了節點數據變更的事件,在變更的邏輯中,我們將同步刪除本地緩存的數據;
再次調用時發現緩存已經被清理
通過上面的操作演示,實現了基于zk的節點注冊與事件監聽機制實現緩存一致性的問題處理;
五、解決方案三:使用redis事件訂閱與發布機制實現緩存同步
Redis 發布訂閱 (pub/sub) 是一種消息通信模式:發送者 (pub) 發送消息,訂閱者 (sub) 接收消息。
這種模式很像消息隊列的實現機制,服務端發布消息到topic,客戶端監聽topic的消息,并做自身的業務處理;
只不過在redis這里,不叫topic,而是channel,下面來看一個簡單的redis實現的發布訂閱使用
1、導入依賴org.springframework.bootspring-boot-starter-data-redis
2、自定義 RedisMessageListener
該類的功能和消息中間件中的監聽邏輯很相似
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;@Componentpublic class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 獲取消息byte[] messageBody = message.getBody();// 使用值序列化器轉換Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 獲取監聽的頻道byte[] channelByte = message.getChannel();// 使用字符串序列化器轉換Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名稱轉換String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---頻道---: " + channel);System.out.println("---消息內容---: " + msg);
3、自定義 RedisSubConfig
該類用于配置特定的channel,即監聽來自哪些channel的消息
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;@Configurationpublic class RedisSubConfig {@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//訂閱頻道redis.news 和 redis.life 這個container 可以添加多個 messageListenercontainer.addMessageListener(listener, new ChannelTopic("redis.life"));container.addMessageListener(listener, new ChannelTopic("redis.news"));return container;
4、最后編寫一個接口做測試@GetMapping("/testPublish")public void testPublish(){dbUserService.testPublish();@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void testPublish() {redisTemplate.convertAndSend("redis.life", "aaa");redisTemplate.convertAndSend("redis.news", "bbb");
調用下接口,可以看到控制臺輸出如下信息
通過上面的演示,快速了解了一下redis的這種發布訂閱模式的功能使用,下面就來使用這種方式來解決緩存一致性問題;
5、刪除用戶接口中向redis channel 推送消息@Override@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")public String deleteById(String id) {dbUserMapper.deleteByUserId(id);redisTemplate.convertAndSend("redis.user", id);return "delete success";
6、RedisMessageListener 改造
添加刪除本地緩存邏輯
@Overridepublic void onMessage(Message message, byte[] pattern) {CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();// 獲取消息byte[] messageBody = message.getBody();// 使用值序列化器轉換Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 獲取監聽的頻道byte[] channelByte = message.getChannel();// 使用字符串序列化器轉換Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名稱轉換String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---頻道---: " + channel);System.out.println("---消息內容---: " + msg);caffeineCacheUtils.evict("dbUser",patternStr);
7、模擬測試
啟動服務后,直接調用刪除用戶接口,可以看到,監聽邏輯中收到了一條消息,然后調用本地緩存工具類刪除本地緩存即可
六、解決方案四:使用消息隊列實現緩存同步
了解了redis發布訂閱這種方式實現原理后,如果再更換為消息中間件來實現就不難理解了,其實現的大致思路如下:
- 調用刪除接口刪除用戶;
- 向特定的隊列推送一條刪除消息;
- 在消息監聽邏輯中接收消息,并清理本地緩存
以rabbitmq為例,其核心實現如下:
@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver 接收到了消息 : " +msg);//執行本地緩存的刪除操作
關于rabbitmq的相關實現感興趣的同學可以參考:rabbbitmq 技術全解
七、總結
關于后3三種的實現,不僅可以解決緩存一致性問題,同時適用于分布式應用的場景,算是比較通用的解決方案,但這樣一來,引入了第三方組件,也增加了系統整體的復雜性,這一點需要在架構設計中進行綜合考量,結合小編本人的一些實踐經驗,比較推薦使用redis的發布訂閱模式,這種方式簡單高效,同時兼顧了避免引入更多的外部組件,可酌情參考。