最近为了提高系统的运行稳定性,在日志收集的过程中要求添加错误日志的告警,这里主要使用Logstash自带的metrics功能。Logstash可以在filter中根据某些字段进行日志的分类,如果某一类的日志出现次数不在正常范围,就会触发metrics event然后进行告警操作,这里我们只是使用简单的发邮件的告警方式。
Java的日志格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 2017-02-14 14:36:40 [ INFO] - com.yunyu.birdben.task.RiskTask -RiskTask.java(97) -我是日志信息 2017-02-14 14:36:41 [ INFO] - com.yunyu.birdben.task.RiskTask -RiskTask.java(97) -我是日志信息 2017-02-14 14:36:42 [ INFO] - com.yunyu.birdben.task.RiskTask -RiskTask.java(97) -我是日志信息 2017-02-14 14:36:43 [ INFO] - com.yunyu.birdben.task.RiskTask -RiskTask.java(97) -我是日志信息 2017-02-14 14:36:44 [ INFO] - com.yunyu.birdben.task.RiskTask -RiskTask.java(97) -我是日志信息 2017-02-14 14:36:45 [ INFO] - com.yunyu.birdben.task.RiskTask -RiskTask.java(97) -我是日志信息 2017-02-14 14:36:46 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:47 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:48 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:49 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:50 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:51 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:52 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息 2017-02-14 14:36:53 [ INFO] - com.yunyu.birdben.task.OtherTask -OtherTask.java(97) -我是日志信息
|
Grok表达式
1 2
| JAVA_TIMESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} JAVA_LOGS %{JAVA_TIMESTAMP:timestamp} \[ %{DATA:level}\] - %{DATA:class_name} -%{DATA:file_name}.java\(%{DATA:line}\) -%{GREEDYDATA:msg}
|
日志中有两个文件RiskTask.java和OtherTask.java文件,我们的需求是5分钟内,如果RiskTask的日志一条都没有出现就发送告警邮件。
这里使用了三个新的插件metrics,ruby,email
- metrics : 用来定时统计和生成metrics event的
- ruby : 使用ruby代码来定制metrics event失效的条件
- email : 不需要多说,就是用来发送告警邮件的
metrics插件
默认情况下或根据flush_interval,每5秒刷新一次指标。 指标在事件流中显示为新事件,并执行发生在事件流以及输出之后的任何过滤器。
一般来说,您需要为指标添加标记,并让输出显式查找该标记。
被刷新的事件将以以下方式包括每个计量器和计时器度量:
meter => [ “event_%{field_name}” ]
1 2 3 4
| "[event_%{field_name}] [count]" - 事件的总数 "[event_%{field_name}] [rate_1m]" - 1分钟滑动窗口中的每秒事件率 "[event_%{field_name}] [rate_5m]" - 5分钟滑动窗口中的每秒事件率 "[event_%{field_name}] [rate_15m]" - 15分钟滑动窗口中的每秒事件率
|
timer => [ “thing”, “%{duration}” ]
1 2 3 4 5 6 7 8 9
| "[thing] [count]" - 事件的总数 "[thing] [rate_1m]" - 1分钟滑动窗口中的每秒事件率 "[thing] [rate_5m]" - 5分钟滑动窗口中的每秒事件率 "[thing] [rate_15m]" - 15分钟滑动窗口中的每秒事件率 "[thing] [min]" - 此指标的最小值 "[thing] [max]" - 此指标的最大值 "[thing] [stddev]" - 此指标的标准差 "[thing] [mean]" - 这个指标的平均值 "[thing] [pXX]" - 此指标的第XX个百分位数(请参阅百分位数)
|
1 2 3 4 5 6 7 8 9 10 11 12
| metrics { # 定义metrics计数器数据保存的字段名 field_name的值就是上面Grok表达式解析出来的字段名 meter => [ "event_%{field_name}" ] # 给该metrics添加tag标签,用于区分metrics add_tag => [ "metric" ] # 每隔5分钟统计一次(测试环境可以适当改小) flush_interval => 300 # 每隔5分钟(flush_interval + 1秒)清空计数器(测试环境可以适当改小) clear_interval => 301 # 10秒内的message数据才统计,避免延迟 ignore_older_than => 10 }
|
ruby插件
1 2 3 4 5 6 7 8 9
| if "metric" in [tags] { ruby { # code是定义metrics的过滤规则,满足什么条件删除metric event日志 # 如果code为空,就是metric event不会被cancel,那么最终metric event会output到elasticsearch/stdout/email,如果不想每个metric event都触发告警事件,就只能通过ruby插件的code添加ruby代码来控制metric event的取消条件 # code => "" # 如果status_code是500的日志count小于100条,就忽略此事件(即不发送任何消息)。 code => "event.cancel if event['event_500']['count'] < 100" } }
|
email插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| # 测试环境建议注释掉邮件发送,否则邮箱容易爆炸 email { # stmp服务器地址 address => "smtpdm.aliyun.com" # 发件人邮箱地址 username => "service@post.XXX.com" # 发件人邮箱密码 password => "123456" # 发件人邮箱 from => "service@post.XXX.com" # 收件人邮箱 to => "birdben@XXX.com" # 邮件主题 subject => "告警:风控任务未执行" # 邮件内容 htmlbody => "告警内容:com.yunyu.birdben.task.RiskTask没有执行" }
|
总结一下我所理解的metrics原理:
配置文件定义好metrics之后,Logstash每隔flushinterval设置的时间就会自动创建一个metrics event,可以把metrics event理解成是Logstash自己创建的一条新的日志,这条新的日志有个名称是event%{field_name}的字段(可能是event_A,event_B,fieldname根据Grok表达式解析出来的结果确定的),event%{field_name}的字段下有四个字段
- “[event_%{field_name}] [count]” - 事件的总数
- “[event_%{field_name}] [rate_1m]” - 1分钟滑动窗口中的每秒事件率
- “[event_%{field_name}] [rate_5m]” - 5分钟滑动窗口中的每秒事件率
- “[event_%{field_name}] [rate_15m]” - 15分钟滑动窗口中的每秒事件率
我们可以根据count(事件的总数)的值,来统计每隔flushinterval时间,我们要统计的event%{field_name}日志的数量。举个例子,如果field_name是status_code,那我们要统计的日志就是event_200,event_302,event_400,event_500等等。那么,event_200.count就是每隔flush_interval时间内,stats_code是200的事件个数,其他同理。如果metrics event被保存到ES索引中,那么查看到的ES结果就会类似下面的结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| "_source": { "@version": "1", "@timestamp": "2017-02-15T11:06:37.402Z", "message": "hadoop1", "evnet_500": { "count": 17, "rate_1m": 3.4, "rate_5m": 3.4, "rate_15m": 3.4 }, "evnet_302": { "count": 1074, "rate_1m": 197.62554026237865, "rate_5m": 211.24966828088344, "rate_15m": 213.60997535145182 }, "event_200": { "count": 10483, "rate_1m": 982.4, "rate_5m": 982.4, "rate_15m": 982.4 }, "tags": [ "metric" ] }
|
这里给metrics event添加了一个metric标签,这样方便与其他业务日志区分开,在后续的ruby处理,email发送邮件,存储ES时,都使用了tag中是否包含metric标签来判断,该日志是否为metrics event。如果是metrics event我们才进行ruby处理,进行event的条件过滤。如果是metrics event我们才发送邮件,并且不保存到ES索引中。
这里我建议使用event.count来作为判断依据,而不是使用rate。因为count更适合用于是判断日志的收集数量,而rate更适合用于判断日志的收集速率。
参考文章: