什么是缓存?

在我们的日常生活中,总会有一些“常用但重复”的操作,比如每天早晨都去同一家早餐店吃饭。如果老板聪明,看到你过来,就提前准备好你常点的豆浆和油条。这种行为,在技术上有一个类似的概念,叫做缓存

缓存就是提前把“经常被需要的东西”放在离我们最近、最快的地方,比如内存,而不是每次都去慢腾腾的后厨(数据库或远程服务)重新制作。它的本质,是用时间换空间:你花费一点内存(空间)来存储数据,换取更快的访问速度。

为什么要使用缓存?

想象一下,如果没有缓存,你打开一个购物网站,每点击一个商品都要等上好几秒才能加载出来。这背后的原因是:每次请求都需要服务器去数据库里挖东西,然后整理好再返回给你。而数据库的速度,虽然比你去翻资料库快,但对于大流量场景来说,仍然不够快。

缓存的作用就在于,让那些常用的数据提前“躺”在内存里,等着被访问。举个例子,你查询某个热销商品的价格,系统第一次去数据库取数据,之后就把价格存进缓存。下次有人查询同样的商品,直接从缓存返回结果,只需毫秒级的时间。

会有哪些问题?

大文学家雨果曾说:“所有光明的背后,都有一片阴影。”缓存看似完美,实则也潜藏着几个问题,稍不注意,可能会给系统带来麻烦。

  1. 占用资源
    内存不是无穷的,缓存的数据越多,系统越容易出现资源不足的问题。就像书桌太乱,你找书的速度反而慢了。
  2. 数据一致性问题
    缓存的数据可能和数据库里的数据不一致。比如,商品的库存在数据库中更新了,但缓存没有同步刷新,结果用户买了已经售罄的商品。
  3. 缓存穿透、雪崩、击穿
    1. 缓存穿透:用户请求的数据根本不存在,每次都得去数据库查,缓存根本用不上。
    2. 缓存击穿:某些特别热门的数据突然过期,所有请求瞬间涌向数据库,直接把它“打穿”。
    3. 缓存雪崩:多个缓存同时失效,引发一连串的系统崩溃效应。

本篇文章将讲述如何使用redis做为缓存和如何解决上述问题。

1. redis作为缓存的基本使用

缓存的作用其实就是在客户端和数据库之间添加一个中间层,正如下面缓存作用模型所描画的那样。

当客户端想获取一些数据时,它会先“敲门”缓存系统,看看 Redis 中有没有这部分数据。如果 Redis 中已经有这部分数据,缓存会立刻响应,将数据返回给客户端。这个过程既快又省力,数据直接从内存中读取,几乎是瞬时完成的。如果 Redis 中没有找到需要的数据,缓存会无奈地转向数据库,发出请求获取数据。然后,数据库返回结果,Redis 将这个数据存起来(写缓存),方便下次直接使用。

1653322097736

接下来我们将按照右侧的流程图,完成一次redis做缓存的操作。

@Override  
public Result queryById(Long id) {  
    // 1. 从redis中查询id  
    String key = CACHE_SHOP_KEY + id;  
    Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(key);  
  
    // 2. 判断是否命中  
    if (!shopMap.isEmpty()){  
        // 命中,返回数据  
        Shop shop = new Shop();  
        BeanUtil.copyProperties(shopMap, shop);  
        return Result.ok(shop);  
    }  
  
    // 3. 未命中,查询数据库  
    Shop shop = getById(id);  
  
    // 4. 判断shop是否存在  
    if (shop == null) {  
        return Result.fail("店铺不存在");  
    }  
  
    // 5. 存在,将数据写入redis  
    Map<String, Object> shopMapSave = BeanUtil.beanToMap(shop, new HashMap<>(), CopyOptions.create()  
        .setIgnoreNullValue(true)  
        .setFieldValueEditor((field, value) -> value != null ? value.toString() : null));  
    stringRedisTemplate.opsForHash().putAll(key, shopMapSave);  
  
    // 6. 返回数据  
    return Result.ok(shop);  
}

2. 缓存更新策略

正如上面提及的“内存不是无穷的”,Redis 作为缓存,虽然高效,但它终究也是有限资源,不可能无限存储所有数据。这就需要我们聪明地规划缓存的“存”和“删”,以保证系统高效运行。下面是缓存更新的几种策略:

