Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
    • 开启虚拟线程(Virtual Thread)
    • 框架级错误通知
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件上传
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • Controller拦截器
    • 请求拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • 跨域
    • 添加 Controller
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • handler入门
    • 返回 multipart
    • 待定
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 使用零拷贝发送大文件
    • 分片上传
    • 接口访问统计
    • 接口请求和响应数据记录
    • WebJars
    • JProtobuf
    • 测速
    • Gzip Bomb:使用压缩炸弹防御恶意爬虫
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • Model生成器
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • 预留
    • 预留
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
    • TQL(Table SQL)前端输入规范
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 自定义注解拦截器
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • Tio-Boot 整合 Java-DB 与 Enjoy 模板引擎示例
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 独立端口启动 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • Email
    • JSON
    • File
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 配置工具
    • 系统监控
    • 线程
    • 虚拟线程
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 存储文件到 亚马逊 S3
    • 存储文件到 腾讯 COS
    • 存储文件到 阿里云 OSS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • tio-boot 整合 spring-boot-starter
    • Tio Boot 整合 Spring Boot Starter db
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_quarkus

    • Quarkus(无 HTTP)整合 tio-boot(有 HTTP)
    • tio-boot + Quarkus + Hibernate ORM Panache
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • 索引
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 全文检索
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 42_mysql

    • 使用 Docker 运行 MySQL
    • 常见问题
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • 待定
  • 49_jooq

    • 使用配置类方式整合 jOOQ
    • tio-boot + jOOQ 事务管理
    • 批量操作与性能优化
    • 代码生成(可选)与类型安全升级
    • JSONB、Upsert、窗口函数实战
    • 整合agroal
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • 待定
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
    • 获取视频长度
    • 保存视频的最后一帧
    • 添加水印
    • linux版本
  • 55_cv

    • 使用 Java 运行 YOLOv8 ONNX 模型进行目标检测
    • tio-boot整合yolo
    • ONNX Runtime 推理说明
  • 58_telegram4j

    • 数据库设计
    • 基于 HTTP 协议开发 Telegram 翻译机器人
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 59_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • 流式生成
    • 图片多模态输入
    • 协议自动转换 Google Gemini示例
    • 请求记录
    • 限流和错误处理
    • 整合Gemini realtime模型
    • Voice Agent 前端接入接口文档
    • 整合千问realtime模型
    • 增强检索(RAG)
    • 搜索+AI
    • AI 问答
    • 连接代码执行器
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 使用 OpenAI ASR 实现语音识别接口(Java 后端示例)
    • 定向搜索
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_ai-coding

    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_java-uni-ai-server

    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 待定
  • 67_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 68_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • 执行 Python (Manim) 代码
    • 待定
    • 待定
    • 待定
    • 视频下载增加水印说明文档
  • 69_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 多图片管理
    • 单图片管理(只读模式)
    • 布尔值管理
    • 字段联动
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 74_tio-mcp-server

    • 实现 MCP Server 开发指南
  • 75_tio-sip

    • SIP Server 第一版原理说明
    • SIP Server 第一版实战
    • 使用livekit-sip进行测试
    • SIP Server 第二版实战
    • SIP Server 第三版实战
    • 性能优化
    • 基于 MediaProcessor 对接 Realtime 模型说明
    • 对接大语言模型
    • 支持 G722 宽带语音
    • 一、SIP 是否只支持 8k
  • 76_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • TTS服务端
    • 废弃
    • 废弃
    • 废弃
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • ManimGL(manimgl)
    • Manim 实战入门:用代码创造数学动画
    • 欢迎
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
    • HTTP/1.1 Pipelining 性能测试报告
    • tio-boot vs Quarkus 性能对比测试报告
  • 81_tio-boot

    • 简介
    • Swagger 整合到 Tio-Boot 中的指南
    • 待定
    • 待定
    • 高性能网络编程中的 ByteBuffer 分配与回收策略
    • TioBootServerHandler 源码解析
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

对接大语言模型

背景

原来的 SIP Server 已经把通话底座全部打通了,SIP 建链、SDP 协商、RTP 收发、G711 编解码这些底层能力都已经稳定存在。系统里真正留给业务扩展的点只有一个,就是 MediaProcessor。默认实现只是把收到的音频原样返回,所以表现出来就是回声。现在要做的事情,是把这个“回声处理器”替换成“实时模型处理器”,让通话中的语音不再直接回显,而是先送进 Realtime 模型,再把模型生成的语音回送给通话对端。

