ELK stack权威指南
上QQ阅读APP看书,第一时间看更新

2.2 编解码配置

Codec是Logstash从1.3.0版开始新引入的概念(Codec来自Coder/decoder两个单词的首字母缩写)。

在此之前,Logstash只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了Codec设置。

所以,这里需要纠正之前的一个概念。Logstash不只是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流!Codec就是用来decode、encode事件的。

Codec的引入,使得Logstash可以更好、更方便地与其他有自定义数据格式的运维产品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用数据格式的其他产品等。

事实上,我们在第一个“Hello World”用例中就已经用过Codec了——rubydebug就是一种Codec!虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。

提示

这个五段式的流程说明源自Perl版的Logstash(后来改名叫Message::Passing模块)的设计。本书稍后5.8节会对该模块稍作介绍。

2.2.1 JSON编解码

在早期的版本中,有一种降低Logstash过滤器的CPU负载消耗的做法盛行于社区(在当时的Cookbook上有专门的一节介绍):直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置!

这个建议依然有效,不过在当前版本中需要稍微做一点配置变动,因为现在有专门的Codec设置。

1.配置示例

社区常见的示例都是用的Apache的customlog,不过我觉得Nginx是一个比Apache更常用的新型Web服务器,所以我这里会用nginx.conf做示例:

logformat json '{“@timestamp”:“$time_iso8601”,'
           ‘“@version”:“1”,'
           ’“host”:“$server_addr”,'
           ‘“client”:“$remote_addr”,'
           ’“size”:$body_bytes_sent,'
           ‘“responsetime”:$request_time,'
           ’“domain”:“$host”,'
           ‘“url”:“$uri”,'
           ’“status”:“$status”}';
access_log /var/log/nginx/access.log_json json;

注意,在$request_time和$body_bytes_sent变量两头没有双引号",这两个数据在JSON里应该是数值类型!

重启Nginx应用,然后修改你的input/file区段配置成下面这样:

input {
    f?ile {
        path =>“/var/log/nginx/access.log_json”“
        codec =>”json“
    }
}

2.运行结果

下面访问一下你Nginx发布的Web页面,然后你会看到Logstash进程输出类似下面这样的内容:

{“@timestamp” =>“2014-03-21T18:52:25.000+08:00”,“@version” =>“1”,“host” =>“raochenlindeMacBook-Air.local”,“client” =>“123.125.74.53”,“size” =>8096,“responsetime” =>0.04,“domain” =>“www.domain.com”,“url” =>“/path/to/f?ile.suff?ix”,“status” =>“200”
}

3.Nginx代理服务的日志格式问题

对于一个Web服务器的访问日志,看起来已经可以很好的工作了。不过如果Nginx是作为一个代理服务器运行的话,访问日志里有些变量,比如说$upstream_response_time,可能不会一直是数字,它也可能是一个“-”字符串!这会直接导致Logstash对输入数据验证报异常。

有两个办法解决这个问题:

1)用sed在输入之前先替换-成0。运行Logstash进程时不再读取文件而是标准输入,这样命令就成了下面这个样子:

tail -F /var/log/nginx/proxy_access.log_json \
    | sed 's/upstreamtime“:-/upstreamtime”:0/' \
    | /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf

2)日志格式中统一记录为字符串格式(即都带上双引号"),然后再在Logstash中用filter/mutate插件来变更应该是数值类型的字符字段的值类型。

有关LogStash::Filters::Mutate的内容,本书稍后会有介绍。

2.2.2 多行事件编码

有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。

而Logstash正为此准备好了codec/multiline插件!当然,multiline插件也可以用于其他类似的堆栈式信息,比如Linux的内核日志。

配置示例如下:

input {
    stdin {
        codec => multiline {
            pattern =>“^\[”
            negate => true
            what =>“previous”
        }
    }
}

运行Logstash进程,然后在等待输入的终端中输入如下几行数据:

[Aug/08/08 14:54:03] hello world
[Aug/08/09 14:54:04] hello logstash
    hello best practice
    hello raochenlin
[Aug/08/10 14:54:05] the end

你会发现Logstash输出下面这样的返回:

{
“@timestamp” =>“2014-08-09T13:32:03.368Z”,
“message” =>“[Aug/08/08 14:54:03] hello world\n”,
“@version” =>“1”,
“host” =>“raochenlindeMacBook-Air.local”
}
{
“@timestamp” =>“2014-08-09T13:32:24.359Z”,
“message” =>“[Aug/08/09 14:54:04] hello logstash\n\n    hello best practice\n\n
             hello raochenlin\n”,
“@version” =>“1”,
“tags” => [
        [0] “multiline”
    ],“host” =>“raochenlindeMacBook-Air.local”
}

你看,后面这个事件,在“message”字段里存储了三行数据!

注意

输出的事件中没有最后一行的"the end"字符串,这是因为你最后输入的回车符\n并不匹配设定的^\[正则表达式,Logstash还得等下一行数据直到匹配成功后才会输出这个事件。

其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,直到新进的当前行匹配^\[正则为止。这个正则还可以用grok表达式,稍后你就会学习这方面的内容。具体的Java日志正则见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java

说到应用程序日志,Log4j肯定是第一个被大家想到的,使用codec/multiline也确实是一个办法。

不过,如果你本身就是开发人员,或者可以推动程序修改变更的话,Logstash还提供了另一种处理Log4j的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理TCP端口接收的数据。稍后章节会详细讲述Log4j的用法。

2.2.3 网络流编码

NetFlow是Cisco发明的一种数据交换方式。NetFlow提供网络流量的会话级视图,记录下每个TCP/IP事务的信息。它的目的不是像tcpdump那样提供网络流量的完整记录,而是汇集起来形成更易于管理和易读的流向和容量的分析监控。

Cisco上配置NetFlow的方法,请参照具体的设备说明,主要是设定采集服务器的地址和端口,为运行Logstash服务的主机地址和端口(示例中为9995)。

采集NetFlow数据的Logstash服务配置示例如下:

input {
    udp {
      port => 9995
      codec => netf?low {
        definitions =>“/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml”
        versions => [5]
      }
    }
  }
  output {
    elasticsearch {
      index =>“logstash_netf?low5-%{+YYYY.MM.dd}”
      host =>“localhost”
    }
  }

由于该插件生成的字段较多,所以建议对应的Elasticsesarch索引模板也需要单独提交:

# curl -XPUT localhost:9200/_template/logstash_netf?low5 -d '{“template” : “logstash_netflow5-*”,“settings”: {
“index.refresh_interval”: “5s”
},“mappings” : {
“_default_” : {
“_all” : {“enabled” : false},
“properties” : {
“@version”: { “index”: “analyzed”, “type”: “integer” },
“@timestamp”: { “index”: “analyzed”, “type”: “date” },
“netf?low”: {
“dynamic”: true,
“type”: “object”,
“properties”: {
  “version”: { “index”: “analyzed”, “type”: “integer” },
  “f?low_seq_num”: { “index”: “not_analyzed”, “type”: “long” },
  “engine_type”: { “index”: “not_analyzed”, “type”: “integer” },
  “engine_id”: { “index”: “not_analyzed”, “type”: “integer” },
  “sampling_algorithm”: { “index”: “not_analyzed”, “type”: “integer” },
  “sampling_interval”: { “index”: “not_analyzed”, “type”: “integer” },
  “f?low_records”: { “index”: “not_analyzed”, “type”: “integer” },
  “ipv4_src_addr”: { “index”: “analyzed”, “type”: “ip” },
  “ipv4_dst_addr”: { “index”: “analyzed”, “type”: “ip” },
  “ipv4_next_hop”: { “index”: “analyzed”, “type”: “ip” },
  “input_snmp”: { “index”: “not_analyzed”, “type”: “long” },
  “output_snmp”: { “index”: “not_analyzed”, “type”: “long” },
  “in_pkts”: { “index”: “analyzed”, “type”: “long” },
  “in_bytes”: { “index”: “analyzed”, “type”: “long” },
  “f?irst_switched”: { “index”: “not_analyzed”, “type”: “date” },
  “last_switched”: { “index”: “not_analyzed”, “type”: “date” },
  “l4_src_port”: { “index”: “analyzed”, “type”: “long” },
  “l4_dst_port”: { “index”: “analyzed”, “type”: “long” },
  “tcp_flags”: { “index”: “analyzed”, “type”: “integer” },
  “protocol”: { “index”: “analyzed”, “type”: “integer” },
  “src_tos”: { “index”: “analyzed”, “type”: “integer” },
  “src_as”: { “index”: “analyzed”, “type”: “integer” },
  “dst_as”: { “index”: “analyzed”, “type”: “integer” },
  “src_mask”: { “index”: “analyzed”, “type”: “integer” },
  “dst_mask”: { “index”: “analyzed”, “type”: “integer” }
     }
    }
   }
  }
 }
}'

Elasticsearch索引模板的功能,本书稍后12.6节会有详细介绍。