服务器之家

服务器之家 > 正文

基于Redis实现分布式应用限流的方法

时间:2021-03-06 11:35     来源/作者:冷冷gg

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。

前几天在dd的公众号,看了一篇关于使用 瓜娃 实现单应用限流的方案 --》原文,参考《redis in action》 实现了一个jedis版本的,都属于业务层次限制。 实际场景中常用的限流策略:

nginx接入层限流

按照一定的规则如帐号、ip、系统调用逻辑等在nginx层面做限流

业务应用系统限流

通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它可以限制一项资源最多能同时被多少进程访问。

代码实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import redis.clients.jedis.jedis;
import redis.clients.jedis.transaction;
import redis.clients.jedis.zparams;
import java.util.list;
import java.util.uuid;
 
/**
 * @email wangiegie@gmail.com
 * @data 2017-08
 */
public class redisratelimiter {
  private static final string bucket = "bucket";
  private static final string bucket_count = "bucket_count";
  private static final string bucket_monitor = "bucket_monitor";
 
  static string acquiretokenfrombucket(
      jedis jedis, int limit, long timeout) {
    string identifier = uuid.randomuuid().tostring();
    long now = system.currenttimemillis();
    transaction transaction = jedis.multi();
 
    //删除信号量
    transaction.zremrangebyscore(bucket_monitor.getbytes(), "-inf".getbytes(), string.valueof(now - timeout).getbytes());
    zparams params = new zparams();
    params.weightsbydouble(1.0,0.0);
    transaction.zinterstore(bucket, params, bucket, bucket_monitor);
 
    //计数器自增
    transaction.incr(bucket_count);
    list<object> results = transaction.exec();
    long counter = (long) results.get(results.size() - 1);
 
    transaction = jedis.multi();
    transaction.zadd(bucket_monitor, now, identifier);
    transaction.zadd(bucket, counter, identifier);
    transaction.zrank(bucket, identifier);
    results = transaction.exec();
    //获取排名,判断请求是否取得了信号量
    long rank = (long) results.get(results.size() - 1);
    if (rank < limit) {
      return identifier;
    } else {//没有获取到信号量,清理之前放入redis 中垃圾数据
      transaction = jedis.multi();
      transaction.zrem(bucket_monitor, identifier);
      transaction.zrem(bucket, identifier);
      transaction.exec();
    }
    return null;
  }
}

调用

测试接口调用

?
1
2
3
4
5
6
7
8
9
10
11
@getmapping("/")
public void index(httpservletresponse response) throws ioexception {
  jedis jedis = jedispool.getresource();
  string token = redisratelimiter.acquiretokenfrombucket(jedis, limit, timeout);
  if (token == null) {
    response.senderror(500);
  }else{
    //todo 你的业务逻辑
  }
  jedispool.returnresource(jedis);
}

优化

使用拦截器 + 注解优化代码

拦截器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@configuration
static class webmvcconfigurer extends webmvcconfigureradapter {
  private logger logger = loggerfactory.getlogger(webmvcconfigurer.class);
  @autowired
  private jedispool jedispool;
 
  public void addinterceptors(interceptorregistry registry) {
    registry.addinterceptor(new handlerinterceptoradapter() {
      public boolean prehandle(httpservletrequest request, httpservletresponse response,
                   object handler) throws exception {
        handlermethod handlermethod = (handlermethod) handler;
        method method = handlermethod.getmethod();
        ratelimiter ratelimiter = method.getannotation(ratelimiter.class);
        if (ratelimiter != null){
          int limit = ratelimiter.limit();
          int timeout = ratelimiter.timeout();
          jedis jedis = jedispool.getresource();
          string token = redisratelimiter.acquiretokenfrombucket(jedis, limit, timeout);
          if (token == null) {
            response.senderror(500);
            return false;
          }
          logger.debug("token -> {}",token);
          jedis.close();
        }
        return true;
      }
    }).addpathpatterns("/*");
  }
}

定义注解

?
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * @email wangiegie@gmail.com
 * @data 2017-08
 * 限流注解
 */
 
@target(elementtype.method)
@retention(retentionpolicy.runtime)
@documented
public @interface ratelimiter {
  int limit() default 5;
  int timeout() default 1000;
}

使用

?
1
2
3
4
@ratelimiter(limit = 2, timeout = 5000)
@getmapping("/test")
public void test() {
}

并发测试

工具:apache-jmeter-3.2

说明: 没有获取到信号量的接口返回500,status是红色,获取到信号量的接口返回200,status是绿色。

当限制请求信号量为2,并发5个线程:

基于Redis实现分布式应用限流的方法

当限制请求信号量为5,并发10个线程:

基于Redis实现分布式应用限流的方法

资料

基于reids + lua的实现

总结

  1. 对于信号量的操作,使用事务操作。
  2. 不要使用时间戳作为信号量的排序分数,因为在分布式环境中,各个节点的时间差的原因,会出现不公平信号量的现象。
  3. 可以使用把这块代码抽成@ratelimiter注解,然后再方法上使用就会很方便啦
  4. 不同接口的流控,可以参考源码的里面redisratelimiterplus,无非是每个接口生成一个监控参数
  5. 源码:boding1-pig-cloud.rar

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://my.oschina.net/giegie/blog/1525931

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
Intellij idea2020永久破解,亲测可用!!!
Intellij idea2020永久破解,亲测可用!!! 2020-07-29
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
返回顶部