Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • jooq
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • LLM
  • voice-agent
  • knowlege_base
  • ai_agent
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • jooq
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • LLM
  • voice-agent
  • knowlege_base
  • ai_agent
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
    • 开启虚拟线程(Virtual Thread)
    • 框架级错误通知
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件上传
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • Controller拦截器
    • 请求拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • 跨域
    • 添加 Controller
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • handler入门
    • 返回 multipart
    • 待定
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 使用零拷贝发送大文件
    • 分片上传
    • 接口访问统计
    • 接口请求和响应数据记录
    • WebJars
    • JProtobuf
    • 测速
    • Gzip Bomb:使用压缩炸弹防御恶意爬虫
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • Model生成器
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • 预留
    • 预留
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
    • TQL(Table SQL)前端输入规范
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 自定义注解拦截器
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • Tio-Boot 整合 Java-DB 与 Enjoy 模板引擎示例
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 独立端口启动 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
    • Disruptor
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • EnvUtils 配置工具
    • Notification
    • Email
    • JSON
    • File
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • HttpUtils
    • 系统监控
    • 线程
    • 虚拟线程
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 72_文件存储

    • 文件上传数据表
    • 本地存储
    • 存储到 亚马逊 S3
    • 存储到 Cloudflare R2
    • 存储到 腾讯 COS
    • 存储到 阿里云 OSS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • tio-boot 整合 spring-boot-starter
    • Tio Boot 整合 Spring Boot Starter db
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_quarkus

    • Quarkus(无 HTTP)整合 tio-boot(有 HTTP)
    • tio-boot + Quarkus + Hibernate ORM Panache
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • 索引
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 全文检索
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 42_mysql

    • 使用 Docker 运行 MySQL
    • 常见问题
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • 待定
  • 49_jooq

    • 使用配置类方式整合 jOOQ
    • tio-boot + jOOQ 事务管理
    • 批量操作与性能优化
    • 整合agroal
    • 代码生成与类型安全
    • 基于 Record / POJO 增删改查
    • UPSERT、批量更新、返回主键与高级 SQL
    • 的多表关联查询、DTO 投影、聚合统计与视图封装
    • 的窗口函数、CTE、JSON 查询与 PostgreSQL 高级 SQL 实战
    • tio-boot + jOOQ 的审计字段、乐观锁、数据权限与企业级 Repository 设计
    • 测试策略、SQL 日志、性能诊断与生产排障
    • 多租户、读写分离与多数据源设计
    • 代码生成治理、数据库迁移与团队协作规范实战
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • 待定
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
    • 获取视频长度
    • 保存视频的最后一帧
    • 添加水印
    • linux版本
  • 55_cv

    • 使用 Java 运行 YOLOv8 ONNX 模型进行目标检测
    • tio-boot整合yolo
    • ONNX Runtime 推理说明
  • 58_telegram4j

    • 数据库设计
    • 基于 HTTP 协议开发 Telegram 翻译机器人
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 59_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • 流式生成
    • 图片多模态输入
    • Google Gemini接入
    • google Vertex AI 接入
    • 请求记录
    • 限流和错误处理
    • /zh/60_LLM/08.html
    • /zh/60_LLM/09.html
    • /zh/60_LLM/10.html
    • 增强检索(RAG)
    • 结构化数据检索
    • AI 问答
  • 61_voice-agent

    • 整合Gemini realtime模型
    • Voice Agent 前端接入接口文档
    • 整合千问realtime模型
    • 打断支持
    • 主动介入
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 使用 OpenAI ASR 实现语音识别接口(Java 后端示例)
    • 定向搜索
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 65_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 66_ai-coding

    • Cline 提示词
    • Cline 提示词-中文版本
  • 67_java-uni-ai-server

    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 待定
  • 68_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 69_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • 执行 Python (Manim) 代码
    • 待定
    • 待定
    • 待定
    • 视频下载增加水印说明文档
  • 70_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 71_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 单图片管理(只读模式)
    • 多图片管理
    • 布尔值管理
    • 字段联动
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
    • 整合 Enjoy 模版引擎
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 74_tio-mcp-server

    • 实现 MCP Server 开发指南
  • 75_tio-sip

    • SIP Server 第一版原理说明
    • SIP Server 第一版实战
    • 一、Windows 平台测试
    • SIP Server 第二版实战
    • SIP Server 第三版实战
    • 性能优化
    • 基于 MediaProcessor 对接 Realtime 模型说明
    • 对接大语言模型
    • 支持 G722 宽带语音
    • G722编码和解码
    • 会话级采样率转换
    • /zh/75_tio-sip/12.html
    • 增加 9196 回声测试分机
    • 语音系统链路说明
    • 一、Gemini Realtime 的打断机制
  • 77_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • TTS服务端
    • 废弃
    • 废弃
    • 废弃
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • ManimGL(manimgl)
    • Manim 实战入门:用代码创造数学动画
    • 欢迎
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
    • HTTP/1.1 Pipelining 性能测试报告
    • tio-boot vs Quarkus 性能对比测试报告
  • 81_tio-boot

    • 简介
    • Swagger 整合到 Tio-Boot 中的指南
    • 待定
    • 待定
    • 高性能网络编程中的 ByteBuffer 分配与回收策略
    • TioBootServerHandler 源码解析
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

