使用 Redis 对用户 IP 进行接口限流

烟雨 2年前 (2023-07-28) 阅读数 19474 #Spring
文章标签 SpringRedis

一、思路

使用接口限流的主要目的在于提高系统的稳定性,防止接口被恶意打击(短时间内大量请求)。

比如要求某接口在1分钟内请求次数不超过1000次,那么应该如何设计代码呢?

下面讲两种思路,如果想看代码可直接翻到后面的代码部分。

1.1 固定时间段(旧思路)

1.1.1 思路描述

该方案的思路是:使用Redis记录固定时间段内某用户IP访问某接口的次数,其中:

  • Redis的key:用户IP + 接口方法名
  • Redis的value:当前接口访问次数。

当用户在近期内第一次访问该接口时,向Redis中设置一个包含了用户IP和接口方法名的key,value的值初始化为1(表示第一次访问当前接口)。同时,设置该key的过期时间(比如为60秒)。

之后,只要这个key还未过期,用户每次访问该接口都会导致value自增1次。

用户每次访问接口前,先从Redis中拿到当前接口访问次数,如果发现访问次数大于规定的次数(如超过1000次),则向用户返回接口访问失败的标识。

image.png

1.1.2 思路缺陷

该方案的缺点在于,限流时间段是固定的。

比如要求某接口在1分钟内请求次数不超过1000次,观察以下流程:

image.png

image.png

可以发现,00:59和01:01之间仅仅间隔了2秒,但接口却被访问了1000+999=1999次,是限流次数(1000次)的2倍!

所以在该方案中,限流次数的设置可能不起作用,仍然可能在短时间内造成大量访问。

1.2 滑动窗口(新思路)

1.2.1 思路描述

为了避免出现方案1中由于键过期导致的短期访问量增大的情况,我们可以改变一下思路,也就是把固定的时间段改成动态的:

假设某个接口在10秒内只允许访问5次。用户每次访问接口时,记录当前用户访问的时间点(时间戳),并计算前10秒内用户访问该接口的总次数。如果总次数大于限流次数,则不允许用户访问该接口。这样就能保证在任意时刻用户的访问次数不会超过1000次。

如下图,假设用户在0:19时间点访问接口,经检查其前10秒内访问次数为5次,则允许本次访问。

image.png

假设用户0:20时间点访问接口,经检查其前10秒内访问次数为6次(超出限流次数5次),则不允许本次访问。

image.png

1.2.2 Redis部分的实现

1)选用何种 Redis 数据结构

首先是需要确定使用哪个Redis数据结构。用户每次访问时,需要用一个key记录用户访问的时间点,而且还需要利用这些时间点进行范围检查。

2)为何选择 zSet 数据结构

为了能够实现范围检查,可以考虑使用Redis中的zSet有序集合。

添加一个zSet元素的命令如下:

ZADD [key] [score] [member]

它有一个关键的属性score,通过它可以记录当前member的优先级。

于是我们可以把score设置成用户访问接口的时间戳,以便于通过score进行范围检查。key则记录用户IP和接口方法名,至于member设置成什么没有影响,一个member记录了用户访问接口的时间点。因此member也可以设置成时间戳。

3)zSet 如何进行范围检查(检查前几秒的访问次数)

思路是,把特定时间间隔之前的member都删掉,留下的member就是时间间隔之内的总访问次数。然后统计当前key中的member有多少个即可。

① 把特定时间间隔之前的member都删掉。

zSet有如下命令,用于删除score范围在[min~max]之间的member:

Zremrangebyscore [key] [min] [max]

假设限流时间设置为5秒,当前用户访问接口时,获取当前系统时间戳为currentTimeMill,那么删除的score范围可以设置为:

min = 0
max = currentTimeMill - 5 * 1000

相当于把5秒之前的所有member都删除了,只留下前5秒内的key。

② 统计特定key中已存在的member有多少个。

zSet有如下命令,用于统计某个key的member总数:

 ZCARD [key]

统计的key的member总数,就是当前接口已经访问的次数。如果该数目大于限流次数,则说明当前的访问应被限流。

二、代码实现

主要是使用注解 + AOP的形式实现。

2.1.1 限流注解

Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface FixedTimeRateLimiter {
    /**
     * 定义redis中限流key前缀 例如:rate_limit:com.xxx.controller.HelloController-hello //HelloController中的hello方法
     */
    String key() default "ft_rate_limit:";

    /**
     * 限流时间,单位秒
     * @return
     */
    int time() default 60;

    /**
     * 限流时间内限流次数
     * @return
     */
    int count() default 100;

    /**
     * 限流类型:1.限制接口访问次数 2.限制ip访问次数
     * @return
     */
    LimitType limitType() default LimitType.DEFAULT;
}

2.1.2 定义lua脚本

resources/lua下新建limit.lua

