当前位置:网站首页>Logstash——使用throttle过滤器向钉钉发送预警消息
Logstash——使用throttle过滤器向钉钉发送预警消息
2022-06-26 05:59:00 【大·风】
之前介绍过logstash的过滤器中有一种throttle过滤器,他可以设置一种规则来进行流量控制,当流量超过预设值时就会为后续的消息添加指定的tag。可以通过使用此过滤器结合阿里钉钉机器人实现发送警报信息到钉钉聊天中。
钉钉机器人的配置
关于钉钉机器人的配置可以查看:钉钉开放平台——机器人开发——自定义机器人开发
logstash配置
数据输入的配置
input {
redis {
key => "logstash-dingtalk"
host => "localhost"
password => "root"
port => 6379
db => "0"
data_type => "list"
type => "dingtalk"
codec => plain{
charset=>"UTF-8"
}
}
}
数据的输入没有固定的要求,这里取决于每个人不同的业务场景和需求,只需要能把数据引入logstash中就可以。
过滤器的配置
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_date} %{LOGLEVEL:log_info} %{DATA:thread} %{NOTSPACE} %{SPACE} %{NOTSPACE} %{JAVACLASS:log_class} %{SPACE}: %{GREEDYDATA:log_message}" }
}
if "_grokparsefailure" in [tags] {
drop {
}
}
if [log_info] == "INFO" {
drop {
}
}
if [log_info] == "ERROR" {
throttle {
before_count => -1
after_count => 3
period => 120
max_age => 240
key => "%{[log_info]}"
add_tag => "throttled"
}
}
}
数据过滤内容核心部分是:
- 通过grok对日志信息的解析。
上面grok针对是我自己的日志格式,日志内容大概是这样的
2020-05-20 21:22:23.456 ERROR 9160 [ main] o.s.j.e.a.AnnotationMBeanExporter : Exception encountered during context initialization
- 对流量的限制。其他部分就要根据不同的业务场景去选择需不需要添加。
对流量限制,上面配置中限制了2分钟最多只有3条消息的上限限制(为了测试所以设置的比较小),下限并没有限制。
数据输出的配置
output {
if "throttled" in [tags] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token={你的token}"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"错误告警:120秒内错误日志超过3条,请注意排查"}}'
}
}
stdout {
codec => rubydebug
}
}
数据输出配置的核心内容有两部分:
- 需要告警消息的确认。
- 发送告警消息
确定需要发送告警消息
根据官方文档,当事件流量超过限制的时候会被throttle过滤器打上tag,在上面配置中也就是throttled。所以此时应该使用的判断是"throttled" in [tags]。而不是网上很多文章的"throttled" not in [tags]
此时可以正常收到预警消息

