2.2 编解码配置
Codec是Logstash从1.3.0版开始新引入的概念(Codec来自Coder/decoder两个单词的首字母缩写)。
在此之前,Logstash只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了Codec设置。
所以,这里需要纠正之前的一个概念。Logstash不只是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流!Codec就是用来decode、encode事件的。
Codec的引入,使得Logstash可以更好、更方便地与其他有自定义数据格式的运维产品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用数据格式的其他产品等。
事实上,我们在第一个“Hello World”用例中就已经用过Codec了——rubydebug就是一种Codec!虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。
提示
这个五段式的流程说明源自Perl版的Logstash(后来改名叫Message::Passing模块)的设计。本书稍后5.8节会对该模块稍作介绍。
2.2.1 JSON编解码
在早期的版本中,有一种降低Logstash过滤器的CPU负载消耗的做法盛行于社区(在当时的Cookbook上有专门的一节介绍):直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置!
这个建议依然有效,不过在当前版本中需要稍微做一点配置变动,因为现在有专门的Codec设置。
1.配置示例
社区常见的示例都是用的Apache的customlog,不过我觉得Nginx是一个比Apache更常用的新型Web服务器,所以我这里会用nginx.conf做示例:
logformat json '{“@timestamp”:“$time_iso8601”,' ‘“@version”:“1”,' ’“host”:“$server_addr”,' ‘“client”:“$remote_addr”,' ’“size”:$body_bytes_sent,' ‘“responsetime”:$request_time,' ’“domain”:“$host”,' ‘“url”:“$uri”,' ’“status”:“$status”}'; access_log /var/log/nginx/access.log_json json;
注意,在$request_time和$body_bytes_sent变量两头没有双引号",这两个数据在JSON里应该是数值类型!
重启Nginx应用,然后修改你的input/file区段配置成下面这样:
input { f?ile { path =>“/var/log/nginx/access.log_json”“ codec =>”json“ } }
2.运行结果
下面访问一下你Nginx发布的Web页面,然后你会看到Logstash进程输出类似下面这样的内容:
{“@timestamp” =>“2014-03-21T18:52:25.000+08:00”,“@version” =>“1”,“host” =>“raochenlindeMacBook-Air.local”,“client” =>“123.125.74.53”,“size” =>8096,“responsetime” =>0.04,“domain” =>“www.domain.com”,“url” =>“/path/to/f?ile.suff?ix”,“status” =>“200” }
3.Nginx代理服务的日志格式问题
对于一个Web服务器的访问日志,看起来已经可以很好的工作了。不过如果Nginx是作为一个代理服务器运行的话,访问日志里有些变量,比如说$upstream_response_time,可能不会一直是数字,它也可能是一个“-”字符串!这会直接导致Logstash对输入数据验证报异常。
有两个办法解决这个问题:
1)用sed在输入之前先替换-成0。运行Logstash进程时不再读取文件而是标准输入,这样命令就成了下面这个样子:
tail -F /var/log/nginx/proxy_access.log_json \ | sed 's/upstreamtime“:-/upstreamtime”:0/' \ | /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf
2)日志格式中统一记录为字符串格式(即都带上双引号"),然后再在Logstash中用filter/mutate插件来变更应该是数值类型的字符字段的值类型。
有关LogStash::Filters::Mutate的内容,本书稍后会有介绍。
2.2.2 多行事件编码
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。
而Logstash正为此准备好了codec/multiline插件!当然,multiline插件也可以用于其他类似的堆栈式信息,比如Linux的内核日志。
配置示例如下:
input { stdin { codec => multiline { pattern =>“^\[” negate => true what =>“previous” } } }
运行Logstash进程,然后在等待输入的终端中输入如下几行数据:
[Aug/08/08 14:54:03] hello world [Aug/08/09 14:54:04] hello logstash hello best practice hello raochenlin [Aug/08/10 14:54:05] the end
你会发现Logstash输出下面这样的返回:
{ “@timestamp” =>“2014-08-09T13:32:03.368Z”, “message” =>“[Aug/08/08 14:54:03] hello world\n”, “@version” =>“1”, “host” =>“raochenlindeMacBook-Air.local” } { “@timestamp” =>“2014-08-09T13:32:24.359Z”, “message” =>“[Aug/08/09 14:54:04] hello logstash\n\n hello best practice\n\n hello raochenlin\n”, “@version” =>“1”, “tags” => [ [0] “multiline” ],“host” =>“raochenlindeMacBook-Air.local” }
你看,后面这个事件,在“message”字段里存储了三行数据!
注意
输出的事件中没有最后一行的"the end"字符串,这是因为你最后输入的回车符\n并不匹配设定的^\[正则表达式,Logstash还得等下一行数据直到匹配成功后才会输出这个事件。
其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,直到新进的当前行匹配^\[正则为止。这个正则还可以用grok表达式,稍后你就会学习这方面的内容。具体的Java日志正则见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java
说到应用程序日志,Log4j肯定是第一个被大家想到的,使用codec/multiline也确实是一个办法。
不过,如果你本身就是开发人员,或者可以推动程序修改变更的话,Logstash还提供了另一种处理Log4j的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理TCP端口接收的数据。稍后章节会详细讲述Log4j的用法。
2.2.3 网络流编码
NetFlow是Cisco发明的一种数据交换方式。NetFlow提供网络流量的会话级视图,记录下每个TCP/IP事务的信息。它的目的不是像tcpdump那样提供网络流量的完整记录,而是汇集起来形成更易于管理和易读的流向和容量的分析监控。
Cisco上配置NetFlow的方法,请参照具体的设备说明,主要是设定采集服务器的地址和端口,为运行Logstash服务的主机地址和端口(示例中为9995)。
采集NetFlow数据的Logstash服务配置示例如下:
input { udp { port => 9995 codec => netf?low { definitions =>“/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml” versions => [5] } } } output { elasticsearch { index =>“logstash_netf?low5-%{+YYYY.MM.dd}” host =>“localhost” } }
由于该插件生成的字段较多,所以建议对应的Elasticsesarch索引模板也需要单独提交:
# curl -XPUT localhost:9200/_template/logstash_netf?low5 -d '{“template” : “logstash_netflow5-*”,“settings”: { “index.refresh_interval”: “5s” },“mappings” : { “_default_” : { “_all” : {“enabled” : false}, “properties” : { “@version”: { “index”: “analyzed”, “type”: “integer” }, “@timestamp”: { “index”: “analyzed”, “type”: “date” }, “netf?low”: { “dynamic”: true, “type”: “object”, “properties”: { “version”: { “index”: “analyzed”, “type”: “integer” }, “f?low_seq_num”: { “index”: “not_analyzed”, “type”: “long” }, “engine_type”: { “index”: “not_analyzed”, “type”: “integer” }, “engine_id”: { “index”: “not_analyzed”, “type”: “integer” }, “sampling_algorithm”: { “index”: “not_analyzed”, “type”: “integer” }, “sampling_interval”: { “index”: “not_analyzed”, “type”: “integer” }, “f?low_records”: { “index”: “not_analyzed”, “type”: “integer” }, “ipv4_src_addr”: { “index”: “analyzed”, “type”: “ip” }, “ipv4_dst_addr”: { “index”: “analyzed”, “type”: “ip” }, “ipv4_next_hop”: { “index”: “analyzed”, “type”: “ip” }, “input_snmp”: { “index”: “not_analyzed”, “type”: “long” }, “output_snmp”: { “index”: “not_analyzed”, “type”: “long” }, “in_pkts”: { “index”: “analyzed”, “type”: “long” }, “in_bytes”: { “index”: “analyzed”, “type”: “long” }, “f?irst_switched”: { “index”: “not_analyzed”, “type”: “date” }, “last_switched”: { “index”: “not_analyzed”, “type”: “date” }, “l4_src_port”: { “index”: “analyzed”, “type”: “long” }, “l4_dst_port”: { “index”: “analyzed”, “type”: “long” }, “tcp_flags”: { “index”: “analyzed”, “type”: “integer” }, “protocol”: { “index”: “analyzed”, “type”: “integer” }, “src_tos”: { “index”: “analyzed”, “type”: “integer” }, “src_as”: { “index”: “analyzed”, “type”: “integer” }, “dst_as”: { “index”: “analyzed”, “type”: “integer” }, “src_mask”: { “index”: “analyzed”, “type”: “integer” }, “dst_mask”: { “index”: “analyzed”, “type”: “integer” } } } } } } }'
Elasticsearch索引模板的功能,本书稍后12.6节会有详细介绍。