风之飏 發表於 2025-6-15 15:52:00

Spring Cloud Gateway实现分布式限流和熔断降级

<p>小伙伴们,你们好呀!我是老寇!一起学习学习gateway限流和熔断降级</p>
<h1 id="一限流">一、限流</h1>
<p><strong>思考:为啥需要限流?</strong></p>
<p>在一个流量特别大的业务场景中,如果不进行限流,会造成系统宕机,当大批量的请求到达后端服务时,会造成资源耗尽【CPU、内存、线程、网络带宽、数据库连接等是有限的】,进而拖垮系统。</p>
<h2 id="1常见限流算法">1.常见限流算法</h2>
<ul>
<li>漏桶算法</li>
<li>令牌桶算法</li>
</ul>
<h3 id="11漏桶算法不推荐">1.1漏桶算法(不推荐)</h3>
<p><img src="https://p0-xtjj-private.juejin.cn/tos-cn-i-73owjymdk6/b6693e0051554887bdf9d5434fe9aecf~tplv-73owjymdk6-jj-mark-v1:0:0:0:0:5o6Y6YeR5oqA5pyv56S-5Yy6IEAgS-elng==:q75.awebp?policy=eyJ2bSI6MywidWlkIjoiMTYwOTQ1ODA4ODczNTM0In0%3D&amp;rk3s=f64ab15b&amp;x-orig-authkey=f32326d3454f2ac7e96d3d06cdbb035152127018&amp;x-orig-expires=1750577544&amp;x-orig-sign=HeuvI4uL%2BZk4j0IcThvKkFRFyNE%3D"></p>
<h4 id="111原理">1.1.1.原理</h4>
<p>将请求缓存到一个队列中,然后以固定的速度处理,从而达到限流的目的</p>
<h4 id="112实现">1.1.2.实现</h4>
<p>将请求装到一个桶中,桶的容量为固定的一个值,当桶装满之后,就会将请求丢弃掉,桶底部有一个洞,以固定的速率流出。</p>
<h4 id="113举例">1.1.3.举例</h4>
<p>桶的容量为1W,有10W并发请求,最多只能将1W请求放入桶中,其余请求全部丢弃,以固定的速度处理请求</p>
<h4 id="114缺点">1.1.4.缺点</h4>
<p>处理突发流量效率低(处理请求的速度不变,效率很低)</p>
<h3 id="12令牌桶算法推荐">1.2.令牌桶算法(推荐)</h3>
<p><img src="https://p0-xtjj-private.juejin.cn/tos-cn-i-73owjymdk6/a9bed4e305734e29aa69459513c0c45b~tplv-73owjymdk6-jj-mark-v1:0:0:0:0:5o6Y6YeR5oqA5pyv56S-5Yy6IEAgS-elng==:q75.awebp?policy=eyJ2bSI6MywidWlkIjoiMTYwOTQ1ODA4ODczNTM0In0%3D&amp;rk3s=f64ab15b&amp;x-orig-authkey=f32326d3454f2ac7e96d3d06cdbb035152127018&amp;x-orig-expires=1750577544&amp;x-orig-sign=Hx4AEd%2FQbtxJwGQTTfBnStBUFak%3D"></p>
<h5 id="121原理">1.2.1.原理</h5>
<p>将请求放在一个缓冲队列中,拿到令牌后才能进行处理</p>
<h5 id="122实现">1.2.2.实现</h5>
<p>装令牌的桶大小固定,当令牌装满后,则不能将令牌放入其中;每次请求都会到桶中拿取一个令牌才能放行,没有令牌时即丢弃请求/继续放入缓存队列中等待</p>
<h4 id="123举例">1.2.3.举例</h4>
<p>桶的容量为10w个,生产1w个/s,有10W的并发请求,以每秒10W个/s速度处理,随着桶中的令牌很快用完,速度又慢慢降下来啦,而生产令牌的速度趋于一致1w个/s</p>
<h4 id="124缺点">1.2.4.缺点</h4>
<p>处理突发流量提供了系统性能,但是对系统造成了一定的压力,桶的大小不合理,甚至会压垮系统(处理1亿的并发请求,将桶的大小设置为1,这个系统一下就凉凉啦)</p>
<h2 id="2网关限流spring-cloud-gateway--redis实战">2.网关限流(Spring Cloud Gateway + Redis实战)</h2>
<h3 id="21pomxml配置">2.1.pom.xml配置</h3>
<pre><code class="language-xml">      &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
            &lt;artifactId&gt;spring-boot-starter-data-redis-reactive&lt;/artifactId&gt;
      &lt;/dependency&gt;
      &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
            &lt;artifactId&gt;spring-cloud-starter-gateway&lt;/artifactId&gt;
            &lt;exclusions&gt;
                &lt;exclusion&gt;
                  &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
                  &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
                &lt;/exclusion&gt;
            &lt;/exclusions&gt;
      &lt;/dependency&gt;
    &lt;dependency&gt;
      &lt;groupId&gt;org.apache.httpcomponents&lt;/groupId&gt;
      &lt;artifactId&gt;httpclient&lt;/artifactId&gt;
    &lt;/dependency&gt;