关于告警消息的优化
上面内容完成了一个超过流量就向钉钉发送告警消息的全流程配置。看起来功能实现了,但是实际上目前的配置很可能没法投入生产环境。
假如面对偶尔的流量超限上面配置还是没问题的。但是在很多时候,当错误流量或者异常消息超过预设范围的时候很可能不只是超出一两条而是大量数据。而上面的配置在指定范围中,没超出一条就会向钉钉发送一次消息,直接的结果就是导致钉钉被刷屏。
而钉钉为了避免出现这种情况,在文档中专门指出限制条件。
每个机器人每分钟最多发送20条。如果超过20条,会限流10分钟。
结果就是在遇见异常流量的时候,先是受到一波信息轰炸,然后后续十分钟无法获得服务后续的情况。
针对上面的情况可以加一个限制条件,我们不需要每条超过流量的消息都去发送一条告警消息,可以缓一缓。
基于数量的控制
这里就需要使用aggregate过滤器了。
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_date} %{LOGLEVEL:log_info} %{DATA:thread} %{NOTSPACE} %{SPACE} %{NOTSPACE} %{JAVACLASS:log_class} %{SPACE}: %{GREEDYDATA:log_message}" }
}
if "_grokparsefailure" in [tags] {
drop {
}
}
if [log_info] == "INFO" {
drop {
}
}
if [log_info] == "ERROR" {
throttle {
before_count => -1
after_count => 3
period => 120
max_age => 240
key => "%{[log_info]}"
add_tag => "throttled"
}
}
if "throttled" in [tags] {
aggregate {
task_id => "%{log_info}"
code => "map['throttledNum'] ||= 0 ; map['throttledNum'] += 1
event.set('throttledNum', map['throttledNum'])"
}
if [throttledNum] >= 3 {
aggregate {
task_id => "%{log_info}"
code => "map['throttledNum'] = 0"
add_tag => "throttled-num-out"
}
}
}
}
上面配置中将已经被节流的数据数量做一个缓存存在map['throttledNum']中,同时设置当前数量到事件中,当事件的throttledNum大于我们预设的3时,清空map缓存并且添加throttled-num-out标签。这样就能实现每3条节流事件只发送一条告警信息。
此时消息输出的配置需要调整为
output {
if "throttled-num-out" in [tags] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token={你的token}"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"错误告警:120秒内错误日志超过3条,请注意排查"}}'
}
}
stdout {
codec => rubydebug
}
}
基于时间的控制
但这个还不是完美的,既然是异常我们就没办法估计其最终可能产生的速度,这个时候我们可以基于时间来限制发送频率
这里就需要使用aggregate过滤器了。
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_date} %{LOGLEVEL:log_info} %{DATA:thread} %{NOTSPACE} %{SPACE} %{NOTSPACE} %{JAVACLASS:log_class} %{SPACE}: %{GREEDYDATA:log_message}"
}
}
if "_grokparsefailure" in [tags] {
drop {
}
}
if [log_info] == "INFO" {
drop {
}
}
if [log_info] == "ERROR" {
throttle {
before_count => -1
after_count => 3
period => 120
max_age => 240
key => "%{[log_info]}"
add_tag => "throttled"
}
}
if "throttled" in [tags] {
aggregate {
task_id => "%{log_info}"
code => "event.set('throttled_time',Time.parse(event.get('log_date')).to_f*1000)
map['throttled_time'] || = 0
event.set('throttled_time_out', (event.get('throttled_time') - map['throttled_time']) > 10000)
"
}
if [throttled_time_out] {
aggregate {
task_id => "%{log_info}"
code => "map['throttled_time'] = event.get('throttled_time')
"
}
}
}
}
和之前的配置不同,此时缓存的是时间。
- 首先得到日志时间的long类型表示,设置到事件的
throttled_time中。 - 然后判断事件当前时间,和用来计算上一次发送钉钉预警时间是否超过预设时间(10000毫秒)。
- 当
throttled_time_out的值为真的时候表示距离上次发送预警信息时间间隔超过了(10000毫秒)。 - 则将
map['throttled_time']的值更新为当前事件时间,同时在数据输出配置中发送预警消息。
此时消息输出的配置需要调整为
output {
if [throttled_time_out] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token={你的token}"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"错误告警:120秒内错误日志超过3条,请注意排查"}}'
}
}
stdout {
codec => rubydebug
}
}
上面关于logstash向钉钉发送告警消息的内容仅仅是个人思路。logstash提供了执行Ruby code的相关处理器,在使用Ruby code的情况下应该能刚好的实现上面的内容。当然获取使用其他方式一样可以避免出现消息轰炸。
个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,假如开发同学发现了,请及时告知,我会第一时间修改相关内容。假如我的这篇内容对你有任何帮助的话,麻烦给我点一个赞。你的点赞就是我前进的动力。
边栏推荐
- The model defined (modified) in pytoch loads some required pre training model parameters and freezes them
- SSH keygen specifies the path
- MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications
- 数据可视化实战:实验报告
- 423- binary tree (110. balanced binary tree, 257. all paths of binary tree, 100. same tree, 404. sum of left leaves)
- Source code of findcontrol
- Force buckle 875 Coco, who likes bananas
- Status mode, body can change at will
- Detailed explanation of serial port communication principle 232, 422, 485
- Unicloud cloud development obtains applet user openid
猜你喜欢
随机推荐
Prototype mode, Baa Baa
Func < T, tresult > Commission - learning record
Vs2022 offline installation package download and activation
About XXX management system (version C)
跨域的五种解决方案
Pre-Sale Analysis
【 langage c】 stockage des données d'analyse approfondie en mémoire
numpy.exp()
Record how to modify the control across threads
The model defined (modified) in pytoch loads some required pre training model parameters and freezes them
状态模式,身随心变
Factory method pattern, abstract factory pattern
Multi thread synchronous downloading of network pictures
volatile应用场景
numpy. tile()
tf. nn. top_ k()
Test depends on abstraction and does not depend on concrete
Kolla ansible deploy openstack Yoga version
RIA ideas
A new explanation of tcp/ip five layer protocol model









