16、SpringCloud Alibaba - Sentinel功能插槽ProcessorSlot
前言
sentinel 可以对资源也就是接口配置一些规则,各种规则检查的功能都继承ProcessorSlot,sentinel 会将不同的 ProcessorSlot 组装为一个责任链,当请求来的时候,会对资源的规则进行检查。
一、ProcessorSlotChain 初始化
1、 在访问资源的时候,首次访问会创建功能链,调用SlotChainProvider.newSlotChain()方法;
SlotChainProvider.java
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
//用SPI方式获取SlotChainBuilder
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
//调用 build() 方法
return slotChainBuilder.build();
}
2、 build();
DefaultSlotChainBuilder.java
public ProcessorSlotChain build() {
//创建 DefaultProcessorSlotChain(),并创建首个功能插槽ProcessorSlot
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// Note: the instances of ProcessorSlot should be different, since they are not stateless.
//通过SPI方式加载ProcessorSlot
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
//构建链
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
3、 目前版本的有9个ProcessorSlot;
NodeSelectorSlot: 构建调用树
ClusterBuilderSlot: 维护集群流控数据,每个资源1个ClusterNode
LogSlot: 日志相关
StatisticSlot:统计相关
AuthoritySlot:授权规则
SystemSlot:系统规则
ParamFlowSlot: 热点规则
FlowSlot:流控规则
DegradeSlot: 服务熔断降级规则
二、ProcessorSlotChain 使用
1、 SentinelWebAutoConfiguration;
在SentinelWebAutoConfiguration配置类中配置了bean SentinelWebInterceptor ,它实现了spring mvc 里的HandlerInterceptor,那么请求在执行前、后、完成时 会回调 SentinelWebInterceptor 里的方法
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebInterceptor sentinelWebInterceptor(
SentinelWebMvcConfig sentinelWebMvcConfig) {
return new SentinelWebInterceptor(sentinelWebMvcConfig);
}
2、 SentinelWebInterceptor;
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
//请求的资源名称,即访问的接口
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
//控制同一个request只能进来一次
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
// Parse the request origin using registered origin parser.
String origin = parseOrigin(request);
String contextName = getContextName(request);
ContextUtil.enter(contextName, origin);
//sentinel 对资源进行规则检查、统计等操作
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
//全局处理流控异常
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
3、 entry();
最终进入到 entryWithPriority( ) 方法
public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
throws BlockException {
return entryWithType(name, resourceType, entryType, count, false, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
return entryWithPriority(resource, count, prioritized, args);
}
4、 entryWithPriority();
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
//校验 context
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//获取资源的功能检查链,通过 SlotChainProvider.newSlotChain() 获取,上面已经分析过
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//进入检查链
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
5、 chain.entry();
链头的ProcessorSlot没有什么逻辑,直接调用了链上的下一个节点,然后再依次调用链上其他节点
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
三、ProcessorSlot 实现类
1、NodeSelectorSlot
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
//构建调用树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
//设置调用链上下文当前节点
context.setCurNode(node);
//调用下一个
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
2、ClusterBuilderSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
//每个资源一个clusterNode
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
//将clusterNode设置到当前node中
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
3、LogSlot
出现异常时,记录日志
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
try {
fireExit(context, resourceWrapper, count, args);
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exit exception", e);
}
}
}
4、AuthoritySlot
黑白名单授权相关
@SpiOrder(-6000)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
//检查授权
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
//从AuthorityRuleManager 中获取规则
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
//找到资源的规则
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
//规则校验
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
5、SystemSlot
系统规则,校验 qps、thread、rollingCounter、SystemAvgLoad、CpuUsage 等
@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//校验
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// total qps
//qps限制
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
//线程数限制
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
//rt限制
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
//负载限制
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
//cpu使用率限制
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
其他的实现类后续再分析…