优秀的编程知识分享平台

网站首页 > 技术文章 正文

牛逼哄哄的ELK日志分析系统,搭建起来也没有想象中的那么难啊

nanyue 2024-11-15 22:55:17 技术文章 15 ℃

作者 :于老三

来源:https://www.cnblogs.com/yuhuLin/p/7018858.html

一、ELK搭建篇



官网地址:https://www.elastic.co/cn/

官网权威指南:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html

安装指南:https://www.elastic.co/guide/en/elasticsearch/reference/5.x/rpm.html

ELK是Elasticsearch、Logstash、Kibana的简称,这三者是核心套件,但并非全部。

Elasticsearch是实时全文搜索和分析引擎,提供搜集、分析、存储数据三大功能;是一套开放REST和JAVA API等结构提供高效搜索功能,可扩展的分布式系统。它构建于Apache Lucene搜索引擎库之上。

Logstash是一个用来搜集、分析、过滤日志的工具。它支持几乎任何类型的日志,包括系统日志、错误日志和自定义应用程序日志。它可以从许多来源接收日志,这些来源包括 syslog、消息传递(例如 RabbitMQ)和JMX,它能够以多种方式输出数据,包括电子邮件、websockets和Elasticsearch。

Kibana是一个基于Web的图形界面,用于搜索、分析和可视化存储在 Elasticsearch指标中的日志数据。它利用Elasticsearch的REST接口来检索数据,不仅允许用户创建他们自己的数据的定制仪表板视图,还允许他们以特殊的方式查询和过滤数据

# 环境



# 安装

# 安装elasticsearch的环境

创建elasticsearch data的存放目录,并修改该目录的属主属组

# mkdir -p /data/es-data (自定义用于存放data数据的目录)
# chown -R elasticsearch:elasticsearch /data/es-data

修改elasticsearch的日志属主属组

# chown -R elasticsearch:elasticsearch /var/log/elasticsearch/

修改elasticsearch的配置文件

启动服务

注意事项

通过浏览器请求下9200的端口,看下是否成功

如何和elasticsearch交互

安装插件

# LogStash的使用

logstash使用配置文件

官方指南:
https://www.elastic.co/guide/en/logstash/current/configuration.html

创建配置文件01-logstash.conf
# vim /etc/logstash/conf.d/elk.conf

文件中添加以下内容
input { stdin { } }
output {
 elasticsearch { hosts => ["192.168.1.202:9200"] }
 stdout { codec => rubydebug }
}

使用配置文件运行logstash
# logstash -f ./elk.conf

运行成功以后输入以及标准输出结果

logstash的数据库类型

1. Input插件
 权威指南:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
 
 file插件的使用
 # vim /etc/logstash/conf.d/elk.conf

 添加如下配置
 input {
 file {
 path => "/var/log/messages"
 type => "system"
 start_position => "beginning"
 }
 }
 output { 
 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "system-%{+YYYY.MM.dd}"
 }
 }


运行logstash指定elk.conf配置文件,进行过滤匹配
#logstash -f /etc/logstash/conf.d/elk.conf

来一发配置安全日志的并且把日志的索引按类型做存放,继续编辑elk.conf文件

# vim /etc/logstash/conf.d/elk.conf

添加secure日志的路径
input {
 file {
 path => "/var/log/messages"
 type => "system"
 start_position => "beginning"
 }

 file {
 path => "/var/log/secure"
 type => "secure"
 start_position => "beginning"
 }
}

output {

 if [type] == "system" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-system-%{+YYYY.MM.dd}"
 }
 }

 if [type] == "secure" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-secure-%{+YYYY.MM.dd}"
 }
 }
}

运行logstash指定elk.conf配置文件,进行过滤匹配
# logstash -f ./elk.conf 

这些设置都没有问题之后,接下来安装下kibana,可以让在前台展示

Kibana的安装及使用

安装kibana环境