AWS MSK

tio-boot整合AWS MSK

一、准备工作

接入前先确认这几件事:

  1. MSK 集群已创建
  2. Topic 已存在,或者由程序自动创建
  3. 应用运行环境能访问 MSK
  4. 应用绑定的 AWS 身份有足够权限

AWS 官方说明里提到,MSK Provisioned 集群默认是私网访问,客户端一般需要在同一 VPC 内;而 IAM 模式下,Java 客户端要用 aws-msk-iam-auth,配置 security.protocol=SASL_SSL、sasl.mechanism=AWS_MSK_IAM、IAMLoginModule 和 IAMClientCallbackHandler。(AWS Documentation)


二、Maven 依赖

AWS 官方文档给了 aws-msk-iam-auth 的 Maven 坐标,官方 GitHub README 当前示例版本是 2.3.5。(AWS Documentation)

<dependencies>
  <!-- Kafka Java Client -->
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.1</version>
  </dependency>

  <!-- AWS MSK IAM Authentication -->
  <dependency>
    <groupId>software.amazon.msk</groupId>
    <artifactId>aws-msk-iam-auth</artifactId>
    <version>2.3.5</version>
  </dependency>
</dependencies>

or

<dependencies>
  <!-- Kafka Java Client -->
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.2</version>
  </dependency>

  <!-- AWS MSK IAM Authentication -->
  <dependency>
    <groupId>software.amazon.msk</groupId>
    <artifactId>aws-msk-iam-auth</artifactId>
    <version>1.1.8</version>
  </dependency>
</dependencies>

说明:

  • kafka-clients 版本通常选与你的项目兼容的稳定版本即可。
  • aws-msk-iam-auth 是 Java 连接 IAM 认证 MSK 的关键依赖。(GitHub)

三、app.properties 配置

结合你给的配置,可以这样整理:

# MSK
aws.msk.bootstrap-servers=server1:9098,sever2:9098
aws.msk.enable-producer=true
aws.msk.enable-consumer=false
aws.msk.producer.topic-name=jr_experience_verified_prod
aws.msk.consumer.topic-name=jr_experience_verified_prod
aws.msk.group-id=jr_experience_verified_group

# Producer
aws.msk.acks=all
aws.msk.retries=3

# Consumer
aws.msk.auto-offset-reset=earliest
aws.msk.enable-auto-commit=true

如果你后面想把生产者和消费者拆分成不同 topic,也可以再加:

aws.msk.producer-topic=jr_experience_verified_prod
aws.msk.consumer-topic=jr_experience_verified_prod

四、HelloApp 启动类

import com.litongjava.annotation.AComponentScan;
import com.litongjava.tio.boot.TioApplication;