- 获取redis键
local key = KEYS[1]
-- 获取第一个参数(次数)
local count = tonumber(ARGV[1])
-- 获取第二个参数(时间)
local time = tonumber(ARGV[2])
-- 获取当前流量
local current = redis.call('get', key);
-- 如果current值存在,且值大于规定的次数,则拒绝放行(直接返回当前流量)
if current and tonumber(current) > count then
    return tonumber(current)
end
-- 如果值小于规定次数,或值不存在,则允许放行,当前流量数+1  (值不存在情况下,可以自增变为1)
current = redis.call('incr', key);
-- 如果是第一次进来,那么开始设置键的过期时间。
if tonumber(current) == 1 then 
    redis.call('expire', key, time);
end
-- 返回当前流量
return tonumber(current)

2.1.3 注入Lua执行脚本

关键代码是limitScript()方法

@Configuration
public class RedisScriptConfig {
    /**
     * 定义lua脚本
     */
    @Bean
    public DefaultRedisScript<Long> limitScript(){
        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
        script.setResultType(Long.class);//设置lua脚本返回值类型 需要同lua脚本中返回值一致
        script.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua\\rateLimit.lua")));//读取lua文件
        return script;
    }
}

Redis相关配置:

/**
* 开启缓存支持
*/
@Slf4j
@EnableCaching
@Configuration
public class RedisConfig{
	/**
	 * RedisTemplate配置
	 * @param lettuceConnectionFactory
	 * @return
	 */
	@Bean
	public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
		log.info(" --- redis config init --- ");
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = jacksonSerializer();
		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
		redisTemplate.setConnectionFactory(lettuceConnectionFactory);
		RedisSerializer<String> stringSerializer = new StringRedisSerializer();

		// key序列化
		redisTemplate.setKeySerializer(stringSerializer);
		// value序列化
		redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
		// Hash key序列化
		redisTemplate.setHashKeySerializer(stringSerializer);
		// Hash value序列化
		redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
		redisTemplate.afterPropertiesSet();
		return redisTemplate;
	}

	/**
	 * StringRedisTemplate配置
	 * @param lettuceConnectionFactory
	 * @return
	 */
	@Bean
	public StringRedisTemplate setStringredisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
		log.info(" --- StringRedisTemplate config init --- ");
		Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = jacksonSerializer();
		StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
		stringRedisTemplate.setConnectionFactory(lettuceConnectionFactory);
		RedisSerializer<String> stringSerializer = new StringRedisSerializer();
		// key序列化
		stringRedisTemplate.setKeySerializer(stringSerializer);
		// value序列化
		stringRedisTemplate.setValueSerializer(stringSerializer);
		// Hash key序列化
		stringRedisTemplate.setHashKeySerializer(stringSerializer);
		// Hash value序列化
		stringRedisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
		stringRedisTemplate.afterPropertiesSet();
		return stringRedisTemplate;
	}

	private Jackson2JsonRedisSerializer<Object> jacksonSerializer() {
		Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
		ObjectMapper objectMapper = new ObjectMapper();
		objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
		//objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
		objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
				ObjectMapper.DefaultTyping.NON_FINAL,
				JsonTypeInfo.As.WRAPPER_ARRAY);

		jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
		return jackson2JsonRedisSerializer;
	}
}

2.1.3 定义Aop切面类

/**
 * 固定时间段内限流
 */
@Slf4j
@Aspect
@Component
public class FixedTimeRateLimitAspect {

    @Autowired
    RedisTemplate<String, Object> redisTemplate;
    @Autowired
    RedisScript<Long> redisScript; //实现类为DefaultRedisScript<Long> limitScript()

    @Pointcut("@annotation(com.zender.limiting.annotation.FixedTimeRateLimiter)")
    public void pointCut() {
    }

    @Before("pointCut()")
    public void beforeRateLimit(JoinPoint jp) {
        // 获取RateLimiter注解上的值
        MethodSignature methodSignature = (MethodSignature) jp.getSignature();
        FixedTimeRateLimiter rateLimiter = AnnotationUtils.findAnnotation(methodSignature.getMethod(), FixedTimeRateLimiter.class);
        int time = rateLimiter.time();
        int count = rateLimiter.count();
        // 构建redis中的key值
        String rateKey = getRateLimitKey(rateLimiter, methodSignature);
        log.info("redis中key值:" + rateKey);
        try {
            Long current = redisTemplate.execute(redisScript, Collections.singletonList(rateKey), time, count);
            if (current == null || current.intValue() > count) {
                log.error("固定时间段限流:限制请求数'{}',当前请求数'{}',缓存key'{}'", count, current, rateKey);
                throw new BizException(BusinessCodes.InterFaceFlow.IFF0001);
            }
        } catch (Exception e) {
            throw e;
        }


    }

