kafka设计原理:持久性、高效率、消息传递保障、副本集、leader选举、日志压缩

前言

Apache Kafka 是一个分布式流数据平台,具有高可靠性、持久性、高效性和可扩展性,用于处理实时数据流。它的设计原理包括以下几个关键方面:持久性、高效率、消息传递保障、副本集、领导者选举、日志压缩等。

持久性

Kafka 使用持久化日志来存储消息,保证消息即使在发送后不会丢失。每个消息都会被追加到一个日志文件(Log Segment)中,并通过索引进行索引。Kafka 在写入消息时会先写入内存,然后再异步将数据刷新到磁盘上的日志文件中。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaPersistenceExample {
    public static void main(String[] args) {
        String topic = "my-topic";
        String bootstrapServers = "localhost:9092";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully: " + metadata.toString());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        // 异步刷新并关闭生产者
        producer.flush();
        producer.close();
    }
}

高效率

Kafka 使用批量处理和压缩来提高吞吐量和效率。消息被分批写入磁盘,而不是每条消息都进行磁盘操作,从而降低了磁盘 I/O 的负担。此外,Kafka 采用了零拷贝技术,减少了数据在内存和磁盘之间的复制。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaEfficiencyExample {
    public static void main(String[] args) {
        String topic = "my-topic";
        String bootstrapServers = "localhost:9092";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 启用批量处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 等待时间,达到批量大小或等待时间后发送
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 启用压缩

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully: " + metadata.toString());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        // 异步刷新并关闭生产者
        producer.flush();
        producer.close();
    }
}

消息传递保障

Kafka 提供了多种消息传递保障机制,包括至少一次传递(消息不会丢失,但可能重复)、最多一次传递(消息不会重复,但可能丢失)和精确一次传递(确保不丢失也不重复)。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaGuaranteeExample {
    public static void main(String[] args) {
        String topic = "my-topic";
        String bootstrapServers = "localhost:9092";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置消息传递保障
        properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败时的重试次数
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully: " + metadata.toString());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        // 异步刷新并关闭生产者
        producer.flush();
        producer.close();
    }
}

acks 参数被设置为 "all",表示生产者会等待所有副本都成功写入消息后才会发送确认。
retries 参数被设置为 3,表示在发送失败时,生产者会尝试重新发送消息的最大次数。
这种配置可以实现“精确一次传递”(Exactly Once Semantics),即确保消息不会丢失,也不会重复。Kafka 会在消息发送失败时进行重试,直到所有的副本都成功写入为止。

需要注意的是,不同的消息传递保障机制可能会影响性能,因此在选择机制时需要根据业务需求和性能要求进行权衡。

副本集

Kafka 使用副本集来提高可用性和容错性。每个分区都可以有多个副本,分布在不同的服务器上。如果一个副本不可用,仍然可以从其他副本中读取数据。

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaReplicaSetExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String topicName = "my-topic";
        int numPartitions = 3; // 分区数
        short replicationFactor = 2; // 副本数

        Properties adminProps = new Properties();
        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (AdminClient adminClient = AdminClient.create(adminProps)) {
            NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
            // 配置副本集参数
            ConfigEntry minInSyncReplicasConfig = new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
            newTopic.configs(Collections.singletonMap(minInSyncReplicasConfig.name(), minInSyncReplicasConfig));

            CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
            createTopicsResult.all().get(); // 等待创建完成
            System.out.println("Topic created successfully.");
        }
    }
}

领导者选举

每个分区的副本中有一个被称为领导者(leader)的副本,负责处理读写请求。如果领导者不可用,Kafka 会进行领导者选举,选择一个新的领导者。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

class KafkaBroker {
    private int id;
    private boolean isLeader;

    public KafkaBroker(int id) {
        this.id = id;
        this.isLeader = false;
    }

    public int getId() {
        return id;
    }

    public boolean isLeader() {
        return isLeader;
    }

    public void becomeLeader() {
        isLeader = true;
    }

    public void demoteLeader() {
        isLeader = false;
    }
}

class KafkaTopicPartition {
    private int partitionId;
    private List<KafkaBroker> replicas;

    public KafkaTopicPartition(int partitionId) {
        this.partitionId = partitionId;
        replicas = new ArrayList<>();
    }

    public int getPartitionId() {
        return partitionId;
    }

    public void addReplica(KafkaBroker broker) {
        replicas.add(broker);
    }

    public List<KafkaBroker> getReplicas() {
        return replicas;
    }
}

public class LeaderElectionSimulation {
    public static void main(String[] args) {
        int numBrokers = 5;
        int numPartitions = 3;

        List<KafkaBroker> brokers = new ArrayList<>();
        for (int i = 0; i < numBrokers; i++) {
            brokers.add(new KafkaBroker(i));
        }

        List<KafkaTopicPartition> partitions = new ArrayList<>();
        for (int i = 0; i < numPartitions; i++) {
            KafkaTopicPartition partition = new KafkaTopicPartition(i);
            for (int j = 0; j < numBrokers; j++) {
                partition.addReplica(brokers.get(j));
            }
            partitions.add(partition);
        }
        // Simulate leader election
        Random random = new Random();
        for (KafkaTopicPartition partition : partitions) {
            List<KafkaBroker> replicas = partition.getReplicas();
            KafkaBroker newLeader = replicas.get(random.nextInt(replicas.size()));
            System.out.println("Partition " + partition.getPartitionId() + " leader changed to Broker " + newLeader.getId());
            replicas.forEach(broker -> broker.demoteLeader());
            newLeader.becomeLeader();
        }
    }
}

日志压缩

为了减少存储成本和网络传输,Kafka 支持日志压缩。可以对日志段中的消息进行压缩,以减小磁盘占用和网络带宽消耗。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaCompressionExample {
    public static void main(String[] args) {
        String topic = "my-topic";
        String bootstrapServers = "localhost:9092";

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置消息压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

        KafkaProducer producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord record = new ProducerRecord<>(topic, key, value);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully: " + metadata.toString());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        // 异步刷新并关闭生产者
        producer.flush();
        producer.close();
    }
}

Kafka 生产者示例:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        String topic = "my-topic";
        String bootstrapServers = "localhost:9092";
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord record = new ProducerRecord<>(topic, key, value);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully: " + metadata.toString());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        producer.close();
    }
}

Kafka 消费者示例:

import org.apache.kafka.clients.consumer.*;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        String topic = "my-topic";
        String bootstrapServers = "localhost:9092";
        String groupId = "my-group";

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton(topic));

        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
            }
        }
    }
}
kafka设计原理:持久性、高效率、消息传递保障、副本集、leader选举、日志压缩

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

滚动到顶部