目录

Logstash日志删除filter插件

背景

目前市面上流行的日志解决方案主要为ELK方案,但随着使用过程中,日志不断增多,日志消费和存储阶段存在性能问题,导致写入延迟,kaibana使用上体验不优。

另外,如果日志不太规范,打开很多debug日志,环境如果也挺多的情况下,成本也居高不下,针对es存储来说费用上也是一个不小的数目。

优化方案

针对上面的两个问题,通过植入自定义logstash filter插件,删除掉不需要的debug日志和其他异常日志,可以有效地缓解日志过多的问题,本文重点不讨论解决方案,主要描述自定义filter插件的使用方法,请看下文所述。

Logstash简介

Logstash是一个具有实时管线能力的开源数据收集引擎。在ELK Stack技术栈中,通常选择更轻量级的Filebeat搜集日志,然后将日志输出到Logstash中进行加工处理,解析切割日志,再将处理后的日志输出到指定的目标(Elasticsearch、Kafka等)当中。

Logstash事件的处理管线是inputs->filters->outputs, 这三个阶段都可以自定义插件,下面主要介绍如何开发自定义需求最多的filter插件。

生成Logstash filter插件

下载Logstsh二进制包

官方下载连接地址:https://www.elastic.co/cn/downloads/logstash

https://tc.ctq6.cn/tc/20240706180704.png

生成插件代码

cd倒logstash根目录,使用bin/logstash-plugin生成filter插件模板,如下所示:

1
bin/logstash-plugin generate --type filter --name debug-drop --path vendor/localgems

其中vendor/localgems 可修改为其他路径。

查看filter插件的目录结构,如下: https://tc.ctq6.cn/tc/20240706181301.png

插件文件说明

查看lib/logstash/filters/debug-drop.rb 文件

Logstash依赖UTF-8编码,需要在插件代码开始处添加:

1
# encoding: utf-8

模板代码里面默认require了"logstash/filters/base", 如果需要依赖其他代码或者gems就在这里添加。

插件名称配置

1
2
config_name "debug-drop"
#debug-drop 就是插件名称, 在Logstash配置的filter块中使用。

插件名称配置

1
2
config: message, :validate => :string, :default => "Hello World!"
# message 是插件test的可选参数,默认值是“Hello World!”

下面是参数的通用配置代码:

1
config: :variable_name, :validate => :variable_type, :default => "Default Value" :required => boolean, :deprecated => boolean, :obsolete => string

参数说明:

1
2
3
4
5
6
: varable_name: 参数名称
: validate: 验证参数类型,如:string,:password, :boolean, :number, :array,等
: required: 是否必须配置
: default: 默认值
: deprecated: 是否废弃
: obsolete: 声明该配置不再使用,通常提供升级方案

结合本文说明,定义几个hash变量

1
2
3
$debugStartTimeHash = Hash.new("debugStartTimeHash")
$debugEndTimeHash = Hash.new("debugEndTimeHash")
$debugNumHash = Hash.new("debugNumHash")

结合本文说明,定义几个默认参数

1
2
3
4
config :dropNum, :validate => :number, :default => 100
config :notDropNum, :validate => :number, :default => 200
config :dropTime, :validate => :number, :default => 3600
config :notDropTime, :validate => :number, :default => 7200

插件方法 Logstash插件必须实现两个方法:registerfilter

register方法代码如下:

1
2
3
4
public
def register
  @logger = self.logger
end

register方法相当于初始化方法,不需要手动调用, 可以在这个方法里面调用配置变量,如@message,也可以初始化自己的实例变量。