</code></pre>
<h3 id="22yaml配置">2.2.yaml配置</h3>
<pre><code class="language-yaml">spring:
application:
    name: laokou-gateway
cloud:
    gateway:
      routes:
      - id: LAOKOU-SSO-DEMO
          uri: lb://laokou-sso-demo
          predicates:
          - Path=/sso/**
          filters:
          - StripPrefix=1
          - name: RequestRateLimiter #请求数限流,名字不能乱打
            args:
            key-resolver: "#{@ipKeyResolver}"
            redis-rate-limiter.replenishRate: 1 #生成令牌速率-设为1方便测试
            redis-rate-limiter.burstCapacity: 1 #令牌桶容量-设置1方便测试
redis:
    database: 0
    cluster:
      nodes: x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005,x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005
    password: laokou #密码
    timeout: 6000ms #连接超时时长(毫秒)
    jedis:
      pool:
      max-active: -1 #连接池最大连接数(使用负值表示无极限)
      max-wait: -1ms #连接池最大阻塞等待时间(使用负值表示没有限制)
      max-idle: 10 #连接池最大空闲连接
      min-idle: 5 #连接池最小空间连接
</code></pre>
<h3 id="23创建bean">2.3.创建bean</h3>
<pre><code class="language-java">@Configuration
public class RequestRateLimiterConfig {

    @Bean(value = "ipKeyResolver")
    public KeyResolver ipKeyResolver(RemoteAddressResolver remoteAddressResolver) {
            return exchange -&gt; Mono.just(remoteAddressResolver.resolve(exchange).getAddress().getHostAddress());
    }

    @Bean
    public RemoteAddressResolver remoteAddressResolver() {
            // 远程地址解析器
            return XForwardedRemoteAddressResolver.trustAll();
    }

}
</code></pre>
<h2 id="3测试限流编写java并发测试">3.测试限流(编写java并发测试)</h2>
<pre><code class="language-java">@Slf4j
public class HttpUtil {
public static void apiConcurrent(String url,Map&lt;String,String&gt; params) {
      Integer count = 200;
      //创建线程池
      ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.SECONDS, new SynchronousQueue&lt;&gt;());
      //同步工具
      CountDownLatch latch = new CountDownLatch(count);
      Map&lt;String,String&gt; dataMap = new HashMap&lt;&gt;(1);
      dataMap.put("authorize","XXXXXXX");
      for (int i = 0; i &lt; count; i++) {
            pool.execute(() -&gt; {
                try {
                  //访问网关的API接口
                  HttpUtil.doGet("http://localhost:1234/sso/laokou-demo/user",dataMap);
                } catch (IOException e) {
                  e.printStackTrace();
                }finally {
                  latch.countDown();
                }
            });
      }
      try {
            latch.await();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }

public static String doGet(String url, Map&lt;String, String&gt; params) throws IOException {
      //创建HttpClient对象
      CloseableHttpClient httpClient = HttpClients.createDefault();
      String resultString = "";
      CloseableHttpResponse response = null;
      try {
            //创建uri
            URIBuilder builder = new URIBuilder(url);
            if (!params.isEmpty()) {
                for (Map.Entry&lt;String, String&gt; entry : params.entrySet()) {
                  builder.addParameter(entry.getKey(), entry.getValue());
                }
            }
            URI uri = builder.build();
            //创建http GET请求
            HttpGet httpGet = new HttpGet(uri);
            List&lt;NameValuePair&gt; paramList = new ArrayList&lt;&gt;();
            RequestBuilder requestBuilder = RequestBuilder.get().setUri(new URI(url));
            requestBuilder.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
            httpGet.setHeader(new BasicHeader("Content-Type", "application/json;charset=UTF-8"));
            httpGet.setHeader(new BasicHeader("Accept", "*/*;charset=utf-8"));
            //执行请求
            response = httpClient.execute(httpGet);
            //判断返回状态是否是200
            if (response.getStatusLine().getStatusCode() == 200) {
                resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
            }
      } catch (Exception e) {
            log.info("调用失败:{}",e);
      } finally {
            if (response != null) {
                response.close();
            }
            httpClient.close();
      }
      log.info("打印:{}",resultString);
      return resultString;
    }
}
</code></pre>
<p><img src="https://p0-xtjj-private.juejin.cn/tos-cn-i-73owjymdk6/d2c4919fbc3c43e1930374a83ebd539d~tplv-73owjymdk6-jj-mark-v1:0:0:0:0:5o6Y6YeR5oqA5pyv56S-5Yy6IEAgS-elng==:q75.awebp?policy=eyJ2bSI6MywidWlkIjoiMTYwOTQ1ODA4ODczNTM0In0%3D&amp;rk3s=f64ab15b&amp;x-orig-authkey=f32326d3454f2ac7e96d3d06cdbb035152127018&amp;x-orig-expires=1750577544&amp;x-orig-sign=RZu3Ty2d08Eq1bTjaYhR4nBHHhE%3D"><br>
<img src="https://p0-xtjj-private.juejin.cn/tos-cn-i-73owjymdk6/d759020bce8c4792b7c9a24acf540a30~tplv-73owjymdk6-jj-mark-v1:0:0:0:0:5o6Y6YeR5oqA5pyv56S-5Yy6IEAgS-elng==:q75.awebp?policy=eyJ2bSI6MywidWlkIjoiMTYwOTQ1ODA4ODczNTM0In0%3D&amp;rk3s=f64ab15b&amp;x-orig-authkey=f32326d3454f2ac7e96d3d06cdbb035152127018&amp;x-orig-expires=1750577544&amp;x-orig-sign=cgHTrLrIum0Q5OSaF1xmEj1Xtao%3D"></p>
<p><strong>说明这个网关限流配置是没有问题的</strong></p>
<h2 id="4源码查看">4.源码查看</h2>
<p>Spring Cloud Gateway RequestRateLimiter GatewayFilter Factory文档地址</p>
<p>工厂 <code>RequestRateLimiter GatewayFilter</code>使用一个<code>RateLimiter</code>实现来判断当前请求是否被允许继续。如果不允许,<code>HTTP 429 - Too Many Requests</code>则返回默认状态。</p>
<h3 id="41查看-requestratelimitergatewayfilterfactory">4.1.查看 RequestRateLimiterGatewayFilterFactory</h3>
<pre><code class="language-java">        @Override
        public GatewayFilter apply(Config config) {
                KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
                RateLimiter&lt;Object&gt; limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
                boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
                HttpStatusHolder emptyKeyStatus = HttpStatusHolder
                                .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

                return (exchange, chain) -&gt; resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -&gt; {
                        if (EMPTY_KEY.equals(key)) {
                                if (denyEmpty) {
                                        setResponseStatus(exchange, emptyKeyStatus);
                                        return exchange.getResponse().setComplete();
                                }
                                return chain.filter(exchange);
                        }
                        String routeId = config.getRouteId();
                        if (routeId == null) {
                                Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                                routeId = route.getId();
                        }
               // 执行限流
                        return limiter.isAllowed(routeId, key).flatMap(response -&gt; {

                                for (Map.Entry&lt;String, String&gt; header : response.getHeaders().entrySet()) {
                                        exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
                                }

                                if (response.isAllowed()) {
                                        return chain.filter(exchange);
                                }

                                setResponseStatus(exchange, config.getStatusCode());
                                return exchange.getResponse().setComplete();
                        });
                });
        }
