黄胜华 發表於 2023-9-23 00:00:00

ELK logstash 处理MySQL慢查询日志(26th)

<p>在生产环境下,logstash 经常会遇到处理多种格式的日志,不同的日志格式,解析方法不同。下面来说说logstash处理多行日志的例子,对MySQL慢查询日志进行分析,这个经常遇到过,网络上疑问也很多。</p>
<p>MySQL慢查询日志格式如下:</p><pre class="brush:bash;toolbar:false"># User@Host: ttlsa @Id: 69641319
# Query_time: 0.000148Lock_time: 0.000023 Rows_sent: 0Rows_examined: 202
SET timestamp=1456717595;
select`Id`, `Url` from `File` where `Id` in ('201319', '201300');
# Time: 160229 11:46:37</pre><p></p>
<h3>filebeat配置</h3>
<p>我这里是使用filebeat 1.1.1版本的,之前版本没有multiline配置项,具体方法看后面那种。</p><pre class="brush:bash;toolbar:false">filebeat:
prospectors:
    -
      paths:
      - /www.ttlsa.com/logs/mysql/slow.log
      document_type: mysqlslowlog
      input_type: log
      multiline:
      negate: true
      match: after
registry_file: /var/lib/filebeat/registry
output:
logstash:
    hosts: ["10.6.66.14:5046"]
shipper:
logging:
files:</pre><p></p>
<h3>logstash配置</h3>
<h4>1. input段配置</h4>
<p></p><pre class="brush:bash;toolbar:false"># vi /etc/logstash/conf.d/01-beats-input.conf 
input {
beats {
    port =&gt; 5046
    host =&gt; "10.6.66.14"
}
}</pre><p></p>
<h4>2. filter 段配置</h4>
<p></p><pre class="brush:bash;toolbar:false"># vi /etc/logstash/conf.d/16-mysqlslowlog.log
filter {
if == "mysqlslowlog" {
grok {
    match =&gt; { "message" =&gt; "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?&lt;clienthost&gt;\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?&lt;sql&gt;(?&lt;action&gt;\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" }
}
    date {
      match =&gt; [ "timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
      remove_field =&gt; [ "timestamp" ]
    }
}
}</pre><p>关键之重是grok正则的配置。</p>
<h4>3. output段配置</h4>
<p></p><pre class="brush:bash;toolbar:false"># vi /etc/logstash/conf.d/30-beats-output.conf
output {
    if "_grokparsefailure" in {
      file { path =&gt; "/var/log/logstash/grokparsefailure-%{}-%{+YYYY.MM.dd}.log" }
    }

if [@metadata] in [ "mysqlslowlog" ] {
    elasticsearch {
      hosts =&gt; ["10.6.66.14:9200"]
      sniffing =&gt; true
      manage_template =&gt; false
      template_overwrite =&gt; true
      index =&gt; "%{[@metadata]}-%{}-%{+YYYY.MM.dd}"
      document_type =&gt; "%{[@metadata]}"
    }
}
}</pre><p>标准输出结果截图</p>
<p><img title="ELK logstash 处理MySQL慢查询日志(26th)" class="alignnone size-large wp-image-11415" src="https://zhuji.jb51.net/uploads/img/20230519/aa8c10ce4543925978a6c9f2e10c0659.jpg" width="1024" height="339"></p>
<p>elasticsearch结果截图</p>
<p><img title="ELK logstash 处理MySQL慢查询日志(26th)" class="alignnone size-full wp-image-11416" src="https://zhuji.jb51.net/uploads/img/20230519/d717e57a87c1a3def021134d06c3176e.jpg" width="758" height="517"></p>
<p><img title="ELK logstash 处理MySQL慢查询日志(26th)" class="alignnone size-large wp-image-11417" src="https://zhuji.jb51.net/uploads/img/20230519/ef2e379500d8bf23730db7b0af6ecda0.jpg" width="1024" height="232"></p>
<p>如果是使用filebeat1.1.1之前的版本,配置如下:</p>
<h4>1. filebeat配置</h4>
<p></p><pre class="brush:bash;toolbar:false">filebeat:
prospectors:
    -
      paths:
      - /www.ttlsa.com/logs/mysql/slow.log
      document_type: mysqlslowlog
      input_type: log
registry_file: /var/lib/filebeat/registry
output:
logstash:
    hosts: ["10.6.66.14:5046"]
shipper:
logging:
files:</pre><p></p>
<h4>2. logstash input段配置</h4>
<p></p><pre class="brush:bash;toolbar:false">input {
beats {
    port =&gt; 5046
    host =&gt; "10.6.66.14"
    codec =&gt; multiline {
      pattern =&gt; "^# User@Host:"
      negate =&gt; true
      what =&gt; previous
    }
}
}</pre><p>其它配置不变。</p>
<p>如有疑问跟帖,或者加群沟通。</p>
頁: [1]
查看完整版本: ELK logstash 处理MySQL慢查询日志(26th)