官方安装手册:https://www.elastic.co/guide/en/kibana/current/install.html

下载kibana的tar.gz的软件包
# wget https://artifacts.elastic.co/downloads/kibana/kibana-5.4.0-linux-x86_64.tar.gz

解压kibana的tar包
# tar -xzf kibana-5.4.0-linux-x86_64.tar.gz

进入解压好的kibana
# mv kibana-5.4.0-linux-x86_64 /usr/local

创建kibana的软连接
# ln -s /usr/local/kibana-5.4.0-linux-x86_64/ /usr/local/kibana 

编辑kibana的配置文件
# vim /usr/local/kibana/config/kibana.yml

修改配置文件如下,开启以下的配置
server.port: 5601

server.host: "0.0.0.0"

elasticsearch.url: "http://192.168.1.202:9200"

kibana.index: ".kibana" 

安装screen,以便于kibana在后台运行(当然也可以不用安装,用其他方式进行后台启动)
# yum -y install screen

# screen

# /usr/local/kibana/bin/kibana
netstat -antp |grep 5601
tcp 0 0 0.0.0.0:5601 0.0.0.0:* LISTEN 17007/node 

打开浏览器并设置对应的index
http://IP:5601

二、ELK实战篇

好,现在索引也可以创建了,现在可以来输出nginx、apache、message、secrue的日志到前台展示(Nginx有的话直接修改,没有自行安装)

编辑nginx配置文件,修改以下内容(在http模块下添加)

log_format json '{"@timestamp":"$time_iso8601",'
 '"@version":"1",'
 '"client":"$remote_addr",'
 '"url":"$uri",'
 '"status":"$status",'
 '"domian":"$host",'
 '"host":"$server_addr",'
 '"size":"$body_bytes_sent",'
 '"responsetime":"$request_time",'
 '"referer":"$http_referer",'
 '"ua":"$http_user_agent"'
 '}';

修改access_log的输出格式为刚才定义的json 
access_log logs/elk.access.log json;

继续修改apache的配置文件

LogFormat "{ \
 \"@timestamp\": \"%{%Y-%m-%dT%H:%M:%S%z}t\", \
 \"@version\": \"1\", \
 \"tags\":[\"apache\"], \
 \"message\": \"%h %l %u %t \\\"%r\\\" %>s %b\", \
 \"clientip\": \"%a\", \
 \"duration\": %D, \
 \"status\": %>s, \
 \"request\": \"%U%q\", \
 \"urlpath\": \"%U\", \
 \"urlquery\": \"%q\", \
 \"bytes\": %B, \
 \"method\": \"%m\", \
 \"site\": \"%{Host}i\", \
 \"referer\": \"%{Referer}i\", \
 \"useragent\": \"%{User-agent}i\" \
 }" ls_apache_json

一样修改输出格式为上面定义的json格式
CustomLog logs/access_log ls_apache_json

编辑logstash配置文件,进行日志收集
vim /etc/logstash/conf.d/full.conf

 input {
 file {
 path => "/var/log/messages"
 type => "system"
 start_position => "beginning"
 } 

 file {
 path => "/var/log/secure"
 type => "secure"
 start_position => "beginning"
 } 

 file {
 path => "/var/log/httpd/access_log"
 type => "http"
 start_position => "beginning"
 } 

 file {
 path => "/usr/local/nginx/logs/elk.access.log"
 type => "nginx"
 start_position => "beginning"
 } 

}
 
output {

 if [type] == "system" { 

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-system-%{+YYYY.MM.dd}"
 } 
 } 

 if [type] == "secure" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-secure-%{+YYYY.MM.dd}"
 }
 }

 if [type] == "http" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-http-%{+YYYY.MM.dd}"
 }
 }

 if [type] == "nginx" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-nginx-%{+YYYY.MM.dd}"
 }
 }

}

运行看看效果如何
logstash -f /etc/logstash/conf.d/full.conf