</code></pre>
<h3 id="42查看redisratelimiter">4.2.查看&nbsp;RedisRateLimiter</h3>
<pre><code class="language-java">        @Override
        @SuppressWarnings("unchecked")
        public Mono&lt;Response&gt; isAllowed(String routeId, String id) {
                if (!this.initialized.get()) {
                        throw new IllegalStateException("RedisRateLimiter is not initialized");
                }
      // 这里如何加载配置?请思考
                Config routeConfig = loadConfiguration(routeId);
      // 令牌桶每秒产生令牌数量
                int replenishRate = routeConfig.getReplenishRate();
      // 令牌桶容量
                int burstCapacity = routeConfig.getBurstCapacity();
      // 请求消耗的令牌数
                int requestedTokens = routeConfig.getRequestedTokens();
                try {
                  // 键
                        List&lt;String&gt; keys = getKeys(id);
                  // 参数
                        List&lt;String&gt; scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + "");
                        // 调用lua脚本
                        Flux&lt;List&lt;Long&gt;&gt; flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
                        return flux.onErrorResume(throwable -&gt; {
                                if (log.isDebugEnabled()) {
                                        log.debug("Error calling rate limiter lua", throwable);
                                }
                                return Flux.just(Arrays.asList(1L, -1L));
                        }).reduce(new ArrayList&lt;Long&gt;(), (longs, l) -&gt; {
                                longs.addAll(l);
                                return longs;
                        }).map(results -&gt; {
                        // 判断是否等于1,1表示允许通过,0表示不允许通过
                                boolean allowed = results.get(0) == 1L;
                                Long tokensLeft = results.get(1);
                                Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
                                if (log.isDebugEnabled()) {
                                        log.debug("response: " + response);
                                }
                                return response;
                        });
                }
                catch (Exception e) {
                        log.error("Error determining if user allowed from redis", e);
                }
                return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
        }

        static List&lt;String&gt; getKeys(String id) {
                String prefix = "request_rate_limiter.{" + id;
                String tokenKey = prefix + "}.tokens";
                String timestampKey = prefix + "}.timestamp";
                return Arrays.asList(tokenKey, timestampKey);
        }
