当前位置:网站首页>Alibaba Sentinel - 工作流程及原理解析
Alibaba Sentinel - 工作流程及原理解析
2022-07-29 02:53:00 【普通人zzz~】
Sentinel - 工作流程及原理解析
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。
一、Sentinel 基本概念
资源(Resource)
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。
规则(Rule)
围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。Sentinel 目前存在五种规则种类:1. 流量控制规则(FlowRule);2. 熔断降级规则 (DegradeRule);3. 系统保护规则(SystemRule);4. 来源访问控制规则(AuthorityRule);5. 热点参数规则(ParamFlowRule);
插槽(Slot Chain)
Sentinel的工作流程是围绕着一个个插槽所组成的插槽链来展开的。每个Slot都有自己的功能,通过一定的编排顺序,来达到最终的限流降级的目的。
Sentinel 内置插槽有NodeSelectorSlot、ClusterBuilderSlot、StatisticsSlot、ParamFlowSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。当然也支持自定义插槽,Sentinel 将 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 作为 SPI 接口进行扩展(1.7.2 版本以前 com.alibaba.csp.sentinel.slotchain.SlotChainBuilder 作为 SPI),使得 Slot Chain 具备了扩展的能力。
调用链路上下文(Context)
Context贯穿一次调用链路中的所有 Entry。Context 维护着当前调用链的元数据:入口节点、本次调用链路节点、调用来源等信息,通过 ThreadLocal传递。
EntryEntry 是指 Sentinel 中请求是否通过限流的一个凭证。每次执行 SphU.entry() 或 SphO.entry() 都会返回一个 Entry 给调用者。资源调用结束时需要 entry.exit()。Entry 包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。
节点(Node)
节点。在Sentinel中保存统计数据的对象有4种:1. StatisticNode:统计节点,最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。2. DefaultNode:链路节点,用于统计调用链路上某个资源的数据,维持树状结构。3. ClusterNode:簇节点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据。4. EntranceNode:入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。
二、Sentinel 工作流程
在 Sentinel 里面,所有的资源都对应一个资产名称以及一个 Entry。 Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显示创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,通过责任链方式进行构建,整体框架如下:

这些插槽(slot chain)具有不同职责。
- NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流、降级。
- ClusterBuilderSlot:用于存储资产的统计信息,以及调用者信息,例如,该资源的RT(平均响应时间)、QPS、Thread Count等等,这些信息将用于作为多维度限流、降级的依据。
- StatisticsSlot:用于记录、统计不同维度的 runtime 指标监控信息。
- ParamFlowSlot:用于资源配置热点参数、限流规则以及前面 slot 统计的状态,来进行流量控制。
- SystemSlot:通过系统的状态,例如 load1 等,来控制总的入口流量。
- AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制。
- FlowSlot:用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
- DegradeSlot:通过统计信息以及预设的规则,来做熔断降级。
自定义插槽(slot chain)
Sentinel 将 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 作为 SPI 接口进行扩展(1.7.2 版本以前 com.alibaba.csp.sentinel.slotchain.SlotChainBuilder 作为 SPI),使得 Slot Chain 具备了扩展的能力。
您可以自行加入自定义的 slot 并编排 slot 间的顺序,从而可以给 Sentinel 添加自定义的功能。