1653322506393

2.1 一致性问题

由于我们的缓存数据源来自数据库,而数据库中的数据是会发生变化的,因此,当数据库中的数据发生变化时,如果缓存没有及时同步更新,就会出现一致性问题。这种问题的后果是:

用户可能会获取到缓存中过时的数据,这就像多线程中的数据安全问题一样,可能导致业务逻辑错误,影响用户体验,甚至损害产品口碑。那么,如何解决这个问题呢?以下是几种常见的方案:

  1. Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案。
  2. Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理。
  3. Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致。

一般,我们采用第一种方案。

2.2 缓存更新的双写问题

在方案一种,操作缓存和数据库时需要注意几个问题,因为这涉及到多个系统之间的交互,特别是数据库和缓存:

2.2.1 删除缓存还是更新缓存?

首先,我们面临一个问题:每次数据库更新时,应该是删除缓存还是更新缓存呢?

  • 更新缓存
    每次更新数据库后,也更新缓存,将新的数据直接写入缓存。这种方式的优点是每次更新后缓存都能得到刷新,但问题是:如果缓存的更新不必要,比如数据并没有变化,那么就会发生无效写操作,浪费资源。

  • 删除缓存
    另一种方式是在更新数据库时删除缓存,让缓存失效。这样下次查询时,缓存会被重新加载。这种方式的优点是减少了不必要的更新操作,但缺点是,当缓存被删除后,下一次查询时,系统会重新访问数据库,这对性能有一定影响。

2.2.2. 如何保证缓存和数据库的操作同步成功或失败?

在分布式系统中,如何确保缓存与数据库的操作成功或失败的原子性,往往是个挑战。不同的系统架构会有不同的解决方法:

  • 单体系统
    在单体系统中,可以将缓存和数据库操作放在同一个事务里,确保操作的原子性。也就是说,要么两个操作都成功,要么都失败,避免数据不一致的情况。
  • 分布式系统
    在分布式系统中,操作缓存和数据库可能是分布在多个节点上的。这时可以使用一些分布式事务解决方案,比如TCC(Try-Confirm/Cancel),来保证缓存和数据库的操作具有一致性。TCC 通过分布式事务的机制,确保多个分布式操作的一致性和可靠性。

2.2.3. 先操作缓存还是先操作数据库?

假设有两个线程同时操作缓存和数据库,我们应该先操作数据库还是先操作缓存呢?

  • 先删除缓存,再操作数据库
    • 并发问题:当线程1删除缓存后,线程2开始执行,会导致缓存和数据库中的数据不一致,并且这种情况在实际情况中发生的概率比较高。
  • 先操作数据库,再删除缓存
    • 并发问题:恰好缓存失效,线程1来查询数据,必然是未命中的,因此需要查询数据库。但就在这是线程2来更新数据,更新数据完成后,线程1才开始写如何缓存。这种情况发生的概率会比较小:
        1. 满足线程1查询的时候,缓存刚好失效
        1. 相比与更新数据库,写入缓存的操作更快,因此图中在1、4之间执行2、3的概率较小

1653323595206

2.2.4 案例:实现商铺缓存与数据库双写一致

  1. queryById() --> 在写入redis时,设置ttl
stringRedisTemplate.opsForHash().putAll(key, shopMapSave);  
stringRedisTemplate.expire(key, CACHE_SHOP_TTL, TimeUnit.MINUTES); // 设置过期时间
  1. updateByIdWithRedis() --> 先修改数据库,再删缓存
@Override  
@Transactional  
public Result updateByIdWithRedis(Shop shop) {  
    // 1. 判断id是否为空  
    Long id = shop.getId();  
    if (id == null){  
        return Result.fail("店铺id不能为空");  
    }  
    // 2. 更新数据库  
    updateById(shop);  
    // 3. 删除缓存  
    stringRedisTemplate.delete(CACHE_SHOP_KEY + id);  
    return Result.ok();  
}

3. 缓存穿透

缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

3.1 解决方案

3.1.1 缓存空对象

  • 优点:实现简单,维护方便
  • 缺点:额外的内存消耗,可能导致短期不一致

