专栏名称: 房东的小黑黑
学生
今天看啥  ›  专栏  ›  房东的小黑黑

Sentinel之集群限流

房东的小黑黑  · 掘金  ·  · 2020-05-07 06:12

文章预览

阅读 60

Sentinel之集群限流

微信公众号: 房东的小黑黑
路途随遥远,将来更美好
学海无涯,大家一起加油!

之前的限流功能都是单机版的,只能统计本地的服务调用次数信息,那么如果是在集群状态下,一个服务被放在了多个服务器上,假设一个集群有5台机器,每台机器单机限流阈值为10qps,理想状态下整个集群的限流阈值就是50qps,不过实际状态下路由到每台机器的流量可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。

每个单机实例只关心自己的阈值,但是对于整个系统的全局阈值大家都漠不关心,当我们希望为某个api设置一个总的Qps时,那么单机模式下的限流就无法满足条件。
单机版是在每个实例中进行统计,而集群版是有一个专门的实例进行统计。

这个专门用来统计数据的称为Sentinel的token server,其他的实例作为Sentinel的token client会向token server去请求token,如果能获取到token,则说明当前的qps还未达到总的阈值,否则就说明已经达到集群的总阈值,当前实例就会被block。

集群限流适合的场景:
1) 在API Gateway处统计某个api的总访问量,并对某个api或者服务的总qps进行限制。
2)Service Mesh中对服务间的调用进行全局流控。
3)集群内对热点商品的总访问频次进行限制。

起点

Sentinel的集群限流是在FlowSlot中实现的。它会根据资源名找到所有的限流规则FlowRule,然后依次对每个规则调用canPassCheck进行判断,是否能够通过该限流规则。

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }
复制代码

如果该规则是应用在集群模式下,则会调用passClusterCheck方法。

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            TokenService clusterService = pickClusterService(); \\@1
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);  //@2
            }
            long flowId = rule.getClusterConfig().getFlowId(); //@3
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);  //@4
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }
复制代码

@1代码处是获取当前节点是Token Client还是Token Server。 1) 如果当前节点的角色是Client,返回的TokenService为DefaultClusterTokenClient; 2)如果当前节点的角色是Server,则默认返回的TokenService为DefaultTokenService

@2代码处: 如果无法获取集群的的TokenService,那么该流量控制规则可以退化为单机限流模式。

private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                 boolean prioritized) {
        if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
            return passLocalCheck(rule, context, node, acquireCount, prioritized);
        } else {
            return true;
        }
    }
复制代码

@3代码处获取该流量控制的flowId,在集群模式下,每一个rule都有一个对应的ClusterFlowConfig

ClusterFlowConfig类介绍:

public class ClusterFlowConfig {
    // 全局唯一id
    private Long flowId;
    // 有两种阈值类型,一种是单机均摊,一种是集群总体模式
    private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
    //集群不可用时是否回退到单机模式
    private boolean fallbackToLocalWhenFail = true;
<span class="hljs-keyword" style="color: #aa0d91; line-height: 26px;">private</span> <span class="hljs-keyword" style="color: #aa0d91; line-height: 26px;">int</span> strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
<span class="hljs-comment" style="color: #007400; line-height: 26px;">// 集群采样数 10 </span>
<span class="hljs-keyword" style="color: #aa0d91; line-height: 26px;">private</span> <span class="hljs-keyword" style="color: #aa0d91; line-height: 26px;">int</span> sampleCount = ClusterRuleConstant.DEFAULT_CLUSTER_SAMPLE_COUNT;
复制代码
复制代码

// 1000ms,即1秒 private int windowIntervalMs = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS; 复制代码

@4代码处根据获取的flowId通过TokenService进行申请token。从上面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。分别对应的类是DefaultClusterTokenClientDefaultTokenService

下面分别从TokenClient和TokenService两个角色进行解读。

DefaultClusterTokenClient

public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
        // 如果flowId是无效的,或则count小于等于0
        // id == null || id <= 0 || count <= 0;
        if (notValidRequest(flowId, acquireCount)) {
            return badRequest();
        }
        // 新建一个请求对象
        FlowRequestData data = new FlowRequestData().setCount(acquireCount)
            .setFlowId(flowId).setPriority(prioritized);
        // 进一步封装为ClusterRequest,消息类型是Flow,
        //  MSG_TYPE_PING = 0; MSG_TYPE_FLOW = 1; MSG_TYPE_PARAM_FLOW = 2;
        ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
        try {
            // 然后向TokenServer发送请求
            TokenResult result = sendTokenRequest(request);
            logForResult(result);
            return result;
        } catch (Exception ex) {
            ClusterClientStatLogUtil.log(ex.getMessage());
            return new TokenResult(TokenResultStatus.FAIL);
        }
    }
复制代码
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        if (transportClient == null) {
            RecordLog.warn(
                "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
            return clientFail();
        }
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }
复制代码

在客户端启动的时候会创建与TokenServer之间的连接,当发送请求时如何客户端对象为空,就会记录请求失败。

