2.1 输入插件
在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。
限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。
以上请读者自明。
2.1.1 标准输入
我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。
配置示例如下:
input { stdin { add_field => {“key” =>“value”} codec =>“plain” tags => [“add”] type =>“std” } }
用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash-f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:
{“message” =>“hello world”,“@version” =>“1”,“@timestamp” =>“2014-08-08T06:48:47.789Z”,“type” =>“std”,“tags” => [ [0] “add” ],“key” =>“value”,“host” =>“raochenlindeMacBook-Air.local” }
type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型——我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。
最常见的用法是像下面这样:
input { stdin { type =>“web” } } filter { if [type] == “web” { grok { match => [“message”, %{COMBINEDAPACHELOG}] } } } output { if “_grokparsefailure” in [tags] { nagios_nsca { nagios_status =>“1” } } else { elasticsearch { } } }
看起来蛮复杂的,对吧?
继续学习,你也可以写出来的。
2.1.2 文件输入
分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。
Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。
提示
sincedb文件中记录了每个被监听的文件的inode,major number,minor number和pos。
配置示例如下:
input { file { path => [“/var/log/*.log”, “/var/log/message”] type =>“system” start_position =>“beginning” } }
有一些比较有用的配置项,可以用来指定FileWatch库的行为:
□ discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。
□ exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。
□ sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。
□ sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。
□ stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。
□ start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于cat,但是读到最后一行不会终止,而是继续变成tail-F。
注意事项如下:
1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。
2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path=>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path=>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。
4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。
5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参阅本书稍后章节。
2.1.3 TCP输入
未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。
注意
虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。
配置示例如下:
input { tcp { port => 8888 mode =>“server” ssl_enable => false } }
目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:
# nc 127.0.0.1 8888 < olddata
这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。
2.1.4 syslog输入
syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。
我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。
有关rsyslog的用法,稍后的类型项目一节中,会有更详细的介绍。
配置示例如下:
input { syslog { port =>“514” } }
作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:
{“message” =>“Hello World”,“@version” =>“1”,“@timestamp” =>“2014-08-08T09:01:15.911Z”,“host” =>“127.0.0.1”,“priority” =>31,“timestamp” =>“Aug 8 17:01:15”,“logsource” =>“raochenlindeMacBook-Air.local”,“program” =>“com.apple.metadata.mdflagwriter”,“pid” =>“381”,“severity” =>7,“facility” =>3,“facility_label” =>“system”,“severity_label” =>“Debug” }
Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:
input { tcp { port =>“8514” } } f?ilter { grok { match => [“message”, “%{SYSLOGLINE}” ] } syslog_pri { } }
建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。
因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。
如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:
# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}' Recv-Q 228096
228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!
强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!
虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降——经过测试,TCPServer每秒可以接收50000条数据,而在同一线程中启用grok后每秒只能处理5000条,再加上date只能达到500条!
才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash-f tcp.conf-w 20的测试中,总体处理性能可以达到每秒30000条数据!
注意
测试采用Logstash作者提供的命令:
yes “<44>May 19 18:30:17 snack jls: foo bar 32” | nc localhost 3000
出处见:https://github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile
如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。
2.1.5 collectd输入
collectd是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析performance analysis)和预测系统未来的load(如能力部署capacity planning)等
下面简单介绍一下:collectd的部署以及与Logstash对接的相关配置实例。
1.collectd的安装
解决依赖如下:
rpm -ivh “http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm” yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl- CPAN net-snmp net-snmp-devel
源码安装collectd如下:
wget http://collectd.org/f?iles/collectd-5.4.1.tar.gz tar zxvf collectd-5.4.1.tar.gz cd collectd-5.4.1 ./conf?igure --pref?ix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins make && make install
安装启动脚本如下:
cp contrib/redhat/init.d-collectd /etc/init.d/collectd chmod +x /etc/init.d/collectd
启动collectd如下:
service collectd start
2.collectd的配置
以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘I/O以及磁盘空间占用情况的监控:
Hostname “host.example.com” LoadPlugin interface LoadPlugin cpu LoadPlugin memory LoadPlugin network LoadPlugin df LoadPlugin disk <Plugin interface> Interface “eth0” IgnoreSelected false </Plugin> <Plugin network> <Server “10.0.0.1”“25826”> ## Logstash 的 IP 地址和 collectd 的数据接收端口号 </Server> </Plugin>
3.Logstash的配置
以下配置实现通过Logstash监听25826端口,接收从collectd发送过来的各项检测数据。
简单配置示例如下:
input { collectd { port => 25826 ## 端口号与发送端对应 type => collectd }
推荐配置示例如下:
udp { port => 25826 buffer_size => 1452 workers => 3 # Default is 2 queue_size => 30000 # Default is 2000 codec => collectd { } type =>“collectd” }
下面是简单的一个输出结果:
{“_index”: “logstash-2014.12.11”,“_type”: “collectd”,“_id”: “dS6vVz4aRtK5xS86kwjZnw”,“_score”: null,“_source”: {“host”: “host.example.com”,“@timestamp”: “2014-12-11T06:28:52.118Z”,“plugin”: “interface”,“plugin_instance”: “eth0”,“collectd_type”: “if_packets”,“rx”: 19147144,“tx”: 3608629,“@version”: “1”,“type”: “collectd”,“tags”: [“_grokparsefailure” ] },“sort”: [ 1418279332118 ] }
参考资料
□ collectd支持收集的数据类型:http://git.verplant.org/p=collectd.git;a=blob;hb=master;f=README
□ collectd收集各数据类型的配置参考资料:http://collectd.org/documentation/manpages/collectd.conf.5.shtml
□ collectd简单配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d