默认处理器插槽的顺序
// NodeSelectorSlot
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
// ClusterBuilderSlot
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
// LogSlot
public static final int ORDER_LOG_SLOT = -8000;
// StatisticsSlot
public static final int ORDER_STATISTIC_SLOT = -7000;
// AuthoritySlot
public static final int ORDER_AUTHORITY_SLOT = -6000;
// SystemSlot
public static final int ORDER_SYSTEM_SLOT = -5000;
// FlowSlot
public static final int ORDER_FLOW_SLOT = -2000;
// DegradeSlot
public static final int ORDER_DEGRADE_SLOT = -1000;
三、Sentinel 原理解析
使用 Sentinel 来进行资源保护,主要分为几个步骤:
- 定义规则
- 定义资源
- 检验规则是否生效
3.1 定义规则
Sentinel 的所有规则都可以在内存态中动态地查询及修改,修改之后立即生效。同时 Sentinel 也提供相关 API,供您来定制自己的规则策略。
Sentinel 支持以下几种规则:流量控制规则、熔断降级规则、系统保护规则、来源访问控制规则 和 热点参数规则。
下面以流量控制规则为例。
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule1 = new FlowRule();
rule1.setResource(resource);
// Set max qps to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
// 加载 流量控制规则
FlowRuleManager.loadRules(rules);
}
可以看到,在初始化规则的过程中,我们主要是创建了一个 FlowRule 对象,然后通过 FlowRuleManager 的 loadRules(List<FlowRule> rules) 方法加载规则。
FlowRuleManager.loadRules(rules)
public class FlowRuleManager {
// 存放资源规则:一个资源可以拥有多个限流规则
// key -> resource name value -> 去重后的 FlowRule
private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
// 流量规则监听器:动态规则实现,监听规则变化
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
// 动态规则对象:存储所有FlowRule,当规则存在变更时,通知 FlowRuleManager 中的LISTENER 进行处理
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-metrics-record-task", true));
static {
// 添加 LISTENER 到 DynamicSentinelProperty 中
currentProperty.addListener(LISTENER);
// 开启一个任务:每隔一段时间,统计并保存限流信息到本地磁盘
startMetricTimerListener();
}
/** * 加载规则 */
public static void loadRules(List<FlowRule> rules) {
// 更新规则
currentProperty.updateValue(rules);
}
}
1. DynamicSentinelProperty.updateValue(T newValue)
DynamicSentinelProperty 中的 boolean updateValue(T newValue) 用于刷新限流规则。
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
@Override
public boolean updateValue(T newValue) {
// 判断已有规则是否和当前规则相对
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);
// 更新设值
value = newValue;
// 通知listener 规则更新
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
}
其中主要逻辑就是更新规则,然后通知其中的 Listener 进行配置更新。DynamicSentinelProperty 中的 listener 对象在 FlowRuleManager 中进行设值,所以会调用到 FlowRuleManager 中的 FlowPropertyListener 内部类的方法。
2. FlowPropertyListener
FlowPropertyListener 用于监听 Sentinel 规则的动态变化,完成本地限流规则缓存更新。
public class FlowRuleManager {
// 存放资源规则:一个资源可以拥有多个限流规则
// key -> resource name value -> 去重后的 FlowRule
private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public synchronized void configUpdate(List<FlowRule> value) {
/********规则配置更新*********/
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules = rules;
}
RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
}
@Override
public synchronized void configLoad(List<FlowRule> conf) {
/********规则配置加载*********/
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules = rules;
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);
}
}
}
其中 configUpdate(List<FlowRule> value) 以及 configLoad(List<FlowRule> conf) 方法都调用了 FlowRuleUtil.buildFlowRuleMap(List<FlowRule> list)。
注意:configLoad(List<FlowRule> conf) 以及 FlowRuleUtil.buildFlowRuleMap(List<FlowRule> list) 方法都在方法前面添加了 synchronized 关键字,所以内部所有的操作都是 线程安全 的。
3. FlowRuleUtil.buildFlowRuleMap(List list)
FlowRuleUtil.buildFlowRuleMap(List<FlowRule> list) 用于从 List<FlowRule> list 构建限流规则映射,按资源名称(resource name)分组。
public final class FlowRuleUtil {
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list) {
return buildFlowRuleMap(list, null);
}
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter) {
return buildFlowRuleMap(list, filter, true);
}
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter,
boolean shouldSort) {
return buildFlowRuleMap(list, extractResource, filter, shouldSort);
}
/** * 构建限流规则映射 * @param list:限流规则list * @param Function<FlowRule, K> groupFunction:一个java 8 中的Function,用于获取规则名称resourceName * private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() { * @Override * public String apply(FlowRule rule) { * return rule.getResource(); * } * }; * @param filter:一个java 8 中的Predicate,用于过滤某些规则 * @param shouldSort:是否排序 * * @return Map<K, List<FlowRule>> 其中key 为资产名称,value为当前资产的所有FlowRule(通过Set去重后的值) */
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
Predicate<FlowRule> filter, boolean shouldSort) {
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
// 创建一个临时的规则映射
Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
// 校验是否有效
if (!isValidRule(rule)) {
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
// 过滤
if (filter != null && !filter.test(rule)) {
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
// 设置默认 LimitApp:流控针对的调用来源,default,代表不区分调用来源
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
// 根据规则,创建对应的 流量整形控制器:用于将无序的流量(request),变成有序
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
// 获取资产名称,resource
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
Set<FlowRule> flowRules = tmpMap.get(key);
if (flowRules == null) {
// Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
// 添加规则到set集合
flowRules.add(rule);
}
Comparator<FlowRule> comparator = new FlowRuleComparator();
for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
List<FlowRule> rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
// Sort the rules.排序
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
/** * 根据规则,创建对应的 流量整形控制器:用于将无序的流量(request),变成有序 */
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
// QPS
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
// 根据对应 controlBehavior 生成 TrafficShapingController
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 默认 TrafficShapingController
return new DefaultController(rule.getCount(), rule.getGrade());
}
}
4. 流量整形控制器(TrafficShapingController)
流量控制在网络传输中是一个常用的概念,它用于调整网络包的发送数据。然而,从系统稳定性角度考虑,在处理请求的速度上,也有非常多的讲究。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:
流量控制有以下几个角度:
- 资源的调用关系,例如资源的调用链路,资源和资源之间的关系;
- 运行指标,例如 QPS、线程池、系统负载等;
- 控制的效果,例如直接限流、冷启动、排队等。
Sentinel 的设计理念是让您自由选择控制的角度,并进行灵活组合,从而达到想要的效果。
Sentinel 的 TrafficShapingController 存在两个方法:
public interface TrafficShapingController {
/** * 检查当前资源节点是否可以通过count计数。 * * @param node 资源节点 * @param acquireCount 统计计数 * @param prioritized 是否优先 * @return true - 通过; false - 拒绝 */
boolean canPass(Node node, int acquireCount, boolean prioritized);
/** * 检查当前资源节点是否可以通过count计数。 * * @param node 资源节点 * @param acquireCount 统计计数 * @return true - 通过; false - 拒绝 */
boolean canPass(Node node, int acquireCount);
}
TrafficShapingController 有四个实现类,如下图所示:
DefaultController:一个默认的流量整形控制器,根据定义资源的限流维度(线程数 or QPS)来判断当前请求是否限流(计数器算法)。RateLimiterController:匀速流程整形控制器,使得错乱无章的请求,匀速排队通过,多出的请求,会进行拒绝(漏桶算法)。WarmUpController:预热流量整形控制器,通过流量缓慢增加,来保护系统资源(令牌桶算法实现,通过系统启动时间,来增加令牌桶的大小,直到最大)WarmUpRateLimiterController:RateLimiterController + WarmUpController
3.2 定义资源
我们说的资源,可以是任何东西,服务,服务里的方法,甚至是一段代码。
先把可能需要保护的资源定义好,之后再配置规则。也可以理解为,只要有了资源,我们就可以在任何时候灵活地定义各种流量控制规则。在编码的时候,只需要考虑这个代码是否需要保护,如果需要保护,就将之定义为一个资源。如:
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
try (Entry entry = SphU.entry("resourceName")) {
// 被保护的业务逻辑
// do something here...
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 在此处进行相应的处理操作
}
对于主流的框架,我们提供适配,只需要按照适配中的说明配置,Sentinel 就会默认定义提供的服务,方法等为资源。
3.3 检验规则是否生效
定义规则 和 资源后,我们检查当前规则是否生效,通过 Entry entry = SphU.entry 判断当前流量是否限流,如果 SphU.entry 方法正常运行,代码系统正常,如果抛出异常,则代表需要进行限流。
3.3.1 SphU
用于记录统计信息和对资源执行规则检查的基本 Sentinel API。
从概念上讲,需要保护的物理或逻辑资源应该被一个代码块包围。如果满足当前资源的所有限流规则,请求会被放行。当超过任何当前资产定义的阈值时,将抛出 BlockException。
SphU 中定义了很多方法,如下:
其中主要的方法可以分为两类:
static Entry entry:对当前资源进行 统计数据、规则检查,如果放行,则同步调用当前“资源”。static AsyncEntry asyncEntry:对当前资源进行 统计数据、规则检查,如果放行,则异步调用当前“资源”。
1. static Entry entry
public class CtSph implements Sph {
private static final Object[] OBJECTS0 = new Object[0];
// 相同的资源共享相同的ProcessorSlotChain(处理链路):缓存
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();
// 对象锁
private static final Object LOCK = new Object();
/** * 最终执行的方法 * * @param resourceWrapper 资产包装器对象,包含了资源名称(resource name)、流量类型(IN-入、OUT-出)、资源类型(普通、web、rpc、api gateway、db sql) * @param count 统计请求个数 * @param args 用于参数流控制或自定义插槽的 args */
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 获取当前线程执行上下文对象Context:底层是ThreadLocal<Context> contextHolder
// 第一次进来肯定为空
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
// NullContext:表示上下文的数量已经超过阈值(树形结构,最大层级2000)
// 所以这里只初始化条目。不会进行任何规则检查。chain = null
return new CtEntry(resourceWrapper, null, context);
}
// 初始化一个默认上下文
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
// 全局开关,不会进行任何规则检查,常量值Constants.ON = true
if (!Constants.ON) {
// 不会进行任何规则检查。chain = null
return new CtEntry(resourceWrapper, null, context);
}
// 加载插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */
// 表示插槽链超过 Constants.MAX_SLOT_CHAIN_SIZE= 6000
if (chain == null) {
// 不会进行任何规则检查。chain = null
return new CtEntry(resourceWrapper, null, context);
}
// 创建一个带 chain 的 CtEntry
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 通过 chain.entry 执行处理链路,完成资产收集、统计、流量控制、熔断降级
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
// 达到当前时间的流量执行阈值,执行exit,退出
e.exit(count, args);
// 抛出 BlockException
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}
通过上面 Entry 对象的创建过程,以及执行最终的链路分析,其中主要涉及几个对象、方法:Context、ProcessorSlot<Object> chain、chain.entry(context, resourceWrapper, null, count, prioritized, args);、entry.exit(count, args),下面对这些对象以及方法,分别讲解。
Context
Context贯穿一次调用链路中的所有 Entry。Context 维护着当前调用链的元数据:入口节点、本次调用链路节点、调用来源等信息,通过 ThreadLocal传递。
上面 entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) 存在一段创建上下文的代码:
// Constants.CONTEXT_DEFAULT_NAME = "sentinel_default_context"
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
调用如下:
public class CtSph implements Sph {
private final static class InternalContextUtil extends ContextUtil {
static Context internalEnter(String name) {
return trueEnter(name, "");
}
static Context internalEnter(String name, String origin) {
return trueEnter(name, origin);
}
}
}
InternalContextUtil 为 CtSph 的一个静态内部类,继承自 ContextUtil,最终会调用 ContextUtil 中的 trueEnter(String name, String origin) 方法。
public class ContextUtil {
// ThreadLocal
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
// 上下文名称节点映射
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
// 可重入锁
private static final ReentrantLock LOCK = new ReentrantLock();
// 空的上下文对象
private static final Context NULL_CONTEXT = new NullContext();
/** * 获取当前线程Context对象 * @param name context名称 * @param origin 来源 */
protected static Context trueEnter(String name, String origin) {
// 获取当前线程的上下文对象
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 获取当前name的Node对象
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 判断是否超过最大Node缓存 2000
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 加锁:可重入锁
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
// 双重校验
if (node == null) {
// 判断是否超过最大Node缓存 2000
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 创建一个Node
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 添加到所有节点集合中
Constants.ROOT.addChild(node);
// 添加缓存
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
// 创建Context
context = new Context(node, name);
context.setOrigin(origin);
// add
contextHolder.set(context);
}
return context;
}
ProcessorSlot chain = lookProcessChain(resourceWrapper)
public class CtSph implements Sph {
// ProcessorSlotChain 缓存:每个资源对应一个 ProcessorSlotChain
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
private static final Object LOCK = new Object();
// 获取当前资源插槽链
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 从缓存中获取
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 加锁
synchronized (LOCK) {
// 双重校验
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 最大资源数:6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 创建一个新的插槽链
chain = SlotChainProvider.newSlotChain();
// 缓存
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
chain = SlotChainProvider.newSlotChain(); 创建一个新的插槽链,代码如下:
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
// 创建一个新的插槽:通过 SlotChainBuilder 对象构建
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// SPI:获取SlotChainBuilder,并加载第一个实例或默认值
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
// 构建插槽链
return slotChainBuilder.build();
}
private SlotChainProvider() {
}
}
上面通过 Sentinel 的SPI机制,获取SlotChainBuilder,并加载第一个实例或默认值,如下,SlotChainBuilder 只存在一个实现 com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder。
构建插槽链 slotChainBuilder.build(),代码如下:
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
// 先创建一个默认的 ProcessorSlotChain
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// SPI 机制:获取所有 ProcessorSlot(默认的、自定义的),并排序后返回
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
// 去除脏数据
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}