在某些场景中,客户端可能会请求不存在的数据。通常情况下,客户端先查询 Redis 缓存,若缓存中没有数据,则查询数据库。如果数据库中也没有该数据,这就会发生“缓存穿透”,请求直接命中数据库。相比 Redis,数据库的并发处理能力较弱,大量类似请求可能导致数据库负载过高甚至宕机。

为解决这一问题,可以将数据库中不存在的数据作为空对象缓存到 Redis 中。这样,后续对该数据的请求会直接从 Redis 返回,而无需再访问数据库。这种方案实现起来简单,同时减少了数据库的压力。然而,它也有一定的局限性:首先,会额外占用 Redis 的内存;其次,在某些情况下可能造成数据的不一致。例如,后续数据库中添加了该数据,但缓存中的空对象并未及时更新。

3.1.2 布隆过滤器

  • 优点:内存占用少,无多余 key
  • 缺点:实现复杂,可能存在误判

布隆过滤器是一种基于哈希思想的数据结构,用于判断某个数据是否可能存在。它通过多个哈希函数将数据映射到一个二进制数组中。当请求到来时,布隆过滤器会判断数据是否存在于缓存中。若布隆过滤器判断数据可能存在:请求会继续访问 Redis,即使缓存中数据过期,查询仍会转向数据库,最终从数据库中获取数据并重新写入 Redis。若布隆过滤器判断数据不存在:请求会直接返回,避免无意义的缓存和数据库查询。

布隆过滤器的优点在于它占用内存少,可有效避免缓存穿透问题。然而,由于它的哈希算法可能存在冲突,因此会有一定的误判概率——即可能错误地认为某些不存在的数据是存在的。这种误判可能会导致一些请求不必要地访问数据库。

1653326156516

3.2 案例:解决商品查询的缓存穿透问题

思路:当数据库中确认数据不存在时,将该数据以空对象形式写入 Redis 缓存,并为其设置一个合理的过期时间。之后的请求再查询时,若命中空对象,直接返回空值响应,避免访问数据库。

1653327124561

@Override  
public Result queryById(Long id) {  
    // 1. 从redis中查询id  
    String key = CACHE_SHOP_KEY + id;  
    Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(key);  
  
    // 2. 判断是否命中  
    if (!shopMap.isEmpty()){  
        // 判断是否为空值  
        if(shopMap.containsKey("fail")){  
            return Result.fail("店铺不存在");  
        }  
  
        // 不为空值,返回店铺数据  
        Shop shop = new Shop();  
        BeanUtil.copyProperties(shopMap, shop);  
        return Result.ok(shop);  
    }  
  
    // 3. 未命中,查询数据库  
    Shop shop = getById(id);  
  
    // 4. 判断shop是否存在  
    if (shop == null) {  
        // 将空值写入redis  
        stringRedisTemplate.opsForHash().put(key, "fail", "");  
        stringRedisTemplate.expire(key, CACHE_NULL_TTL, TimeUnit.MINUTES);  
        return Result.fail("店铺不存在");  
    }  
  
    // 5. 存在,将数据写入redis  
    Map<String, Object> shopMapSave = BeanUtil.beanToMap(shop, new HashMap<>(),  
            CopyOptions.create()  
                    .setIgnoreNullValue(true)  
                    .setFieldValueEditor((field, value) -> value != null ? value.toString() : null));  
    stringRedisTemplate.opsForHash().putAll(key, shopMapSave);  
    stringRedisTemplate.expire(key, CACHE_SHOP_TTL, TimeUnit.MINUTES); // 设置过期时间  
  
    // 6. 返回数据  
    return Result.ok(shop);  
}

4. 缓存雪崩

缓存雪崩:在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案:

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

1653327884526

5. 缓存击穿

缓存击穿:一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。缓存击穿问题也叫热点Key问题。

问题分析:

如图所示,假设线程1发起请求时,发现缓存里没有这个数据(缓存未命中)。于是线程1按照正常流程,去查询数据库并重建缓存。假如线程1的工作能及时完成,后续的线程(线程2、线程3、线程4)就可以直接从缓存读取数据,一切看起来很美好。

