当前位置:网站首页>Source code analysis of warning notification for skywalking series learning
Source code analysis of warning notification for skywalking series learning
2022-07-27 03:11:00 【snail-jie】
Preface
Previous posts , We already know that skywalking Link data can be collected , But how to notify if there is an exception in the link ? This article takes the response time timeout as an example to analyze the source code skywalking Alarm flow of the
ServiceDispatcher How to generate
- stay OALRuntime#start when , analysis oal/core.oal The configuration file , Dynamic generation ServiceDispatcher
public void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException { // Read oal/core.oal The configuration file Reader read = ResourceUtils.read(oalDefine.getConfigFile()); // analysis oal/core.oal The configuration file ScriptParser scriptParser = ScriptParser.createFromFile(read, oalDefine.getSourcePackage()); OALScripts oalScripts = scriptParser.parse(); // Generate corresponding Dispatcher class this.generateClassAtRuntime(oalScripts); } - OALRuntime#generateClassAtRuntime Generate ServiceDispatcher
private void generateClassAtRuntime(OALScripts oalScripts) throws OALCompileException { List<AnalysisResult> metricsStmts = oalScripts.getMetricsStmts(); /** * 1. Take out metricsStmt Of from Attribute sourceName ---> Service * 2. take sourceName And DispatcherContext Form a mapping relationship :OALRuntime$allDispatcherContext$allContext * 3. Will belong to sourceName Of metricsStmt Put it in DispatcherContext Of metrics Properties of the **/ metricsStmts.forEach(this::buildDispatcherContext); for (AnalysisResult metricsStmt : metricsStmts) { /** * generateMetricsClass Generate Metrics class ( for example ServiceRespTimeMetrics) * Inherit LongAvgMetrics, Realization WithMetadata Interface --> by ServiceRespTimeMetrics Add method (hashCode Method in metrics/hashCode.ftl) * */ metricsClasses.add(generateMetricsClass(metricsStmt)); generateMetricsBuilderClass(metricsStmt); } for (Map.Entry<String, DispatcherContext> entry : allDispatcherContext.getAllContext().entrySet()) { // generateDispatcherClass Generate Dispatcher class dispatcherClasses.add(generateDispatcherClass(entry.getKey(), entry.getValue())); } oalScripts.getDisableCollection().getAllDisableSources().forEach(disable -> { DisableRegister.INSTANCE.add(disable); }); }
OALRuntime#generateDispatcherClass Generate ServiceDispatcher
private Class generateDispatcherClass(String scopeName,
DispatcherContext dispatcherContext) throws OALCompileException {
// adopt javassist Dynamic generation ServiceDispatcher class
String className = dispatcherClassName(scopeName, false);
CtClass dispatcherClass = classPool.makeClass(dispatcherClassName(scopeName, true));
// by ServiceDispatcher add to Service scope metrics Methods ( The method template is dispatcher/doMetrics.ftl)--> For example, below doServiceRespTime Method
for (AnalysisResult dispatcherContextMetric : dispatcherContext.getMetrics()) {
StringWriter methodEntity = new StringWriter();
configuration.getTemplate("dispatcher/doMetrics.ftl").process(dispatcherContextMetric, methodEntity);
dispatcherClass.addMethod(CtNewMethod.make(methodEntity.toString(), dispatcherClass));
}
// by ServiceDispatcher add to dispatch Method ( Will call the above method set , The method template is dispatcher/dispatch.ftl)
StringWriter methodEntity = new StringWriter();
configuration.getTemplate("dispatcher/dispatch.ftl").process(dispatcherContext, methodEntity);
dispatcherClass.addMethod(CtNewMethod.make(methodEntity.toString(), dispatcherClass));
}
- by ServiceDispatcher add to Service scope metrics Methods ( With ServiceRespTime For example )
private void doServiceRespTime(org.apache.skywalking.oap.server.core.source.Service source) { org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics metrics = new org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics(); metrics.setTimeBucket(source.getTimeBucket()); metrics.setEntityId(source.getEntityId()); metrics.combine( (long)(source.getLatency()), (long)(1)); org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor.getInstance().in(metrics); }
- by ServiceDispatcher add to dispatch Method
public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) { org.apache.skywalking.oap.server.core.source.Service _source = (org.apache.skywalking.oap.server.core.source.Service)source; doServiceRespTime(_source); doServiceSla(_source); doServiceCpm(_source); doServicePercentile(_source); doServiceApdex(_source); doServiceMqConsumeCount(_source); doServiceMqConsumeLatency(_source); }
ServiceDispatcher Generate ServiceRespTimeMetrics data
TraceSegmentReportServiceHandler Receive link data SegmentObject Report , Core source code process :
TraceSegmentReportServiceHandler#collect#onNext --> SegmentParserServiceImpl#send --> TraceAnalyzer#doAnalysis --> RPCAnalysisListener#parseEntry --> RPCAnalysisListener#build --> SourceReceiverImpl#receive --> dispatcherManager#forward --> XXXDispatcher#dispatchnotifyEntryListener Call in RPCAnalysisListener#parseEntry, Improve the upstream and downstream link information to sourceBuilder in ( Including link time ), To add to callingInTraffic in

public void parseEntry(SpanObject span, SegmentObject segmentObject) { RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl); sourceBuilder.setSourceServiceName(Const.USER_SERVICE_NAME); sourceBuilder.setSourceServiceInstanceName(Const.USER_INSTANCE_NAME); sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME); sourceBuilder.setSourceLayer(Layer.UNDEFINED); sourceBuilder.setDestServiceInstanceName(segmentObject.getServiceInstance()); sourceBuilder.setDestServiceName(segmentObject.getService()); sourceBuilder.setDestLayer(identifyServiceLayer(span.getSpanLayer())); sourceBuilder.setDestEndpointName(span.getOperationName()); sourceBuilder.setDetectPoint(DetectPoint.SERVER); sourceBuilder.setComponentId(span.getComponentId()); // involve latency Calculation :startTime-endTime,TimeBucke:StartTime Minute timing ->202207232324 setPublicAttrs(sourceBuilder, span); callingInTraffic.add(sourceBuilder); }From below RPCAnalysisListener#build You can see in the code snippet that it contains Service、ServiceInstance、ServiceRelation、ServiceInstanceRelation These types of Source; And these Source Submit to sourceReceiver, Its bottom layer is encapsulated DispatcherManager Will be based on Source Select the corresponding SourceDispatcher, By means of dispatch Further treatment
public void build() { callingInTraffic.forEach(callingIn -> { callingIn.prepare(); sourceReceiver.receive(callingIn.toService()); sourceReceiver.receive(callingIn.toServiceInstance()); sourceReceiver.receive(callingIn.toServiceRelation()); sourceReceiver.receive(callingIn.toServiceInstanceRelation()); .... }); ....... }SourceDispatcher How classes are generated and related to Source The mapping relationship is in the directory (ServiceDispatcher How to generate ) It has been analyzed ,ServiceDispatcher#dispatch
public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) { org.apache.skywalking.oap.server.core.source.Service _source = (org.apache.skywalking.oap.server.core.source.Service)source; doServiceRespTime(_source); doServiceSla(_source); doServiceCpm(_source); doServicePercentile(_source); doServiceApdex(_source); doServiceMqConsumeCount(_source); doServiceMqConsumeLatency(_source); }ServiceDispatcher#doServiceRespTime Generate ServiceRespTimeMetrics After the call MetricsStreamProcessor Conduct Metrics Aggregate processing
private void doServiceRespTime(org.apache.skywalking.oap.server.core.source.Service source) { org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics metrics = new org.apache.skywalking.oap.server.core.source.oal.rt.metrics.ServiceRespTimeMetrics(); metrics.setTimeBucket(source.getTimeBucket()); metrics.setEntityId(source.getEntityId()); metrics.combine( (long)(source.getLatency()), (long)(1)); org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor.getInstance().in(metrics); }
ServiceRespTimeMetrics Data alarm processing
- NotifyHandler receive Metrics, according to scope encapsulation MetaInAlarm Information , obtain MetricsName be-all RunningRule aggregate , Traverse the execution in() Add to Window in
public void notify(Metrics metrics) { WithMetadata withMetadata = (WithMetadata) metrics; MetricsMetaInfo meta = withMetadata.getMeta(); int scope = meta.getScope(); ..... // according to scope encapsulation MetaInAlarm Information MetaInAlarm metaInAlarm; if (DefaultScopeDefine.inServiceCatalog(scope)) { final String serviceId = meta.getId(); final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId( serviceId); ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm(); serviceMetaInAlarm.setMetricsName(meta.getMetricsName()); serviceMetaInAlarm.setId(serviceId); serviceMetaInAlarm.setName(serviceIDDefinition.getName()); metaInAlarm = serviceMetaInAlarm; } ...... // obtain MetricsName be-all RunningRule aggregate List<RunningRule> runningRules = core.findRunningRule(meta.getMetricsName()); if (runningRules == null) { return; } runningRules.forEach(rule -> rule.in(metaInAlarm, metrics)); }
- RunningRule#in(MetaInAlarm meta, Metrics metrics) add to metrics To window in ( Keep only the latest N(period) bucket --> silent 10 Avoid repeated alarms )
- AlarmCore Start timing task , Every time 10s check
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { LocalDateTime checkTime = LocalDateTime.now(); // How many minutes before the last inspection int minutes = Minutes.minutesBetween(lastExecuteTime, checkTime).getMinutes(); alarmRulesWatcher.getRunningContext().values().forEach(ruleList -> ruleList.forEach(runningRule -> { // The inspection interval is 1 Minutes or more if (minutes > 0) { // Move the window runningRule.moveTo(checkTime); // Second scale greater than 15 if (checkTime.getSecondOfMinute() > 15) { // Inspection conditions , Decide whether to trigger the alarm , Alarm threshold actual judgment logic : The service invocation time exceeds the threshold and the number reaches the threshold alarmMessageList.addAll(runningRule.check()); } } })); if (!alarmMessageList.isEmpty()) { // Warning notice ( Native support 9 Alarm notification ) allCallbacks.forEach(callback -> callback.doAlarm(filteredMessages)); } }, 10, 10, TimeUnit.SECONDS);
- Response time exceeds threshold , Nail the alarm notice

Bundle language
This article first introduces ServiceDispatcher How to generate , Then analysis ServiceDispatcher How to generate ServiceRespTimeMetrics data , Finally pass the inspection ServiceRespTimeMetrics Data for alarm notification , The whole link is analyzed , I believe you also have a certain understanding of the whole link . In order to ensure the main idea , Some details are ignored , for example Metrics How to aggregate ? Let's continue the analysis later
边栏推荐
- The most complete basic knowledge of software testing in the whole network (a must for beginners)
- Plato Farm全新玩法,套利ePLATO稳获超高收益
- Cuteone: a onedrive multi network disk mounting program / with member / synchronization and other functions
- 仿知乎论坛社区社交微信小程序
- 论构造函数的原型是谁
- 2649: 段位计算
- Kubeadmin到底做了什么?
- OpenTelemetry 在服务网格架构下的最佳实践
- 力扣(LeetCode)207. 课程表(2022.07.26)
- [Ryu] common problems and solutions in installing Ryu
猜你喜欢
随机推荐
5、 MFC view windows and documents
论构造函数的原型是谁
Cs224w fall course - --- 1.1 why graphs?
2649: segment calculation
五、MFC视图窗口和文档
be based on. NETCORE development blog project starblog - (16) some new functions (monitoring / statistics / configuration / initialization)
Common questions and answers of software testing interview (divergent thinking, interface, performance, concept,)
[动态规划中等题] LeetCode 198. 打家劫舍 740. 删除并获得点数
Ten thousand words long text, take you to understand the kubernetes network model
Skywalking系列学习之告警通知源码分析
[dynamic planning medium] leetcode 198. looting 740. delete and get points
Attention should be paid to the first parameter of setTimeout
Call jshaman's Web API interface to realize JS code encryption.
Role of thread.sleep (0)
基于.NetCore开发博客项目 StarBlog - (16) 一些新功能 (监控/统计/配置/初始化)
My crawler notes (VII) blog traffic +1 through Crawlers
商城小程序项目完整源码(微信小程序)
Rust web (I) -- self built TCP server
Worth more than 100 million! The 86 version of "red boy" refuses to be a Daocheng Xueba. He is already a doctor of the Chinese Academy of Sciences and has 52 companies under his name
Alibaba cloud technology expert Yang Zeqiang: Construction of observability on elastic computing cloud








