(2023)ELK结合kafka环境搭建

前言

最近项目使用了ELK+kafka,编写此文章是为了帮助需要的人

Elasticsearch、Logstash和Kibana以及Kafka搭建日志抽取平台的详细操作步骤:

  1. 安装并启动Elasticsearch。可以下载并解压Elasticsearch软件包,修改配置文件elasticsearch.yml,然后运行bin/elasticsearch启动。
  2. 安装并启动Logstash。下载并解压Logstash软件包,编写logstash配置文件。input配置从Kafka消费数据,filter进行过滤和处理,output输出到Elasticsearch。然后运行bin/logstash -f logstash-simple.conf启动。
  3. 安装并启动Kibana。下载并解压Kibana软件包,修改配置文件kibana.yml,然后运行bin/kibana启动Web界面。
  4. 安装并启动Zookeeper和Kafka。下载并解压软件包,修改配置文件,分别启动zookeeper server和kafka server。创建topics等。
  5. 编写数据生产程序,将日志数据发送到Kafka指定topic。可以使用kafka自带的console producer或自己写程序推送日志到kafka。
  6. 在Kibana上创建index pattern,然后就可以进行日志分析和可视化了。创建仪表盘,添加图表等。
  7. (可选)可以设置Logstash过滤器,使用 grok 等方式解析日志,提高搜索效果。
  8. (可选)可以配置Elasticsearch集群,提高日志处理能力。

常见的集成ELK和Kafka的方式有:

ELK默认没有直接集成Kafka,但可以通过Logstash的input插件来从Kafka消费数据。

  1. 使用Logstash Kafka Input 插件
    这是最直接的方式,Logstash直接通过Kafka Input插件消费Kafka中的日志数据。在Logstash配置文件input部分可以配置Kafka连接和Topic等信息。
  2. 通过Filebeat+Kafka+Logstash
    Filebeat先收集日志,输出到Kafka,然后Logstash从Kafka消费日志数据输入到ELK。Filebeat支持输出到Kafka。
  3. 使用Kafka Connector
    利用Kafka Connect框架,可以开发连接器,实现从Kafka直接写入数据到Elasticsearch。这种方式需要编程实现连接器。
  4. 使用第三方工具
    也有一些第三方工具可以帮助集成,比如Logconnect等。这些工具可以管理 offsets 并实现 接收-处理-加载 的流程。
    总之,ELK和Kafka可以很方便地集成,Kafka给ELK提供了缓冲和队列功能,帮助处理海量日志数据。需要根据架构需求选择最合适的集成方式。