各个 Slot 的作用如下:
- NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流、降级。
- ClusterBuilderSlot:用于存储资产的统计信息,以及调用者信息,例如,该资源的RT(平均响应时间)、QPS、Thread Count等等,这些信息将用于作为多维度限流、降级的依据。
- StatisticsSlot:用于记录、统计不同维度的 runtime 指标监控信息。
- ParamFlowSlot:用于资源配置热点参数、限流规则以及前面 slot 统计的状态,来进行流量控制。
- SystemSlot:通过系统的状态,例如 load1 等,来控制总的入口流量。
- AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制。
- FlowSlot:用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
- DegradeSlot:通过统计信息以及预设的规则,来做熔断降级。
chain.entry
通过上面的插槽链路构建(责任链模式),返回一个 ProcessorSlot<Object> chain 对象,并封装到 CtEntry 中。通过 chain.entry 完成责任链的执行。
关于所有 Slot chain 的执行,请参考【Alibaba Sentinel - Slot chain解析】。
entry.exit(count, args)
退出:清除上下文。
class CtEntry extends Entry {
@Override
public void exit(int count, Object... args) throws ErrorEntryFreeException {
trueExit(count, args);
}
@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);
return parent;
}
protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
// NullContext 直接退出
if (context instanceof NullContext) {
return;
}
// 判断当前处理Entry是否为this:如果不是,循环递归调用exit,处理如下场景
// -parent
// ---asyncInvocation
// -----handleResultForAsync
// -------handleResultForAsync
if (context.getCurEntry() != this) {
String curEntryNameInContext = context.getCurEntry() == null ? null
: context.getCurEntry().getResourceWrapper().getName();
// Clean previous call stack.
CtEntry e = (CtEntry) context.getCurEntry();
// 循环递归调用
while (e != null) {
e.exit(count, args);
e = (CtEntry) e.parent;
}
String errorMessage = String.format("The order of entry exit can't be paired with the order of entry"
+ ", current entry in context: <%s>, but expected: <%s>", curEntryNameInContext,
resourceWrapper.getName());
throw new ErrorEntryFreeException(errorMessage);
} else {
// Go through the onExit hook of all slots.
// 执行所有slot chain的exit方法
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// Go through the existing terminate handlers (associated to this invocation).
// 调用退出处理程序并清理
callExitHandlersAndCleanUp(context);
// Restore the call stack.
context.setCurEntry(parent);
if (parent != null) {
((CtEntry) parent).child = null;
}
if (parent == null) {
// Default context (auto entered) will be exited automatically.
// 默认上下文(自动输入)将自动退出。
if (ContextUtil.isDefaultContext(context)) {
ContextUtil.exit();
}
}
// Clean the reference of context in current entry to avoid duplicate exit.
// 清理当前条目中上下文的引用,避免重复退出。
clearEntryContext();
}
}
}
}
2. static AsyncEntry asyncEntry
public class CtSph implements Sph {
private static final Object[] OBJECTS0 = new Object[0];
// 相同的资源共享相同的ProcessorSlotChain(处理链路):缓存
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();
// 对象锁
private static final Object LOCK = new Object();
@Override
public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return asyncEntryInternal(resource, count, args);
}
private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args)
throws BlockException {
return asyncEntryWithPriorityInternal(resourceWrapper, count, false, args);
}
private AsyncEntry asyncEntryWithPriorityInternal(ResourceWrapper resourceWrapper, int count, boolean prioritized,
Object... args) throws BlockException {
// 获取当前线程执行上下文对象Context:底层是ThreadLocal<Context> contextHolder
// 第一次进来肯定为空
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
// NullContext:表示上下文的数量已经超过阈值(树形结构,最大层级2000)
// 所以这里只初始化条目。不会进行任何规则检查。chain = null
return asyncEntryWithNoChain(resourceWrapper, context);
}
// 初始化一个默认上下文
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is turned off, so no rule checking will be done.
// 全局开关,不会进行任何规则检查,常量值Constants.ON = true
if (!Constants.ON) {
// 不会进行任何规则检查。chain = null
return asyncEntryWithNoChain(resourceWrapper, context);
}
// 加载插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done.
// 表示插槽链超过 Constants.MAX_SLOT_CHAIN_SIZE= 6000
if (chain == null) {
// 不会进行任何规则检查。chain = null
return asyncEntryWithNoChain(resourceWrapper, context);
}
// 创建一个异步的Entry对象
AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context);
try {
// 通过 chain.entry 执行处理链路,完成资产收集、统计、流量控制、熔断降级
chain.entry(context, resourceWrapper, null, count, prioritized, args);
// 仅当条目成功通过槽链时才启动异步上下文。
asyncEntry.initAsyncContext();
// The asynchronous call may take time in background, and current context should not be hanged on it.
// 从当前上下文中删除当前的异步条目。
asyncEntry.cleanCurrentEntryInLocal();
} catch (BlockException e1) {
// When blocked, the async entry will be exited on current context.
// The async context will not be initialized.
// 达到当前时间的流量执行阈值,执行exit,退出
asyncEntry.exitForContext(context, count, args);
// 抛出 BlockException
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
// When this happens, async context is not initialized.
RecordLog.warn("Sentinel unexpected exception in asyncEntryInternal", e1);
asyncEntry.cleanCurrentEntryInLocal();
}
return asyncEntry;
}
}
可以看出,static AsyncEntry asyncEntry 与 static Entry entry 的最大区别,就是在请求通过当前资源所有规则后,会初始化一个异步调用链。
SphU.asyncEntry(xxx) 不会影响当前(调用线程)的 Context,因此以下两个 entry 在调用链上是平级关系(处于同一层),而不是嵌套关系:
// 调用链类似于:
// -parent
// ---asyncResource
// ---syncResource
asyncEntry = SphU.asyncEntry(asyncResource);
entry = SphU.entry(normalResource);
若在异步回调中需要嵌套其它的资源调用(无论是 entry 还是 asyncEntry),只需要借助 Sentinel 提供的上下文切换功能,在对应的地方通过 ContextUtil.runOnContext(context, f) 进行 Context 变换,将对应资源调用处的 Context 切换为生成的异步 Context,即可维持正确的调用链路关系。示例如下:
public void handleResult(String result) {
Entry entry = null;
try {
entry = SphU.entry("handleResultForAsync");
// Handle your result here.
} catch (BlockException ex) {
// Blocked for the result handler.
} finally {
if (entry != null) {
entry.exit();
}
}
}
public void someAsync() {
try {
AsyncEntry entry = SphU.asyncEntry(resourceName);
// Asynchronous invocation.
doAsync(userId, result -> {
// 在异步回调中进行上下文变换,通过 AsyncEntry 的 getAsyncContext 方法获取异步 Context
ContextUtil.runOnContext(entry.getAsyncContext(), () -> {
try {
// 此处嵌套正常的资源调用.
handleResult(result);
} finally {
entry.exit();
}
});
});
} catch (BlockException ex) {
// Request blocked.
// Handle the exception (e.g. retry or fallback).
}
}
此时的调用链就类似于:
-parent
---asyncInvocation
-----handleResultForAsync
边栏推荐
- MYSQL入门与进阶(十三)
- Trample --- discretization + tree array + difference
- 数仓中概念术语解析
- 【npm错误】- npm ERR code ERESOLVE 和 npm ERR ERESOLVE could not resolve 问题
- centos安装mysql8
- What is SOA (Service Oriented Architecture)?
- C陷阱与缺陷 第3章 语义“陷阱” 3.8 运算符&&、||和!
- mysql大表联合查询优化,大事务优化,规避事务超时,锁等待超时与锁表
- My approval function of conference OA project
- 瀚高数据库最佳实践配置工具HG_BP日志采集内容
猜你喜欢

