前言
最近项目使用了ELK+kafka,编写此文章是为了帮助需要的人
Elasticsearch、Logstash和Kibana以及Kafka搭建日志抽取平台的详细操作步骤:
- 安装并启动Elasticsearch。可以下载并解压Elasticsearch软件包,修改配置文件elasticsearch.yml,然后运行bin/elasticsearch启动。
- 安装并启动Logstash。下载并解压Logstash软件包,编写logstash配置文件。input配置从Kafka消费数据,filter进行过滤和处理,output输出到Elasticsearch。然后运行bin/logstash -f logstash-simple.conf启动。
- 安装并启动Kibana。下载并解压Kibana软件包,修改配置文件kibana.yml,然后运行bin/kibana启动Web界面。
- 安装并启动Zookeeper和Kafka。下载并解压软件包,修改配置文件,分别启动zookeeper server和kafka server。创建topics等。
- 编写数据生产程序,将日志数据发送到Kafka指定topic。可以使用kafka自带的console producer或自己写程序推送日志到kafka。
- 在Kibana上创建index pattern,然后就可以进行日志分析和可视化了。创建仪表盘,添加图表等。
- (可选)可以设置Logstash过滤器,使用 grok 等方式解析日志,提高搜索效果。
- (可选)可以配置Elasticsearch集群,提高日志处理能力。
常见的集成ELK和Kafka的方式有:
ELK默认没有直接集成Kafka,但可以通过Logstash的input插件来从Kafka消费数据。
- 使用Logstash Kafka Input 插件
这是最直接的方式,Logstash直接通过Kafka Input插件消费Kafka中的日志数据。在Logstash配置文件input部分可以配置Kafka连接和Topic等信息。 - 通过Filebeat+Kafka+Logstash
Filebeat先收集日志,输出到Kafka,然后Logstash从Kafka消费日志数据输入到ELK。Filebeat支持输出到Kafka。 - 使用Kafka Connector
利用Kafka Connect框架,可以开发连接器,实现从Kafka直接写入数据到Elasticsearch。这种方式需要编程实现连接器。 - 使用第三方工具
也有一些第三方工具可以帮助集成,比如Logconnect等。这些工具可以管理 offsets 并实现 接收-处理-加载 的流程。
总之,ELK和Kafka可以很方便地集成,Kafka给ELK提供了缓冲和队列功能,帮助处理海量日志数据。需要根据架构需求选择最合适的集成方式。
详细安装和配置步骤
-
Elasticsearch 安装
(1) 下载Elasticsearch,解压到指定目录
(2) 修改配置文件elasticsearch.yml,设置集群名称、节点名、绑定Host等
(3) 启动Elasticsearch:运行bin/elasticsearch
(4) 检查Elasticsearch运行状态:curl http://localhost:9200 -
Logstash 安装
(1) 下载Logstash,解压到指定目录
(2) 编写Logstash配置文件logstash.conf,设置input、filter、output
input:从Kafka消费数据
filter: grok、日期格式化等过滤
output: 输出到Elasticsearch
(3) 启动Logstash: bin/logstash -f logstash.conf -
Kibana 安装
(1) 下载Kibana,解压到指定目录
(2) 修改配置文件kibana.yml,设置Elasticsearch URL
(3) 启动Kibana: bin/kibana
(4) 访问Web界面,设置Index Pattern -
Kafka 安装
(1) 下载Kafka,解压到指定目录
(2) 修改配置文件 server.properties,设置broker.id等
(3) 启动Zookeeper: bin/zookeeper-server-start.sh
(4) 启动Kafka Server: bin/kafka-server-start.sh
(5) 创建Topic: bin/kafka-topics.sh -
整合配置
(1) Logstash配置从Kafka消费数据并输出到Elasticsearch
(2) 配置Kibana索引和可视化Dashboard
(3) 编写数据产生程序,推送日志到Kafka
(4) Logstash从Kafka消费数据,处理后输出到Elasticsearch
(5) 在Kibana上分析和可视化日志数据
以上是使用ELK结合Kafka搭建日志分析平台的详细步骤。可以根据实际需求进行调整优化,实现日志数据从采集到可视化的端对端处理。 -
Elasticsearch
下载Elasticsearchwget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
安装:
# 解压缩 tar -xzf elasticsearch-7.6.2-linux-x86_64.tar.gz # 创建数据和日志目录 mkdir elasticsearch-data mkdir elasticsearch-logs # 修改配置文件 vim config/elasticsearch.yml path.data: /path/to/elasticsearch-data path.logs: /path/to/elasticsearch-logs # 启动Elasticsearch cd /path/to/elasticsearch-7.6.2/ bin/elasticsearch
-
Logstash
# 下载Logstash wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.tar.gz
安装:
# 解压缩 tar -xzf logstash-7.6.2.tar.gz # 编写Logstash配置文件 vim logstash.conf # 启动Logstash cd /path/to/logstash-7.6.2/ bin/logstash -f logstash.conf
-
Kibana
# 下载Kibana wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
安装:
# 解压缩 tar -xzf kibana-7.6.2-linux-x86_64.tar.gz # 修改配置文件 vim config/kibana.yml # 启动Kibana cd /path/to/kibana-7.6.2-linux-x86_64/ bin/kibana
-
Kafka
# 下载Kafka wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
安装:
# 解压缩 tar -xzf kafka_2.12-2.4.0.tgz # 修改配置文件 vim config/server.properties # 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Kafka Server bin/kafka-server-start.sh config/server.properties
ELK和Kafka之间具体的代码配置和说明
-
Logstash配置
Logstash通过input插件消费Kafka数据,配置如下:input { kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["log_topic"] } } output { elasticsearch { hosts => "elasticsearch:9200" } }
这里input通过kafka插件指向kafka集群,消费log_topic的日志数据。
-
生产日志到Kafka
可以通过kafka命令行或者程序向log_topic发送日志:# 命令行发送日志 bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic log_topic # 程序产生日志 producer = KafkaProducer(bootstrap_servers='kafka1:9092') producer.send('log_topic', b'log content...')
-
Kibana配置
在Kibana中配置index pattern,指向Logstash输出的Elasticsearch index。然后可以进行搜索和数据可视化。 -
Kafka Topic创建
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 1 --topic log_topic
创建日志主题log_topic。
以上是ELK和Kafka之间的主要配置和交互方式。两者通过Kafka和Logstash平滑集成,实现从日志采集到可视化的端到端流程。可以根据具体需求调整配置。微服务日志存放到Elasticsearch的详细代码实现步骤:
-
每个微服务内部统一日志输出到文件
在微服务内部使用log4j、logback等日志框架,统一输出日志到文件,例如:// logback.xml <appender name="FILE" class="ch.qos.logback.core.FileAppender"> <file>logs/app.log</file> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="FILE" /> </root>
-
Filebeat收集日志文件并发送到Kafka
在每台微服务主机上安装Filebeat,Used module方式读取日志文件并输出到Kafka:filebeat.inputs: - type: log enabled: true paths: - /var/log/myapp/*.log output.kafka: hosts: ["kafka1:9092", "kafka2:9092"] topic: 'log_topic'
-
Logstash从Kafka消费日志数据
使用Logstash Kafka Input插件,并配置输出到Elasticsearch:input { kafka { ... } } output { elasticsearch { ... } }
-
Kibana连接Elasticsearch
在Kibana上配置index pattern,即可查询分析日志。
这样通过Filebeat、Kafka、Logstash和Elasticsearch的配合,可实现微服务日志的可视化分析。也可以考虑使用ELK Stack直接读日志文件的方式。Logstash Kafka Input插件从Kafka消费日志数据并输出到Elasticsearch的详细配置代码:
-
安装Logstash,并导入Kafka插件
# 下载和安装Logstash wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.tar.gz tar xzf logstash-7.6.2.tar.gz cd logstash-7.6.2 # 安装kafka input插件 bin/logstash-plugin install logstash-input-kafka
-
编写Logstash配置文件
input { kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["log_topic"] consumer_threads => 3 codec => "json" } } filter { # 进行过滤或转换 } output { elasticsearch { hosts => ["http://elasticsearch1:9200","http://elasticsearch2:9200"] index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
这里input通过kafka插件消费log_topic主题的数据,然后输出到配置的elasticsearch cluster中。
-
运行Logstash
# 后台运行logstash bin/logstash -f logstash.conf
-
检查Elasticsearch indice
发现数据成功写入indices like logstash-type-2023.02.15
以上是使用Logstash Kafka Input插件从Kafka消费数据并写入Elasticsearch的详细配置步骤。可以根据实际情况调整参数。