ELK stack权威指南
上QQ阅读APP看书,第一时间看更新

2.1 输入插件

在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。

限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。

以上请读者自明。

2.1.1 标准输入

我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。

配置示例如下:

input {
    stdin {
        add_field => {“key” =>“value”}
        codec =>“plain”
        tags => [“add”]
        type =>“std”
    }
}

用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash-f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:

{“message” =>“hello world”,“@version” =>“1”,“@timestamp” =>“2014-08-08T06:48:47.789Z”,“type” =>“std”,“tags” => [
        [0] “add”
    ],“key” =>“value”,“host” =>“raochenlindeMacBook-Air.local”
}

type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型——我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。

最常见的用法是像下面这样:

input {
    stdin {
        type =>“web”
    }
}
filter {
    if [type] == “web” {
        grok {
            match => [“message”, %{COMBINEDAPACHELOG}]
        }
    }
}
output {
    if “_grokparsefailure” in [tags] {
        nagios_nsca {
            nagios_status =>“1”
        }
    } else {
        elasticsearch {
        }
    }
}

看起来蛮复杂的,对吧?

继续学习,你也可以写出来的。

2.1.2 文件输入

分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。

Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。

提示

sincedb文件中记录了每个被监听的文件的inode,major number,minor number和pos。

配置示例如下:

input {
    file {
        path => [“/var/log/*.log”, “/var/log/message”]
        type =>“system”
        start_position =>“beginning”
    }
}

有一些比较有用的配置项,可以用来指定FileWatch库的行为:

□ discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。

□ exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

□ sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。

□ sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。

□ stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。

□ start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于cat,但是读到最后一行不会终止,而是继续变成tail-F。

注意事项如下:

1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。

2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path=>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path=>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。

5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参阅本书稍后章节。

2.1.3 TCP输入

未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

注意

虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。

配置示例如下:

input {
    tcp {
        port => 8888
        mode =>“server”
        ssl_enable => false
    }
}

目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:

# nc 127.0.0.1 8888 < olddata

这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

2.1.4 syslog输入

syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。

我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。

有关rsyslog的用法,稍后的类型项目一节中,会有更详细的介绍。

配置示例如下:

input {
  syslog {
    port =>“514”
  }
}

作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:

{“message” =>“Hello World”,“@version” =>“1”,“@timestamp” =>“2014-08-08T09:01:15.911Z”,“host” =>“127.0.0.1”,“priority” =>31,“timestamp” =>“Aug  8 17:01:15”,“logsource” =>“raochenlindeMacBook-Air.local”,“program” =>“com.apple.metadata.mdflagwriter”,“pid” =>“381”,“severity” =>7,“facility” =>3,“facility_label” =>“system”,“severity_label” =>“Debug”
}

Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:

input {
  tcp {
    port =>“8514”
  }
}
f?ilter {
  grok {
    match => [“message”, “%{SYSLOGLINE}” ]
  }
  syslog_pri { }
}

建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。

因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:

# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096

228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!

强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!

虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降——经过测试,TCPServer每秒可以接收50000条数据,而在同一线程中启用grok后每秒只能处理5000条,再加上date只能达到500条!

才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash-f tcp.conf-w 20的测试中,总体处理性能可以达到每秒30000条数据!

注意

测试采用Logstash作者提供的命令:

yes “<44>May 19 18:30:17 snack jls: foo bar 32” | nc localhost 3000

出处见:https://github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。

2.1.5 collectd输入

collectd是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析performance analysis)和预测系统未来的load(如能力部署capacity planning)等

下面简单介绍一下:collectd的部署以及与Logstash对接的相关配置实例。

1.collectd的安装

解决依赖如下:

rpm -ivh “http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm”
yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool
    rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-
    CPAN net-snmp net-snmp-devel

源码安装collectd如下:

wget http://collectd.org/f?iles/collectd-5.4.1.tar.gz
tar zxvf collectd-5.4.1.tar.gz
cd collectd-5.4.1
./conf?igure --pref?ix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib 
    --mandir=/usr/share/man --enable-all-plugins
make && make install

安装启动脚本如下:

cp contrib/redhat/init.d-collectd /etc/init.d/collectd
chmod +x /etc/init.d/collectd

启动collectd如下:

service collectd start

2.collectd的配置

以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘I/O以及磁盘空间占用情况的监控:

Hostname “host.example.com”
LoadPlugin interface
LoadPlugin cpu
LoadPlugin memory
LoadPlugin network
LoadPlugin df
LoadPlugin disk
<Plugin interface>
    Interface “eth0”
    IgnoreSelected false
</Plugin>
<Plugin network>
<Server “10.0.0.1”“25826”> ## Logstash 的 IP 地址和 collectd 的数据接收端口号
</Server>
</Plugin>

3.Logstash的配置

以下配置实现通过Logstash监听25826端口,接收从collectd发送过来的各项检测数据。

简单配置示例如下:

input {
    collectd {
        port => 25826 ## 端口号与发送端对应
        type => collectd
}

推荐配置示例如下:

udp {
    port => 25826
    buffer_size => 1452
    workers => 3          # Default is 2
    queue_size => 30000   # Default is 2000
    codec => collectd { }
    type =>“collectd”
}

下面是简单的一个输出结果:

{“_index”: “logstash-2014.12.11”,“_type”: “collectd”,“_id”: “dS6vVz4aRtK5xS86kwjZnw”,“_score”: null,“_source”: {“host”: “host.example.com”,“@timestamp”: “2014-12-11T06:28:52.118Z”,“plugin”: “interface”,“plugin_instance”: “eth0”,“collectd_type”: “if_packets”,“rx”: 19147144,“tx”: 3608629,“@version”: “1”,“type”: “collectd”,“tags”: [“_grokparsefailure”
    ]
  },“sort”: [
    1418279332118
  ]
}

参考资料

□ collectd支持收集的数据类型:http://git.verplant.org/p=collectd.git;a=blob;hb=master;f=README

□ collectd收集各数据类型的配置参考资料:http://collectd.org/documentation/manpages/collectd.conf.5.shtml

□ collectd简单配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d