2.3 过滤器配置
有丰富的过滤器插件,是Logstash威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。下面我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有地添加新的Logstash事件到后续的流程中去!
2.3.1 date时间处理
之前章节已经提过,logstash-filter-date插件可以用来转换你的日志记录中的时间字符串,变成LogStash::Timestamp对象,然后转存到@timestamp字段里。
警告
因为在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用logstash-filter-date转换后删除自己的字段!
这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。
提示
强烈建议打开Nginx的access_log配置项的buffer参数,对极限响应性能有极大提升!
1.配置示例
logstash-filter-date插件支持五种时间格式:
□ ISO8601:类似“2011-04-19T03:44:01.103Z”这样的格式。具体Z后面可以有“08:00”也可以没有,“.103”这个也可以没有。常用场景里来说,Nginx的log_format配置里就可以使用$time_iso8601变量来记录请求时间成这种格式。
□ UNIX:UNIX时间戳格式,记录的是从1970年起始至今的总秒数。Squid默认日志格式中就使用了这种格式。
□ UNIX_MS:这个时间戳则是从1970年起始至今的总毫秒数。据我所知,JavaScript里经常使用这个时间格式。
□ TAI64N:TAI64N格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中,qmail会用这个格式。
□ Joda-Time库:Logstash内部使用了Java的Joda时间库来作时间处理。所以我们可以使用Joda库所支持的时间格式来作具体定义。Joda时间格式定义见表2-1。
表2-1 Joda时间库格式
下面我们写一个Joda时间格式的配置作为示例:
filter { grok { match => [“message”, “%{HTTPDATE:logdate}”] } date { match => [“logdate”, “dd/MMM/yyyy:HH:mm:ss Z”] } }
注意,时区偏移量只需要用一个字母Z即可。
2.时区问题的解释
很多中国用户经常提一个问题:为什么@timestamp比我们早了8个小时?怎么修改成北京时间?
其实,Elasticsearch内部,对时间类型字段,是统一采用UTC时间,存成long长整形数据的!对日志统一采用UTC时间存储,是国际安全/运维界的一个通识——欧美公司的服务器普遍广泛分布在多个时区里——不像中国,地域横跨五个时区却只用北京时间。
对于页面查看,ELK的解决方案是在Kibana上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。
所以,建议大家接受这种设定。否则,即便你用.getLocalTime修改,也还要面临在Kibana过去修改,以及Elasticsearch原有的["now-1h"TO"now"]这种方便的搜索语句无法正常使用的尴尬。
以上,请读者自行斟酌。
2.3.2 grok正则捕获
grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。
1.正则表达式语法
运维工程师多多少少都会一点正则。你可以在grok里写标准的正则,像下面这样:
\s+(?<request_time>\d+(?:\.\d+)?)\s+
这个正则表达式写法对于Perl或者Ruby程序员应该很熟悉了,Python程序员可能更习惯写(?P<name>pattern),没办法,适应一下吧。
现在给我们的配置文件添加第一个过滤器区段配置。配置要添加在输入和输出区段之间(Logstash执行区段的时候并不依赖于次序,不过为了自己看得方便,还是按次序书写吧):
input {stdin{}} filter { grok { match => { “message” =>“\s+(?<request_time>\d+(?:\.\d+)?)\s+” } } } output {stdout{}}
运行Logstash进程然后输入“begin 123.456 end”,你会看到类似下面这样的输出:
{“message” =>“begin 123.456 end”,“@version” =>“1”,“@timestamp” =>“2014-08-09T11:55:38.186Z”,“host” =>“raochenlindeMacBook-Air.local”,“request_time” =>“123.456” }
漂亮!不过数据类型好像不太满意……request_time应该是数值而不是字符串。
我们已经提过稍后会学习用LogStash::Filters::Mutate来转换字段值类型,不过在grok里,其实有自己的魔法来实现这个功能!
2.grok表达式语法
grok支持把预定义的grok表达式写入到文件中,官方提供的预定义grok表达式见:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
下面是从官方文件中摘抄的最简单但是足够说明用法的示例:
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME}
第一行,用普通的正则表达式来定义一个grok表达式;第二行,通过打印赋值格式,用前面定义好的grok表达式来定义另一个grok表达式。
grok表达式的打印复制格式的完整语法见下行示例。其中data_type目前只支持两个值:int和float。
%{PATTERN_NAME:capture_name:data_type}
所以我们可以改进我们的配置成下面这样:
filter { grok { match => { “message” =>“%{WORD} %{NUMBER:request_time:float} %{WORD}” } } }
重新运行进程然后可以得到如下结果:
{“message” =>“begin 123.456 end”,“@version” =>“1”,“@timestamp” =>“2014-08-09T12:23:36.634Z”,“host” =>“raochenlindeMacBook-Air.local”,“request_time” => 123.456 }
这次request_time变成数值类型了。
3.最佳实践
实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的grok表达式统一写入到一个地方。然后用filter/grok的patterns_dir选项来指明。
如果你把“message”里所有的信息都grok到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用remove_field参数来删除掉message字段,或者用overwrite参数来重写默认的message字段,只保留最重要的部分。
重写参数的示例如下:
filter { grok { patterns_dir =>“/path/to/your/own/patterns” match => { “message” =>“%{SYSLOGBASE} %{DATA:message}” } overwrite => [“message”] } }
4.高级用法
□ 多行匹配 在和codec/multiline搭配使用的时候,需要注意一个问题,grok正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要=~//m一样也需要单独指定,具体写法是在表达式开始位置加(?m)标记。如下所示:
match => {“message” =>“(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+” }
□ 多项选择 有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用|隔开又比较丑陋。这时候,Logstash的语法提供给我们一个有趣的解决方式。
文档中,都说明logstash-filters-grok插件的match参数应该接受的是一个Hash值。但是因为早期的Logstash语法中Hash值也是用[]这种方式书写的,所以其实现在传递Array值给match参数也完全没问题。所以,我们这里其实可以传递多个正则来匹配同一个字段:
match => [“message”, “(?<request_time>\d+(?:\.\d+)?)”,“message”, “%{SYSLOGBASE} %{DATA:message}”,“message”, “(?m)%{WORD}” ]
Logstash会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用|分割写个大大的正则是一样的,但是可阅读性好了很多。
提示
我强烈建议每个人都要使用Grok Debugger(http://grokdebug.herokuapp.com/)来调试自己的grok表达式。
2.3.3 GeoIP地址查询
GeoIP是最常见的免费IP地址归类查询库,同时也有收费版可以采购。GeoIP库可以根据IP地址提供对应的地域信息,包括国别、省市、经纬度等,对于可视化地图和区域统计非常有用。
配置示例如下:
filter { geoip { source =>“message” } }
运行结果如下:
{“message” =>“183.60.92.253”,“@version” =>“1”,“@timestamp” =>“2014-08-07T10:32:55.610Z”,“host” =>“raochenlindeMacBook-Air.local”,“geoip” => {“ip” =>“183.60.92.253”,“country_code2” =>“CN”,“country_code3” =>“CHN”,“country_name” =>“China”,“continent_code” =>“AS”,“region_name” =>“30”,“city_name” =>“Guangzhou”,“latitude” =>23.11670000000001,“longitude” =>113.25,“timezone” =>“Asia/Chongqing”,“real_region_name” =>“Guangdong”,“location” => [ [0] 113.25, [1] 23.11670000000001 ] } }
GeoIP库数据较多,如果你不需要这么多内容,可以通过fields选项指定自己所需要的。下例为全部可选内容:
filter { geoip { f?ields => [“city_name”, “continent_code”, “country_code2”, “country_code3”, “country_name”, “dma_code”, “ip”, “latitude”, “longitude”, “postal_code”, “region_name”, “timezone”] } }
需要注意的是:geoip.location是Logstash通过latitude和longitude额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:
filter { geoip { fields => [“city_name”, “country_code2”, “country_name”, “latitude”, “longitude”, “region_name”] remove_field => [“[geoip][latitude]”, “[geoip][longitude]”] } }
还要注意:geoip插件的“source”字段可以是任一处理后的字段,比如“client_ip”,但是字段内容却需要小心!GeoIp库内只存有公共网络上的IP信息,查询不到结果的,会直接返回null,而Logstash的GeoIp插件对null结果的处理是:“不生成对应的geoip.字段”。所以读者在测试时,如果使用了诸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等内网地址,会发现没有对应输出!
2.3.4 JSON编解码
在上一章,已经讲过在Codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只有一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。
配置示例如下:
filter { json { source =>“message” target =>“jsoncontent” } }
运行结果如下:
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “{\”uid\“:3081609001,\”type\“:\”signal\“}”,“jsoncontent”: {“uid”: 3081609001,“type”: “signal” } }
如果不打算使用多层结构的话,删掉target配置即可。单层结构新的结果如下:
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “{\”uid\“:3081609001,\”type\“:\”signal\“}”,“uid”: 3081609001,“type”: “signal” }
2.3.5 key-value切分
在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。Logstash提供logstash-filter-kv插件,帮助处理不同样式的key-value日志,变成实际的LogStash::Event数据。
配置示例如下:
filter { ruby { init =>“@kname = ['method','uri','verb']” code =>“event.append(Hash[@kname.zip(event['request'].split(‘ ’))])” } if [uri] { ruby { init =>“@kname = ['url_path','url_args']” code =>“event.append(Hash[@kname.zip(event['uri'].split(‘?’))])” } kv { pref?ix =>“url_” source =>“url_args” field_split =>“&” remove_field => [ “url_args”, “uri”, “request” ] } } }
Nginx访问日志中的$request,通过这段配置,可以详细切分成method、url_path、verb、url_a、url_b...
进一步,如果url_args中有过多字段,可能导致Elasticsearch集群因为频繁update mapping或者消耗太多内存在cluster state上而宕机。所以,更优的选择是只保留明确有用的url_args内容,其他部分舍去,如下所示:
kv { prefix =>“url_” source =>“url_args” field_split =>“&” include_keys => [ “uid”, “cip” ] remove_field => [ “url_args”, “uri”, “request” ] }
上例即表示,除了url_uid和url_cip两个字段以外,其他的url_*都不保留。
2.3.6 metrics数值统计
logstash-filter-metrics插件是使用Ruby的Metriks模块来实现在内存里实时地计数和采样分析。该模块支持两个类型的数值分析:meter和timer。下面分别举例说明。
1.Meter示例(速率阈值检测)
Web访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法是通过Logstash或者其他日志分析脚本,把计数发送到rrdtool或者graphite里面,然后再通过check_graphite脚本之类的东西来检查异常并报警。
事实上这个事情可以直接在Logstash内部就完成。比如如果最近一分钟504请求的个数超过100个就报警,如下所示:
filter { metrics { meter =>“error.%{status}” add_tag =>“metric” ignore_older_than => 10 } if “metric” in [tags] { ruby { code =>“event.cancel if event['error.504.rate_1m'] * 60 < 100” } } } output { if “metric” in [tags] { exec { command =>“echo \”Out of threshold: %{error.504.rate_1m}\“” } } }
这里需要注意*60的含义。metrics模块生成的rate_1m/5m/15m意思是:最近1、5、15分钟的每秒速率!
2.Timer示例(box and whisker异常检测)
官版的logstash-filter-metrics插件只适用于metric事件的检查。由插件生成的新事件内部不存有来自input区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在metric的timer事件里加一个属性,存储最近一个实际事件的数值:https://github.com/chenryn/logstash/commit/bc7bf34caf551d8a149605cf28e7c5d33fae7458
然后我们就可以用如下配置来探测异常数据了:
filter { metrics { timer => {“rt” =>“%{request_time}”} percentiles => [25, 75] add_tag =>“percentile” } if “percentile” in [tags] { ruby { code =>“l=event['rt.p75']-event['rt.p25'];event['rt.low'] =event['rt.p25']-l;event['rt.high']=event['rt.p75']+l” } } } output { if “percentile” in [tags] and ([rt.last] > [rt.high] or [rt.last] < [rt.low]) { exec { command =>“echo \”Anomaly: %{rt.last}\“” } } }
提示
有关box and shisker plot内容和重要性,参见《数据之魅》一书。
2.3.7 mutate数据修改
logstash-filter-mutate插件是Logstash另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换、字符串处理和字段处理等。
1.类型转换
类型转换是logstash-filter-mutate插件最初诞生时的唯一功能。其应用场景在之前JSON编解码小节已经提到。
可以设置的转换类型包括:“integer”、“float”和“string”。示例如下:
filter { mutate { convert => [“request_time”, “float”] } }
注意
mutate除了转换简单的字符值,还支持对数组类型的字段进行转换,即将[“1”,“2”]转换成[1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的logstash-filter-ruby插件完成。
2.字符串处理
有如下字符串处理的插件:
□ gsub:仅对字符串类型字段有效。
gsub => [“urlparams”, “[\\?#]”, “_”]
□ split:分割字符串。
filter { mutate { split => [“message”, “|”] } }
随意输入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下输出:
{“message” => [ [0] “123”, [1] “321”, [2] “adfd”, [3] “dfjld*=123” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T15:58:23.120Z”,“host” =>“raochenlindeMacBook-Air.local” }
□ join:仅对数组类型字段有效。
我们在之前已经用split割切的基础上再join回去。配置改成:
filter { mutate { split => [“message”, “|”] } mutate { join => [“message”, “,”] } }
filter区段之内,是顺序执行的。所以我们最后看到的输出结果是:
{“message” =>“123,321,adfd,dfjld*=123”,“@version” =>“1”,“@timestamp” =>“2014-08-20T16:01:33.972Z”,“host” =>“raochenlindeMacBook-Air.local” }
□ merge:合并两个数组或者哈希字段。依然在之前split的基础上继续:
filter { mutate { split => [“message”, “|”] } mutate { merge => [“message”, “message”] } }
我们会看到输出:
{“message” => [ [0] “123”, [1] “321”, [2] “adfd”, [3] “dfjld*=123”, [4] “123”, [5] “321”, [6] “adfd”, [7] “dfjld*=123” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T16:05:53.711Z”,“host” =>“raochenlindeMacBook-Air.local” }
如果src字段是字符串,会自动先转换成一个单元素的数组再合并。把上一示例中的来源字段改成“host”:
filter { mutate { split => [“message”, “|”] } mutate { merge => [“message”, “host”] } }
结果变成:
{“message” => [ [0] “123”, [1] “321”, [2] “adfd”, [3] “dfjld*=123”, [4] “raochenlindeMacBook-Air.local” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T16:07:53.533Z”,“host” => [ [0] “raochenlindeMacBook-Air.local” ] }
看,目的字段“message”确实多了一个元素,但是来源字段“host”本身也由字符串类型变成数组类型了!
同样,如果目的字段不是数组,也会被强制转换。即使来源字段并不存在:
filter { mutate { merge => [“message”, “not_exist_field”] } }
结果会变成:
{“message” => [ [0] “123|321|adfd|dfjld*=123” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T15:58:23.120Z”,“host” =>“raochenlindeMacBook-Air.local” }
□ strip:去除字段内容前后的空格。可以接受数组参数:
filter { mutate { strip => [“syslog_message”, “syslog_datetime”] } }
□ lowercase:将字段内容全部转换成小写字母。同样可以接受数组。在ELK stack场景中,将内容转换成小写会是一个比较常见的需求。因为Elasticsearch默认是统一按照小写字母来搜索的。为了确保检索准确率,在不影响使用的情况下,建议对常用检索字段启用lowercase配置。
□ uppercase:将字段内容全部转换成大写字母。同样可以接受数组。
3.字段处理
字段处理的插件有:
□ rename:重命名某个字段,如果目的字段已经存在,会被覆盖掉,如下所示:
filter { mutate { rename => [“syslog_host”, “host”] } }
□ update:更新某个字段的内容。如果字段不存在,不会新建。
□ replace:作用和update类似,但是当字段不存在的时候,它会起到add_field参数一样的效果,自动添加新的字段。
4.执行次序
需要注意的是,filter/mutate内部是有执行次序的。其次序如下:
rename(event) if @rename update(event) if @update replace(event) if @replace convert(event) if @convert gsub(event) if @gsub uppercase(event) if @uppercase lowercase(event) if @lowercase strip(event) if @strip remove(event) if @remove split(event) if @split join(event) if @join merge(event) if @merge filter_matched(event)
而filter_matched这个filters/base.rb里继承的方法也是有次序的:
@add_field.each do |field, value| end @remove_field.each do |field| end @add_tag.each do |tag| end @remove_tag.each do |tag| end
2.3.8 随心所欲的Ruby处理
如果你稍微懂那么一点点Ruby语法的话,logstash-filter-ruby插件将会是一个非常有用的工具。比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用logstash-filter-ruby插件绝对感觉良好。
配置示例如下:
filter { ruby { init =>“@kname = ['client','servername','url','status','time','size','upstream', 'upstreamstatus','upstreamtime','referer','xff','useragent']” code =>“event.append(Hash[@kname.zip(event['message'].split(‘|’))])” } }
官网示例是一个比较有趣但是没啥大用的做法——随机取消90%的事件。
所以上面我们给出了一个有用而且强大的实例。
通常我们都是用logstash-filter-grok插件来捕获字段的,但是正则耗费大量的CPU资源,很容易成为Logstash进程的瓶颈。
而实际上,很多流经Logstash的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。
logstash-filter-mutate插件里的“split”选项只能切成数组,后续很不方便使用和识别。而在logstash-filter-ruby里,我们可以通过“init”参数预定义好由每个新字段的名字组成的数组,然后在“code”参数指定的Ruby语句里通过两个数组的zip操作生成一个哈希并添加进数组里。短短一行Ruby代码,可以减少50%以上的CPU使用率。
logstash-filter-ruby插件用途远不止这一点,下一节你还会继续见到它的身影。
更多实例如下:
filter{ date { match => [“datetime” , “UNIX”] } ruby { code =>“event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now)。abs” } }
在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致Elasticsearch集群出现问题。
当数据格式发生变化,比如UNIX时间格式变成UNIX_MS时间格式,会导致Logstash疯狂创建新索引,集群崩溃。
或者误输入过老的数据时,因为一般我们会close几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。
这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。
2.3.9 split拆分事件
上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。
配置示例如下:
filter { split { field =>“message” terminator =>“#” } }
这个测试中,我们在intputs/stdin的终端中输入一行数据:“test1#test2”,结果看到输出两个事件:
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “test1” } {“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “test2” }
注意
split插件中使用的是yield功能,其结果是split出来的新事件,会直接结束其在filter阶段的历程,也就是说写在split后面的其他filter插件都不起作用,进入到output阶段。所以,一定要保证split配置写在全部filter配置的最后。
使用了类似功能的还有clone插件。从logstash-1.5.0beta1版本以后修复该问题。
2.3.10 elapsed
Splunk有一项非常有用的功能,叫做transaction。可以在错乱的多行日志中,根据connected字段、maxspan窗口、startswith/endwith标签等信息计算出事件的duration和count结果。其文档见:http://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Transaction
ELK中,承载计算功能的Elasticsearch并不支持这种跨行计算,所以,变通的处理方式是:在Logstash中,提前做好事件的归并,直接计算出来transaction的duration数据。
比如一个transaction task_id startswith=START endwith=END的查询,可以在Logstash中这样计算:
filter { grok { match => [“message”, “%{TIMESTAMP_ISO8601} START id: (?<task_id>.*)”] add_tag => [ “taskStarted” ] } grok { match => [“message”, “%{TIMESTAMP_ISO8601} END id: (?<task_id>.*)”] add_tag => [ “taskTerminated”] } elapsed { start_tag =>“taskStarted” end_tag =>“taskTerminated” unique_id_field =>“task_id” } }