</code></pre>
<p><strong>思考:redis限流配置是如何加载?</strong></p>
<p>其实就是监听动态路由的事件并把配置存起来</p>
<p><img src="https://p0-xtjj-private.juejin.cn/tos-cn-i-73owjymdk6/eb5d954ce9594303b456cd33f7635f49~tplv-73owjymdk6-jj-mark-v1:0:0:0:0:5o6Y6YeR5oqA5pyv56S-5Yy6IEAgS-elng==:q75.awebp?policy=eyJ2bSI6MywidWlkIjoiMTYwOTQ1ODA4ODczNTM0In0%3D&amp;rk3s=f64ab15b&amp;x-orig-authkey=f32326d3454f2ac7e96d3d06cdbb035152127018&amp;x-orig-expires=1750577544&amp;x-orig-sign=FPi2NO%2Fr7XgKwVk7i%2BeDR8pDzPc%3D"></p>
<h3 id="43重点来了令牌桶-meta-infscriptsrequest_rate_limiterlua-脚本剖析">4.3.重点来了,令牌桶 /META-INF/scripts/request_rate_limiter.lua 脚本剖析</h3>
<pre><code class="language-lua">-- User Request Rate Limiter filter
-- See https://stripe.com/blog/rate-limiters
-- See https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34

-- 令牌桶算法工作原理
-- 1.系统以恒定速率往桶里面放入令牌
-- 2.请求需要被处理,则需要从桶里面获取一个令牌
-- 3.如果桶里面没有令牌可获取,则可以选择等待或直接拒绝并返回

-- 令牌桶算法工作流程
-- 1.计算填满令牌桶所需要的时间(填充时间 = 桶容量 / 速率)
-- 2.设置存储数据的TTL(过期时间),为填充时间的两倍(存储时间 = 填充时间 * 2)
-- 3.从Redis获取当前令牌的剩余数量和上一次调用的时间戳
-- 4.计算距离上一次调用的时间间隔(时间间隔 = 当前时间 - 上一次调用时间)
-- 5.计算填充的令牌数量(填充令牌数量 = 时间间隔 * 速率)【前提:桶容量是固定的,不存在无限制的填充】
-- 6.判断是否有足够多的令牌满足请求【 (填充令牌数量 + 剩余令牌数量) &gt;= 请求数量 &amp;&amp; (填充令牌数量 + 剩余令牌数量) &lt;= 桶容量 】
-- 7.如果请求被允许,则从桶里面取出相应数据的令牌
-- 8.如果TTL为正,则更新Redis键中的令牌和时间戳
-- 9.返回两个两个参数(allowed_num:请求被允许标志。1允许,0不允许)、(new_tokens:填充令牌后剩余的令牌数据)

-- 随机写入
redis.replicate_commands()

-- 令牌桶Key -&gt; 存储当前可用令牌的数量(剩余令牌数量)
local tokens_key = KEYS

-- 时间戳Key -&gt; 存储上次令牌刷新的时间戳
local timestamp_key = KEYS

-- 令牌填充速率
local rate = tonumber(ARGV)

-- 令牌桶容量
local capacity = tonumber(ARGV)

-- 当前时间
local now = tonumber(ARGV)

-- 请求数量
local requested = tonumber(ARGV)

-- 填满令牌桶所需要的时间
local fill_time = capacity / rate

-- 设置key的过期时间(填满令牌桶所需时间的2倍)
local ttl = math.floor(fill_time * 2)

-- 判断当前时间,为空则从redis获取
if now == nil then
    now = redis.call('TIME')
end

-- 获取当前令牌的剩余数量
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
    last_tokens = capacity
end

-- 获取上一次调用的时间戳
local last_refreshed = tonumber(redis.call('get', timestamp_key))
if last_refreshed == nil then
    last_refreshed = 0
end

-- 计算距离上一次调用的时间间隔
local delta = math.max(0, now - last_refreshed)

-- 当前的令牌数量(剩余 + 填充 &lt;= 桶容量)
local now_tokens = math.min(capacity, last_refreshed + (rate * delta))

-- 判断是否有足够多的令牌满足请求
local allowed = now_tokens &gt;= requested

-- 定义当前令牌的剩余数量
local new_tokens = now_tokens

-- 定义被允许标志
local allowed_num = 0
if allowed then
    new_tokens = now_tokens - requested
    -- 允许访问
    allowed_num = 1
end

-- ttl &gt; 0,将当前令牌的剩余数量和当前时间戳存入redis
if ttl &gt; 0 then
    redis.call('setex', tokens_key, ttl, new_tokens)
    redis.call('setex', timestamp_key, ttl, now)
end

-- 返回参数
return { allowed_num, new_tokens }
</code></pre>
<h3 id="44查看-gatewayredisautoconfiguration-脚本初始化">4.4.查看 GatewayRedisAutoConfiguration 脚本初始化</h3>
<pre><code class="language-java">        @Bean
        @SuppressWarnings("unchecked")
        public RedisScript redisRequestRateLimiterScript() {
                DefaultRedisScript redisScript = new DefaultRedisScript&lt;&gt;();
                redisScript.setScriptSource(
                        // 根据指定路径获取lua脚本来初始化配置
                                new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
                redisScript.setResultType(List.class);
                return redisScript;
        }

        @Bean
        @ConditionalOnMissingBean
        public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
                        @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript&lt;List&lt;Long&gt;&gt; redisScript,
                        ConfigurationService configurationService) {
                return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
        }
</code></pre>
<p><strong>思考:请求限流过滤器是如何开启?</strong></p>
<p>1.通过yaml配置开启</p>
<pre><code class="language-yaml">spring:
cloud:
    gateway:
      server:
      webflux:
          filter:
            request-rate-limiter:
            enabled: true
</code></pre>
<p>2.GatewayAutoConfiguration自动注入bean</p>
<pre><code class="language-java">@Bean
@ConditionalOnBean({ RateLimiter.class, KeyResolver.class })
@ConditionalOnEnabledFilter
public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter,
       KeyResolver resolver) {
    return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver);
}
</code></pre>
<p>重点来了,真正加载这个bean的是 <code>@ConditionalOnEnabledFilter</code> 注解进行判断</p>
<pre><code class="language-java">@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@Documented
@Conditional(OnEnabledFilter.class)
public @interface ConditionalOnEnabledFilter {

