当前位置:网站首页>Skywalking系列学习之告警通知源码分析
Skywalking系列学习之告警通知源码分析
2022-07-27 00:44:00 【snail-jie】
前言
前面的文章,我们已经知道了skywalking可以采集链路数据,但如果链路里有异常应该怎样通知呢?本篇文章以响应时间超时为例源码分析一下skywalking的告警流程
ServiceDispatcher如何生成
- 在OALRuntime#start时,解析oal/core.oal配置文件,动态生成ServiceDispatcher
public void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException { // 读取oal/core.oal配置文件 Reader read = ResourceUtils.read(oalDefine.getConfigFile()); // 解析oal/core.oal配置文件 ScriptParser scriptParser = ScriptParser.createFromFile(read, oalDefine.getSourcePackage()); OALScripts oalScripts = scriptParser.parse(); // 生成对应的Dispatcher类 this.generateClassAtRuntime(oalScripts); } - OALRuntime#generateClassAtRuntime生成ServiceDispatcher
private void generateClassAtRuntime(OALScripts oalScripts) throws OALCompileException { List<AnalysisResult> metricsStmts = oalScripts.getMetricsStmts(); /** * 1. 取出metricsStmt的from属性的sourceName ---> Service * 2. 将sourceName与DispatcherContext形成映射关系:OALRuntime$allDispatcherContext$allContext * 3. 将属于sourceName的metricsStmt放在DispatcherContext的metrics属性中 **/ metricsStmts.forEach(this::buildDispatcherContext); for (AnalysisResult metricsStmt : metricsStmts) { /** * generateMetricsClass生成Metrics类(例如ServiceRespTimeMetrics) * 继承LongAvgMetrics,实现WithMetadata接口 --> 为ServiceRespTimeMetrics添加方法(hashCode方法在metrics/hashCode.ftl) * */ metricsClasses.add(generateMetricsClass(metricsStmt)); generateMetricsBuilderClass(metricsStmt); } for (Map.Entry<String, DispatcherContext> entry : allDispatcherContext.getAllContext().entrySet()) { // generateDispatcherClass生成Dispatcher类 dispatcherClasses.add(generateDispatcherClass(entry.getKey(), entry.getValue())); } oalScripts.getDisableCollection().getAllDisableSources().forEach(disable -> { DisableRegister.INSTANCE.add(disable); }); }
OALRuntime#generateDispatcherClass生成ServiceDispatcher
private Class generateDispatcherClass(String scopeName,
DispatcherContext dispatcherContext) throws OALCompileException {
// 通过javassist动态生成ServiceDispatcher类
String className = dispatcherClassName(scopeName, false);
CtClass dispatcherClass = classPool.makeClass(dispatcherClassName(scopeName, true));
// 为ServiceDispatcher添加Service scope metrics的方法(方法模板为dispatcher/doMetrics.ftl)-->例如下方的doServiceRespTime方法
for (AnalysisResult dispatcherContextMetric : dispatcherContext.getMetrics()) {
StringWriter methodEntity = new StringWriter();
configuration.getTemplate("dispatcher/doMetrics.ftl").process(dispatcherContextMetric, methodEntity);
dispatcherClass.addMethod(CtNewMethod.make(methodEntity.toString(), dispatcherClass));
}
// 为ServiceDispatcher添加dispatch方法(会调用上面方法集合,方法模板为dispatcher/dispatch.ftl)
StringWriter methodEntity = new StringWriter();
configuration.getTemplate("dispatcher/dispatch.ftl").process(dispatcherContext, methodEntity);
dispatcherClass.addMethod(CtNewMethod.make(methodEntity.toString(), dispatcherClass));
}
- 为ServiceDispatcher添加Service scope metrics的方法(以ServiceRespTime为例)
private void doServiceRespTime(org.apache.skywalking.oap.server.core.source.Service source) { org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics metrics = new org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics(); metrics.setTimeBucket(source.getTimeBucket()); metrics.setEntityId(source.getEntityId()); metrics.combine( (long)(source.getLatency()), (long)(1)); org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor.getInstance().in(metrics); }
- 为ServiceDispatcher添加dispatch方法
public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) { org.apache.skywalking.oap.server.core.source.Service _source = (org.apache.skywalking.oap.server.core.source.Service)source; doServiceRespTime(_source); doServiceSla(_source); doServiceCpm(_source); doServicePercentile(_source); doServiceApdex(_source); doServiceMqConsumeCount(_source); doServiceMqConsumeLatency(_source); }
ServiceDispatcher生成ServiceRespTimeMetrics数据
TraceSegmentReportServiceHandler接收链路数据SegmentObject上报,核心源码流程:
TraceSegmentReportServiceHandler#collect#onNext --> SegmentParserServiceImpl#send --> TraceAnalyzer#doAnalysis --> RPCAnalysisListener#parseEntry --> RPCAnalysisListener#build --> SourceReceiverImpl#receive --> dispatcherManager#forward --> XXXDispatcher#dispatchnotifyEntryListener中调用RPCAnalysisListener#parseEntry,把上下游的链路信息完善到sourceBuilder里(包括链路耗时),并添加到callingInTraffic中

public void parseEntry(SpanObject span, SegmentObject segmentObject) { RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl); sourceBuilder.setSourceServiceName(Const.USER_SERVICE_NAME); sourceBuilder.setSourceServiceInstanceName(Const.USER_INSTANCE_NAME); sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME); sourceBuilder.setSourceLayer(Layer.UNDEFINED); sourceBuilder.setDestServiceInstanceName(segmentObject.getServiceInstance()); sourceBuilder.setDestServiceName(segmentObject.getService()); sourceBuilder.setDestLayer(identifyServiceLayer(span.getSpanLayer())); sourceBuilder.setDestEndpointName(span.getOperationName()); sourceBuilder.setDetectPoint(DetectPoint.SERVER); sourceBuilder.setComponentId(span.getComponentId()); // 涉及latency计算:startTime-endTime,TimeBucke:StartTime分钟计时->202207232324 setPublicAttrs(sourceBuilder, span); callingInTraffic.add(sourceBuilder); }从下边RPCAnalysisListener#build代码片段中可以看到包含了Service、ServiceInstance、ServiceRelation、ServiceInstanceRelation这些类型的Source;并将这些Source提交给sourceReceiver,其底层封装的DispatcherManager会根据 Source的类型选择相应的SourceDispatcher,通过方法dispatch进一步处理
public void build() { callingInTraffic.forEach(callingIn -> { callingIn.prepare(); sourceReceiver.receive(callingIn.toService()); sourceReceiver.receive(callingIn.toServiceInstance()); sourceReceiver.receive(callingIn.toServiceRelation()); sourceReceiver.receive(callingIn.toServiceInstanceRelation()); .... }); ....... }SourceDispatcher类怎样生成以及与Source映射关系在目录(ServiceDispatcher如何生成)已经分析,ServiceDispatcher#dispatch
public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) { org.apache.skywalking.oap.server.core.source.Service _source = (org.apache.skywalking.oap.server.core.source.Service)source; doServiceRespTime(_source); doServiceSla(_source); doServiceCpm(_source); doServicePercentile(_source); doServiceApdex(_source); doServiceMqConsumeCount(_source); doServiceMqConsumeLatency(_source); }ServiceDispatcher#doServiceRespTime生成ServiceRespTimeMetrics后调用MetricsStreamProcessor进行Metrics聚合处理
private void doServiceRespTime(org.apache.skywalking.oap.server.core.source.Service source) { org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics metrics = new org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics(); metrics.setTimeBucket(source.getTimeBucket()); metrics.setEntityId(source.getEntityId()); metrics.combine( (long)(source.getLatency()), (long)(1)); org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor.getInstance().in(metrics); }
ServiceRespTimeMetrics数据告警处理
- NotifyHandler接收Metrics,根据scope封装MetaInAlarm信息,获取MetricsName所有的RunningRule集合,遍历执行in() 添加到Window中
public void notify(Metrics metrics) { WithMetadata withMetadata = (WithMetadata) metrics; MetricsMetaInfo meta = withMetadata.getMeta(); int scope = meta.getScope(); ..... // 根据scope封装MetaInAlarm信息 MetaInAlarm metaInAlarm; if (DefaultScopeDefine.inServiceCatalog(scope)) { final String serviceId = meta.getId(); final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId( serviceId); ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm(); serviceMetaInAlarm.setMetricsName(meta.getMetricsName()); serviceMetaInAlarm.setId(serviceId); serviceMetaInAlarm.setName(serviceIDDefinition.getName()); metaInAlarm = serviceMetaInAlarm; } ...... // 获取MetricsName所有的RunningRule集合 List<RunningRule> runningRules = core.findRunningRule(meta.getMetricsName()); if (runningRules == null) { return; } runningRules.forEach(rule -> rule.in(metaInAlarm, metrics)); }
- RunningRule#in(MetaInAlarm meta, Metrics metrics)添加metrics到window中(只保留最近的N(period)桶 -->静默10次避免重复告警)
- AlarmCore启动定时任务,每10s检查一次
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { LocalDateTime checkTime = LocalDateTime.now(); // 距离上一次检查间隔多少分钟 int minutes = Minutes.minutesBetween(lastExecuteTime, checkTime).getMinutes(); alarmRulesWatcher.getRunningContext().values().forEach(ruleList -> ruleList.forEach(runningRule -> { // 检查间隔有1分钟以上 if (minutes > 0) { // 移动窗口 runningRule.moveTo(checkTime); // 秒刻度大于15 if (checkTime.getSecondOfMinute() > 15) { // 检查条件,决定是否触发报警,告警阈值实际判断逻辑:服务调用耗时超过阈值并且次数也达到阈值 alarmMessageList.addAll(runningRule.check()); } } })); if (!alarmMessageList.isEmpty()) { // 告警通知(原生支持9种告警通知) allCallbacks.forEach(callback -> callback.doAlarm(filteredMessages)); } }, 10, 10, TimeUnit.SECONDS);
- 响应时间超过阈值,钉钉进行告警通知

束语
本篇文章先介绍ServiceDispatcher如何生成,然后分析ServiceDispatcher怎样生成ServiceRespTimeMetrics数据,最后通过检查ServiceRespTimeMetrics数据进行告警通知,对整条链路进行了分析,相信大家也对整个链路有了一定的了解。在为了保证主体思路的前提下,忽略了一些细节,譬如Metrics是如何聚合处理的?后续再继续分析吧
边栏推荐
- Interview shock 68: why does TCP need three handshakes?
- 机器学习【Matplotlib】
- The EXE compiled by QT is started with administrator privileges
- CuteOne:一款OneDrive多网盘挂载程序/带会员/同步等功能
- 手动从0搭建ABP框架-ABP官方完整解决方案和手动搭建简化解决方案实践
- 论构造函数的原型是谁
- Use of formdata
- setTimeout第一个参数应该注意的地方
- 确定了,2022下半年软考报名8月开始
- Zhang Ping, Alibaba cloud Solution Architect: system construction of cloud native digital safety production
猜你喜欢

Database knowledge required by testers: MySQL common syntax

云开发寝适闹钟微信小程序源码
![[paper]PointLaneNet论文浅析](/img/f6/8001be4f90fe15100e0295de02491f.png)
[paper]PointLaneNet论文浅析

What is a process?

CAS部署使用以及登录成功跳转地址

抖音服务器带宽有多大,才能供上亿人同时刷?

Interview shock 68: why does TCP need three handshakes?

CS224W fall 1.2 Applications of Graph ML
![[nisactf 2022] upper](/img/61/05291ba7a63fe13882e49ab026df14.png)
[nisactf 2022] upper

Play a parallel multithreaded mcu-mc3172
随机推荐
仿知乎论坛社区社交微信小程序
iNFTnews | “流量+体验”白衬e数字时装节引领数字时装新变迁
【RYU】安装RYU常见问题及解决办法
Zhang Ping, Alibaba cloud Solution Architect: system construction of cloud native digital safety production
使用 WebSocket 实现一个网页版的聊天室(摸鱼更隐蔽)
Common questions and answers of software testing interview (divergent thinking, interface, performance, concept,)
CuteOne:一款OneDrive多网盘挂载程序/带会员/同步等功能
调用JShaman的Web API接口,实现JS代码加密。
Rust web (I) -- self built TCP server
Marqueeview realizes sliding display effect
关于url编解码应该选用的函数
阿里云技术专家杨泽强:弹性计算云上可观测能力的构建
"Software testing" packaging resume directly improves the pass rate from these points
{“errcode“:44001,“errmsg“:“empty media data, hint: [1655962096234893527769663], from ip: 222.72.xxx.
Database knowledge required by testers: MySQL common syntax
商城小程序项目完整源码(微信小程序)
Goatgui invites you to attend a machine learning seminar
B-树的应用以及添加和删除操作
Comprehensive summary of shell analysis log file commands
[SQL简单题] LeetCode 627. 变更性别