@AComponentScan
public class HelloApp {
  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    TioApplication.run(HelloApp.class, args);
    long end = System.currentTimeMillis();
    System.out.println((end - start) + "ms");
  }
}

五、KafkaProducerUtils

这个工具类负责保存 Producer,并提供发送消息的方法。

package com.litongjava.kafka;

import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class KafkaProducerUtils {

  private static KafkaProducer<String, String> producer;
  private static String topic;

  public static void init(KafkaProducer<String, String> kafkaProducer, String topicName) {
    producer = kafkaProducer;
    topic = topicName;
  }

  public static KafkaProducer<String, String> getProducer() {
    return producer;
  }

  public static void send(String message) {
    send(topic, message);
  }

  public static void send(String topicName, String message) {
    try {
      ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
      Future<RecordMetadata> future = producer.send(record);
      RecordMetadata metadata = future.get();
      log.info("Kafka message sent, topic:{}, partition:{}, offset:{}",
          metadata.topic(), metadata.partition(), metadata.offset());
    } catch (Exception e) {
      log.error("Kafka send message failed", e);
    }
  }
}

六、KafkaConsumerRunner

这个类负责消费消息。为了跟你 EMQX 的风格接近,这里用一个线程持续轮询。

package com.litongjava.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class KafkaConsumerRunner implements Runnable {

  private final KafkaConsumer<String, String> consumer;
  private final String topic;
  private final AtomicBoolean running = new AtomicBoolean(true);

  public KafkaConsumerRunner(KafkaConsumer<String, String> consumer, String topic) {
    this.consumer = consumer;
    this.topic = topic;
  }

  @Override
  public void run() {
    consumer.subscribe(Collections.singletonList(topic));
    log.info("Kafka consumer subscribed topic: {}", topic);

    try {
      while (running.get()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
          log.info("Received message, topic:{}, partition:{}, offset:{}, key:{}, value:{}",
              record.topic(),
              record.partition(),
              record.offset(),
              record.key(),
              record.value());
        }
      }
    } catch (Exception e) {
      log.error("Kafka consumer error", e);
    } finally {
      try {
        consumer.close();
      } catch (Exception e) {
        log.error("Kafka consumer close error", e);
      }
    }
  }

  public void shutdown() {
    running.set(false);
  }
}

七、MskClientConfig 配置类

这是整合的核心。这里会:

  • 读取 app.properties
  • 创建 Producer
  • 创建 Consumer
  • 配置 IAM 认证
  • 应用关闭时优雅释放资源

AWS 官方给 Java 的 IAM 配置要点是:

  • security.protocol=SASL_SSL
  • sasl.mechanism=AWS_MSK_IAM
  • sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  • sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

这些都是下面代码里的关键配置。(AWS Documentation)

package com.litongjava.tio.boot.admin.utils;