目标

目标也很清楚:在不改动 SIP、SDP、RTP、Codec 这些底层通信逻辑的前提下,把现有通话链路接到 Qwen Realtime 模型上。换句话说,电话侧继续只关心 8k PCM 音频帧,模型侧继续只关心它自己的实时音频接口,而中间这层新实现负责把两边接起来,包括采样率转换、异步发送、音频缓冲和回填。这样做的价值是结构非常干净,SIP 服务器继续做它擅长的实时音频传输,模型桥接层继续做它擅长的多轮实时生成,两边通过 RealtimeMediaProcessor 耦合,职责边界很清晰。

在原有链路中插入了一个“实时语音适配层”。电话进来的音频还是按原来的方式被 RTP 解码成 8k PCM 的 AudioFrame,然后交给 RealtimeMediaProcessor。它先把 8k 音频转换成 16k,因为接入的 Realtime 模型希望收到更高采样率的输入;接着通过 RealtimeModelBridge 把音频发给模型;模型返回的下行音频则先进入缓冲区,再被重采样回 8k,最后重新包装成 AudioFrame 返回给 SIP/RTP 链路。对现有系统来说,它看到的仍然只是“输入一个 AudioFrame,输出一个 AudioFrame”,所以原有框架几乎不用动。

改造思路

RealtimeMediaProcessor 是这次改造的核心类。它实现了 MediaProcessor 接口,所以可以直接替换掉原来的 EchoMediaProcessor。它的职责不是直接“理解语音”,而是做一层适配和调度。上游给它的是 SIP 通话里的 8k 音频帧,下游它连接的是 Realtime 模型桥接对象。它在一次 process() 调用里做三件事: 第一,确保当前通话已经建立对应的实时模型会话; 第二,把当前收到的 8k 音频转成 16k,并异步送给模型; 第三,从模型下行音频缓冲区取出一帧已经准备好的 8k 音频返回给对端。如果当前模型还没返回数据,它就返回 null,表示这一时刻不发音频。 这个类本质上是“RTP 帧处理接口”和“实时模型接口”之间的总控器。

SessionState 是每一路通话对应的内部状态对象。因为 SIP 通话不是单例,可能同时存在多路呼叫,所以不能把所有状态放在 RealtimeMediaProcessor 的全局变量里混着用。这个类的意义就是把“某个 callId 的模型连接、下行音频队列、连接状态”等资源组织在一起。可以把它理解成“一路电话和一路实时模型会话的一一绑定关系”。它维护了当前通话的 RealtimeModelBridge,并且负责保存模型返回的音频数据。下行音频先进入它的缓冲队列,等到下一次 process() 被调用时,再从队列里取出对应长度的一帧回给 RTP。这样就把“模型返回的异步性”和“RTP 每 20ms 拉一帧的同步节奏”平滑地衔接起来了。

SipRealtimeBridgeCallback 的作用是把 RealtimeModelBridge 的回调结果喂回 SessionState。因为桥接层本身不是直接返回一个 AudioFrame,它是通过回调的方式通知“模型有新音频了”或“有文本事件了”。在 WebSocket 场景下,这种回调会把内容发给浏览器;而在 SIP 场景下,不需要把这些内容发到网页,而是要把模型返回的二进制音频塞进当前通话的下行缓冲里。这个类正是干这个事情的。它接收桥接层返回的文本和音频,其中音频部分会被送进 SessionState 的输出队列,供 RealtimeMediaProcessor 后续取走。也就是说,这个回调类把“模型事件流”转换成了“电话通话可消费的音频流”。

RealtimeModelBridge 不是这次新发明的类,而是现有系统里已经定义好的统一桥接接口。它的意义非常大,因为它把上层业务和底层具体模型平台解耦了。RealtimeMediaProcessor 不需要知道自己对接的是 Gemini 还是 Qwen,也不需要知道底层 SDK 的具体方法叫什么,它只知道:建立连接、发送 16k PCM、结束音频输入、发送文本、关闭连接,这些动作可以通过统一接口完成。这样一来,SIP 场景和 WebSocket 场景都可以复用同一套桥接抽象,后续要切换模型平台时,也不用改媒体处理逻辑。

