Redis 缓存穿透的解决技巧与实践

Redis 是一种快速且高效的缓存工具,在很多 Web 应用程序中都被广泛使用。但是,在实现 Redis 缓存时,我们可能会遇到一些问题,例如缓存穿透。

缓存穿透是指当一个请求查询一个缓存中不存在的数据时,由于缓存未能返回结果,该请求将直接传递给数据库层。如果该请求由于某种原因持续发生,将大大降低服务器的性能,因为它会对数据库进行大量的查询,而数据库很可能无法承受这种负载。

本篇文章将介绍 Redis 缓存穿透的解决技巧和实践,以及如何实现这些技巧和跟踪缺少的数据,从而使 Redis 缓存更加高效和优秀。

Redis 缓存穿透的解决技巧

布隆过滤器

布隆过滤器是一种旨在帮助我们快速透视缓存中是否存在指定数据的算法。在缓存中打开布隆过滤器时,缓存会先将所有的键存储在这个布隆过滤器中。当一个新的缓存查询到来时,Redis 首先看一下布隆过滤器,看看它是否已经常常该请求。如果是,Redis 将返回一个虚假的结果,告诉请求者它不存在于缓存中。如果不是,将继续执行查询并将结果存储在缓存中。

这种技巧的优点是快速、有效,且不会影响正常的缓存查询,缺点是有一定误报率,即有可能将存在于缓存中的数据错误地报为缓存外的数据。但是我们可以通过调整布隆过滤器的大小和 hash 函数的数量,来调整误报率。所以布隆过滤器是一种非常高效的解决 Redis 缓存穿透的技巧。

下面是使用 Python 实现布隆过滤器的示例代码:

import redis
import mmh3
from bitarray import bitarray


class BloomFilter:
    def __init__(self, r, n):
        self.r = r
        self.n = n
        self.p = 0.0001
        self.m = int(self.get_size())
        self.k = int(self.get_hash_count())
        self.bitarray = bitarray(self.m)
        self.redis = redis.Redis(host='localhost', port=6379)

    def add_to_redis(self, key, value):
        self.redis.set(key, value)

    def get_from_redis(self, key):
        return self.redis.get(key)

    def add(self, item):
        for salt in range(self.k):
            index = mmh3.hash(item, salt) % self.m
            self.bitarray[index] = True
            self.add_to_redis(index, True)

    def query(self, item):
        for salt in range(self.k):
            index = mmh3.hash(item, salt) % self.m
            if not self.get_from_redis(index):
                return False

        return True

    def get_size(self):
        return -1 * ((self.n * math.log(self.p)) / (math.log(2) ** 2))

    def get_hash_count(self):
        return (self.m / self.n) * math.log(2)

预处理

在遭受缓存穿透攻击时,可以考虑在 Redis 缓存中用一些随机的、程序生成的“填充值”占用缓存中的一些键。例如,我们可以用类似于以下示例代码来预填充需要缓存的数据:

import redis


redis_client = redis.Redis()

def get_data(key):
    data = redis_client.get(key)
    if data is None:
        if is_valid_data(key):
            data = lookup_data(key)
            redis_client.set(key, data)
        else:
            redis_client.set(key, '#NA')
            data = None
    else:
        if data == '#NA':
            return None
        else:
            return data

    return data

在这个示例代码中,我们为 Redis 缓存中不存在的所有键占用了“#Na”值,这样它们就会被视为缓存中的数据,不会再次进行数据库查询。

缓存雪崩

缓存雪崩是指在某一特定时间段缓存服务器中有大量的数据过期或者某个组件宕机,导致一些数据没有缓存到内存中。解决这个问题最简单的方式就是,增加缓存层的冗余度,将数据分散到不同的缓存服务器上。这种方式不仅可以防止缓存雪崩,还可以提高可扩展性、可循环性和可维护性。

Redis 缓存穿透的实践

下面是一个 Python 示例,用于演示如何使用 Redis 做缓存和布隆过滤器存储,以避免缓存穿透的问题:

import redis
import requests
import json
import mmh3
from bitarray import bitarray


def is_valid_data(key):
    url = "https://some_api.com/api/v1/is_valid?key={}".format(key)
    response = requests.get(url)

    if response.status_code == 200:
        data = json.loads(response.text)
        return data['valid']

    return False


def lookup_data(key):
    url = "https://some_api.com/api/v1/get_data?key={}".format(key)
    response = requests.get(url)

    if response.status_code == 200:
        data = json.loads(response.text)
        return data['data']

    return None


class RedisCache(object):
    def __init__(self):
        self.redis = redis.StrictRedis(host='localhost', port=6379, db=0)

    def set(self, key, value, expiration=None):
        self.redis.set(key, value)
        if expiration:
            self.redis.expire(key, expiration)

    def get(self, key):
        return self.redis.get(key)


class BloomFilter:
    def __init__(self, r, n):
        self.r = r
        self.n = n
        self.p = 0.0001
        self.m = int(self.get_size())
        self.k = int(self.get_hash_count())
        self.bitarray = bitarray(self.m)
        self.redis = redis.Redis(host='localhost', port=6379)

    def add_to_redis(self, key, value):
        self.redis.set(key, value)

    def get_from_redis(self, key):
        return self.redis.get(key)

    def add(self, item):
        for salt in range(self.k):
            index = mmh3.hash(item, salt) % self.m
            self.bitarray[index] = True
            self.add_to_redis(index, True)

    def query(self, item):
        for salt in range(self.k):
            index = mmh3.hash(item, salt) % self.m
            if not self.get_from_redis(index):
                return False

        return True

    def get_size(self):
        return -1 * ((self.n * math.log(self.p)) / (math.log(2) ** 2))

    def get_hash_count(self):
        return (self.m / self.n) * math.log(2)


class CacheWrapper(object):
    def __init__(self, cache=None):
        self.cache = cache or RedisCache()
        self.filter = BloomFilter(100000, 0.0001)

    def get(self, key):
        if self.filter.query(key):
            data = self.cache.get(key)
            if data is None:
                return None
            else:
                return json.loads(data)

        return None

    def set(self, key, value, expiration=None):
        self.filter.add(key)
        return self.cache.set(key, json.dumps(value), expiration)

总结

Redis 缓存穿透是一个常见的问题,但是我们可以使用一些技巧和实践来避免这个问题。这篇文章介绍了一些解决 Redis 缓存穿透问题的技巧,如布隆过滤器和预处理,以及实现缓存穿透解决方案的示例代码。我们可以根据实际情况来选择适合我们业务需求的解决方案。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65b14ce8add4f0e0ffa923ff