    /**
     * redis中key两种类型格式为:
     * 1.  ft_rate_limit:com.xxx.controller.HelloController-hello
     * 2.  ft_rate_limit:127.0.0.1-com.xxx.controller.HelloController-hello
     *
     * @return
     */
    private String getRateLimitKey(FixedTimeRateLimiter rateLimiter, MethodSignature methodSignature) {
        StringBuilder key = new StringBuilder(rateLimiter.key());
        if (rateLimiter.limitType() == LimitType.IP) {//如果参数类型为IP
            // 获取客户端ip
            String clientIP = ServletUtil.getClientIP(((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest());
            key.append(clientIP).append("-");
        }
        Method method = methodSignature.getMethod();
        // 获取全类名
        String className = method.getDeclaringClass().getName();
        // 获取方法名
        String methodName = method.getName();
        key.append(className).append("-").append(methodName);
        // log.info("全类名+方法名 " + className + "-" + methodName);
        return key.toString();
    }
}

2.2 滑动窗口思路

2.2.1 限流注解

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimiter {
    /**
     * 定义redis中限流key前缀 例如:rate_limit:com.xxx.controller.HelloController-hello //HelloController中的hello方法
     */
    String key() default "rate_limit:";

    /**
     * 限流时间,单位秒
     * @return
     */
    int time() default 60;

    /**
     * 限流时间内限流次数
     * @return
     */
    int count() default 100;

    /**
     * 限流类型:1.限制接口访问次数 2.限制ip访问次数
     * @return
     */
    LimitType limitType() default LimitType.DEFAULT;
}

2.2.2 定义Aop切面类

/**
 * 滑动时间段内限流
 */
@Slf4j
@Aspect
@Component
public class RateLimiterAspect {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Pointcut("@annotation(com.zender.limiting.annotation.RateLimiter)")
    public void pointCut() {
    }


    /**
     * 实现限流(新思路)
     *
     * @param jp
     */
    @SuppressWarnings("unchecked")
    @Before("pointCut()")
    public void doBefore(JoinPoint jp) {
        // 获取RateLimiter注解上的值
        MethodSignature methodSignature = (MethodSignature) jp.getSignature();
        RateLimiter rateLimiter = AnnotationUtils.findAnnotation(methodSignature.getMethod(), RateLimiter.class);
        // 在 {time} 秒内仅允许访问 {count} 次。
        int time = rateLimiter.time();
        int count = rateLimiter.count();
        // 根据用户IP(可选)和接口方法,构造key
        String rateKey = getCombineKey(rateLimiter, jp);

        // 限流逻辑实现
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        // 记录本次访问的时间结点
        long currentMs = System.currentTimeMillis();
        zSetOperations.add(rateKey, currentMs, currentMs);
        // 这一步是为了防止member一直存在于内存中
        redisTemplate.expire(rateKey, time, TimeUnit.SECONDS);
        // 移除{time}秒之前的访问记录(滑动窗口思想)
        zSetOperations.removeRangeByScore(rateKey, 0, currentMs - time * 1000L);

        // 获得当前窗口内的访问记录数
        Long current = zSetOperations.zCard(rateKey);
        // 限流判断
        if (current != null && current > count) {
            log.error("滑动窗口限流:限制请求数'{}',当前请求数'{}',缓存key'{}'", count, current, rateKey);
            throw new BizException(BusinessCodes.InterFaceFlow.IFF0001);
        }
    }

    /**
     * 把用户IP和接口方法名拼接成 redis 的 key
     *
     * @param point       切入点
     * @return 组合key
     */
    private String getCombineKey(RateLimiter rateLimiter, JoinPoint point) {
        StringBuilder key = new StringBuilder(rateLimiter.key());
        if (rateLimiter.limitType() == LimitType.IP) {
            // 如果参数类型为IP
            // 获取客户端ip
            String clientIP = ServletUtil.getClientIP(((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest());
            key.append(clientIP).append("-");
        }
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();

        // 获取全类名
        String className = method.getDeclaringClass().getName();
        // 获取方法名
        String methodName = method.getName();
        key.append("-").append(className).append("-").append(methodName);
        // log.info("全类名 + 方法名 " + className + "-" + methodName);
        return key.toString();
    }
}

分别测试即可:

@RestController
public class TestController {
	@GetMapping("/hello")
    @RateLimiter(time = 10,count = 3,limitType = LimitType.DEFAULT) //10秒内允许访问三次
    public String hello(){
        return "hello";
    }

    @GetMapping("/hello2")
    @FixedTimeRateLimiter(time = 10,count = 3,limitType = LimitType.DEFAULT) //10秒内允许访问三次
    public String hello2(){
        return "hello2";
    }
}

image.png

image.png

版权声明

非特殊说明,本文由Zender原创或收集发布,欢迎转载。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

作者文章
热门