RealtimeModelBridgeFactory 的作用是根据配置创建实际的桥接实现。它是一个工厂角色。因为统一接口只是抽象,真正运行时还是要落到某个具体实现上,比如 QwenOmniRealtimeBridge。工厂的价值在于把“选择哪个平台”的逻辑集中起来。RealtimeMediaProcessor 调用工厂拿到桥接对象后,就不再关心底层是百炼还是其他平台。这样后续如果需要扩展更多模型平台,也只是工厂里增加分支,而不影响媒体处理主逻辑。

RealtimeSetup 的作用是承载会话启动时的初始配置。它不是处理音频流本身的,而是告诉模型“是谁、的角色是什么、开场白是什么、有哪些业务背景”。例如系统提示词、岗位描述、简历内容、问题列表、欢迎语,这类信息都应该在建会话时一起传给模型。对于 SIP 电话场景来说,这意味着每一路电话一接通,就可以给模型建立一个带业务上下文的实时会话,而不是一个什么都不知道的裸模型连接。

QwenOmniRealtimeBridge 是真正对接 DashScope Realtime SDK 的实现类。它的职责是把统一桥接接口的调用翻译成阿里云 Qwen Realtime 所要求的协议。比如上行音频要做 Base64,输入格式是 PCM16,输出音频事件里返回的是 Base64 编码的 PCM,服务端还会抛出各种生命周期事件和转写事件。这个类负责管理 WebSocket 连接、更新 session、append 音频、处理模型回调事件,并通过统一的 RealtimeBridgeCallback 向上层交付结果。对 SIP 场景来说,它就是“模型连接器”;对 WebSocket 场景来说,它也是“模型连接器”。这说明的桥接层设计已经很通用了。

AudioResampler 的作用是做采样率转换。这个类虽然小,但它其实是整个链路能跑通的关键之一。因为电话侧是 8k,模型输入通常要 16k,模型输出又往往是 24k,如果没有重采样,就没法在两边之间直接对接。它的职责就是把一段 PCM 样本从一个采样率映射到另一个采样率。现在的实现属于轻量级线性插值方案,优点是依赖少、实现简单、足够支撑语音通话场景。后面如果对音质要求更高,也可以替换成更专业的重采样器,但在架构上它仍然扮演同样的角色。

PcmCodec 的作用是做 PCM short 数组和 little-endian 字节数组之间的转换。这个类的存在是因为 SIP 侧更习惯处理 short[] 形式的采样,而桥接层和 SDK 往往更习惯传输 byte[]。它本质上是数据表示层的转换工具,让音频在不同模块之间可以顺畅流动。它不是业务核心,但没有它,音频数据就很难在 RTP 层和模型层之间交换。

从整个调用时序来看,也可以这样理解:当一通电话进来后,系统第一次调用 RealtimeMediaProcessor.process() 时,会基于当前 callId 创建一个 SessionState,再通过工厂创建实际的 RealtimeModelBridge,并用 RealtimeSetup 去连接模型。之后每次来一帧 RTP 音频,处理器都会把它上采样后送给模型。模型返回的音频不会立刻直接“同步返回”,而是先进回调,再进入当前会话的输出队列。等下一次 RTP 处理循环到来时,处理器从队列中取出足够的一帧数据,重采样回 8k 后交给 RTP 发回去。这样就形成了一个持续运转的实时双向语音回路。

所以这次改造的本质可以概括成一句话:没有改电话系统本身,而是在 MediaProcessor 这一层,把“本地回声”升级成了“外部实时模型驱动的语音处理”。背景是已有 SIP 链路已经成熟,目标是最小侵入地接入 Realtime 模型,而各个类则分别承担了通话会话管理、模型桥接、回调接收、音频转换和缓冲调度等不同职责,最终共同把电话网络和实时大模型连接到了一起。

实战代码

实现思路是:

  • SIP/RTP 侧每次进来的是 8k / PCM16 / 20ms
  • process() 中把 8k 上采样到 16k
  • 通过现有的 RealtimeModelBridge 异步送给 Qwen Realtime
  • Qwen 下行音频通过 RealtimeBridgeCallback.sendBinary(...) 回来
  • 下行默认按 24k PCM16 处理,再重采样回 8k
  • process() 每次从缓冲队列里取一帧 8k 音频返回给 RTP
  • 如果当前还没有模型音频,返回 null

代码里我把依赖都按现有工程结构组织好了,并且没有改 RealtimeModelBridge 的接口。

AudioResampler

package com.litongjava.sip.rtp.codec;

import java.util.Arrays;
import java.util.Objects;

/**
 * 简单线性插值重采样
 * 适合当前语音链路对接,依赖少、易落地
 */
public final class AudioResampler {

  private AudioResampler() {
  }

  public static short[] resample(short[] input, int srcRate, int dstRate) {
    Objects.requireNonNull(input, "input");
    if (input.length == 0) {
      return new short[0];
    }
    if (srcRate <= 0 || dstRate <= 0) {
      throw new IllegalArgumentException("sample rate must be > 0");
    }
    if (srcRate == dstRate) {
      return Arrays.copyOf(input, input.length);
    }

    double ratio = (double) dstRate / (double) srcRate;
    int outputLength = Math.max(1, (int) Math.round(input.length * ratio));
    short[] output = new short[outputLength];

    for (int i = 0; i < outputLength; i++) {
      double srcIndex = i / ratio;
      int left = (int) Math.floor(srcIndex);
      int right = Math.min(left + 1, input.length - 1);
      double frac = srcIndex - left;

      double sample = input[left] * (1.0 - frac) + input[right] * frac;
      output[i] = clampToShort(sample);
    }
    return output;
  }

  private static short clampToShort(double v) {
    if (v > Short.MAX_VALUE) {
      return Short.MAX_VALUE;
    }
    if (v < Short.MIN_VALUE) {
      return Short.MIN_VALUE;
    }
    return (short) Math.round(v);
  }
}

PcmCodec

package com.litongjava.sip.rtp.codec;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/**
 * PCM 编解码工具
 */
public final class PcmCodec {

  private PcmCodec() {
  }

  public static byte[] shortsToLittleEndianBytes(short[] samples) {
    if (samples == null || samples.length == 0) {
      return new byte[0];
    }

    ByteBuffer buffer = ByteBuffer.allocate(samples.length * 2).order(ByteOrder.LITTLE_ENDIAN);
    for (short s : samples) {
      buffer.putShort(s);
    }
    return buffer.array();
  }

  public static short[] littleEndianBytesToShorts(byte[] bytes) {
    if (bytes == null || bytes.length < 2) {
      return new short[0];
    }

    int len = bytes.length / 2;
    short[] out = new short[len];
    ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
    for (int i = 0; i < len; i++) {
      out[i] = buffer.getShort();
    }
    return out;
  }
}

RealtimeSetupFactory

package com.litongjava.voice.agent.sip;

import com.litongjava.template.PromptEngine;
import com.litongjava.voice.agent.bridge.RealtimeSetup;

public final class RealtimeSetupFactory {

  private RealtimeSetupFactory() {

  }

  public static RealtimeSetup buildFromEnv() {
    String systemPrompt = PromptEngine.renderToString("VOICE_AGENT_SYSTEM_PROMPT");
    String userPrompt = PromptEngine.renderToString("VOICE_AGENT_USER_PROMPT");
    String jobDescription = PromptEngine.renderToString("VOICE_AGENT_JOB_DESCRIPTION");
    String resume = PromptEngine.renderToString("VOICE_AGENT_RESUME");
    String questions = PromptEngine.renderToString("VOICE_AGENT_QUESTIONS");
    String greeting = PromptEngine.renderToString("VOICE_AGENT_GREETING");

    return new RealtimeSetup(systemPrompt, userPrompt, jobDescription, resume, questions, greeting);
  }
}

RealtimeSetupCallback

package com.litongjava.voice.agent.callback;

import com.litongjava.sip.model.CallSession;
import com.litongjava.voice.agent.bridge.RealtimeSetup;

public interface RealtimeSetupCallback {

  RealtimeSetup getRealtimeSetup(CallSession session);
}

SipRealtimeSetupCallback

package com.litongjava.voice.agent.sip;

import com.litongjava.sip.model.CallSession;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;

public class SipRealtimeSetupCallback implements RealtimeSetupCallback {

  @Override
  public RealtimeSetup getRealtimeSetup(CallSession session) {
    return RealtimeSetupFactory.buildFromEnv();
  }
}

SipRealtimeSession

package com.litongjava.voice.agent.sip;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SipRealtimeSession {

  private static final int MODEL_OUTPUT_SAMPLE_RATE = 24000;
  private static final int SIP_SAMPLE_RATE = 8000;

  private final String callId;
  private final RealtimeModelBridge bridge;
  private final SipRealtimeBridgeCallback callback;
  private final RealtimeSetupCallback realtimeSetupCallback;
  private final AtomicBoolean connected = new AtomicBoolean(false);
  private final ConcurrentLinkedQueue<Short> outputQueue = new ConcurrentLinkedQueue<>();

  public SipRealtimeSession(String callId, RealtimeModelBridge bridge, SipRealtimeBridgeCallback callback,
      RealtimeSetupCallback realtimeSetupCallback) {
    this.callId = callId;
    this.bridge = bridge;
    this.callback = callback;
    this.realtimeSetupCallback = realtimeSetupCallback;
  }

  public void ensureConnected(CallSession session) {
    if (connected.compareAndSet(false, true)) {
      RealtimeSetup setup = null;
      if (realtimeSetupCallback != null) {
        setup = realtimeSetupCallback.getRealtimeSetup(null);
      }
      callback.start(setup);
      bridge.connect(setup).exceptionally(ex -> {
        log.error("bridge connect failed, callId={}", callId, ex);
        connected.set(false);
        return null;
      });
    }
  }

  public void sendToModel(byte[] pcm16kBytes) {
    if (pcm16kBytes == null || pcm16kBytes.length == 0) {
      return;
    }

    bridge.sendPcm16k(pcm16kBytes).exceptionally(ex -> {
      log.warn("sendPcm16k failed, callId={}", callId, ex);
      return null;
    });
  }

  public void appendModelAudio(byte[] pcmBytes) {
    if (pcmBytes == null || pcmBytes.length == 0) {
      return;
    }

    short[] pcm24k = PcmCodec.littleEndianBytesToShorts(pcmBytes);
    if (pcm24k.length == 0) {
      return;
    }

    short[] pcm8k = AudioResampler.resample(pcm24k, MODEL_OUTPUT_SAMPLE_RATE, SIP_SAMPLE_RATE);
    for (short sample : pcm8k) {
      outputQueue.offer(sample);
    }
  }

  public short[] takeOutputFrame(int frameSamples) {
    if (frameSamples <= 0) {
      return null;
    }

    if (outputQueue.peek() == null) {
      return null;
    }

    short[] out = new short[frameSamples];
    int i = 0;
    for (; i < frameSamples; i++) {
      Short value = outputQueue.poll();
      if (value == null) {
        break;
      }
      out[i] = value;
    }

    if (i < frameSamples) {
      Arrays.fill(out, i, frameSamples, (short) 0);
    }

    return out;
  }

  public void close() {
    try {
      bridge.close().getNow(null);
    } catch (Exception e) {
      log.warn("bridge.close failed, callId={}", callId, e);
    } finally {
      outputQueue.clear();
      connected.set(false);
    }
  }

  public String getCallId() {
    return callId;
  }
}

SipSessionRegistry

package com.litongjava.voice.agent.sip;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

public class SipSessionRegistry {

  private final Map<String, SipRealtimeSession> sessions = new ConcurrentHashMap<>();

  public SipRealtimeSession getOrCreate(String callId, Function<String, SipRealtimeSession> creator) {
    return sessions.computeIfAbsent(callId, creator);
  }

  public SipRealtimeSession get(String callId) {
    return sessions.get(callId);
  }

  public void remove(String callId) {
    SipRealtimeSession session = sessions.remove(callId);
    if (session != null) {
      session.close();
    }
  }

  public void clear() {
    for (SipRealtimeSession session : sessions.values()) {
      if (session != null) {
        session.close();
      }
    }
    sessions.clear();
  }
}

SipRealtimeBridgeCallback

package com.litongjava.voice.agent.sip;

import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.voice.agent.bridge.RealtimeBridgeCallback;
import com.litongjava.voice.agent.bridge.RealtimeSetup;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SipRealtimeBridgeCallback implements RealtimeBridgeCallback {

  private final String callId;
  private volatile SipRealtimeSession sipSession;

  public SipRealtimeBridgeCallback(String callId) {
    this.callId = callId;
  }

  public void bind(SipRealtimeSession sipSession) {
    this.sipSession = sipSession;
  }

  @Override
  public void start(RealtimeSetup setup) {
    log.info("realtime callback start, callId={}", callId);
  }

  @Override
  public void sendText(String text) {
    if (StrUtil.isNotBlank(text)) {
      log.debug("realtime text event, callId={}, text={}", callId, text);
    }
  }

  @Override
  public void sendBinary(byte[] bytes) {
    SipRealtimeSession session = this.sipSession;
    if (session == null || bytes == null || bytes.length == 0) {
      return;
    }
    session.appendModelAudio(bytes);
  }

  @Override
  public void close(String reason) {
    log.info("realtime callback close, callId={}, reason={}", callId, reason);
  }

  @Override
  public void session(String sessionId) {
    // TODO Auto-generated method stub
    
  }

  @Override
  public void turnComplete(String role, String text) {
    // TODO Auto-generated method stub
    
  }
}