单例模式(饿汉式 懒汉式)

C language: judging letters

sqlilabs less-32~less-33

Day 8 notes

MySQL忘记密码怎么办

Verilog's time system tasks - $time, $stime, $realtime

C language: Little Lele and hexadecimal conversion

Interpretation of ue4.25 slate source code

Look at robot education and lead the mainstream of quality education

【FreeSwitch开发实践】media bug获取通话语音流
随机推荐
解析机器人与人类情感共鸣的主观意识
C陷阱与缺陷 第3章 语义“陷阱” 3.9 整数溢出
Codeworks 5 questions per day (average 1500) - day 25
Shell编程规范与变量
.net serialize enumeration as string
解析Steam教育中的项目式学习创造力
Hangao database best practice configuration tool Hg_ BP log collection content
PHP process communication series (I) named pipes
Restfulapi - C - add header username and password authentication
Verilog:阻塞赋值和非阻塞赋值
Add a row to a specific location in the dataframe
seed 随机种子
金山云回港上市:中国TO B云厂商的港股奔袭
Data truncation and estimation
MYSQL入门与进阶(十二)
Advanced architects, 16 common principles of microservice design and Governance
第09章_性能分析工具的使用
瀚高数据库最佳实践配置工具HG_BP日志采集内容
Unable to start after idea installation
vasp计算任务报错:M_divide:can not subdivide 8 nodes by 6