2.4 输出插件
2.4.1 输出到Elasticsearch
Logstash早期有三个不同的Elasticsearch插件。到1.4.0版本的时候,开发者彻底重写了LogStash::Outputs::Elasticsearch插件。从此,我们只需要用这一个插件,就能任意切换使用Elasticsearch集群支持的各种不同协议了。
1.配置示例
output { elasticsearch { host =>“192.168.0.2” protocol =>“http” index =>“logstash-%{type}-%{+YYYY.MM.dd}” index_type =>“%{type}” workers => 5 template_overwrite => true } }
2.解释
□ 索引名 写入的Elasticsearch索引的名称,这里可以使用变量。为了更贴合日志场景,Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。所以,之前处理过程中不要给自定义字段取个加号开头的名字……
此外,注意索引名中不能有大写字母,否则Elasticsearch在日志中会报InvalidIndex-NameException,但是Logstash不会报错,这个错误比较隐晦,也容易掉进这个坑中。
□ 协议 现在,新插件支持三种协议:node、http和transport。
一个小集群里,使用node协议最方便了。Logstash以Elasticsearch的client节点身份(即不存数据不参加选举)运行。如果你运行下面这行命令,你就可以看到自己的Logstash进程名,对应的node.role值是c:
# curl 127.0.0.1:9200/_cat/nodes?v host ip heap.percent ram.percent load node.role master name local 192.168.0.102 7 c - logstash-local-1036-2012 local 192.168.0.2 7 d * Sunstreak
特别的,作为一个快速运行示例的需要,你还可以在Logstash进程内部运行一个内嵌的Elasticsearch服务器。内嵌服务器默认会在$PWD/data目录里存储索引。如果你想变更这些配置,在$PWD/elasticsearch.yml文件里写自定义配置即可,Logstash会尝试自动加载这个文件。
对于拥有很多索引的大集群,你可以用transport协议。Logstash进程会转发所有数据到你指定的某台主机上。这种协议跟上面的node协议是不同的。node协议下的进程是可以接收到整个Elasticsearch集群状态信息的,当进程收到一个事件时,它就知道这个事件应该存在集群内哪个机器的分片里,所以它就会直接连接该机器发送这条数据。而transport协议下的进程不会保存这个信息,在集群状态更新(节点变化,索引变化都会发送全量更新)时,就不会对所有的Logstash进程也发送这种信息。更多Elasticsearch集群状态的细节,参阅http://www.elasticsearch.org/guide。
如果你已经有现成的Elasticsearch集群,但是版本跟Logstash自带的又不太一样,建议你使用http协议。Logstash会使用POST方式发送数据。
3.protocol选择建议
Logstash 1.4.2在transport和http协议的情况下是固定连接指定host发送数据。从1.5.0开始,host可以设置数组,它会从节点列表中选取不同的节点发送数据,达到Round-Robin负载均衡的效果。
Kibana 4强制要求Elasticsearch全集群所有node版本在1.4以上,所以采用node方式发送数据的Logstash 1.4版(携带的Elasticsearch.jar库是1.1.1版本)会导致Kibana 4无法运行,采用Kibana 4的读者务必改用http方式。
开发者在IRC freenode#logstash频道里表示:“高于1.0版本的Elasticsearch应该都能跟最新版Logstash的node协议一起正常工作”。此信息仅供参考,请认真测试后再上线。
4.性能问题
Logstash 1.4.2在http协议下默认使用作者自己的ftw库,随同分发的是0.0.39版。该版本有内存泄露问题,长期运行下输出性能越来越差!
解决办法:对性能要求不高的,可以在启动Logstash进程时,配置环境变量ENV["BULK"],强制采用Elasticsearch官方Ruby库。命令如下:
export BULK=“esruby”
对性能要求高的,可以尝试升级到Logstash 1.5版。新版的logstash-output-elasticsearch放弃了ftw库,改用了一个JRuby平台专有的Manticore库。根据测试,性能跟ftw比相当接近。
对性能要求极高的,可以手动更新ftw库版本,目前最新版是0.0.42版,据称内存问题在0.0.40版即解决。
5.模板
Elasticsearch支持给索引预定义设置和mapping(前提是你用的Elasticsearch版本支持这个API,不过估计应该都支持)。Logstash自带有一个优化好的模板,内容如下:
{“template” : “logstash-*”,“settings” : {“index.refresh_interval” : “5s” },“mappings” : {“_default_” : {“_all” : {“enabled” : true},“dynamic_templates” : [ {“string_fields” : {“match” : “*”,“match_mapping_type” : “string”,“mapping” : {“type” : “string”, “index” : “analyzed”, “omit_norms” : true,“fields” : {“raw” : {“type”: “string”, “index” : “not_analyzed”, “ignore_above” : 256} } } } } ],“properties” : {“@version”: { “type”: “string”, “index”: “not_analyzed” },“geoip” : {“type” : “object”,“dynamic”: true,“path”: “full”,“properties” : {“location” : { “type” : “geo_point” } } } } } } }
这其中的关键设置包括:
□ template for index-pattern:只有匹配logstash-*的索引才会应用这个模板。有时候我们会变更Logstash的默认索引名称,记住你也得通过PUT方法上传可以匹配你自定义索引名的模板。当然,我更建议的做法是,把你自定义的名字放在“logstash-”后面,变成index=>"logstash-custom-%{+yyyy.MM.dd}"这样。
□ refresh_interval for indexing:Elasticsearch是一个近实时搜索引擎。它实际上是每1秒钟刷新一次数据。对于日志分析应用,我们用不着这么实时,所以Logstash自带的模板修改成了5秒钟。你还可以根据需要继续放大这个刷新间隔以提高数据写入性能。
□ multi-field with not_analyzed:Elasticsearch会自动使用自己的默认分词器(空格,点,斜线等分割)来分析字段。分词器对于搜索和评分是非常重要的,但是大大降低了索引写入和聚合请求的性能。所以Logstash模板定义了一种叫“多字段”(multi-field)类型的字段。这种类型会自动添加一个“.raw”结尾的字段,并给这个字段设置为不启用分词器。简单说,你想获取url字段的聚合结果的时候,不要直接用“url”,而是用"url.raw"作为字段名。
□ geo_point:Elasticsearch支持geo_point类型,geo distance聚合等等。比如说,你可以请求某个geo_point点方圆10千米内数据点的总数。在Kibana的bettermap类型面板里,就会用到这个类型的数据。
6.其他模板配置建议
□ doc_values:doc_values是Elasticsearch 1.0版本引入的新特性。启用该特性的字段,索引写入的时候会在磁盘上构建fielddata。而过去,fielddata是固定只能使用内存的。在请求范围加大的时候,很容易触发OOM或者circuit breaker报错:
ElasticsearchException[org.elasticsearch.common.breaker.CircuitBreakingException: Data too large, data for field [@timestamp] would be larger than limit of [639015321/609.4mb]]
doc_values只能给不分词(对于字符串字段就是设置了"index":"not_analyzed",数值和时间字段默认就没有分词)的字段配置生效。
doc_values虽然用的是磁盘,但是系统本身也有自带VFS的cache效果并不会太差。据官方测试,经过1.4的优化后,只比使用内存的fielddata慢15%。所以,在数据量较大的情况下,强烈建议开启该配置:
{“template” : “logstash-*”,“settings” : {“index.refresh_interval” : “5s” },“mappings” : {“_default_” : {“_all” : {“enabled” : true},“dynamic_templates” : [ {“string_fields” : {“match” : “*”,“match_mapping_type” : “string”,“mapping” : {“type” : “string”, “index” : “analyzed”, “omit_norms” : true,“fields” : {“raw” : { “type”: “string”, “index” : “not_analyzed”, “ignore_above” : 256, “doc_values”: true } } } } } ],“properties” : {“@version”: { “type”: “string”, “index”: “not_analyzed” },“@timestamp”: { “type”: “date”, “index”: “not_analyzed”, “doc_values”: true, “format”: “dateOptionalTime” },“geoip” : {“type” : “object”,“dynamic”: true,“path”: “full”,“properties” : {“location” : { “type” : “geo_point” } } } } } } }
□ order:如果你有自己单独定制template的想法,很好。这时候有几种选择:
■ 在logstash-output-elasticsearch配置中开启manage_template=>false选项,然后一切自己动手;
■ 在logstash-output-elasticsearch配置中开启template=>"/path/to/your/tmpl.json"选项,让logstash来发送你自己写的template文件;
■ 避免变更Logstash里的配置,而是另外发送一个template,利用Elasticsearch的templates order功能。
这个order功能,就是Elasticsearch在创建一个索引的时候,如果发现这个索引同时匹配上了多个template,那么就会先应用order数值小的template设置,然后再应用一遍order数值高的作为覆盖,最终达到一个merge的效果。
比如,对上面这个模板已经很满意,只想修改一下refresh_interval,那么只需要新写一个:
{“order” : 1,“template” : “logstash-*”,“settings” : {“index.refresh_interval” : “20s” } }
然后运行curl-XPUT http://localhost:9200/_template/template_newid-d'@/path/to/your/tmpl.json'即可。
Logstash默认的模板,order是0,id是logstash,通过logstash-output-elasticsearch的配置选项template_name修改。你的新模板就不要跟这个名字冲突了。
2.4.2 发送email
配置示例如下:
output { email { to =>“admin@website.com,root@website.com” cc =>“other@website.com” via =>“smtp” subject =>“Warning: %{title}” options => { smtpIporHost =>“localhost”, port => 25, domain => 'localhost.localdomain', userName => nil, password => nil, authenticationType => nil, # (plain, login and cram_md5) starttls => true } htmlbody =>“” body =>“” attachments => [“/path/to/filename”] } }
logstash-output-email插件支持SMTP协议和sendmail两种方式,通过via参数设置。SMTP方式有较多的options参数可配置。sendmail只能利用本机上的sendmail服务来完成——文档上描述了Mail库支持的sendmail配置参数,但实际代码中没有相关处理,不要被迷惑了。
2.4.3 调用系统命令执行
logstash-output-exec插件的运用也非常简单,如下所示,将Logstash切割成的内容作为参数传递给命令。这样,在每个事件到达该插件的时候,都会触发这个命令的执行。
output { exec { command =>“sendsms.pl \”%{message}\“ -t %{user}” } }
需要注意的是。这种方式是每次都重新开始执行一次命令并退出。本身是比较慢速的处理方式(程序加载,网络建联等都有一定的时间消耗)。最好只用于少量的信息处理场景,比如不适用nagios的其他报警方式。示例就是通过短信发送消息。
2.4.4 保存成文件
通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。早年的scribed,甚至直接就把输出的语法命名为<store>。Logstash当然也能做到这点。
和LogStash::Inputs::File不同,LogStash::Outputs::File里可以使用sprintf format格式来自动定义输出到带日期命名的路径。
配置示例如下:
output { file { path =>“/path/to/%{+yyyy/MM/dd/HH}/%{host}.log.gz” message_format =>“%{message}” gzip => true } }
使用output/file插件首先需要注意的就是message_format参数。插件默认是输出整个event的JSON形式数据的。这可能跟大多数情况下使用者的期望不符。大家可能只是希望按照日志的原始格式保存就好了。所以需要定义为%{message},当然,前提是在之前的filter插件中,你没有使用remove_field或者update等参数删除或修改%{message}字段的内容。
另一个非常有用的参数是gzip。gzip格式是一个非常奇特而友好的格式。其格式包括有:
□ 10字节的头,包含幻数、版本号以及时间戳。
□ 可选的扩展头,如原文件名。
□ 文件体,包括DEFLATE压缩的数据。
□ 8字节的尾注,包括CRC-32校验和以及未压缩的原始数据长度。
这样gzip就可以一段一段的识别出来数据——反过来说,也就是可以一段一段压缩了添加在后面!
这对于我们流式添加数据简直太棒了!
提示
你或许见过网络流传的parallel命令行工具并发处理数据的神奇文档,但在自己用的时候总见不到效果。实际上就是因为:文档中处理的gzip文件,可以分开处理然后再合并的,而你的用法却不一定可以。
2.4.5 报警发送到Nagios
Logstash中有两个output插件是与Nagios有关的。logstash-output-nagios插件发送数据给本机的nagios.cmd管道命令文件,logstash-output-nagios_nsca插件则是调用send_nsca命令以NSCA协议格式把数据发送给Nagios服务器(远端或者本地皆可)。
1.nagios.cmd
nagios.cmd是Nagios服务器的核心组件。Nagios事件处理和内外交互都是通过这个管道文件来完成的。
使用CMD方式,需要自己保证发送的Logstash事件符合Nagios事件的格式。即必须在filter阶段预先准备好nagios_host和nagios_service字段;此外,如果在filter阶段也准备好nagios_annotation和nagios_level字段,这里也会自动转换成nagios事件信息。
filter { if [message] =~ /err/ { mutate { add_tag =>“nagios” rename => [“host”, “nagios_host”] replace => [“nagios_service”, “logstash_check_%{type}”] } } } output { if “nagios” in [tags] { nagios { } } }
如果不打算在filter阶段提供nagios_level,那么也可以在该插件中通过参数配置。
所谓nagios_level,即我们通过nagiosplugin检查数据时的返回值。其取值范围和含义如下:
□ “0”,代表“OK”,服务正常。
□ “1”,代表“WARNNING”,服务警告,一般nagios plugin命令中使用-w参数设置该阈值。
□ “2”,代表“CRITICAL”,服务危急,一般nagios plugin命令中使用-c参数设置该阈值。
□ “3”,代表“UNKNOWN”,未知状态,一般会在timeout等情况下出现。
默认情况下,该插件会以“CRITICAL”等级发送报警给Nagios服务器。
nagios.cmd文件的具体位置,可以使用command_file参数设置。默认位置是“/var/lib/nagios3/rw/nagios.cmd”。
关于和nagios.cmd交互的具体协议说明,有兴趣的读者请阅读Using external commands in Nagios一文(http://archive09.linux.com/feature/153285),这是《Learning Nagios 3.0》书中内容节选。
2.NSCA
NSCA是一种标准的Nagios分布式扩展协议。分布在各机器上的send_nsca进程主动将监控数据推送给远端Nagios服务器的NSCA进程。
当Logstash跟Nagios服务器没有在同一个主机上运行的时候,就只能通过NSCA方式来发送报警了——当然也必须在Logstash服务器上安装send_nsca命令。
Nagios事件所需要的几个属性在上一段中已经有过描述。不过在使用这个插件的时候,不要求提前准备好,而是可以在该插件内部定义参数:
output { nagios_nsca { nagios_host =>“%{host}” nagios_service =>“logstash_check_%{type}” nagios_status =>“2” message_format =>“%{@timestamp}: %{message}” host =>“nagiosserver.domain.com” } }
这里请注意,host和nagios_host两个参数,分别是用来设置Nagios服务器的地址,和报警信息中有问题的服务器地址。
关于NSCA原理,架构和配置说明,还不了解的读者请阅读官方网站Using NSClient++from nagios with NSCA一节(http://nsclient.org/nscp/wiki/doc/usage/nagios/nsca)。
3.其他类似插件
除了Nagios以外,Logstash同样可以发送信息给其他常见监控系统,方式和Nagios大同小异:
□ logstash-output-ganglia插件通过UDP协议,发送gmetric型数据给本机/远端的gmond或者gmetad。
□ logstash-output-zabbix插件调用本机的zabbix_sender命令发送。
2.4.6 statsd
statsd最早是2008年Flickr公司用Perl写的针对graphite、datadog等监控数据后端存储开发的前端网络应用,2011年Etsy公司用Nodejs重构。用于接收、写入、读取和聚合时间序列数据,包括即时值和累积值等。
配置示例如下:
output { statsd { host =>“statsdserver.domain.com” namespace =>“logstash” sender =>“%{host}” increment => [“httpd.response.%{status}”] } }
Graphite以树状结构存储监控数据,所以statsd也是如此。所以发送给statsd的数据的key也一定得是“first.second.tree.four”这样的形式。而在logstash-output-statsd插件中,就会以三个配置参数来拼接成这种形式:
namespace.sender.metric
其中namespace和sender都是直接设置的,而metric又分为好几个不同的参数可以分别设置。statsd支持的metric类型如下:
□ increment
示例语法:increment=>["nginx.status.%{status}"]。该配置即可在statsd中生成对应的nginx.status.200,nginx.status.206,nginx.status.304,nginx.status.404,nginx.status.502,nginx.status.503,nginx.status.504等一系列监控项。同时,在statsd内配置的一个时间周期内,各状态码的次数,会自动累加到各自监控项里。
□ decrement
语法同increment。不过是递减而不是递增。
□ count
示例语法:count=>{"nginx.bytes"=>"%{bytes}"}。该配置可以在statsd中生成一个nginx.bytes监控项。而每条Nginx访问记录的响应字节数,累加成带宽。
□ gauge
语法同count。gauge和count(在rrdtool中叫counter)两种计数器的区别从早年的rrdtool时代就被反复强调,即gauge直接存储原始数值,不累加。
□ set
示例语法:set=>{"online"=>"%{user_id}"}。是新版statsd才支持的功能,可以用来做去重计算,比如在线人数统计。
□ timing
示例语法:timing=>["nginx.requesttime"=>"%{request_time}"]。该配置可以在statsd中生成一个nginx.requesttime监控项,记录时间周期内,Nginx访问记录的响应时间的数值统计情况,包括平均值,最大值,最小值,标准差,百分比值等。
关于这些metric类型的详细说明,请阅读statsd文档:https://github.com/etsy/statsd/blob/master/docs/metric_types.md。
推荐阅读
Etsy发布nodejs版本statsd的博客:Measure Anything,Measure Everything(http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/)
Flickr发布statsd的博客:Counting&Timing(http://code.flickr.net/2008/10/27/counting-timing/)
Librato有关statsd协作的博客:Using StatsD with Librato(http://support.metrics.librato.com/knowledgebase/articles/77199-using-statsd-with-librato)
2.4.7 标准输出stdout
和之前logstash-input-stdin插件一样,logstash-output-stdout插件也是最基础和简单的输出插件。同样在这里简单介绍一下,作为输出插件的一个共性了解。
配置示例如下:
output { stdout { codec => rubydebug workers => 2 } }
输出插件统一具有一个参数是workers。Logstash为输出做了多线程的准备。
其次是codec设置。codec的作用在之前已经讲过。可能除了logstash-codec-multiline,其他codec插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。换句话说。上面配置示例的完全写法应该是:
output { stdout { codec => rubydebug { } workers => 2 } }
单就logstash-output-stdout插件来说,其最重要和常见的用途就是调试。所以在不太有效的时候,加上命令行参数-vv运行,查看更多详细调试信息。
2.4.8 TCP发送数据
虽然之前我们已经提到过不建议直接使用LogStash::Inputs::TCP和LogStash::Outputs::TCP做转发工作,不过在实际交流中,发现确实有不少朋友觉得这种简单配置足够使用,因而不愿意多加一层消息队列的。所以,还是把Logstash如何直接发送TCP数据也稍微提点一下。
配置示例如下:
output { tcp { host =>“192.168.0.2” port => 8888 codec => json_lines } }
在收集端采用TCP方式发送给远端的TCP端口。这里需要注意的是,默认的Codec选项是json。而远端的LogStash::Inputs::TCP的默认Codec选项却是plain!所以不指定各自的Codec,对接肯定是失败的。
另外,由于IO BUFFER的原因,即使是两端共同约定为json依然无法正常运行,接收端会认为一行数据没结束,一直等待直至自己OutOfMemory!
所以,正确的做法是,发送端指定Codec为json_lines,这样每条数据后面会加上一个回车,接收端指定Codec为json_lines或者json均可,这样才能正常处理。包括在收集端已经切割好的字段,也可以直接带入收集端使用了。
2.4.9 输出到HDFS
数据写入HDFS是很多日志收集系统的最终目的。不过Logstash偏巧不是其中之一。到目前为止,Logstash还没有官方支持的直接写入HDFS的插件。而在社区,则有两种不同的解决方案可供选择。下面分别介绍。
1.通过HTTP接口
插件源码地址见:https://github.com/dstore-dbap/logstash-webhdfs
该插件使用Hadoop的WebHDFS接口,其本质就是发送POST数据,可以说实现起来比较简单。未来logstash-plugins官方可能也会收这个插件。
配置示例如下:
output { hadoop_webhdfs { workers => 2 server =>“your.nameno.de:14000” user =>“flume” path =>“/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log” f?lush_size => 500 compress =>“snappy” idle_f?lush_time => 10 retry_interval => 0.5 } }
插件使用方式,和其他自定义插件一样,通过—pluginpath或者打包gem均可。
2.通过Java接口
插件源码地址见:https://github.com/avishai-ish-shalom/logstash-hdfs
该插件使用Hadoop的HDFS接口,利用JRuby可以直接导入Java类的特性,直接使用了org.apache.hadoop.fs.FileSystem等类来实现。
配置示例如下:
output { hdfs { path =>“/path/to/output_file.log” enable_append => true } }
因为需要导入各种Hadoop的jar包,所以这个运行比较麻烦。Logstash-1.4.2上的运行命令示例如下:
# LD_LIBRARY_PATH=“/usr/lib/hadoop/lib/native” GEM_HOME=./logstash-1.4.2/vendor/bundle/jruby/1.9 CLASSPATH=$(find ./logstash-1.4.2/vendor/jar -type f -name '*.jar'|tr '\n' ':‘):$(find /usr/lib/hadoop-hdfs -type f -name '*.jar' | tr '\n' ':’):$(find /usr/lib/hadoop -type f -name '*.jar' | tr '\n' ':‘):/etc/hadoop/conf java org.jruby.Main -I./logstash-1.4.2/lib ./logstash-1.4.2/lib/logstash/runner.rb agent -f logstash.conf -p ./logstash
如果使用Logstash-1.5版本,可以通过rubygems.org直接安装打包好的插件:
# bin/plugin install logstash-output-hdfs
然后这样运行:
# LD_LIBRARY_PATH=“/usr/lib/hadoop/lib/native” CLASSPATH=$(find /usr/lib/hadoop-hdfs -type f -name '*.jar' | tr '\n' ':‘):$(find /usr/lib/hadoop -type f -name '*.jar' | grep -v sources | tr '\n' ':’):/etc/hadoop/conf $LOGSTASH_DIR/bin/logstash agent -f logstash.conf