import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AwsProfileUtils {

  public static String jaasConfig() {
    String awsProfile = firstNonBlank(EnvUtils.get("AWS_PROFILE"), EnvUtils.get("aws.profile"),
        EnvUtils.get("aws.msk.aws-profile"));

    describeAuthMode(awsProfile);
    String jaasConfig = buildJaasConfig(awsProfile);
    return jaasConfig;
  }

  private static String buildJaasConfig(String awsProfile) {
    if (StrUtil.isNotBlank(awsProfile)) {
      log.info("AWS_PROFILE detected for MSK IAM auth: {}", awsProfile);
      return "software.amazon.msk.auth.iam.IAMLoginModule required " + "awsProfileName=\"" + escapeJaasValue(awsProfile)
          + "\";";
    }

    log.info("AWS_PROFILE not found, using default AWS credentials provider chain for MSK IAM auth");
    return "software.amazon.msk.auth.iam.IAMLoginModule required;";
  }

  private static String firstNonBlank(String... values) {
    if (values == null) {
      return null;
    }
    for (String value : values) {
      if (StrUtil.isNotBlank(value)) {
        return value.trim();
      }
    }
    return null;
  }

  private static String escapeJaasValue(String value) {
    if (value == null) {
      return null;
    }
    return value.replace("\\", "\\\\").replace("\"", "\\\"");
  }

  private static String describeAuthMode(String awsProfile) {
    if (StrUtil.isNotBlank(awsProfile)) {
      return "AWS_PROFILE(" + awsProfile + ")";
    }
    return "DEFAULT_CREDENTIALS_CHAIN";
  }

}
package com.litongjava.tio.boot.admin.config;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import com.litongjava.hook.HookCan;
import com.litongjava.tio.boot.admin.kafaka.KafkaConsumerRunner;
import com.litongjava.tio.boot.admin.kafaka.KafkaProducerUtils;
import com.litongjava.tio.boot.admin.utils.AwsProfileUtils;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TioAdminKafkaMskClientConfig {

  private KafkaConsumerRunner consumerRunner;
  private Thread consumerThread;
  private KafkaProducer<String, String> producer;

  public void config() {
    String bootstrapServers = EnvUtils.get("aws.msk.bootstrap-servers");
    if (StrUtil.isBlank(bootstrapServers)) {
      log.info("aws.msk.bootstrap-servers is blank, skip MSK client initialization");
      return;
    }

    boolean enableProducer = EnvUtils.getBoolean("aws.msk.enable-producer", true);
    boolean enableConsumer = EnvUtils.getBoolean("aws.msk.enable-consumer", true);

    if (!enableProducer && !enableConsumer) {
      log.info("Both producer and consumer are disabled, skip MSK client initialization");
      return;
    }

    String producerTopicName = EnvUtils.get("aws.msk.producer.topic-name");
    String consumerTopicName = EnvUtils.get("aws.msk.consumer.topic-name");
    String groupId = EnvUtils.get("aws.msk.group-id");

    String acks = EnvUtils.get("aws.msk.acks", "all");
    int retries = EnvUtils.getInt("aws.msk.retries", 3);
    String autoOffsetReset = EnvUtils.get("aws.msk.auto-offset-reset", "earliest");
    boolean enableAutoCommit = EnvUtils.getBoolean("aws.msk.enable-auto-commit", true);

    String jaasConfig = AwsProfileUtils.jaasConfig();

    if (enableProducer) {
      initProducer(bootstrapServers, producerTopicName, acks, retries, jaasConfig);
    } else {
      log.info("Kafka producer disabled, config: aws.msk.enable-producer=false");
    }

    if (enableConsumer) {
      initConsumer(bootstrapServers, consumerTopicName, groupId, autoOffsetReset, enableAutoCommit, jaasConfig);
    } else {
      log.info("Kafka consumer disabled, config: aws.msk.enable-consumer=false");
    }

    HookCan.me().addDestroyMethod(() -> {
      try {
        log.info("Shutting down Kafka resources");

        if (consumerRunner != null) {
          consumerRunner.shutdown();
        }

        if (consumerThread != null) {
          consumerThread.interrupt();
        }

        if (producer != null) {
          producer.close();
        }

        log.info("Kafka resources shutdown completed");
      } catch (Exception e) {
        log.error("Shutdown Kafka resources failed", e);
      }
    });
  }

  private void initProducer(String bootstrapServers, String producerTopicName, String acks, int retries, String jaasConfig) {
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.ACKS_CONFIG, acks);
    producerProps.put(ProducerConfig.RETRIES_CONFIG, retries);

    producerProps.put("security.protocol", "SASL_SSL");
    producerProps.put("sasl.mechanism", "AWS_MSK_IAM");
    producerProps.put("sasl.jaas.config", jaasConfig);
    producerProps.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");

    producer = new KafkaProducer<>(producerProps);

    if (StrUtil.isNotBlank(producerTopicName)) {
      KafkaProducerUtils.init(producer, producerTopicName);
      log.info("Kafka producer initialized, default topicName:{}", producerTopicName);
    } else {
      log.info("Kafka producer initialized without default topic, dynamic topic mode");
    }
  }

  private void initConsumer(String bootstrapServers, String consumerTopicName, String groupId, String autoOffsetReset,
      boolean enableAutoCommit, String jaasConfig) {

    if (StrUtil.isBlank(consumerTopicName)) {
      log.info("aws.msk.consumer.topic-name is blank, skip Kafka consumer initialization");
      return;
    }

    if (StrUtil.isBlank(groupId)) {
      log.info("aws.msk.group-id is blank, skip Kafka consumer initialization");
      return;
    }

    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

    consumerProps.put("security.protocol", "SASL_SSL");
    consumerProps.put("sasl.mechanism", "AWS_MSK_IAM");
    consumerProps.put("sasl.jaas.config", jaasConfig);
    consumerProps.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

    consumerRunner = new KafkaConsumerRunner(consumer, consumerTopicName);
    consumerThread = new Thread(consumerRunner, "msk-consumer-thread");
    consumerThread.start();

    log.info("Kafka consumer started, groupId:{}, topicName:{}", groupId, consumerTopicName);
  }
}