filter方法代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public
def filter(event)
  if @message
    event.set("message", @message)
  end

  msg = event.get("[event][original]")
  service = event.get("[fields][document_type]")
  puts event.get("[event][original]")

  if msg.include?'DEBUG'
    if $debugNumHash[service] == "debugNumHash"
      $debugNumHash = {
        service => 1
      }
    else
      debugNums = $debugNumHash[service]
      triggerTimes = Time.now.straftime("%Y-%m-%d %H:%M:%S")
      $debugNumHash = {
        service => debugNums + 1
      }
    end
    @logger.info("trigger time: #{tiggerTimes}, nums: #{debugsNUms}")

    if $debugStartTimeHash[service] == "debugStartTimeHash"
      $debugStartTimeHash = {
        service => Time.now.to_i
      }
    end

    if $debugEndTimeHash[service] == "debugEndTimeHash"
      $debugStartTimeHash = {
        service => Time.now.to_i
      }
    end

    if $debugEndTimeHash[service] == "debugEndTimeHash"
      $debugEndTimeHash = {
        service => 0
      }
    else 
      startTimes = $debugStartTimeHash[service]
      endTime = Time.now.to_i
      tilTimes = endTime - startTimes
      endTimes = Time.now.strftime("%Y-%m-%d %H:%M:%S")
      $debugEndTimeHash = {
        service => tilTimes
      }
    end
    @logger.info("trigger time: #{endTimes}, times: #{tilTimes}s")
  end

  $debugNumHash.each do |key, value|
    if value > @dropNum
      @logger.info("#{key} => Debug Log is Droped By over define DropNum...")
      event.cancel()
    elseif value > @notDropNum
      @debugNumHash[service] = 0
    end
  end

  $debugEndTimeHash.each do |key, value|
    if value > @dropTime
      endTimes = Time.now.strftime("%Y-%m-%d %H:%M:%")
      @logger.info("#{key} => Debug Logs is Drop By over define DropTime...")
      event.cancel()
    elseif value > @notDropTime
      $debugEndTimeHash = 0
    end
  end
  filter_matched(event)
end

filter 方法是插件的数据处理逻辑,其中event变量封装了数据流,可以通过接口访问event中的内容,具体参考https://www.elastic.co/guide/en/logstash/8.2/event-api.html。最后一句调用了filter_matched, 这个方法用于保证Logstash的配置add——failed,remove_filed,add_tag和remove_tag会被正确执行。

filter逻辑说明

先定义全局Hash

获取msg内容,匹配debug,按照条数或者时间,创建service => value存入Hash中

根据定义的参数,数据和时间, event.cancel(), 当条数或者时间达到预定值,重置

在Logstash中配置定制的插件

cd到Logstash根目录下,在Gemfile添加如下配置:

1
gem "logstash-filter-debug-drop",:path => "vendor/localgems/logstash-filter-debug-drop"

cd到插件的根目录下,修改logstash-filter-debug-drop.gemspec中的如下三处地方,不然编译或者logstash启动直接报错。

https://tc.ctq6.cn/tc/20240706192657.png

启动Logstash

修改logstash配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/opt/logstash-8.8.2/config/logstash-sample.conf
input {
  beats {
    host => "0.0.0.0"
    port => 5044
  }
}
filter {
  debug-drop {}
}
output {
  file {
    path => "/var/log/message.log"
  }
}

通过从filebeat上接收文件,输出到本地文件中

配置启动文件

创建system/logstash.service启动文件时,需要添加–enable-local-plugin-development参数,不然启动报错,启动文件如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
[Unit]
Description=logstash server daemon
Documentation=/opt/logstash-8.8.2/bin/logstash -help
Wants=network-online.target
After=network-online.target

[Service]
User=root
Group=root
Envionment="BEAT_CONFIG_OPTS=-f /opt/logstsh-8.8.2/config/logstsh-sample.conf"
ExecStart=/opt/logstsh-8.8.2/bin/logstash --enable-local-plugin-development $BEAT_CONFIG_OPTS
Restart=always

[Install]
WantedBy=multi-user.target

启动logstash

1
2
3
systemctl enable logstash
systemctl start logstash
journalctl -f -u logstash --no-pager