前言
Apache Kafka 是一个分布式流数据平台,具有高可靠性、持久性、高效性和可扩展性,用于处理实时数据流。它的设计原理包括以下几个关键方面:持久性、高效率、消息传递保障、副本集、领导者选举、日志压缩等。
持久性
Kafka 使用持久化日志来存储消息,保证消息即使在发送后不会丢失。每个消息都会被追加到一个日志文件(Log Segment)中,并通过索引进行索引。Kafka 在写入消息时会先写入内存,然后再异步将数据刷新到磁盘上的日志文件中。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaPersistenceExample {
public static void main(String[] args) {
String topic = "my-topic";
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = "value" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: " + metadata.toString());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
// 异步刷新并关闭生产者
producer.flush();
producer.close();
}
}
高效率
Kafka 使用批量处理和压缩来提高吞吐量和效率。消息被分批写入磁盘,而不是每条消息都进行磁盘操作,从而降低了磁盘 I/O 的负担。此外,Kafka 采用了零拷贝技术,减少了数据在内存和磁盘之间的复制。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaEfficiencyExample {
public static void main(String[] args) {
String topic = "my-topic";
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 启用批量处理
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 等待时间,达到批量大小或等待时间后发送
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 启用压缩
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = "value" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: " + metadata.toString());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
// 异步刷新并关闭生产者
producer.flush();
producer.close();
}
}
消息传递保障
Kafka 提供了多种消息传递保障机制,包括至少一次传递(消息不会丢失,但可能重复)、最多一次传递(消息不会重复,但可能丢失)和精确一次传递(确保不丢失也不重复)。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaGuaranteeExample {
public static void main(String[] args) {
String topic = "my-topic";
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 配置消息传递保障
properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败时的重试次数
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = "value" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: " + metadata.toString());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
// 异步刷新并关闭生产者
producer.flush();
producer.close();
}
}
acks 参数被设置为 "all",表示生产者会等待所有副本都成功写入消息后才会发送确认。
retries 参数被设置为 3,表示在发送失败时,生产者会尝试重新发送消息的最大次数。
这种配置可以实现“精确一次传递”(Exactly Once Semantics),即确保消息不会丢失,也不会重复。Kafka 会在消息发送失败时进行重试,直到所有的副本都成功写入为止。
需要注意的是,不同的消息传递保障机制可能会影响性能,因此在选择机制时需要根据业务需求和性能要求进行权衡。
副本集
Kafka 使用副本集来提高可用性和容错性。每个分区都可以有多个副本,分布在不同的服务器上。如果一个副本不可用,仍然可以从其他副本中读取数据。
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaReplicaSetExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
String topicName = "my-topic";
int numPartitions = 3; // 分区数
short replicationFactor = 2; // 副本数
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(adminProps)) {
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
// 配置副本集参数
ConfigEntry minInSyncReplicasConfig = new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
newTopic.configs(Collections.singletonMap(minInSyncReplicasConfig.name(), minInSyncReplicasConfig));
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
createTopicsResult.all().get(); // 等待创建完成
System.out.println("Topic created successfully.");
}
}
}
领导者选举
每个分区的副本中有一个被称为领导者(leader)的副本,负责处理读写请求。如果领导者不可用,Kafka 会进行领导者选举,选择一个新的领导者。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
class KafkaBroker {
private int id;
private boolean isLeader;
public KafkaBroker(int id) {
this.id = id;
this.isLeader = false;
}
public int getId() {
return id;
}
public boolean isLeader() {
return isLeader;
}
public void becomeLeader() {
isLeader = true;
}
public void demoteLeader() {
isLeader = false;
}
}
class KafkaTopicPartition {
private int partitionId;
private List<KafkaBroker> replicas;
public KafkaTopicPartition(int partitionId) {
this.partitionId = partitionId;
replicas = new ArrayList<>();
}
public int getPartitionId() {
return partitionId;
}
public void addReplica(KafkaBroker broker) {
replicas.add(broker);
}
public List<KafkaBroker> getReplicas() {
return replicas;
}
}
public class LeaderElectionSimulation {
public static void main(String[] args) {
int numBrokers = 5;
int numPartitions = 3;
List<KafkaBroker> brokers = new ArrayList<>();
for (int i = 0; i < numBrokers; i++) {
brokers.add(new KafkaBroker(i));
}
List<KafkaTopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
KafkaTopicPartition partition = new KafkaTopicPartition(i);
for (int j = 0; j < numBrokers; j++) {
partition.addReplica(brokers.get(j));
}
partitions.add(partition);
}
// Simulate leader election
Random random = new Random();
for (KafkaTopicPartition partition : partitions) {
List<KafkaBroker> replicas = partition.getReplicas();
KafkaBroker newLeader = replicas.get(random.nextInt(replicas.size()));
System.out.println("Partition " + partition.getPartitionId() + " leader changed to Broker " + newLeader.getId());
replicas.forEach(broker -> broker.demoteLeader());
newLeader.becomeLeader();
}
}
}
日志压缩
为了减少存储成本和网络传输,Kafka 支持日志压缩。可以对日志段中的消息进行压缩,以减小磁盘占用和网络带宽消耗。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaCompressionExample {
public static void main(String[] args) {
String topic = "my-topic";
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 配置消息压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
KafkaProducer producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = "value" + i;
ProducerRecord record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: " + metadata.toString());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
// 异步刷新并关闭生产者
producer.flush();
producer.close();
}
}
Kafka 生产者示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
String topic = "my-topic";
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = "value" + i;
ProducerRecord record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: " + metadata.toString());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
producer.close();
}
}
Kafka 消费者示例:
import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
String topic = "my-topic";
String bootstrapServers = "localhost:9092";
String groupId = "my-group";
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton(topic));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
}
}
}
}