    // 这里value是用来指定满足条件的某些类,换一句话说,就是这些类都加载或注入到ioc容器,这个注解修饰的自动装配类才会生效
    Class&lt;? extends GatewayFilterFactory&lt;?&gt;&gt; value() default OnEnabledFilter.DefaultValue.class;

}
</code></pre>
<p>我们继续跟进代码,查看<code>@Conditional(OnEnabledFilter.class)</code></p>
<p>众所周知,<code>@Conditional</code>可以用来加载满足条件的bean,所以,我们分析一下OnEnabledFilter</p>
<pre><code class="language-java">public class OnEnabledFilter extends OnEnabledComponent&lt;GatewayFilterFactory&lt;?&gt;&gt; {}
</code></pre>
<p>我分析它的父类,这里有你想要的答案!</p>
<pre><code class="language-java">public abstract class OnEnabledComponent&lt;T&gt; extends SpringBootCondition implements ConfigurationCondition {

    private static final String PREFIX = "spring.cloud.gateway.server.webflux.";

    private static final String SUFFIX = ".enabled";

    private ConditionOutcome determineOutcome(Class&lt;? extends T&gt; componentClass, PropertyResolver resolver) {
       // 拼接完整名称
       // 例如 =&gt; spring.cloud.gateway.server.webflux.request-rate-limiter.enabled
       String key = PREFIX + normalizeComponentName(componentClass) + SUFFIX;
       ConditionMessage.Builder messageBuilder = forCondition(annotationClass().getName(), componentClass.getName());
       if ("false".equalsIgnoreCase(resolver.getProperty(key))) {
          // 不满足条件不加载bean
          return ConditionOutcome.noMatch(messageBuilder.because("bean is not available"));
       }
       // 满足条件加载bean
       return ConditionOutcome.match();
    }
}
</code></pre>
<h2 id="5优化限流响应使用全限定类名直接覆盖类">5.优化限流响应[使用全限定类名直接覆盖类]</h2>
<p>小伙伴们,有没有发现,这个这个响应体封装的不太好,因此,我们来自定义吧,我们直接覆盖类,代码修改如下</p>
<pre><code class="language-java">@Getter
@ConfigurationProperties("spring.cloud.gateway.server.webflux.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory
       extends AbstractGatewayFilterFactory&lt;RequestRateLimiterGatewayFilterFactory.Config&gt; {

    private static final String EMPTY_KEY = "____EMPTY_KEY__";

    private final RateLimiter&lt;?&gt; defaultRateLimiter;

    private final KeyResolver defaultKeyResolver;

    /**
   * Switch to deny requests if the Key Resolver returns an empty key, defaults to true.
   */
    @Setter
    private boolean denyEmptyKey = true;

    /** HttpStatus to return when denyEmptyKey is true, defaults to FORBIDDEN. */
    @Setter
    private String emptyKeyStatusCode = HttpStatus.FORBIDDEN.name();

    public RequestRateLimiterGatewayFilterFactory(RateLimiter&lt;?&gt; defaultRateLimiter, KeyResolver defaultKeyResolver) {
       super(Config.class);
       this.defaultRateLimiter = defaultRateLimiter;
       this.defaultKeyResolver = defaultKeyResolver;
    }

    @Override
    public GatewayFilter apply(Config config) {
       KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
       RateLimiter&lt;?&gt; limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
       boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
       HttpStatusHolder emptyKeyStatus = HttpStatusHolder
          .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
       return (exchange, chain) -&gt; resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -&gt; {
          if (EMPTY_KEY.equals(key)) {
             if (denyEmpty) {
                setResponseStatus(exchange, emptyKeyStatus);
                return exchange.getResponse().setComplete();
             }
             return chain.filter(exchange);
          }
          String routeId = config.getRouteId();
          if (routeId == null) {
             Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
             Assert.notNull(route, "Route is null");
             routeId = route.getId();
          }
          return limiter.isAllowed(routeId, key).flatMap(response -&gt; {
             for (Map.Entry&lt;String, String&gt; header : response.getHeaders().entrySet()) {
                exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
             }
             if (response.isAllowed()) {
                return chain.filter(exchange);
             }
             // 主要修改这行
             return responseOk(exchange, Result.fail("Too_Many_Requests", "请求太频繁"));
          });
       });
    }
   
    private Mono&lt;Void&gt; responseOk(ServerWebExchange exchange, Object data) {
      return responseOk(exchange, JacksonUtils.toJsonStr(data), MediaType.APPLICATION_JSON);
    }

    private Mono&lt;Void&gt; responseOk(ServerWebExchange exchange, String str, MediaType contentType) {
      DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(str.getBytes(StandardCharsets.UTF_8));
      ServerHttpResponse response = exchange.getResponse();
      response.setStatusCode(HttpStatus.OK);
      response.getHeaders().setContentType(contentType);
      response.getHeaders().setContentLength(str.getBytes(StandardCharsets.UTF_8).length);
      return response.writeWith(Flux.just(buffer));
    }

    private &lt;T&gt; T getOrDefault(T configValue, T defaultValue) {
       return (configValue != null) ? configValue : defaultValue;
    }

    public static class Config implements HasRouteId {

       @Getter
       private KeyResolver keyResolver;

       @Getter
       private RateLimiter&lt;?&gt; rateLimiter;

       @Getter
       private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS;

       @Getter
       private Boolean denyEmptyKey;

       @Getter
       private String emptyKeyStatus;

       private String routeId;

       public Config setKeyResolver(KeyResolver keyResolver) {
          this.keyResolver = keyResolver;
          return this;
       }

       public Config setRateLimiter(RateLimiter&lt;?&gt; rateLimiter) {
          this.rateLimiter = rateLimiter;
          return this;
       }

       public Config setStatusCode(HttpStatus statusCode) {
          this.statusCode = statusCode;
          return this;
       }

       public Config setDenyEmptyKey(Boolean denyEmptyKey) {
          this.denyEmptyKey = denyEmptyKey;
          return this;
       }

       public Config setEmptyKeyStatus(String emptyKeyStatus) {
          this.emptyKeyStatus = emptyKeyStatus;
          return this;
       }

       @Override
       public void setRouteId(String routeId) {
          this.routeId = routeId;
       }

       @Override
       public String getRouteId() {
          return this.routeId;
       }

    }

}
</code></pre>
<h1 id="二熔断降级">二、熔断降级</h1>
<p><strong>思考:为什么需要熔断降级?</strong></p>
<p>当某个服务发生故障时(超时,响应慢,宕机),上游服务无法及时获取响应,进而也导致故障,出现服务雪崩【服务雪崩是指故障像滚雪球一样沿着调用链向上游扩展,进而导致整个系统瘫痪】</p>
<p>熔断降级的目标就是在故障发生时,快速隔离问题服务【快速失败,防止资源耗尽】,保护系统资源不被耗尽,防止故障扩散,保护核心业务可用性。</p>
<h2 id="1技术选型">1.技术选型</h2>
<h3 id="11熔断降级框架选型对比表">1.1.熔断降级框架选型对比表</h3>
<table>
<thead>
<tr>
<th><strong>对比维度</strong></th>
<th><strong>Hystrix</strong>&nbsp;(Netflix)</th>
<th><strong>Sentinel</strong>&nbsp;(Alibaba)</th>
<th><strong>Resilience4j</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>当前状态</strong></td>
<td>❌ 停止更新 (维护模式)</td>
<td>✅ 持续更新</td>
<td>✅ 持续更新</td>
</tr>
<tr>
<td><strong>熔断机制</strong></td>
<td>滑动窗口计数</td>
<td>响应时间/异常比例/QPS</td>
<td>错误率/响应时间阈值</td>
</tr>
<tr>
<td><strong>流量控制</strong></td>
<td>❌ 仅基础隔离</td>
<td>✅ QPS/并发数/热点参数/<strong>集群流控</strong></td>
<td>✅ RateLimiter</td>
</tr>
<tr>
<td><strong>隔离策略</strong></td>
<td>线程池(开销大)/信号量</td>
<td>并发线程数(无线程池开销)</td>
<td>信号量/Bulkhead</td>
</tr>
<tr>
<td><strong>降级能力</strong></td>
<td>Fallback 方法</td>
<td>Fallback +&nbsp;<strong>系统规则自适应</strong></td>
<td>Fallback + 自定义组合策略</td>
</tr>
<tr>
<td><strong>实时监控</strong></td>
<td>✅ Hystrix Dashboard</td>
<td>✅&nbsp;<strong>原生控制台</strong>(可视化动态规则)</td>
<td>❌ 需整合 Prometheus/Grafana</td>
</tr>
<tr>
<td><strong>动态配置</strong></td>
<td>❌ 依赖 Archaius</td>
<td>✅&nbsp;<strong>控制台实时推送</strong></td>
<td>✅ 需编码实现(如Spring Cloud Config)</td>
</tr>
<tr>
<td><strong>生态集成</strong></td>
<td>✅ Spring Cloud Netflix</td>
<td>✅ Spring Cloud Alibaba/<strong>多语言网关</strong></td>
<td>✅ Spring Boot/<strong>响应式编程</strong></td>
</tr>
<tr>
<td><strong>性能开销</strong></td>
<td>高(线程池隔离)</td>
<td>低(无额外线程)</td>
<td>极低(纯函数式)</td>
</tr>
<tr>
<td><strong>适用场景</strong></td>
<td>遗留系统维护</td>
<td><strong>高并发控制</strong>/秒杀/热点防护</td>
<td><strong>云原生</strong>/轻量级微服务</td>
</tr>
<tr>
<td><strong>推荐指数</strong></td>
<td>⭐⭐ (不推荐新项目)</td>
<td>⭐⭐⭐⭐⭐ (Java高并发首选)</td>
<td>⭐⭐⭐⭐⭐ (云原生/响应式首选)</td>
</tr>
</tbody>
</table>
<h3 id="12选型决策指南">1.2选型决策指南</h3>
<table>
<thead>
<tr>
<th><strong>需求场景</strong></th>
<th><strong>推荐方案</strong></th>
<th><strong>原因</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td>电商秒杀/API高频调用管控</td>
<td>✅&nbsp;<strong>Sentinel</strong></td>
<td>精细流量控制+热点防护+实时看板</td>
</tr>
<tr>
<td>Kubernetes云原生微服务</td>
<td>✅&nbsp;<strong>Resilience4j</strong></td>
<td>轻量化+无缝集成Prometheus+响应式支持</td>
</tr>
<tr>
<td>Spring Cloud Netflix旧系统</td>
<td>⚠️ Hystrix</td>
<td>兼容现存代码(短期过渡)</td>
</tr>
<tr>
<td>多语言混合架构(如Go+Java)</td>
<td>✅&nbsp;<strong>Sentinel</strong></td>
<td>通过Sidecar代理支持非Java服务</td>
</tr>
<tr>
<td>响应式编程(WebFlux)</td>
<td>✅&nbsp;<strong>Resilience4j</strong></td>
<td>原生Reactive API支持</td>
</tr>
</tbody>
</table>
<h2 id="2resilience4j使用">2.Resilience4j使用</h2>
<p>Resilience4j官方文档</p>
<p><code>Resilience4j</code> 可以看作是 <code>Hystrix</code> 的替代品,Resilience4j支持 <code>熔断器</code>和<code>单机限流</code></p>
<p>Resilience4j 是一个专为函数式编程设计的轻量级容错库。Resilience4j 提供高阶函数(装饰器),可通过断路器、速率限制器、重试或隔离功能增强任何函数式接口、lambda 表达式或方法引用。您可以在任何函数式接口、lambda 表达式或方法引用上堆叠多个装饰器。这样做的好处是,您可以只选择所需的装饰器,而无需考虑其他因素。</p>
<h3 id="21网关熔断降级spring-cloud-gateway-resilience4j实战">2.1.网关熔断降级(Spring Cloud Gateway +&nbsp;Resilience4j实战)</h3>
<h5 id="211pom依赖">2.1.1.pom依赖</h5>
<pre><code class="language-xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-starter-circuitbreaker-reactor-resilience4j&lt;/artifactId&gt;
&lt;/dependency&gt;
</code></pre>
<h4 id="212yaml配置">2.1.2.yaml配置</h4>
<pre><code class="language-yml">spring:
application:
    name: laokou-gateway