但是,理想和现实总是有点差距。如果线程1重建缓存时耗时较长,还没来得及完成,就有更多线程(比如线程2、线程3、线程4)纷纷过来查询。它们也发现缓存未命中,接着都一窝蜂地去数据库查询,瞬间给数据库施加巨大的压力,甚至可能导致数据库崩溃。

1653328022622

5.1 解决方案

5.1.1 互斥锁

锁的“互斥性”可以有效防止多个线程同时访问数据库,避免数据库承受过大的压力。然而,虽然锁解决了问题,但也带来了一个副作用——查询性能从“并行”变成了“串行”,整体速度会变慢。不过,别担心,我们可以结合 tryLock 方法和“双重检查”(double check)机制来优化这一过程。

假设线程1来了,发现缓存没命中,但幸运地拿到了锁。那么,线程1可以愉快地执行数据库查询和缓存更新的逻辑。此时,如果线程2也来了,它会发现锁已被线程1占用,进而选择休眠等待。当线程1完成任务释放锁后,线程2醒来,重新尝试获取锁并检查缓存。由于线程1已经完成了数据的缓存重建,线程2就可以直接从缓存中读取数据了,无需再去访问数据库。

1653328288627

5.1.2 逻辑过期

我们之所以会出现这个缓存击穿问题,主要原因是在于我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。

我们把过期时间设置在 redis的value中,注意:这个过期时间并不会直接作用于redis,而是我们后续通过逻辑去处理。假设线程1去查询缓存,然后从value中判断出来当前的数据已经过期了,此时线程1去获得互斥锁。此时线程1会开启一个另外一个线程2去执行重构数据的逻辑,直到线程2完成这个逻辑后,才释放锁,而线程1直接进行返回旧数据。假设现在线程3过来访问,由于线程线程2持有锁,所以线程3无法获得锁,线程3也直接返回旧数据,只有等到新开的线程2把重建数据构建完后,其他线程才能返回正确的数据。

这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。

1653328663897

5.1.3 对比

互斥锁方案: 由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响

逻辑过期方案: 线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦

1653357522914

核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

1653357860001

我们注意到在redis中,使用setnx方法,如果redis中没有这个key,则插入成功,返回1,否则旧插入失败,返回0。也就是所我们可以通过这个东西来表示锁。在java中,与之对应的是setIfAbsent()方法。如果插入成功stringRedisTemplate将返回true,否则返回false。

5.2 案例:互斥锁解决商品查询的缓存击穿问题

核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

如果获取到了锁的线程,再去进行查询,查询后将数据写入redTis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

1653357860001 1

核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。

ShopServiceImpl.java

@Service  
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {  
  
    @Autowired  
    private StringRedisTemplate stringRedisTemplate;  
    @Override  
    public Result queryByIdWithMutex(Long id) {  
        // 1. 调用方法,封装为方法的原因是如果没有拿到锁,方便休眠重新执行。  
        Shop shop = queryWithMutex(id);  
        // 2. 判断店铺是否存在  
        if (shop == null) {  
            return Result.fail("店铺不存在");  
        }  
        return Result.ok(shop);  
    }  
  
    private Shop queryWithMutex(Long id) {  
        // 1. 从redis中查询id  
        String key = CACHE_SHOP_KEY + id;  
        Map<Object, Object> shopMap = stringRedisTemplate.opsForHash().entries(key);  
  
        // 2. 判断是否命中  
        if (!shopMap.isEmpty()){  
            // 判断是否为空值  
            if(shopMap.containsKey("fail")){  
                return null;  
            }  
  
            // 不为空值,返回店铺数据  
            Shop shop = new Shop();  
            BeanUtil.copyProperties(shopMap, shop);  
            return shop;  
        }  
  
        // 3. 未命中,尝试获取互斥锁  
        Shop shop = null;  
        try {  
            boolean flag = tryLock(LOCK_SHOP_KEY + id);  
            if (!flag){  
                // 为获取到锁,休眠后重试  
                Thread.sleep(50);  
                return queryWithMutex(id);  
            }  
  
            // 获取到了锁,查询数据库  
            shop = getById(id);  
  
            // 4. 判断shop是否存在  
            if (shop == null) {  
                // 将空值写入redis  
                stringRedisTemplate.opsForHash().put(key, "fail", "");  
                stringRedisTemplate.expire(key, CACHE_NULL_TTL, TimeUnit.MINUTES);  
                return null;            }  
  
            // 5. 存在,将数据写入redis  
            Map<String, Object> shopMapSave = BeanUtil.beanToMap(shop, new HashMap<>(),  
                    CopyOptions.create()  
                            .setIgnoreNullValue(true)  
                            .setFieldValueEditor((field, value) -> value != null ? value.toString() : null));  
            stringRedisTemplate.opsForHash().putAll(key, shopMapSave);  
            stringRedisTemplate.expire(key, CACHE_SHOP_TTL, TimeUnit.MINUTES); // 设置过期时间  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        } finally {  
            unlock(LOCK_SHOP_KEY + id);  
        }  
  
        // 6. 返回数据  
        return shop;  
    }  
  
    private boolean tryLock(String key){  
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);  
        return BooleanUtil.isTrue(flag);  
    }  
  
    private void unlock(String key) {  
        stringRedisTemplate.delete(key);  
    }  
}