RealtimeMediaProcessor

package com.litongjava.voice.agent.sip;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeModelBridgeFactory;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RealtimeMediaProcessor implements MediaProcessor {

  private final String platform;
  private final RealtimeSetupCallback realtimeSetupCallback;
  private final SipSessionRegistry sessionRegistry;

  public RealtimeMediaProcessor(RealtimeSetupCallback realtimeSetupCallback) {
    this(EnvUtils.getStr("vioce.agent.platform"), realtimeSetupCallback, new SipSessionRegistry());
  }

  public RealtimeMediaProcessor(String platform, RealtimeSetupCallback realtimeSetupCallback,
      SipSessionRegistry sessionRegistry) {
    this.platform = platform;
    this.realtimeSetupCallback = realtimeSetupCallback;
    this.sessionRegistry = sessionRegistry;
  }

  @Override
  public AudioFrame process(AudioFrame input, CallSession session) {
    if (input == null || session == null) {
      return null;
    }

    String callId = getCallId(session);
    if (StrUtil.isBlank(callId)) {
      log.warn("callId is blank, skip processing");
      return null;
    }

    SipRealtimeSession sipSession = sessionRegistry.getOrCreate(callId, this::createSipSession);

    try {
      sipSession.ensureConnected(session);
      short[] in8k = input.getSamples();
      if (in8k == null || in8k.length == 0) {
        return null;
      }

      short[] pcm16k = AudioResampler.resample(in8k, 8000, 16000);
      byte[] pcm16kBytes = PcmCodec.shortsToLittleEndianBytes(pcm16k);

      sipSession.sendToModel(pcm16kBytes);

      short[] out8k = sipSession.takeOutputFrame(in8k.length);
      if (out8k == null) {
        return null;
      }

      return new AudioFrame(out8k, 8000, 1, input.getRtpTimestamp());
    } catch (Exception e) {
      log.error("process failed, callId={}", callId, e);
      return null;
    }
  }

  public void close(CallSession session) {
    if (session == null) {
      return;
    }
    closeByCallId(getCallId(session));
  }

  public void closeByCallId(String callId) {
    if (StrUtil.isBlank(callId)) {
      return;
    }
    sessionRegistry.remove(callId);
  }

  public void closeAll() {
    sessionRegistry.clear();
  }

  private SipRealtimeSession createSipSession(String callId) {
    SipRealtimeBridgeCallback callback = new SipRealtimeBridgeCallback(callId);
    RealtimeModelBridge bridge = RealtimeModelBridgeFactory.createBridge(platform, callback);
    SipRealtimeSession sipSession = new SipRealtimeSession(callId, bridge, callback, realtimeSetupCallback);
    callback.bind(sipSession);
    log.info("created realtime sip session, callId={}", callId);
    return sipSession;
  }

  private String getCallId(CallSession session) {
    try {
      String callId = session.getCallId();
      return callId == null ? null : callId.trim();
    } catch (Exception e) {
      log.warn("failed to get callId from CallSession", e);
      return null;
    }
  }
}

SipServerConfig

现在在 SipServerConfig 里,把这段:

MediaProcessor echoMediaProcessor = new EchoMediaProcessor();

替换成:

MediaProcessor mediaProcessor = new RealtimeMediaProcessor(sipRealtimeSetupCallback);

再把后面的 echoMediaProcessor 改成 mediaProcessor 即可:

SipTcpServerHandler tcpHandler = new SipTcpServerHandler(
    localIp, sessionManager, rtpServerManager, mediaProcessor);

SipUdpServerHandler udpHandler = new SipUdpServerHandler(
    localIp, sessionManager, rtpServerManager, mediaProcessor);
Edit this page
Last Updated: 3/9/26, 3:43 PM
Contributors: litongjava
Prev
基于 MediaProcessor 对接 Realtime 模型说明
Next
支持 G722 宽带语音