详细安装和配置步骤

  1. Elasticsearch 安装
    (1) 下载Elasticsearch,解压到指定目录
    (2) 修改配置文件elasticsearch.yml,设置集群名称、节点名、绑定Host等
    (3) 启动Elasticsearch:运行bin/elasticsearch
    (4) 检查Elasticsearch运行状态:curl http://localhost:9200

  2. Logstash 安装
    (1) 下载Logstash,解压到指定目录
    (2) 编写Logstash配置文件logstash.conf,设置input、filter、output
    input:从Kafka消费数据
    filter: grok、日期格式化等过滤
    output: 输出到Elasticsearch
    (3) 启动Logstash: bin/logstash -f logstash.conf

  3. Kibana 安装
    (1) 下载Kibana,解压到指定目录
    (2) 修改配置文件kibana.yml,设置Elasticsearch URL
    (3) 启动Kibana: bin/kibana
    (4) 访问Web界面,设置Index Pattern

  4. Kafka 安装
    (1) 下载Kafka,解压到指定目录
    (2) 修改配置文件 server.properties,设置broker.id等
    (3) 启动Zookeeper: bin/zookeeper-server-start.sh
    (4) 启动Kafka Server: bin/kafka-server-start.sh
    (5) 创建Topic: bin/kafka-topics.sh

  5. 整合配置
    (1) Logstash配置从Kafka消费数据并输出到Elasticsearch
    (2) 配置Kibana索引和可视化Dashboard
    (3) 编写数据产生程序,推送日志到Kafka
    (4) Logstash从Kafka消费数据,处理后输出到Elasticsearch
    (5) 在Kibana上分析和可视化日志数据
    以上是使用ELK结合Kafka搭建日志分析平台的详细步骤。可以根据实际需求进行调整优化,实现日志数据从采集到可视化的端对端处理。

  6. Elasticsearch
    下载Elasticsearch

    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz

    安装:

    # 解压缩
    tar -xzf elasticsearch-7.6.2-linux-x86_64.tar.gz
    # 创建数据和日志目录
    mkdir elasticsearch-data
    mkdir elasticsearch-logs
    # 修改配置文件
    vim config/elasticsearch.yml  
    path.data: /path/to/elasticsearch-data
    path.logs: /path/to/elasticsearch-logs
    # 启动Elasticsearch
    cd /path/to/elasticsearch-7.6.2/
    bin/elasticsearch
  7. Logstash

    # 下载Logstash
    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.tar.gz

    安装:

    # 解压缩
    tar -xzf logstash-7.6.2.tar.gz
    # 编写Logstash配置文件 
    vim logstash.conf
    # 启动Logstash
    cd /path/to/logstash-7.6.2/  
    bin/logstash -f logstash.conf
  8. Kibana

    # 下载Kibana
    wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz

    安装:

    # 解压缩
    tar -xzf kibana-7.6.2-linux-x86_64.tar.gz  
    # 修改配置文件
    vim config/kibana.yml
    # 启动Kibana
    cd /path/to/kibana-7.6.2-linux-x86_64/
    bin/kibana
  9. Kafka

    # 下载Kafka
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz

    安装:

    # 解压缩
    tar -xzf kafka_2.12-2.4.0.tgz
    # 修改配置文件
    vim config/server.properties  
    # 启动Zookeeper
    bin/zookeeper-server-start.sh config/zookeeper.properties
    # 启动Kafka Server  
    bin/kafka-server-start.sh config/server.properties

    ELK和Kafka之间具体的代码配置和说明

  10. Logstash配置
    Logstash通过input插件消费Kafka数据,配置如下:

    input {
    kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["log_topic"]
    }  
    }
    output {
    elasticsearch {
    hosts => "elasticsearch:9200"
    }
    }

    这里input通过kafka插件指向kafka集群,消费log_topic的日志数据。

  11. 生产日志到Kafka
    可以通过kafka命令行或者程序向log_topic发送日志:

    # 命令行发送日志
    bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic log_topic
    # 程序产生日志 
    producer = KafkaProducer(bootstrap_servers='kafka1:9092') 
    producer.send('log_topic', b'log content...')
  12. Kibana配置
    在Kibana中配置index pattern,指向Logstash输出的Elasticsearch index。然后可以进行搜索和数据可视化。

  13. Kafka Topic创建

    bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 1 --topic log_topic

    创建日志主题log_topic。
    以上是ELK和Kafka之间的主要配置和交互方式。两者通过Kafka和Logstash平滑集成,实现从日志采集到可视化的端到端流程。可以根据具体需求调整配置。

    微服务日志存放到Elasticsearch的详细代码实现步骤:

  14. 每个微服务内部统一日志输出到文件
    在微服务内部使用log4j、logback等日志框架,统一输出日志到文件,例如:

    // logback.xml
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
    <file>logs/app.log</file>
    <encoder>
    <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
    </appender>
    <root level="INFO">
    <appender-ref ref="FILE" />
    </root>
  15. Filebeat收集日志文件并发送到Kafka
    在每台微服务主机上安装Filebeat,Used module方式读取日志文件并输出到Kafka:

    filebeat.inputs:
    - type: log
    enabled: true
    paths:
    - /var/log/myapp/*.log
    output.kafka:
    hosts: ["kafka1:9092", "kafka2:9092"]
    topic: 'log_topic'
  16. Logstash从Kafka消费日志数据
    使用Logstash Kafka Input插件,并配置输出到Elasticsearch:

    input {
    kafka {
    ...
    }
    }
    output {
    elasticsearch {
    ... 
    }
    }
  17. Kibana连接Elasticsearch
    在Kibana上配置index pattern,即可查询分析日志。
    这样通过Filebeat、Kafka、Logstash和Elasticsearch的配合,可实现微服务日志的可视化分析。也可以考虑使用ELK Stack直接读日志文件的方式。

    Logstash Kafka Input插件从Kafka消费日志数据并输出到Elasticsearch的详细配置代码:

  18. 安装Logstash,并导入Kafka插件

    # 下载和安装Logstash
    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.tar.gz
    tar xzf logstash-7.6.2.tar.gz
    cd logstash-7.6.2
    # 安装kafka input插件
    bin/logstash-plugin install logstash-input-kafka
  19. 编写Logstash配置文件

    input {
    kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["log_topic"]
    consumer_threads => 3
    codec => "json" 
    }
    }
    filter {
    # 进行过滤或转换  
    }
    output {
    elasticsearch {
    hosts => ["http://elasticsearch1:9200","http://elasticsearch2:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}" 
    }
    }

    这里input通过kafka插件消费log_topic主题的数据,然后输出到配置的elasticsearch cluster中。

  20. 运行Logstash

    # 后台运行logstash
    bin/logstash -f logstash.conf
  21. 检查Elasticsearch indice
    发现数据成功写入indices like logstash-type-2023.02.15
    以上是使用Logstash Kafka Input插件从Kafka消费数据并写入Elasticsearch的详细配置步骤。可以根据实际情况调整参数。

(2023)ELK结合kafka环境搭建

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

滚动到顶部