ShopController.java

@GetMapping("/{id}")  
public Result queryShopById(@PathVariable("id") Long id) {  
    return shopService.queryByIdWithMutex(id);  
}

使用JMeter进行压力测试,使用1000个线程发送请求,实际进行数据库查询的只有一次

{604720AC F846 43E8 AEDA 64358BEABD8A}

{997514D3 55EA 4459 80C4 2DC940545116}

5.3 案例:逻辑过期解决商品查询的缓存击穿问题

思路分析:当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

1653360308731

因为现在redis中存储的数据的value需要带上过期时间,因此我们有两个选择:

  1. 修改原来的实体类
  2. 新建RedisData实体类

由于第一种方式对原来的代码有侵入性,因此我们选择第二种。

  1. 构建RedisData类
@Data  
public class RedisData {  
    private LocalDateTime expireTime;  
    private Object data;  
}
  1. 构建queryByIdWithLogicExpire()方法
private Shop queryWithLogicExpire(Long id) {  
    String key = CACHE_SHOP_KEY + id;  
    String redisJson = stringRedisTemplate.opsForValue().get(key);  
    if (StrUtil.isBlank(redisJson)) {  
        return null; // 未命中  
    }  
  
    // 反序列化  
    RedisData redisData = JSONUtil.toBean(redisJson, RedisData.class);  
    Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);  
    LocalDateTime expireTime = redisData.getExpireTime();  
  
    // 判断是否过期  
    if (expireTime.isAfter(LocalDateTime.now())) {  
        return shop; // 未过期  
    }  
  
    // 已过期,尝试重建缓存  
    if (tryLock(LOCK_SHOP_KEY + id)) {  
        CACHE_REBUILD_EXECUTOR.submit(() -> {  
            try {  
                saveShop2Redis(id, 20L);  
            } catch (Exception e) {  
                throw new RuntimeException(e);  
            } finally {  
                unlock(LOCK_SHOP_KEY + id);  
            }  
        });  
    }  
  
    // 返回旧数据  
    return shop;  
}  
  
public void saveShop2Redis(Long id, Long expireSeconds) throws InterruptedException {  
    Shop shop = getById(id);  
    // 模拟延时  
    Thread.sleep(200);  
    if (shop == null) {  
        RedisData redisData = new RedisData();  
        redisData.setData(null);  
        redisData.setExpireTime(LocalDateTime.now().plusMinutes(CACHE_NULL_TTL));  
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData), CACHE_NULL_TTL, TimeUnit.MINUTES);  
        return;    }  
    RedisData redisData = new RedisData();  
    redisData.setData(shop);  
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));  
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));  
}  
  
private boolean tryLock(String key) {  
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);  
    return BooleanUtil.isTrue(flag);  
}  
  
private void unlock(String key) {  
    stringRedisTemplate.delete(key);  
}
  1. 方法调用

ShopServiceImpl.java

@Override  
public Result queryByIdWithLogicExpire(Long id) {  
    // 1. 调用方法  
    Shop shop = queryWithLogicExpire(id);  
    // 2. 判断店铺是否存在  
    if (shop == null) {  
        return Result.fail("店铺不存在");  
    }  
    return Result.ok(shop);  
}

ShopController.java

