{"id":790,"date":"2023-08-21T09:48:05","date_gmt":"2023-08-21T01:48:05","guid":{"rendered":"https:\/\/zhaocunwei.co.uk\/?p=790"},"modified":"2023-08-21T09:57:38","modified_gmt":"2023-08-21T01:57:38","slug":"k5","status":"publish","type":"post","link":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/","title":{"rendered":"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29"},"content":{"rendered":"<h1>\u524d\u8a00<\/h1>\n<p>Apache Kafka \u662f\u4e00\u4e2a\u5206\u5e03\u5f0f\u6d41\u6570\u636e\u5e73\u53f0\uff0c\u5177\u6709\u9ad8\u53ef\u9760\u6027\u3001\u6301\u4e45\u6027\u3001\u9ad8\u6548\u6027\u548c\u53ef\u6269\u5c55\u6027\uff0c\u7528\u4e8e\u5904\u7406\u5b9e\u65f6\u6570\u636e\u6d41\u3002\u5b83\u7684\u8bbe\u8ba1\u539f\u7406\u5305\u62ec\u4ee5\u4e0b\u51e0\u4e2a\u5173\u952e\u65b9\u9762\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001\u9886\u5bfc\u8005\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29\u7b49\u3002<\/p>\n<h1>\u6301\u4e45\u6027<\/h1>\n<p>Kafka \u4f7f\u7528\u6301\u4e45\u5316\u65e5\u5fd7\u6765\u5b58\u50a8\u6d88\u606f\uff0c\u4fdd\u8bc1\u6d88\u606f\u5373\u4f7f\u5728\u53d1\u9001\u540e\u4e0d\u4f1a\u4e22\u5931\u3002\u6bcf\u4e2a\u6d88\u606f\u90fd\u4f1a\u88ab\u8ffd\u52a0\u5230\u4e00\u4e2a\u65e5\u5fd7\u6587\u4ef6\uff08Log Segment\uff09\u4e2d\uff0c\u5e76\u901a\u8fc7\u7d22\u5f15\u8fdb\u884c\u7d22\u5f15\u3002Kafka \u5728\u5199\u5165\u6d88\u606f\u65f6\u4f1a\u5148\u5199\u5165\u5185\u5b58\uff0c\u7136\u540e\u518d\u5f02\u6b65\u5c06\u6570\u636e\u5237\u65b0\u5230\u78c1\u76d8\u4e0a\u7684\u65e5\u5fd7\u6587\u4ef6\u4e2d\u3002<\/p>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.producer.*;\nimport java.util.Properties;\npublic class KafkaPersistenceExample {\n    public static void main(String[] args) {\n        String topic = &quot;my-topic&quot;;\n        String bootstrapServers = &quot;localhost:9092&quot;;\n        Properties properties = new Properties();\n        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);\n        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);\n        KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\n        for (int i = 0; i &lt; 10; i++) {\n            String key = &quot;key&quot; + i;\n            String value = &quot;value&quot; + i;\n            ProducerRecord&lt;String, String&gt; record = new ProducerRecord&lt;&gt;(topic, key, value);\n            producer.send(record, new Callback() {\n                @Override\n                public void onCompletion(RecordMetadata metadata, Exception exception) {\n                    if (exception == null) {\n                        System.out.println(&quot;Message sent successfully: &quot; + metadata.toString());\n                    } else {\n                        System.err.println(&quot;Error sending message: &quot; + exception.getMessage());\n                    }\n                }\n            });\n        }\n        \/\/ \u5f02\u6b65\u5237\u65b0\u5e76\u5173\u95ed\u751f\u4ea7\u8005\n        producer.flush();\n        producer.close();\n    }\n}<\/code><\/pre>\n<h1>\u9ad8\u6548\u7387<\/h1>\n<p>Kafka \u4f7f\u7528\u6279\u91cf\u5904\u7406\u548c\u538b\u7f29\u6765\u63d0\u9ad8\u541e\u5410\u91cf\u548c\u6548\u7387\u3002\u6d88\u606f\u88ab\u5206\u6279\u5199\u5165\u78c1\u76d8\uff0c\u800c\u4e0d\u662f\u6bcf\u6761\u6d88\u606f\u90fd\u8fdb\u884c\u78c1\u76d8\u64cd\u4f5c\uff0c\u4ece\u800c\u964d\u4f4e\u4e86\u78c1\u76d8 I\/O \u7684\u8d1f\u62c5\u3002\u6b64\u5916\uff0cKafka \u91c7\u7528\u4e86\u96f6\u62f7\u8d1d\u6280\u672f\uff0c\u51cf\u5c11\u4e86\u6570\u636e\u5728\u5185\u5b58\u548c\u78c1\u76d8\u4e4b\u95f4\u7684\u590d\u5236\u3002<\/p>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.producer.*;\nimport java.util.Properties;\npublic class KafkaEfficiencyExample {\n    public static void main(String[] args) {\n        String topic = &quot;my-topic&quot;;\n        String bootstrapServers = &quot;localhost:9092&quot;;\n        Properties properties = new Properties();\n        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);\n        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);\n        \/\/ \u542f\u7528\u6279\u91cf\u5904\u7406\n        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); \/\/ \u6279\u91cf\u5927\u5c0f\n        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); \/\/ \u7b49\u5f85\u65f6\u95f4\uff0c\u8fbe\u5230\u6279\u91cf\u5927\u5c0f\u6216\u7b49\u5f85\u65f6\u95f4\u540e\u53d1\u9001\n        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, &quot;gzip&quot;); \/\/ \u542f\u7528\u538b\u7f29\n\n        KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\n\n        for (int i = 0; i &lt; 10; i++) {\n            String key = &quot;key&quot; + i;\n            String value = &quot;value&quot; + i;\n            ProducerRecord&lt;String, String&gt; record = new ProducerRecord&lt;&gt;(topic, key, value);\n\n            producer.send(record, new Callback() {\n                @Override\n                public void onCompletion(RecordMetadata metadata, Exception exception) {\n                    if (exception == null) {\n                        System.out.println(&quot;Message sent successfully: &quot; + metadata.toString());\n                    } else {\n                        System.err.println(&quot;Error sending message: &quot; + exception.getMessage());\n                    }\n                }\n            });\n        }\n        \/\/ \u5f02\u6b65\u5237\u65b0\u5e76\u5173\u95ed\u751f\u4ea7\u8005\n        producer.flush();\n        producer.close();\n    }\n}<\/code><\/pre>\n<h1>\u6d88\u606f\u4f20\u9012\u4fdd\u969c<\/h1>\n<p>Kafka \u63d0\u4f9b\u4e86\u591a\u79cd\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u673a\u5236\uff0c\u5305\u62ec\u81f3\u5c11\u4e00\u6b21\u4f20\u9012\uff08\u6d88\u606f\u4e0d\u4f1a\u4e22\u5931\uff0c\u4f46\u53ef\u80fd\u91cd\u590d\uff09\u3001\u6700\u591a\u4e00\u6b21\u4f20\u9012\uff08\u6d88\u606f\u4e0d\u4f1a\u91cd\u590d\uff0c\u4f46\u53ef\u80fd\u4e22\u5931\uff09\u548c\u7cbe\u786e\u4e00\u6b21\u4f20\u9012\uff08\u786e\u4fdd\u4e0d\u4e22\u5931\u4e5f\u4e0d\u91cd\u590d\uff09\u3002<\/p>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.producer.*;\nimport java.util.Properties;\npublic class KafkaGuaranteeExample {\n    public static void main(String[] args) {\n        String topic = &quot;my-topic&quot;;\n        String bootstrapServers = &quot;localhost:9092&quot;;\n        Properties properties = new Properties();\n        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);\n        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);\n        \/\/ \u914d\u7f6e\u6d88\u606f\u4f20\u9012\u4fdd\u969c\n        properties.put(ProducerConfig.ACKS_CONFIG, &quot;all&quot;); \/\/ \u7b49\u5f85\u6240\u6709\u526f\u672c\u786e\u8ba4\n        properties.put(ProducerConfig.RETRIES_CONFIG, 3); \/\/ \u5931\u8d25\u65f6\u7684\u91cd\u8bd5\u6b21\u6570\n        KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\n        for (int i = 0; i &lt; 10; i++) {\n            String key = &quot;key&quot; + i;\n            String value = &quot;value&quot; + i;\n            ProducerRecord&lt;String, String&gt; record = new ProducerRecord&lt;&gt;(topic, key, value);\n            producer.send(record, new Callback() {\n                @Override\n                public void onCompletion(RecordMetadata metadata, Exception exception) {\n                    if (exception == null) {\n                        System.out.println(&quot;Message sent successfully: &quot; + metadata.toString());\n                    } else {\n                        System.err.println(&quot;Error sending message: &quot; + exception.getMessage());\n                    }\n                }\n            });\n        }\n        \/\/ \u5f02\u6b65\u5237\u65b0\u5e76\u5173\u95ed\u751f\u4ea7\u8005\n        producer.flush();\n        producer.close();\n    }\n}<\/code><\/pre>\n<p>acks \u53c2\u6570\u88ab\u8bbe\u7f6e\u4e3a &quot;all&quot;\uff0c\u8868\u793a\u751f\u4ea7\u8005\u4f1a\u7b49\u5f85\u6240\u6709\u526f\u672c\u90fd\u6210\u529f\u5199\u5165\u6d88\u606f\u540e\u624d\u4f1a\u53d1\u9001\u786e\u8ba4\u3002<br \/>\nretries \u53c2\u6570\u88ab\u8bbe\u7f6e\u4e3a 3\uff0c\u8868\u793a\u5728\u53d1\u9001\u5931\u8d25\u65f6\uff0c\u751f\u4ea7\u8005\u4f1a\u5c1d\u8bd5\u91cd\u65b0\u53d1\u9001\u6d88\u606f\u7684\u6700\u5927\u6b21\u6570\u3002<br \/>\n\u8fd9\u79cd\u914d\u7f6e\u53ef\u4ee5\u5b9e\u73b0\u201c\u7cbe\u786e\u4e00\u6b21\u4f20\u9012\u201d\uff08Exactly Once Semantics\uff09\uff0c\u5373\u786e\u4fdd\u6d88\u606f\u4e0d\u4f1a\u4e22\u5931\uff0c\u4e5f\u4e0d\u4f1a\u91cd\u590d\u3002Kafka \u4f1a\u5728\u6d88\u606f\u53d1\u9001\u5931\u8d25\u65f6\u8fdb\u884c\u91cd\u8bd5\uff0c\u76f4\u5230\u6240\u6709\u7684\u526f\u672c\u90fd\u6210\u529f\u5199\u5165\u4e3a\u6b62\u3002<\/p>\n<p>\u9700\u8981\u6ce8\u610f\u7684\u662f\uff0c\u4e0d\u540c\u7684\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u673a\u5236\u53ef\u80fd\u4f1a\u5f71\u54cd\u6027\u80fd\uff0c\u56e0\u6b64\u5728\u9009\u62e9\u673a\u5236\u65f6\u9700\u8981\u6839\u636e\u4e1a\u52a1\u9700\u6c42\u548c\u6027\u80fd\u8981\u6c42\u8fdb\u884c\u6743\u8861\u3002<\/p>\n<h1>\u526f\u672c\u96c6<\/h1>\n<p>Kafka \u4f7f\u7528\u526f\u672c\u96c6\u6765\u63d0\u9ad8\u53ef\u7528\u6027\u548c\u5bb9\u9519\u6027\u3002\u6bcf\u4e2a\u5206\u533a\u90fd\u53ef\u4ee5\u6709\u591a\u4e2a\u526f\u672c\uff0c\u5206\u5e03\u5728\u4e0d\u540c\u7684\u670d\u52a1\u5668\u4e0a\u3002\u5982\u679c\u4e00\u4e2a\u526f\u672c\u4e0d\u53ef\u7528\uff0c\u4ecd\u7136\u53ef\u4ee5\u4ece\u5176\u4ed6\u526f\u672c\u4e2d\u8bfb\u53d6\u6570\u636e\u3002<\/p>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.admin.*;\nimport org.apache.kafka.common.config.TopicConfig;\nimport java.util.Collections;\nimport java.util.Properties;\nimport java.util.concurrent.ExecutionException;\n\npublic class KafkaReplicaSetExample {\n    public static void main(String[] args) throws ExecutionException, InterruptedException {\n        String topicName = &quot;my-topic&quot;;\n        int numPartitions = 3; \/\/ \u5206\u533a\u6570\n        short replicationFactor = 2; \/\/ \u526f\u672c\u6570\n\n        Properties adminProps = new Properties();\n        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);\n\n        try (AdminClient adminClient = AdminClient.create(adminProps)) {\n            NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);\n            \/\/ \u914d\u7f6e\u526f\u672c\u96c6\u53c2\u6570\n            ConfigEntry minInSyncReplicasConfig = new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, &quot;2&quot;);\n            newTopic.configs(Collections.singletonMap(minInSyncReplicasConfig.name(), minInSyncReplicasConfig));\n\n            CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));\n            createTopicsResult.all().get(); \/\/ \u7b49\u5f85\u521b\u5efa\u5b8c\u6210\n            System.out.println(&quot;Topic created successfully.&quot;);\n        }\n    }\n}<\/code><\/pre>\n<h1>\u9886\u5bfc\u8005\u9009\u4e3e<\/h1>\n<p>\u6bcf\u4e2a\u5206\u533a\u7684\u526f\u672c\u4e2d\u6709\u4e00\u4e2a\u88ab\u79f0\u4e3a\u9886\u5bfc\u8005\uff08leader\uff09\u7684\u526f\u672c\uff0c\u8d1f\u8d23\u5904\u7406\u8bfb\u5199\u8bf7\u6c42\u3002\u5982\u679c\u9886\u5bfc\u8005\u4e0d\u53ef\u7528\uff0cKafka \u4f1a\u8fdb\u884c\u9886\u5bfc\u8005\u9009\u4e3e\uff0c\u9009\u62e9\u4e00\u4e2a\u65b0\u7684\u9886\u5bfc\u8005\u3002<\/p>\n<pre><code class=\"language-java\">import java.util.ArrayList;\nimport java.util.List;\nimport java.util.Random;\n\nclass KafkaBroker {\n    private int id;\n    private boolean isLeader;\n\n    public KafkaBroker(int id) {\n        this.id = id;\n        this.isLeader = false;\n    }\n\n    public int getId() {\n        return id;\n    }\n\n    public boolean isLeader() {\n        return isLeader;\n    }\n\n    public void becomeLeader() {\n        isLeader = true;\n    }\n\n    public void demoteLeader() {\n        isLeader = false;\n    }\n}\n\nclass KafkaTopicPartition {\n    private int partitionId;\n    private List&lt;KafkaBroker&gt; replicas;\n\n    public KafkaTopicPartition(int partitionId) {\n        this.partitionId = partitionId;\n        replicas = new ArrayList&lt;&gt;();\n    }\n\n    public int getPartitionId() {\n        return partitionId;\n    }\n\n    public void addReplica(KafkaBroker broker) {\n        replicas.add(broker);\n    }\n\n    public List&lt;KafkaBroker&gt; getReplicas() {\n        return replicas;\n    }\n}\n\npublic class LeaderElectionSimulation {\n    public static void main(String[] args) {\n        int numBrokers = 5;\n        int numPartitions = 3;\n\n        List&lt;KafkaBroker&gt; brokers = new ArrayList&lt;&gt;();\n        for (int i = 0; i &lt; numBrokers; i++) {\n            brokers.add(new KafkaBroker(i));\n        }\n\n        List&lt;KafkaTopicPartition&gt; partitions = new ArrayList&lt;&gt;();\n        for (int i = 0; i &lt; numPartitions; i++) {\n            KafkaTopicPartition partition = new KafkaTopicPartition(i);\n            for (int j = 0; j &lt; numBrokers; j++) {\n                partition.addReplica(brokers.get(j));\n            }\n            partitions.add(partition);\n        }\n        \/\/ Simulate leader election\n        Random random = new Random();\n        for (KafkaTopicPartition partition : partitions) {\n            List&lt;KafkaBroker&gt; replicas = partition.getReplicas();\n            KafkaBroker newLeader = replicas.get(random.nextInt(replicas.size()));\n            System.out.println(&quot;Partition &quot; + partition.getPartitionId() + &quot; leader changed to Broker &quot; + newLeader.getId());\n            replicas.forEach(broker -&gt; broker.demoteLeader());\n            newLeader.becomeLeader();\n        }\n    }\n}<\/code><\/pre>\n<h1>\u65e5\u5fd7\u538b\u7f29<\/h1>\n<p>\u4e3a\u4e86\u51cf\u5c11\u5b58\u50a8\u6210\u672c\u548c\u7f51\u7edc\u4f20\u8f93\uff0cKafka \u652f\u6301\u65e5\u5fd7\u538b\u7f29\u3002\u53ef\u4ee5\u5bf9\u65e5\u5fd7\u6bb5\u4e2d\u7684\u6d88\u606f\u8fdb\u884c\u538b\u7f29\uff0c\u4ee5\u51cf\u5c0f\u78c1\u76d8\u5360\u7528\u548c\u7f51\u7edc\u5e26\u5bbd\u6d88\u8017\u3002<\/p>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.producer.*;\nimport java.util.Properties;\n\npublic class KafkaCompressionExample {\n    public static void main(String[] args) {\n        String topic = \"my-topic\";\n        String bootstrapServers = \"localhost:9092\";\n\n        Properties properties = new Properties();\n        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringSerializer\");\n        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringSerializer\");\n        \/\/ \u914d\u7f6e\u6d88\u606f\u538b\u7f29\n        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, \"gzip\");\n\n        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);\n\n        for (int i = 0; i < 10; i++) {\n            String key = \"key\" + i;\n            String value = \"value\" + i;\n            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);\n\n            producer.send(record, new Callback() {\n                @Override\n                public void onCompletion(RecordMetadata metadata, Exception exception) {\n                    if (exception == null) {\n                        System.out.println(\"Message sent successfully: \" + metadata.toString());\n                    } else {\n                        System.err.println(\"Error sending message: \" + exception.getMessage());\n                    }\n                }\n            });\n        }\n        \/\/ \u5f02\u6b65\u5237\u65b0\u5e76\u5173\u95ed\u751f\u4ea7\u8005\n        producer.flush();\n        producer.close();\n    }\n}<\/code><\/pre>\n<h1>Kafka \u751f\u4ea7\u8005\u793a\u4f8b\uff1a<\/h1>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.producer.*;\n\nimport java.util.Properties;\n\npublic class KafkaProducerExample {\n    public static void main(String[] args) {\n        String topic = \"my-topic\";\n        String bootstrapServers = \"localhost:9092\";\n        Properties properties = new Properties();\n        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringSerializer\");\n        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringSerializer\");\n\n        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);\n\n        for (int i = 0; i < 10; i++) {\n            String key = \"key\" + i;\n            String value = \"value\" + i;\n            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);\n\n            producer.send(record, new Callback() {\n                @Override\n                public void onCompletion(RecordMetadata metadata, Exception exception) {\n                    if (exception == null) {\n                        System.out.println(\"Message sent successfully: \" + metadata.toString());\n                    } else {\n                        System.err.println(\"Error sending message: \" + exception.getMessage());\n                    }\n                }\n            });\n        }\n        producer.close();\n    }\n}<\/code><\/pre>\n<h1>Kafka \u6d88\u8d39\u8005\u793a\u4f8b\uff1a<\/h1>\n<pre><code class=\"language-java\">import org.apache.kafka.clients.consumer.*;\n\nimport java.util.Collections;\nimport java.util.Properties;\n\npublic class KafkaConsumerExample {\n    public static void main(String[] args) {\n        String topic = \"my-topic\";\n        String bootstrapServers = \"localhost:9092\";\n        String groupId = \"my-group\";\n\n        Properties properties = new Properties();\n        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);\n        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringDeserializer\");\n        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringDeserializer\");\n\n        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);\n        consumer.subscribe(Collections.singleton(topic));\n\n        while (true) {\n            ConsumerRecords<String, String> records = consumer.poll(100);\n            for (ConsumerRecord<String, String> record : records) {\n                System.out.println(\"Received message: key = \" + record.key() + \", value = \" + record.value());\n            }\n        }\n    }\n}\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u524d\u8a00 Apache Kafka \u662f\u4e00\u4e2a\u5206\u5e03\u5f0f\u6d41\u6570\u636e\u5e73\u53f0\uff0c\u5177\u6709\u9ad8\u53ef\u9760\u6027\u3001\u6301\u4e45\u6027\u3001\u9ad8\u6548\u6027\u548c\u53ef\u6269\u5c55\u6027\uff0c\u7528\u4e8e\u5904\u7406\u5b9e\u65f6\u6570 [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"om_disable_all_campaigns":false,"_mi_skip_tracking":false},"categories":[1],"tags":[],"aioseo_notices":[],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v20.5 - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29 - \u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/\" \/>\n<meta property=\"og:locale\" content=\"zh_CN\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29 - \u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug\" \/>\n<meta property=\"og:description\" content=\"\u524d\u8a00 Apache Kafka \u662f\u4e00\u4e2a\u5206\u5e03\u5f0f\u6d41\u6570\u636e\u5e73\u53f0\uff0c\u5177\u6709\u9ad8\u53ef\u9760\u6027\u3001\u6301\u4e45\u6027\u3001\u9ad8\u6548\u6027\u548c\u53ef\u6269\u5c55\u6027\uff0c\u7528\u4e8e\u5904\u7406\u5b9e\u65f6\u6570 [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/\" \/>\n<meta property=\"og:site_name\" content=\"\u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug\" \/>\n<meta property=\"article:published_time\" content=\"2023-08-21T01:48:05+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2023-08-21T01:57:38+00:00\" \/>\n<meta name=\"author\" content=\"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:label1\" content=\"\u4f5c\u8005\" \/>\n\t<meta name=\"twitter:data1\" content=\"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46\" \/>\n\t<meta name=\"twitter:label2\" content=\"\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4\" \/>\n\t<meta name=\"twitter:data2\" content=\"5\u5206\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"WebPage\",\"@id\":\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/\",\"url\":\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/\",\"name\":\"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29 - \u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug\",\"isPartOf\":{\"@id\":\"https:\/\/zhaocunwei.co.uk\/#website\"},\"datePublished\":\"2023-08-21T01:48:05+00:00\",\"dateModified\":\"2023-08-21T01:57:38+00:00\",\"author\":{\"@id\":\"https:\/\/zhaocunwei.co.uk\/#\/schema\/person\/dfb1dc0fc4a330c41908d477cd99c0b4\"},\"breadcrumb\":{\"@id\":\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/#breadcrumb\"},\"inLanguage\":\"zh-CN\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/\"]}]},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"\u9996\u9875\",\"item\":\"https:\/\/zhaocunwei.co.uk\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/zhaocunwei.co.uk\/#website\",\"url\":\"https:\/\/zhaocunwei.co.uk\/\",\"name\":\"\u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug\",\"description\":\"This site shares programming bugs\",\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/zhaocunwei.co.uk\/?s={search_term_string}\"},\"query-input\":\"required name=search_term_string\"}],\"inLanguage\":\"zh-CN\"},{\"@type\":\"Person\",\"@id\":\"https:\/\/zhaocunwei.co.uk\/#\/schema\/person\/dfb1dc0fc4a330c41908d477cd99c0b4\",\"name\":\"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"zh-CN\",\"@id\":\"https:\/\/zhaocunwei.co.uk\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/4226cc1ca6640507df1d2d4ba3da7a62?s=96&d=mm&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/4226cc1ca6640507df1d2d4ba3da7a62?s=96&d=mm&r=g\",\"caption\":\"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46\"},\"sameAs\":[\"http:\/\/zhaocunwei.co.uk\"],\"url\":\"https:\/\/zhaocunwei.co.uk\/index.php\/author\/18500103508163-com\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29 - \u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/","og_locale":"zh_CN","og_type":"article","og_title":"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29 - \u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug","og_description":"\u524d\u8a00 Apache Kafka \u662f\u4e00\u4e2a\u5206\u5e03\u5f0f\u6d41\u6570\u636e\u5e73\u53f0\uff0c\u5177\u6709\u9ad8\u53ef\u9760\u6027\u3001\u6301\u4e45\u6027\u3001\u9ad8\u6548\u6027\u548c\u53ef\u6269\u5c55\u6027\uff0c\u7528\u4e8e\u5904\u7406\u5b9e\u65f6\u6570 [&hellip;]","og_url":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/","og_site_name":"\u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug","article_published_time":"2023-08-21T01:48:05+00:00","article_modified_time":"2023-08-21T01:57:38+00:00","author":"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46","twitter_card":"summary_large_image","twitter_misc":{"\u4f5c\u8005":"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46","\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4":"5\u5206"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"WebPage","@id":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/","url":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/","name":"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29 - \u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug","isPartOf":{"@id":"https:\/\/zhaocunwei.co.uk\/#website"},"datePublished":"2023-08-21T01:48:05+00:00","dateModified":"2023-08-21T01:57:38+00:00","author":{"@id":"https:\/\/zhaocunwei.co.uk\/#\/schema\/person\/dfb1dc0fc4a330c41908d477cd99c0b4"},"breadcrumb":{"@id":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/#breadcrumb"},"inLanguage":"zh-CN","potentialAction":[{"@type":"ReadAction","target":["https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/"]}]},{"@type":"BreadcrumbList","@id":"https:\/\/zhaocunwei.co.uk\/index.php\/2023\/08\/21\/k5\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"\u9996\u9875","item":"https:\/\/zhaocunwei.co.uk\/"},{"@type":"ListItem","position":2,"name":"kafka\u8bbe\u8ba1\u539f\u7406\uff1a\u6301\u4e45\u6027\u3001\u9ad8\u6548\u7387\u3001\u6d88\u606f\u4f20\u9012\u4fdd\u969c\u3001\u526f\u672c\u96c6\u3001leader\u9009\u4e3e\u3001\u65e5\u5fd7\u538b\u7f29"}]},{"@type":"WebSite","@id":"https:\/\/zhaocunwei.co.uk\/#website","url":"https:\/\/zhaocunwei.co.uk\/","name":"\u672c\u7f51\u7ad9\u5206\u4eab\u7f16\u7a0b\u8fc7\u7a0b\u4e2d\u51fa\u73b0\u7684bug","description":"This site shares programming bugs","potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/zhaocunwei.co.uk\/?s={search_term_string}"},"query-input":"required name=search_term_string"}],"inLanguage":"zh-CN"},{"@type":"Person","@id":"https:\/\/zhaocunwei.co.uk\/#\/schema\/person\/dfb1dc0fc4a330c41908d477cd99c0b4","name":"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46","image":{"@type":"ImageObject","inLanguage":"zh-CN","@id":"https:\/\/zhaocunwei.co.uk\/#\/schema\/person\/image\/","url":"https:\/\/secure.gravatar.com\/avatar\/4226cc1ca6640507df1d2d4ba3da7a62?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/4226cc1ca6640507df1d2d4ba3da7a62?s=96&d=mm&r=g","caption":"\u603b\u662f\u5e78\u798f\u7684\u8001\u8c4c\u8c46"},"sameAs":["http:\/\/zhaocunwei.co.uk"],"url":"https:\/\/zhaocunwei.co.uk\/index.php\/author\/18500103508163-com\/"}]}},"_links":{"self":[{"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/posts\/790"}],"collection":[{"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/comments?post=790"}],"version-history":[{"count":1,"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/posts\/790\/revisions"}],"predecessor-version":[{"id":791,"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/posts\/790\/revisions\/791"}],"wp:attachment":[{"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/media?parent=790"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/categories?post=790"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/zhaocunwei.co.uk\/index.php\/wp-json\/wp\/v2\/tags?post=790"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}