可以发现所有创建日志的索引都已存在,接下来就去Kibana创建日志索引,进行展示(按照上面的方法进行创建索引即可),看下展示的效果

接下来再来一发MySQL慢日志的展示

由于MySQL的慢日志查询格式比较特殊,所以需要用正则进行匹配,并使用multiline能够进行多行匹配(看具体配置)
input {
 file {
 path => "/var/log/messages"
 type => "system"
 start_position => "beginning"
 } 

 file {
 path => "/var/log/secure"
 type => "secure"
 start_position => "beginning"
 } 

 file {
 path => "/var/log/httpd/access_log"
 type => "http"
 start_position => "beginning"
 } 

 file {
 path => "/usr/local/nginx/logs/elk.access.log"
 type => "nginx"
 start_position => "beginning"
 } 
 
 file {
 path => "/var/log/mysql/mysql.slow.log"
 type => "mysql"
 start_position => "beginning" 
 codec => multiline {
 pattern => "^# User@Host:"
 negate => true
 what => "previous"
 }
 }
}

filter {
 
 grok {
 match => { "message" => "SELECT SLEEP" }
 add_tag => [ "sleep_drop" ]
 tag_on_failure => []
 }
 
 
 if "sleep_drop" in [tags] {
 drop {}
 }
 
 grok {
 match => { "message" => "(?m)^# User@Host: %{USER:User}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:Client_IP})?\]\s.*# Query_time: %{NUMBER:Query_Time:float}\s+Lock_time: %{NUMBER:Lock_Time:float}\s+Rows_sent: %{NUMBER:Rows_Sent:int}\s+Rows_examined: %{NUMBER:Rows_Examined:int}\s*(?:use %{DATA:Database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<Query>(?<Action>\w+)\s+.*)\n# Time:.*#34; }
 }
 
 date {
 match => [ "timestamp", "UNIX" ]
 remove_field => [ "timestamp" ]
 }
 
 
}


 
output {

 if [type] == "system" { 

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-system-%{+YYYY.MM.dd}"
 } 
 } 

 if [type] == "secure" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-secure-%{+YYYY.MM.dd}"
 }
 }

 if [type] == "http" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-http-%{+YYYY.MM.dd}"
 }
 }

 if [type] == "nginx" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-nginx-%{+YYYY.MM.dd}"
 }
 }
 
 if [type] == "mysql" {

 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-mysql-slow-%{+YYYY.MM.dd}"
 }
 }
}

查看效果(一条慢日志查询会显示一条,如果不进行正则匹配,那么一行就会显示一条)

具体的日志输出需求,进行具体的分析

三:ELK终极篇

安装reids 
# yum install -y redis

修改redis的配置文件
# vim /etc/redis.conf

修改内容如下
daemonize yes

bind 192.168.1.202

启动redis服务
# /etc/init.d/redis restart

测试redis的是否启用成功
# redis-cli -h 192.168.1.202

输入info如果有不报错即可
redis 192.168.1.202:6379> info

redis_version:2.4.10
....

编辑配置redis-out.conf配置文件,把标准输入的数据存储到redis中
# vim /etc/logstash/conf.d/redis-out.conf

添加如下内容

input {
 stdin {}
}

output {

 redis {
 host => "192.168.1.202"
 port => "6379"
 password => 'test'
 db => '1'
 data_type => "list"
 key => 'elk-test'
 }
} 

运行logstash指定redis-out.conf的配置文件
# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis-out.conf

运行成功以后,在logstash中输入内容(查看下效果)

编辑配置redis-in.conf配置文件,把reids的存储的数据输出到elasticsearch中
# vim /etc/logstash/conf.d/redis-out.conf

添加如下内容
input{
 redis {
 host => "192.168.1.202"
 port => "6379"
 password => 'test'
 db => '1'
 data_type => "list"
 key => 'elk-test'
 batch_count => 1 #这个值是指从队列中读取数据时,一次性取出多少条,默认125条(如果redis中没有125条,就会报错,所以在测试期间加上这个值)
 }

}

