龙岗网站建设方案网店运营工作内容
Sentinel简介
Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。
主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。
核心概念
资源
资源是Sentinel中一个非常重要的概念,资源就是Sentinel所保护的对象。
资源可以是一段代码,又或者是一个接口,Sentinel中并没有什么强制规定,但是实际项目中一般以一个接口为一个资源,比如说一个http接口,又或者是rpc接口,它们就是资源,可以被保护。
资源是通过Sentinel的API定义的,每个资源都有一个对应的名称,比如对于一个http接口资源来说,Sentinel默认的资源名称就是请求路径。
规则
规则也是一个重要的概念,规则其实比较好理解,比如说要对一个资源进行限流,那么限流的条件就是规则,后面在限流的时候会基于这个规则来判定是否需要限流。
Sentinel的规则分为流量控制规则、熔断降级规则以及系统保护规则,不同的规则实现的效果不一样。
快速入门
引入依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.1</version></dependency>
设置流控规则
@Configuration
public class FLowRulesConfig {/*** 定义限流规则*/@PostConstructpublic void initFLowRules() {//1.创建限流规则List<FlowRule> rules = new ArrayList<>();FlowRule rule = new FlowRule();// 定义资源,表示Sentinel会对哪个资源生效rule.setResource("hello");// 定义限流规则类型,QPS限流类型rule.setGrade(RuleConstant.FLOW_GRADE_QPS);// 定义QPS每秒能通过的请求个数rule.setCount(2);rules.add(rule);// 2、加载限流规则FlowRuleManager.loadRules(rules);}
}
编写Controller,进行测试
@Slf4j
@RestController
public class HelloController {@GetMapping("/hello")public String startHello() {// 进行限流try (Entry entry = SphU.entry("hello")) { // 加载限流规则,若果存在才会往下执行// 被保护的资源return "hello Sentinel";} catch (BlockException e) {e.printStackTrace();// 被限流或者被降级的操作return "系统繁忙";}}
}
源码拆解
CtSph
核心是Entry entry = SphU.entry("hello")
。
CtSph#entryWithPriority()
,获取Context
,如果不存在,则使用默认的。构造责任链,执行。
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {// Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}chain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}
ContextUtil
静态方法,会初始化一个默认的EntranceNode
。
static {// Cache the entrance node for default context.initDefaultContext();}private static void initDefaultContext() {String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);Constants.ROOT.addChild(node);contextNameNodeMap.put(defaultContextName, node);}
ContextUtil
ContextUtil#enter()
,当执行该方法的时候,会往线程变量ThreadLocal
存入当前的Context
public static Context enter(String name, String origin) {if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {throw new ContextNameDefineException("The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");}return trueEnter(name, origin);}protected static Context trueEnter(String name, String origin) {Context context = contextHolder.get();if (context == null) {Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;DefaultNode node = localCacheNameMap.get(name);if (node == null) {if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {LOCK.lock();try {node = contextNameNodeMap.get(name);if (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);// Add entrance node.Constants.ROOT.addChild(node);Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);newMap.putAll(contextNameNodeMap);newMap.put(name, node);contextNameNodeMap = newMap;}}} finally {LOCK.unlock();}}}context = new Context(node, name);context.setOrigin(origin);contextHolder.set(context);}return context;}
ProcessorSlotChain
SlotChainProvider#newSlotChain
,按照默认的DefaultSlotChainBuilder
生成责任链。
public static ProcessorSlotChain newSlotChain() {if (slotChainBuilder != null) {return slotChainBuilder.build();}// Resolve the slot chain builder SPI.slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);if (slotChainBuilder == null) {// Should not go through here.RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");slotChainBuilder = new DefaultSlotChainBuilder();} else {RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "+ slotChainBuilder.getClass().getCanonicalName());}return slotChainBuilder.build();}
DefaultSlotChainBuilder#build
,根据spi加载ProcessorSlot
。
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();// Note: the instances of ProcessorSlot should be different, since they are not stateless.List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);for (ProcessorSlot slot : sortedSlotList) {if (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}return chain;}
}
NodeSelectorSlot
NodeSelectorSlot
,处理同一个resource,使用同一个node
@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {/*** {@link DefaultNode}s of the same resource in different context.*/private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;// Build invocation tree((DefaultNode) context.getLastNode()).addChild(node);}}}context.setCurNode(node);fireEntry(context, resourceWrapper, node, count, prioritized, args);}
}
ClusterBuilderSlot
对于每一个 resource,会对应一个 ClusterNode
实例,如果不存在,就创建一个实例。当设置了 origin 的时候,会额外生成一个 StatisticsNode
实例,挂在 ClusterNode
上。
- 不同入口的访问数据是
DefaultNode
- 统计所有入口访问数据之和是
ClusterNode
- 来自某个服务的访问数据是
OriginNode
@SpiOrder(-9000)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args)throws Throwable {if (clusterNode == null) {synchronized (lock) {if (clusterNode == null) {// Create the cluster node.clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}node.setClusterNode(clusterNode);/** if context origin is set, we should get or create a new {@link Node} of* the specific origin.*/if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}fireEntry(context, resourceWrapper, node, count, prioritized, args);}
}
LogSlot
对于下面的节点如果有抛出阻塞异常,进行捕捉,并且打印日志。记录哪些接口被规则挡住了。
@SpiOrder(-8000)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)throws Throwable {try {fireEntry(context, resourceWrapper, obj, count, prioritized, args);} catch (BlockException e) {EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),context.getOrigin(), count);throw e;} catch (Throwable e) {RecordLog.warn("Unexpected entry exception", e);}}
}
StatisticSlot
负责进行数据统计,为限流降级提供数据支持的。
@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// Add block count.node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {// Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}}
}
AuthoritySlot
AuthoritySlot
做权限控制,根据 origin 做黑白名单的控制:
@SpiOrder(-6000)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {checkBlackWhiteAuthority(resourceWrapper, context);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();if (authorityRules == null) {return;}Set<AuthorityRule> rules = authorityRules.get(resource.getName());if (rules == null) {return;}for (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) {throw new AuthorityException(context.getOrigin(), rule);}}}
}
AuthorityRuleChecker#passCheck
对origin进行规则校验
final class AuthorityRuleChecker {static boolean passCheck(AuthorityRule rule, Context context) {String requester = context.getOrigin();// Empty origin or empty limitApp will pass.if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {return true;}// Do exact match with origin name.int pos = rule.getLimitApp().indexOf(requester);boolean contain = pos > -1;if (contain) {boolean exactlyMatch = false;String[] appArray = rule.getLimitApp().split(",");for (String app : appArray) {if (requester.equals(app)) {exactlyMatch = true;break;}}contain = exactlyMatch;}int strategy = rule.getStrategy();if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {return false;}if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {return false;}return true;}private AuthorityRuleChecker() {}
}
SystemSlot
根据整个系统运行的统计数据来限流的,防止当前系统负载过高。它支持入口qps、线程数、响应时间、cpu使用率、负载5个限流的维度。
@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {SystemRuleManager.checkSystem(resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}}
Sentinel 针对所有的入口流量,使用了一个全局的 ENTRY_NODE 进行统计,所以我们也要知道,系统保护规则是全局的,和具体的某个资源没有关系。
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {if (resourceWrapper == null) {return;}// Ensure the checking switch is on.if (!checkSystemStatus.get()) {return;}// for inbound traffic onlyif (resourceWrapper.getEntryType() != EntryType.IN) {return;}// total qpsdouble currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();if (currentQps > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}// total threadint currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");}double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");}// load. BBR algorithm.if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");}}// cpu usageif (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");}}
FlowSlot
流控的核心处理类
@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;public FlowSlot() {this(new FlowRuleChecker());}/*** Package-private for test.** @param checker flow rule checker* @since 1.6.1*/FlowSlot(FlowRuleChecker checker) {AssertUtil.notNull(checker, "flow checker should not be null");this.checker = checker;}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}
}
FlowRuleChecker#checkFlow
public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
}
DegradeSlot
降级策略。Sentinel支持三种熔断策略:慢调用比例、异常比例 、异常数。
Sentinel会为每个设置的规则都创建一个熔断器,熔断器有三种状态,OPEN(打开)、HALF_OPEN(半开)、CLOSED(关闭)
- 当处于CLOSED状态时,可以访问资源,访问之后会进行慢调用比例、异常比例、异常数的统计,一旦达到了设置的阈值,就会将熔断器的状态设置为OPEN
- 当处于OPEN状态时,会去判断是否达到了熔断时间,如果没到,拒绝访问,如果到了,那么就将状态改成HALF_OPEN,然后访问资源,访问之后会对访问结果进行判断,符合规则设置的要求,直接将熔断器设置为CLOSED,关闭熔断器,不符合则还是改为OPEN状态
- 当处于HALF_OPEN状态时,直接拒绝访问资源
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void performChecking(Context context, ResourceWrapper r) throws BlockException {List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}
}
SentinelWebInterceptor
SentinelWebInterceptor
继承AbstractSentinelInterceptor
,当有请求访问,会执行AbstractSentinelInterceptor#preHandle
。
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {try {String resourceName = this.getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;} else if (this.increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;} else {String origin = this.parseOrigin(request);String contextName = this.getContextName(request);ContextUtil.enter(contextName, origin);Entry entry = SphU.entry(resourceName, 1, EntryType.IN);request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry);return true;}} catch (BlockException var12) {BlockException e = var12;try {this.handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}}
InitFunc
SphU#entry()
,执行该方法会调用Env
的静态变量,Env进行初始化。
public static Entry entry(String name) throws BlockException {return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}
Env
执行静态方法。
public class Env {public static final Sph sph = new CtSph();static {// If init fails, the process will exit.InitExecutor.doInit();}}
InitExecutor#doInit
使用 SPI 加载 InitFunc
的实现
public static void doInit() {if (!initialized.compareAndSet(false, true)) {return;}try {ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : loader) {RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {w.func.init();RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",w.func.getClass().getCanonicalName(), w.order));}} catch (Exception ex) {RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);ex.printStackTrace();} catch (Error error) {RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);error.printStackTrace();}}
HeartbeatSenderInitFunc
,使用HeartbeatSenderProvider
获取加载HeartbeatSender
,定时执行。
@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {private ScheduledExecutorService pool = null;public HeartbeatSenderInitFunc() {}private void initSchedulerIfNeeded() {if (this.pool == null) {this.pool = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("sentinel-heartbeat-send-task", true), new DiscardOldestPolicy());}}public void init() {HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();if (sender == null) {RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded", new Object[0]);} else {this.initSchedulerIfNeeded();long interval = this.retrieveInterval(sender);this.setIntervalIfNotExists(interval);this.scheduleHeartbeatTask(sender, interval);}}private boolean isValidHeartbeatInterval(Long interval) {return interval != null && interval > 0L;}private void setIntervalIfNotExists(long interval) {SentinelConfig.setConfig("csp.sentinel.heartbeat.interval.ms", String.valueOf(interval));}long retrieveInterval(HeartbeatSender sender) {Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();if (this.isValidHeartbeatInterval(intervalInConfig)) {RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval in Sentinel config property: " + intervalInConfig, new Object[0]);return intervalInConfig;} else {long senderInterval = sender.intervalMs();RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in config property or invalid, using sender default: " + senderInterval, new Object[0]);return senderInterval;}}private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {this.pool.scheduleAtFixedRate(new Runnable() {public void run() {try {sender.sendHeartbeat();} catch (Throwable var2) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", var2);}}}, 5000L, interval, TimeUnit.MILLISECONDS);RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + sender.getClass().getCanonicalName(), new Object[0]);}
}
HeartbeatSenderProvider
静态方法块进行初始化,根据SPI获取HeartbeatSender
。
public final class HeartbeatSenderProvider {private static HeartbeatSender heartbeatSender = null;private static void resolveInstance() {HeartbeatSender resolved = (HeartbeatSender)SpiLoader.loadHighestPriorityInstance(HeartbeatSender.class);if (resolved == null) {RecordLog.warn("[HeartbeatSenderProvider] WARN: No existing HeartbeatSender found", new Object[0]);} else {heartbeatSender = resolved;RecordLog.info("[HeartbeatSenderProvider] HeartbeatSender activated: " + resolved.getClass().getCanonicalName(), new Object[0]);}}public static HeartbeatSender getHeartbeatSender() {return heartbeatSender;}private HeartbeatSenderProvider() {}static {resolveInstance();}
}
StatisticNode
ClusterNode
,DefaultNode
,EntranceNode
都继承StatisticNode
。
StatisticNode#addPassRequest
,增加通过的请求数。
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}
ArrayMetric
的data属性默认实现OccupiableBucketLeapArray
private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) {this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}
LeapArray#currentWindow(long)
。添加数据的时候,我们要先获取操作的目标窗口,也就是 currentWindow
这个方法。
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
LeapArray#values(long)
,获取有效的窗口中的数据。
public List<T> values(long timeMillis) {if (timeMillis < 0) {return new ArrayList<T>();}int size = array.length();List<T> result = new ArrayList<T>(size);for (int i = 0; i < size; i++) {WindowWrap<T> windowWrap = array.get(i);if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {continue;}result.add(windowWrap.value());}return result;}
TrafficShapingController
FlowRuleUtil#generateRater
,根据规则生成对应的TrafficShapingController
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {switch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:default:// Default mode or unknown mode: default traffic shaping controller (fast-reject).}}return new DefaultController(rule.getCount(), rule.getGrade());}
DefaultController#canPass()
,超过流量阈值的会直接拒绝。
@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;}
SentinelResourceAspect
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")public void sentinelResourceAnnotationPointcut() {}@Around("sentinelResourceAnnotationPointcut()")public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {Method originMethod = resolveMethod(pjp);SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);if (annotation == null) {// Should not go through here.throw new IllegalStateException("Wrong state for SentinelResource annotation");}String resourceName = getResourceName(annotation.value(), originMethod);EntryType entryType = annotation.entryType();int resourceType = annotation.resourceType();Entry entry = null;try {entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());Object result = pjp.proceed();return result;} catch (BlockException ex) {return handleBlockException(pjp, annotation, ex);} catch (Throwable ex) {Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();// The ignore list will be checked first.if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {throw ex;}if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {traceException(ex);return handleFallback(pjp, annotation, ex);}// No fallback function can handle the exception, so throw it out.throw ex;} finally {if (entry != null) {entry.exit(1, pjp.getArgs());}}}
}