Kafka:Broker、Producer和Consumer都原生自动支持分布式 自动实现负载均衡

前言

在Apache Kafka中,Broker、Producer和Consumer都原生自动支持分布式,以及自动实现负载均衡。

原理

Broker

Kafka Broker是消息代理服务器,负责存储和传递消息。多个Broker可以组成一个Kafka集群。Broker之间通过分区(Partitions)来分摊负载,每个分区都在不同的Broker上。

Producer

Kafka Producer负责将消息发送到Broker。Producer并不需要明确知道消息将发送到哪个分区,因为这个决策是由Kafka自动完成的。

Consumer

Kafka Consumer订阅一个或多个Topic,并消费Broker上的消息。Consumer Group是一组Consumer的集合,共同消费一个Topic的消息。Kafka通过分区和Consumer Group来实现负载均衡。

Kafka中的分区和副本(Replicas):

每个Topic可以分为多个分区。
每个分区可以有多个副本,一个Leader副本和零个或多个Follower副本。
Producer将消息发送到特定Topic的分区,Kafka保证相同分区的消息顺序。
Consumer Group中的Consumer会协调消费,每个分区只被一个Consumer Group中的一个Consumer消费。

Kafka Producer示例

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String topic = "test-topic";
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key" + i, "value" + i);
            producer.send(record);
        }
        producer.close();
    }
}

Kafka Consumer示例

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String groupId = "test-group";
        String topic = "test-topic";
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
            }
        }
    }
}

Kafka Producer 测试代码

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String topic = "test-topic";
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key" + i, "value" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Sent message to topic " + metadata.topic() +
                                ", partition " + metadata.partition() +
                                ", offset " + metadata.offset());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        producer.close();
    }
}

Kafka Consumer 测试代码

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerTest {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String groupId = "test-group";
        String topic = "test-topic";
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
            }
        }
    }
}
Kafka:Broker、Producer和Consumer都原生自动支持分布式 自动实现负载均衡

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

滚动到顶部