DefaultTokenService

Token Server收到客户端的请求后,会调用FlowRequestProcessorprocessRequest,最终会调用DefaultTokenServicerequestToken方法。

@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
        TokenService tokenService = TokenServiceProvider.getService();
        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();
        TokenResult result = tokenService.requestToken(flowId, count, prioritized);
        return toResponse(result, request);
    }
复制代码
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        // 和上面一样,先进行验证
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest();
        }
        // 从一个Map中进行查找 
        // private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        // 没有该规则
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        // 服务端进行检查,是否发送令牌token
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }
复制代码

因为acquireClusterToken相对较长,故进行了拆分讲解。

第一步:

Long id = rule.getClusterConfig().getFlowId();
if (!allowProceed(id)) {
   return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
} // @1
复制代码

@1代码处首先判断是否允许本次许可申请,这是因为TokenServe支持嵌入式,即支持在应用节点中嵌入一个TokenServer,为了保证许可申请的请求不对正常业务造成比较大的影响,故对申请许可这个动作进行了限流。

一旦触发了限流,将向客户端返回Too_Many_Request状态码,Sentinel支持按namespace进行限流,具体由GlobalRequestLimiter实现,该类的内部同样是基于滑动窗口进行收集,原理与FlowSlot相似,默认的限流TPS是3W。

static boolean allowProceed(long flowId) {
        String namespace = ClusterFlowRuleManager.getNamespace(flowId);
        return GlobalRequestLimiter.tryPass(namespace);
    }
复制代码
public static boolean tryPass(String namespace) {
        if (namespace == null) {
            return false;
        }
        // private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>();
        RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
        if (limiter == null) {
            return true;
        }
        return limiter.tryPass();
    }
复制代码

canPass方法是计算当前的通过数+1后是否超过qpsAllowed。

public boolean tryPass() {
        if (canPass()) {
            add(1);
            return true;
        }
        return false;
}

public void add(int x) {
        data.currentWindow().value().add(x);
}

public boolean canPass() {
        return getQps() + 1 <= qpsAllowed;
}

public double getQps() {
        return getSum() / data.getIntervalInSecond();
}
private final LeapArray<LongAdder> data;

public long getSum() {
        data.currentWindow();
        long success = 0;

        List<LongAdder> list = data.values();
        for (LongAdder window : list) {
            success += window.sum();
        }
        return success;
}
复制代码

从上面可以看出在集群模式下也是利用时间窗口进行统计的。

第二大步:

根据FlowId获取对用的指标采集器metric

private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();  

ClusterMetric metric = ClusterMetricStatistics.getMetric(id);  
if (metric == null) {
    return new TokenResult(TokenResultStatus.FAIL);
}
复制代码

该Metric具体的是一个ClusterMetricLeapArray,与之前的OccupiableBucketLeapArray类似,多了一个记录有关抢占数据的数组。

public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {

    private final LongAdder[] occupyCounter;
    private boolean hasOccupied = false;

    public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        ClusterFlowEvent[] events = ClusterFlowEvent.values();
        this.occupyCounter = new LongAdder[events.length];
        for (ClusterFlowEvent event : events) {
            occupyCounter[event.ordinal()] = new LongAdder();
        }
    }
复制代码

第三大步:

获取当前通过的Qps,设置的总许可数和剩余的许可数。

double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;
复制代码

如果是FLOW_THRESHOLD_GLOBAL,即集群的许可数等于限流规则中配置的count值。
如果是FLOW_THRESHOLD_AVG_LOCAL,此时限流规则中配置的值只是单机的count值,还要乘以集群中客户端的数量。 上面的getExceedCount默认是1.0。

private static double calcGlobalThreshold(FlowRule rule) {
        double count = rule.getCount();
        switch (rule.getClusterConfig().getThresholdType()) {
            case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
                return count;
            case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
            default:
                int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
                return count * connectedCount;
        }
}
复制代码

第四大步:

如果剩余的许可数大于等于0,更新当前的统计信息。

if (nextRemaining >= 0) {
     //增加通过数和通过的请求数
     metric.add(ClusterFlowEvent.PASS, acquireCount);
     metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
     if (prioritized) {
           // Add prioritized pass.
           // Pass (pre-occupy incoming buckets)
            metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
      }
      // Remaining count is cut down to a smaller integer.
      return new TokenResult(TokenResultStatus.OK)
                .setRemaining((int) nextRemaining)
                .setWaitInMs(0);
} 
复制代码

第五大步:

如果剩余数小于0。