@GetMapping("/{id}")  
public Result queryShopById(@PathVariable("id") Long id) {  
    return shopService.queryByIdWithLogicExpire(id);  
}

5.4 对逻辑过期方案的疑问

问:为什么对于未命中的数据就直接返回了,它不也应该去数据库中查询数据并保存到redis中吗?

ai:

  • 这种处理方式的考虑:

    • 逻辑过期解决缓存击穿通常用于"热点数据"的处理
    • 对于热点数据,我们会提前将数据预热到Redis中
    • 如果某个数据在Redis中完全不存在,说明它可能不是热点数据
  • 实际开发中的建议做法:

    • 如果是热点数据系统,保持现有逻辑
    • 如果是普通的缓存系统,应该修改为"缓存空值"的处理方式:

问:也就是说,在实际的情况下我们会将热点数据进行预热,在这种情况下根本不存在缓存穿透的问题(查不到就代表没有)

ai:

是的,你理解得很正确!让我详细解释一下:

  1. 热点数据的特点:
    • 预先知道哪些数据会被频繁访问
    • 系统启动时就预热到Redis中
    • 数据有更新时实时同步到Redis
  2. 在这种机制下:
    • Redis中没有数据 = 数据库中也没有该数据
    • 确实不存在缓存穿透问题
    • 不需要额外做缓存空值的处理
  3. 这种方案的应用场景:
    • 商品秒杀系统中的热门商品信息
    • 直播间的主播信息
    • 电商首页的热门品类
    • 节日活动中的活动商品
  4. 优点:
    • 性能最优,无需查询数据库
    • 系统更加稳定,避免了缓存击穿
    • 数据一致性容易保证
  5. 缺点:
    • 需要预先知道热点数据
    • 占用更多的Redis内存
    • 不适合数据频繁变化的场景

所以这段代码中直接返回null的逻辑,在纯热点数据的场景下是合理的。但如果系统既要处理热点数据又要处理普通数据,就需要区分处理策略了。

6. 封装工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Slf4j
@Component
public class CacheClient {

    private final StringRedisTemplate stringRedisTemplate;

    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public void set(String key, Object value, Long time, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }

    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
        // 设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        // 写入Redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    public <R,ID> R queryWithPassThrough(
            String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(json)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(json, type);
        }
        // 判断命中的是否是空值
        if (json != null) {
            // 返回一个错误信息
            return null;
        }

        // 4.不存在,根据id查询数据库
        R r = dbFallback.apply(id);
        // 5.不存在,返回错误
        if (r == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回错误信息
            return null;
        }
        // 6.存在,写入redis
        this.set(key, r, time, unit);
        return r;
    }

    public <R, ID> R queryWithLogicalExpire(
            String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isBlank(json)) {
            // 3.存在,直接返回
            return null;
        }
        // 4.命中,需要先把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5.判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())) {
            // 5.1.未过期,直接返回店铺信息
            return r;
        }
        // 5.2.已过期,需要缓存重建
        // 6.缓存重建
        // 6.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2.判断是否获取锁成功
        if (isLock){
            // 6.3.成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 查询数据库
                    R newR = dbFallback.apply(id);
                    // 重建缓存
                    this.setWithLogicalExpire(key, newR, time, unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    // 释放锁
                    unlock(lockKey);
                }
            });
        }
        // 6.4.返回过期的商铺信息
        return r;
    }

    public <R, ID> R queryWithMutex(
            String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(shopJson, type);
        }
        // 判断命中的是否是空值
        if (shopJson != null) {
            // 返回一个错误信息
            return null;
        }

        // 4.实现缓存重建
        // 4.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        R r = null;
        try {
            boolean isLock = tryLock(lockKey);
            // 4.2.判断是否获取成功
            if (!isLock) {
                // 4.3.获取锁失败,休眠并重试
                Thread.sleep(50);
                return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
            }
            // 4.4.获取锁成功,根据id查询数据库
            r = dbFallback.apply(id);
            // 5.不存在,返回错误
            if (r == null) {
                // 将空值写入redis
                stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
                // 返回错误信息
                return null;
            }
            // 6.存在,写入redis
            this.set(key, r, time, unit);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            // 7.释放锁
            unlock(lockKey);
        }
        // 8.返回
        return r;
    }

    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}