八、IndexController

和你原来的 EMQX 示例一样,访问 HTTP 接口时发送一条 Kafka 消息。

package com.jobright.study.voice.agent.controller;

import com.litongjava.annotation.RequestPath;
import com.litongjava.tio.boot.admin.kafaka.KafkaProducerUtils;

@RequestPath("/kafaka/test")
public class KafakaTestController {

  @RequestPath
  public String index() {
    String content = "Hello AWS MSK";
    KafkaProducerUtils.send(content);
    return "Message sent to AWS MSK";
  }
}

如果你想传 JSON:

@RequestPath("/send")
public String send() {
  String json = "{\"event\":\"experience_verified\",\"status\":\"success\"}";
  KafkaProducerUtils.send(json);
  return "ok";
}

九、AWS 凭证怎么提供

上面的代码没有显式写 AK/SK,这是因为 aws-msk-iam-auth 会按 AWS Java 默认凭证链取凭证。实际项目里通常有这几种方式:

1. EC2 绑定 IAM Role

最推荐。

2. ECS / EKS 绑定任务角色

云上容器常用。

3. 本地开发用 AWS Profile

AWS 官方文档提到,可以在配置中附带 awsProfileName="your profile name"; 使用命名 profile。(AWS Documentation)

例如本地调试时可把 JAAS 改成:

producerProps.put("sasl.jaas.config",
    "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"default\";");

consumer 同理。


十、IAM 权限策略

IAM 模式下,客户端权限必须显式授权。AWS 官方说明了:

  • 默认拒绝,必须显式允许
  • 生产至少要有 Connect、DescribeTopic、WriteData
  • 消费至少要有 Connect、DescribeTopic、DescribeGroup、AlterGroup、ReadData

同时,IAM 身份的授权由 IAM policy 决定,Kafka ACL 对 IAM 身份不起作用。(AWS Documentation)

一个常用的最小示例:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:DescribeCluster"
      ],
      "Resource": [
        "arn:aws:kafka:us-west-2:123456789012:cluster/jrmskmvp/cluster-uuid"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:WriteData",
        "kafka-cluster:ReadData"
      ],
      "Resource": [
        "arn:aws:kafka:us-west-2:123456789012:topic/jrmskmvp/cluster-uuid/jr_experience_verified_prod"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:AlterGroup"
      ],
      "Resource": [
        "arn:aws:kafka:us-west-2:123456789012:group/jrmskmvp/cluster-uuid/jr_experience_verified_group"
      ]
    }
  ]
}

如果你的程序需要自动建 topic,还要加:

"kafka-cluster:CreateTopic"

AWS 文档也给出了 topic、group 这类 ARN 的格式。(AWS Documentation)


十一、如何获取正确的 bootstrap servers

你现在已经有:

aws.msk.bootstrap-servers=b-1....:9098,b-2....:9098