output {
 elasticsearch {
 hosts => ['192.168.1.202:9200']
 index => 'redis-test-%{+YYYY.MM.dd}'
 }
}

运行logstash指定redis-in.conf的配置文件
# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis-out.conf
把之前的配置文件修改一下,变成所有的日志监控的来源文件都存放到redis中,然后通过redis在输出到elasticsearch中

更改为如下,编辑full.conf
input {
 file {
 path => "/var/log/httpd/access_log"
 type => "http"
 start_position => "beginning"
 }

 file {
 path => "/usr/local/nginx/logs/elk.access.log"
 type => "nginx"
 start_position => "beginning"
 }

 file {
 path => "/var/log/secure"
 type => "secure"
 start_position => "beginning"
 }

 file {
 path => "/var/log/messages"
 type => "system"
 start_position => "beginning"
 }
}


output {
 if [type] == "http" {
 redis {
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_http' 
 }
 }

 if [type] == "nginx" {
 redis {
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_nginx' 
 }
 }

 if [type] == "secure" {
 redis {
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_secure' 
 }
 }

 if [type] == "system" {
 redis {
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_system' 
 }
 }
} 


运行logstash指定shipper.conf的配置文件
# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/full.conf

在redis中查看是否已经将数据写到里面(有时候输入的日志文件不产生日志,会导致redis里面也没有写入日志)
把redis中的数据读取出来,写入到elasticsearch中(需要另外一台主机做实验)

编辑配置文件
# vim /etc/logstash/conf.d/redis-out.conf

添加如下内容
input {
 redis {
 type => "system"
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_system' 
 batch_count => 1
 }
 
 redis {
 type => "http"
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_http' 
 batch_count => 1
 }

 redis {
 type => "nginx"
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_nginx'
 batch_count => 1
 }
 
 redis {
 type => "secure"
 host => "192.168.1.202"
 password => 'test'
 port => "6379"
 db => "6"
 data_type => "list"
 key => 'nagios_secure' 
 batch_count => 1
 }
}
 
output {
 
 if [type] == "system" {
 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-system-%{+YYYY.MM.dd}"
 }
 } 

 if [type] == "http" {
 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-http-%{+YYYY.MM.dd}"
 } 
 } 

 if [type] == "nginx" {
 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-nginx-%{+YYYY.MM.dd}"
 } 
 } 

 if [type] == "secure" {
 elasticsearch {
 hosts => ["192.168.1.202:9200"]
 index => "nagios-secure-%{+YYYY.MM.dd}"
 } 
 } 
}


注意:
input是从客户端收集的
output是同样也保存到192.168.1.202中的elasticsearch中,如果要保存到当前的主机上,可以把output中的hosts修改成localhost,如果还需要在kibana中显示,需要在本机上部署kabana,为何要这样做,起到一个松耦合的目的
说白了,就是在客户端收集日志,写到服务端的redis里或是本地的redis里面,输出的时候对接ES服务器即可

运行命令看看效果
# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis-out.conf

效果是和直接往ES服务器输出一样的(这样是先将日志存到redis数据库,然后再从redis数据库里取出日志)

上线ELK

1. 日志分类
 系统日志 rsyslog logstash syslog插件
 访问日志 nginx logstash codec json
 错误日志 file logstash mulitline
 运行日志 file logstash codec json
 设备日志 syslog logstash syslog插件
 Debug日志 file logstash json 或者 mulitline 

2. 日志标准化
 路径 固定
 格式 尽量json

3. 系统个日志开始-->错误日志-->运行日志-->访问日志

因为ES保存日志是永久保存,所以需要定期删除一下日志,下面命令为删除指定时间前的日志

curl -X DELETE http://xx.xx.com:9200/logstash-*-`date +%Y-%m-%d -d "-$n days"`
最近发表
标签列表