if (prioritized) {
       // Try to occupy incoming buckets.
       // Waiting due to flow shaping or for next bucket tick.
       // 获取当前等待的Qps(以1s为维度,当前等待的请求数量)
       double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
       // 如果当前等待的Qps低于可借用未来窗口的许可阈值时,可通过,但要设置等待时间
       if (occupyAvg <= 
        // 默认是1.0 , 后面是全局的通过阈值数
        ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
        // 计算等待的时间
        int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
                    // waitInMs > 0 indicates pre-occupy incoming buckets successfully.
                    if (waitInMs > 0) {
                        ClusterServerStatLogUtil.log("flow|waiting|" + id);
                        return new TokenResult(TokenResultStatus.SHOULD_WAIT)
                            .setRemaining(0)
                            .setWaitInMs(waitInMs);
           }
     // Or else occupy failed, should be blocked.
     }
}
// Blocked.
// 发生阻塞,当前请求不能通过,增加与阻塞相关指标的统计数。 
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
if (prioritized) {
      // Add prioritized block.
      metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
      ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}

     return blockedResult();
}
复制代码

下面再讲解下如何计算等待时间的。

// event 为 pass
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
        // 当前的通过数
        double latestQps = getAvg(ClusterFlowEvent.PASS);
        // 判断是否支持抢占
        if (!canOccupy(event, acquireCount, latestQps, threshold)) {
            return 0;
        }
    <span class="hljs-comment" style="color: #007400; line-height: 26px;">// 在抢占数组中添加本次的占用数</span>
    <span class="hljs-comment" style="color: #007400; line-height: 26px;">/**
    *  public void addOccupyPass(int count) {
    occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
    occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
    this.hasOccupied = true;
     }
    **/</span>
    metric.addOccupyPass(acquireCount);
    
    <span class="hljs-comment" style="color: #007400; line-height: 26px;">// 在普通的时间窗口增加等待数</span>
    <span class="hljs-comment" style="color: #007400; line-height: 26px;">/**
    *  public void add(ClusterFlowEvent event, long count) {
    metric.currentWindow().value().add(event, count);
}
    **/</span>
    add(ClusterFlowEvent.WAITING, acquireCount);
    <span class="hljs-comment" style="color: #007400; line-height: 26px;">// 这里有些不懂,sampleCount默认值应该是10,</span>
    <span class="hljs-comment" style="color: #007400; line-height: 26px;">// 这样的话返回的是一个时间窗口的大小</span>
    <span class="hljs-keyword" style="color: #aa0d91; line-height: 26px;">return</span> <span class="hljs-number" style="color: #1c00cf; line-height: 26px;">1000</span> / metric.getSampleCount();
复制代码
复制代码

} 复制代码

从上面可以看出,canOccupy是一个关键方法。

private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
        long headPass = metric.getFirstCountOfWindow(event);
        /**
        *  public long getOccupiedCount(ClusterFlowEvent event) {
        return occupyCounter[event.ordinal()].sum();
    }
        **/
        // 获得Pass事件下的已占用的数  
        long occupiedCount = metric.getOccupiedCount(event);
        // 已通过的请求数 + 本次需要的请求数 + 占用的请求数  - 第一个统计的时间窗口的数 
        // 如果小于等于阈值,即可以抢占
        return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
    }
复制代码
public long getFirstCountOfWindow(ClusterFlowEvent event) {
        if (event == null) {
            return 0;
        }
        WindowWrap<ClusterMetricBucket> windowWrap = getValidHead();
        if (windowWrap == null) {
            return 0;
        }
        return windowWrap.value().get(event);
}
复制代码
public WindowWrap<T> getValidHead() {
        return getValidHead(TimeUtil.currentTimeMillis());
}
复制代码
WindowWrap<T> getValidHead(long timeMillis) {
        // Calculate index for expected head time.
        int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
        WindowWrap<T> wrap = array.get(idx);
        if (wrap == null || isWindowDeprecated(wrap)) {
            return null;
        }

        return wrap;
}
复制代码

执行到这里,服务端已经判断出是否要发送令牌。

然后再将代码跳回到客户端请求发送的位置。

private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        if (transportClient == null) {
            RecordLog.warn(
                "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
            return clientFail();
        }
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }
复制代码

从上面可以看出,会将返回的结果中的剩余数量,等待时间。

那如何处理得到的结果Result。

private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
                                                         DefaultNode node,
                                                         int acquireCount, boolean prioritized) {
        switch (result.getStatus()) {
            case TokenResultStatus.OK:   
                return true;
            case TokenResultStatus.SHOULD_WAIT:
                // Wait for next tick.
                try {
                    Thread.sleep(result.getWaitInMs());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return true;
            case TokenResultStatus.NO_RULE_EXISTS:
            case TokenResultStatus.BAD_REQUEST:
            case TokenResultStatus.FAIL:
            case TokenResultStatus.TOO_MANY_REQUEST:
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            case TokenResultStatus.BLOCKED:
            default:
                return false;
        }
    }
}
复制代码

从上面可以看出如果状态是OK的话,返回true,允许通过。

如果是在优先级情况下,支持抢占,则根据返回的等待时间进行等待。

如果是其他的状态:则回退到单机模式进行判断。

默认情况下,返回false。

至此,关于关于集群限流就讲解完了,但是在关于计算等待时间的逻辑还是有些不清楚,欢迎大家交流。

………………………………

原文地址:访问原文地址
快照地址: 访问文章快照
总结与预览地址:访问总结与预览