一聚教程网:一个值得你收藏的教程网站

热门教程

kafka消息centos怎么处理

时间:2026-06-05 09:42:59 编辑:袖梨 来源:一聚教程网

在 CentOS 上处理 Kafka 消息的实用指南

kafka消息centos如何处理

一 环境准备与启动

  • 安装 Java 8+(Kafka 运行依赖 JVM),建议使用 OpenJDK 8。
  • 启动 Zookeeper(Kafka 元数据管理),默认端口 2181:
    • 下载解压后编辑 conf/zoo.cfg,设置 dataDir,执行:
      • 启动:bin/zkServer.sh start
      • 状态:bin/zkServer.sh status
  • 启动 Kafka Broker,默认端口 9092:
    • 编辑 config/server.properties:
      • 设置唯一 broker.id
      • 配置监听:listeners=PLAINTEXT://0.0.0.0:9092
      • 对外宣传地址:advertised.listeners=PLAINTEXT://<服务器IP>:9092
      • 日志目录:log.dirs=/var/lib/kafka/logs
      • Zookeeper 连接:zookeeper.connect=localhost:2181
    • 启动命令:
      • 前台:bin/kafka-server-start.sh config/server.properties
      • 后台:bin/kafka-server-start.sh -daemon config/server.properties
  • 防火墙放行端口(如 firewalld):
    • firewall-cmd --add-port=2181/tcp --permanent && firewall-cmd --add-port=9092/tcp --permanent && firewall-cmd --reload
  • 简单连通性自检:
    • nc -vz <服务器IP> 9092nc -vz <服务器IP> 2181 应返回 succeeded。

二 命令行收发消息

  • 创建主题(推荐用 bootstrap-server):
    • bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  • 查看主题:
    • bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 发送消息(控制台生产者):
    • bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
  • 消费消息(从最早开始):
    • bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 删除主题:
    • bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

三 消费模式与关键机制

  • 消费模式:Kafka 采用 Pull 模式,消费者主动拉取,便于根据处理能力控制节奏。
  • 消费者组与分区:同一 消费者组 内,一个分区只会被该组内的 一个消费者 消费;一个消费者可消费多个分区,实现负载均衡。
  • 偏移量提交:消费进度(offset)提交到系统主题 __consumer_offsets 持久化;可自动或手动提交。
  • 重平衡(Rebalance):组内消费者数量或分区数变化会触发重平衡,协调者负责分区分配;需关注心跳与会话超时设置,避免频繁重平衡。
  • 常用拉取参数:fetch.min.bytes、fetch.max.wait.ms、fetch.max.bytes、max.poll.records,用于平衡延迟与吞吐。

四 可靠性与顺序性配置要点

  • 生产者可靠性(acks):
    • acks=0:不等待确认,延迟最低、可靠性最差;
    • acks=1:等待 Leader 落盘后确认;
    • acks=all(或 -1):等待 Leader+所有 ISR 副本 落盘后确认,可靠性最高。
  • 重试与幂等:开启 retries 与幂等生产者(enable.idempotence=true)可降低失败重试导致的重复与乱序风险。
  • 顺序性:
    • 顺序保证在 分区内;将需要有序的消息使用相同 key 或自定义分区器路由到同一分区;
    • 避免并发消费同一分区;必要时使用事务或幂等消费逻辑。
  • 副本与容错:生产环境常用 replication.factor=3,提升可用性与数据耐久性。

五 编程消费示例 Java

  • 依赖(Maven):
    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency>
  • 示例代码(自动提交,按需改为手动提交):
    import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());props.put("auto.offset.reset", "earliest"); // 首次消费位置:earliest/latestprops.put("enable.auto.commit", "true"); // 自动提交props.put("auto.commit.interval.ms", "5000");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList("test"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> r : records) {System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",r.topic(), r.partition(), r.offset(), r.key(), r.value());}// 若 enable.auto.commit=false,在此处手动提交:// consumer.commitSync();}}}}
  • 运行前确认:
    • 主题已创建;
    • 服务器 9092 端口可达;
    • 如需远程访问,确保 advertised.listeners 配置为服务器 IP 而非 localhost。

热门栏目