前言
在Apache Kafka中,Broker、Producer和Consumer都原生自动支持分布式,以及自动实现负载均衡。
原理
Broker
Kafka Broker是消息代理服务器,负责存储和传递消息。多个Broker可以组成一个Kafka集群。Broker之间通过分区(Partitions)来分摊负载,每个分区都在不同的Broker上。
Producer
Kafka Producer负责将消息发送到Broker。Producer并不需要明确知道消息将发送到哪个分区,因为这个决策是由Kafka自动完成的。
Consumer
Kafka Consumer订阅一个或多个Topic,并消费Broker上的消息。Consumer Group是一组Consumer的集合,共同消费一个Topic的消息。Kafka通过分区和Consumer Group来实现负载均衡。
Kafka中的分区和副本(Replicas):
每个Topic可以分为多个分区。
每个分区可以有多个副本,一个Leader副本和零个或多个Follower副本。
Producer将消息发送到特定Topic的分区,Kafka保证相同分区的消息顺序。
Consumer Group中的Consumer会协调消费,每个分区只被一个Consumer Group中的一个Consumer消费。
Kafka Producer示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topic = "test-topic";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key" + i, "value" + i);
producer.send(record);
}
producer.close();
}
}
Kafka Consumer示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "test-group";
String topic = "test-topic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
}
}
}
}
Kafka Producer 测试代码
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topic = "test-topic";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key" + i, "value" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Sent message to topic " + metadata.topic() +
", partition " + metadata.partition() +
", offset " + metadata.offset());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
producer.close();
}
}
Kafka Consumer 测试代码
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerTest {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "test-group";
String topic = "test-topic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
}
}
}
}
Kafka:Broker、Producer和Consumer都原生自动支持分布式 自动实现负载均衡