这可以直接用。但从 AWS 的推荐做法看,最好通过 MSK 控制台或 CLI 获取对应认证方式的 bootstrap brokers,并且连接串里尽量包含多个 AZ 的 broker,便于故障转移。(AWS Documentation)

CLI 方式:

aws kafka get-bootstrap-brokers \
  --cluster-arn arn:aws:kafka:us-west-2:123456789012:cluster/jrmskmvp/cluster-uuid

十二、如果要自动创建 Topic

你可以在启动时用 AdminClient 创建 topic。

package com.litongjava.kafka.admin;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;

public class KafkaAdminUtils {

  public static void createTopicIfNotExists(String bootstrapServers, String topicName) throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "AWS_MSK_IAM");
    props.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
    props.put("sasl.client.callback.handler.class",
        "software.amazon.msk.auth.iam.IAMClientCallbackHandler");

    try (AdminClient adminClient = AdminClient.create(props)) {
      NewTopic newTopic = new NewTopic(topicName, 3, (short) 2);
      adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
    }
  }
}

注意:

  • 自动建 topic 需要 IAM policy 里有 CreateTopic
  • 分区数、副本数要与你集群规模匹配

AWS 官方对创建 topic、写数据、读数据分别列出了所需权限。(AWS Documentation)


十三、完整发送示例

String content = "{\"uid\":123,\"event\":\"experience_verified\"}";
KafkaProducerUtils.send(content);

指定 topic:

KafkaProducerUtils.send("jr_experience_verified_prod",
    "{\"uid\":123,\"event\":\"experience_verified\"}");

十四、常见问题排查

1. TimeoutException / 连不上 broker

优先检查网络。

  • 应用是否和 MSK 在同一 VPC
  • 安全组是否放通
  • 路由是否可达

MSK 默认是私网接入,这类问题最常见。(AWS Documentation)

2. SaslAuthenticationException

优先检查 IAM 认证配置。

  • security.protocol=SASL_SSL
  • sasl.mechanism=AWS_MSK_IAM
  • aws-msk-iam-auth 依赖是否引入
  • 运行环境是否真有 AWS 凭证

这些配置项是 AWS 官方要求的 Java IAM 接入方式。(AWS Documentation)

3. TopicAuthorizationException / GroupAuthorizationException

大概率是 IAM policy 不完整。

AWS 官方明确说默认拒绝,需要显式授权;生产、消费、建 topic 都是不同权限。(AWS Documentation)

4. 消费不到历史消息

检查:

auto.offset.reset=earliest

并确认消费者 group 是否首次消费该 topic。


十五、和你当前配置对应的最简接入要点

基于你现在的配置:

aws.msk.experience-verified.topic-name=jr_experience_verified_prod
aws.msk.bootstrap-servers=b-1.jrmskmvp.kou1x9.c3.kafka.us-west-2.amazonaws.com:9098,b-2.jrmskmvp.kou1x9.c3.kafka.us-west-2.amazonaws.com:9098

最核心就是这 4 件事:

  1. bootstrap.servers 用你现有的地址
  2. Java 里加 aws-msk-iam-auth
  3. Producer / Consumer 都加 IAM 的 4 个配置项
  4. 给运行环境绑定能访问这个 topic 和 group 的 IAM 权限

十六、建议的项目目录

src/main/java
├── com.litongjava
│   ├── HelloApp.java
│   ├── kafka
│   │   ├── KafkaProducerUtils.java
│   │   ├── KafkaConsumerRunner.java
│   │   └── admin
│   │       └── KafkaAdminUtils.java
│   ├── kafka/config
│   │   └── MskClientConfig.java
│   └── kafka/controller
│       └── IndexController.java

十七、总结

这套接入方案的核心是:

  • tio-boot 负责应用启动和生命周期
  • Kafka Java Client 负责生产消费
  • aws-msk-iam-auth 负责 IAM 认证
  • IAM policy 负责授权
  • MSK 私网网络负责连通性
Edit this page
Last Updated: 4/2/26, 12:26 PM
Contributors: litongjava