cloud:
    gateway:
       server:
      webflux:
          routes:
            - id: LAOKOU-SSO-DEMO
            uri: lb://laokou-sso-demo
            predicates:
            - Path=/sso/**
            filters:
            - name: CircuitBreaker
                args:
                  name: default
                  fallbackUri: "forward:/fallback"
          filter:
            circuit-breaker:
            enabled: true
</code></pre>
<h4 id="213circuitbreakerconfig配置">2.1.3.CircuitBreakerConfig配置</h4>
<pre><code class="language-java">/**
* @author laokou
*/
@Configuration
public class CircuitBreakerConfig {

    @Bean
    public RouterFunction&lt;ServerResponse&gt; routerFunction() {
       return RouterFunctions.route(
             RequestPredicates.path("/fallback").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
             (request) -&gt; ServerResponse.status(HttpStatus.SC_OK)
                .contentType(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromValue(Result.fail("Service_Unavailable", "服务正在维护"))));
    }

    @Bean
    public Customizer&lt;ReactiveResilience4JCircuitBreakerFactory&gt; reactiveResilience4JCircuitBreakerFactoryCustomizer() {
       return factory -&gt; factory.configureDefault(id -&gt; new Resilience4JConfigBuilder(id)
          // 3秒后超时时间
          .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(3)).build())
          .circuitBreakerConfig(io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.ofDefaults())
          .build());
    }

}
</code></pre>
<p>我是老寇,我们下次再见啦!</p><br><br>
来源:https://www.cnblogs.com/koushenhai/p/18929660
頁: [1]
查看完整版本: Spring Cloud Gateway实现分布式限流和熔断降级