kafaka
本文演示如何在 tio-boot 项目中集成 Kafka,实现消息的生产与消费,并通过 HTTP 接口触发消息发送。
一、Kafka 安装与启动
Kafka 依赖 Zookeeper(新版本可用 KRaft,但这里用经典方式,简单稳定)。
Zookeeper
里面可以看到:
apache-zookeeper-3.9.5-bin.tar.gz(推荐用这个)- `apache-zookeeper-3.9.5.tar.gz
推荐使用 bin 版本(已经编译好) ([downloads.apache.org][1])
安装
mkdir -p /opt/package/zookeeper && cd /opt/package/zookeeper
wget https://downloads.apache.org/zookeeper/zookeeper-3.9.5/apache-zookeeper-3.9.5-bin.tar.gz
tar -xzf apache-zookeeper-3.9.5-bin.tar.gz
mv apache-zookeeper-3.9.5-bin /opt/
cd /opt/apache-zookeeper-3.9.5-bin
配置 zoo.cfg
复制模板:
cp conf/zoo_sample.cfg conf/zoo.cfg
修改配置(建议)
vi conf/zoo.cfg
推荐改成:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=60
如果 /tmp/zookeeper 不存在:
mkdir -p /tmp/zookeeper
启动 ZooKeeper
bin/zkServer.sh start
查看状态
bin/zkServer.sh status
正常输出:
Mode: standalone
测试是否成功
bin/zkCli.sh
输入:
ls /
看到:
[zookeeper]
说明 OK
Kafaka
1. 下载 Kafka
进入官网下载安装包:
推荐版本:3.x
mkdir /opt/package/kafaka -p && cd /opt/package/kafaka
wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.12-3.6.0.tgz
2. 解压并进入目录
tar -zxf kafka_2.12-3.6.0.tgz
mv kafka_2.12-3.6.0 /opt/
cd /opt/kafka_2.12-3.6.0/
3. 配置
4. 启动 Kafka
bin/kafka-server-start.sh config/server.properties
5. 创建 Topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server :9092 --partitions 1 --replication-factor 1
6. 测试(可选)
生产消息:
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
消费消息:
bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
二、Tio-boot整合kafaka
1、Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
2、app.properties 配置
kafka.bootstrap.servers=127.0.0.1:9092
kafka.topic=test-topic
kafka.group.id=test-group
3、HelloApp 启动类
@AComponentScan
public class HelloApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
TioApplication.run(HelloApp.class, args);
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}
}
4、KafkaProducerUtils(生产者工具类)
package com.litongjava.mq.kafaka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerUtils {
private static KafkaProducer<String, String> producer;
private static String topic;
public static void init(String servers, String topicName) {
topic = topicName;
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public static void send(String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);
}
public static KafkaProducer<String, String> getProducer() {
return producer;
}
}
5、KafkaConsumerHandler(消费者)
package com.litongjava.mq.kafaka;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerHandler implements Runnable {
private KafkaConsumer<String, String> consumer;
public KafkaConsumerHandler(String servers, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
6、KafkaClientConfig 配置类
package com.litongjava.mq.kafaka.config;
import com.litongjava.annotation.AConfiguration;
import com.litongjava.annotation.Initialization;
import com.litongjava.hook.HookCan;
import com.litongjava.mq.kafaka.KafkaConsumerHandler;
import com.litongjava.mq.kafaka.KafkaProducerUtils;
import com.litongjava.tio.utils.environment.EnvUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AConfiguration
public class KafkaClientConfig {
@Initialization
public void init() {
String servers = EnvUtils.get("kafka.bootstrap.servers");
String topic = EnvUtils.get("kafka.topic");
String groupId = EnvUtils.get("kafka.group.id");
// 初始化生产者
KafkaProducerUtils.init(servers, topic);
log.info("Kafka Producer initialized");
// 启动消费者线程
KafkaConsumerHandler handler = new KafkaConsumerHandler(servers, groupId, topic);
Thread consumerThread = new Thread(handler);
consumerThread.start();
log.info("Kafka Consumer started");
// 优雅关闭
HookCan.me().addDestroyMethod(() -> {
log.info("Kafka shutting down...");
});
}
}
7、KafakaTestController(HTTP 触发发送)
package com.litongjava.mq.kafaka.controller;
import com.litongjava.annotation.RequestPath;
import com.litongjava.mq.kafaka.KafkaProducerUtils;
@RequestPath("/kafaka/test")
public class KafakaTestController {
@RequestPath()
public String index() {
String msg = "Hello Kafka";
KafkaProducerUtils.send(msg);
return "Message sent to Kafka";
}
}
8、发送消息(手动方式)
KafkaProducer<String, String> producer = KafkaProducerUtils.getProducer();
ProducerRecord<String, String> record =new ProducerRecord<>("test-topic", "Hello Kafka");
producer.send(record);
System.out.println("Message sent");
9、整体流程说明
整个系统运行流程如下:
- 启动 tio-boot
- 初始化 Kafka Producer
- 启动 Consumer 线程监听 Topic
- HTTP 请求触发 Producer 发送消息
- Consumer 接收并处理消息
10、与 EMQX 对比
| 特性 | EMQX (MQTT) | Kafka |
|---|---|---|
| 协议 | MQTT | TCP 自定义 |
| 适用场景 | IoT、设备通信 | 大数据、日志流 |
| 消费模型 | Pub/Sub | 分区 + 消费组 |
| 持久化 | 可选 | 强持久化 |
| 吞吐量 | 中等 | 极高 |
总结
通过本教程你已经掌握:
- Kafka 安装与启动
- tio-boot 集成 Kafka
- Producer / Consumer 编写
- HTTP 触发消息发送
- 生命周期管理
