Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
    • 开启虚拟线程(Virtual Thread)
    • 框架级错误通知
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件上传
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • Controller拦截器
    • 请求拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • 跨域
    • 添加 Controller
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • handler入门
    • 返回 multipart
    • 待定
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 使用零拷贝发送大文件
    • 分片上传
    • 接口访问统计
    • 接口请求和响应数据记录
    • WebJars
    • JProtobuf
    • 测速
    • Gzip Bomb:使用压缩炸弹防御恶意爬虫
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • Model生成器
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • 预留
    • 预留
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
    • TQL(Table SQL)前端输入规范
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 自定义注解拦截器
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • Tio-Boot 整合 Java-DB 与 Enjoy 模板引擎示例
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 独立端口启动 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • Email
    • JSON
    • File
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 配置工具
    • 系统监控
    • 线程
    • 虚拟线程
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 存储文件到 亚马逊 S3
    • 存储文件到 腾讯 COS
    • 存储文件到 阿里云 OSS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • tio-boot 整合 spring-boot-starter
    • Tio Boot 整合 Spring Boot Starter db
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_quarkus

    • Quarkus(无 HTTP)整合 tio-boot(有 HTTP)
    • tio-boot + Quarkus + Hibernate ORM Panache
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • 索引
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 全文检索
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 42_mysql

    • 使用 Docker 运行 MySQL
    • 常见问题
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • 待定
  • 49_jooq

    • 使用配置类方式整合 jOOQ
    • tio-boot + jOOQ 事务管理
    • 批量操作与性能优化
    • 代码生成(可选)与类型安全升级
    • JSONB、Upsert、窗口函数实战
    • 整合agroal
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • 待定
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
    • 获取视频长度
    • 保存视频的最后一帧
    • 添加水印
    • linux版本
  • 55_cv

    • 使用 Java 运行 YOLOv8 ONNX 模型进行目标检测
    • tio-boot整合yolo
    • ONNX Runtime 推理说明
  • 58_telegram4j

    • 数据库设计
    • 基于 HTTP 协议开发 Telegram 翻译机器人
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 59_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • 流式生成
    • 图片多模态输入
    • 协议自动转换 Google Gemini示例
    • 请求记录
    • 限流和错误处理
    • 整合Gemini realtime模型
    • Voice Agent 前端接入接口文档
    • 整合千问realtime模型
    • 增强检索(RAG)
    • 搜索+AI
    • AI 问答
    • 连接代码执行器
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 使用 OpenAI ASR 实现语音识别接口(Java 后端示例)
    • 定向搜索
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_ai-coding

    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_java-uni-ai-server

    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 待定
  • 67_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 68_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • 执行 Python (Manim) 代码
    • 待定
    • 待定
    • 待定
    • 视频下载增加水印说明文档
  • 69_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 多图片管理
    • 单图片管理(只读模式)
    • 布尔值管理
    • 字段联动
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 74_tio-mcp-server

    • 实现 MCP Server 开发指南
  • 75_tio-sip

    • SIP Server 第一版原理说明
    • SIP Server 第一版实战
    • 使用livekit-sip进行测试
    • SIP Server 第二版实战
  • 76_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • TTS服务端
    • 废弃
    • 废弃
    • 废弃
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • ManimGL(manimgl)
    • Manim 实战入门:用代码创造数学动画
    • 欢迎
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
    • HTTP/1.1 Pipelining 性能测试报告
    • tio-boot vs Quarkus 性能对比测试报告
  • 81_tio-boot

    • 简介
    • Swagger 整合到 Tio-Boot 中的指南
    • 待定
    • 待定
    • 高性能网络编程中的 ByteBuffer 分配与回收策略
    • TioBootServerHandler 源码解析
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

SIP Server 第一版实战

1. 文档目标

本文介绍如何使用 tio-core 框架实现一个最小可运行的 SIP Echo 服务。 这个服务的目标不是做一个完整的 SIP 平台,而是先打通一条最关键的语音链路:

  1. 服务端同时监听 SIP TCP 5060 和 SIP UDP 5060
  2. 终端发起 INVITE
  3. 服务端解析 SIP 和 SDP
  4. 服务端动态分配一个 RTP 端口
  5. 服务端返回 200 OK,并在 SDP 中告诉对端媒体端口
  6. 对端向该 RTP 端口发送语音
  7. 服务端收到 RTP 后原样回发
  8. 对端听到自己的回声
  9. 对端挂断,服务端处理 BYE,释放 RTP 资源

如果这条链路跑通,就说明三件事都成立:

  • tio-core 可以承载 SIP 信令和rtp数据
  • Java 可以动态管理 RTP 端口
  • SIP/SDP/RTP 三层配合是正确的

2. 先理解整体架构

在这个系统里,可以把功能拆成三层:

2.1 信令层:SIP

负责:

  • 建立通话
  • 确认通话
  • 结束通话
  • 携带 SDP 进行媒体协商

这里的核心协议消息有:

  • INVITE
  • 100 Trying
  • 200 OK
  • ACK
  • BYE

2.2 协商层:SDP

负责告诉对端:

  • 语音该发到哪个 IP
  • 语音该发到哪个端口
  • 当前用什么编码
  • 每包多长时间

在本项目里,SDP 主要出现在:

  • INVITE 的消息体中,作为对端的 offer
  • 200 OK 的消息体中,作为服务端的 answer

2.3 媒体层:RTP

负责真正承载语音数据。

第一版里,RTP 服务只做一件事:

  • 收到一个 RTP 包
  • 原样发回去

这样就形成 echo。


3. 为什么选择 tio-core

tio-core 适合这个场景,原因有三个:

3.1 同时支持 TCP 与 UDP

SIP 常见传输方式本身就包括:

  • TCP
  • UDP

而 RTP 通常又是 UDP。 tio-core 同时提供了:

  • TCP server 能力
  • UDP server 能力

所以一个框架就能覆盖 SIP 和 RTP 这两类网络需求。


3.2 接口清晰

你会用到两个核心接口:

TCP

  • ServerAioHandler

负责:

  • decode
  • encode
  • handler

UDP

  • UdpHandler

负责:

  • 收到 UDP 数据后的处理

这对于协议型服务很方便,因为你可以很自然地把:

  • 解帧
  • 解析
  • 业务处理
  • 响应编码

分开组织。


3.3 易于逐步演进

这个项目不是“一开始就做复杂系统”,而是从最小功能逐步扩展:

  • 第一阶段:SIP echo 跑通
  • 第二阶段:补 session 生命周期
  • 第三阶段:补真正 SDP 协商
  • 第四阶段:RTP 解码重组
  • 第五阶段:接 ASR / TTS / LLM

tio-core 足够轻,适合这种逐步搭建。


4. 项目结构说明

工程结构如下:

com.litongjava.sip
├── client
│   ├── RtpUdpEchoClient.java
│   ├── SipTcpClient.java
├── model
│   ├── CallSession.java
│   ├── SipMessage.java
│   ├── SipRequest.java
│   ├── SipResponse.java
├── parser
│   ├── SipMessageEncoder.java
│   ├── SipMessageParser.java
│   ├── SipTcpFrameDecoder.java
├── rtp
│   ├── RtpEchoUdpHandler.java
│   ├── RtpPortAllocator.java
│   ├── RtpServerManager.java
│   ├── RtpUdpServer.java
├── server
│   ├── handler
│   │   ├── SipInviteOnlyTcpHandler.java
│   │   ├── SipInviteOnlyUdpHandler.java
│   ├── packet
│   │   ├── SipPacket.java
│   ├── session
│   │   ├── CallSessionManager.java

这个结构非常适合作为第一版落地结构,职责边界已经比较清晰了。

下面逐层解释。


5. model 层:协议对象和会话对象

5.1 SipMessage

SipMessage 是 SIP 消息的抽象父类。 它负责承载:

  • headers
  • body

它不关心这是请求还是响应。

这样做的好处是: parser 先把通用部分装进去,再根据起始行决定它是 SipRequest 还是 SipResponse。


package com.litongjava.sip.model;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public abstract class SipMessage {

  private final Map<String, List<String>> headers = new LinkedHashMap<>();
  private byte[] body;

  public Map<String, List<String>> getHeaders() {
    return headers;
  }

  public void addHeader(String name, String value) {
    headers.computeIfAbsent(name, k -> new ArrayList<>()).add(value);
  }

  public String getHeader(String name) {
    for (Map.Entry<String, List<String>> e : headers.entrySet()) {
      if (e.getKey().equalsIgnoreCase(name)) {
        List<String> vals = e.getValue();
        return vals == null || vals.isEmpty() ? null : vals.get(0);
      }
    }
    return null;
  }

  public List<String> getHeaders(String name) {
    for (Map.Entry<String, List<String>> e : headers.entrySet()) {
      if (e.getKey().equalsIgnoreCase(name)) {
        return e.getValue();
      }
    }
    return List.of();
  }

  public byte[] getBody() {
    return body;
  }

  public void setBody(byte[] body) {
    this.body = body;
  }

  public int contentLength() {
    return body == null ? 0 : body.length;
  }
}

5.2 SipRequest

表示 SIP 请求。 它比 SipMessage 多出这些字段:

  • method
  • requestUri
  • version

例如一条:

INVITE sip:1001@192.168.3.219:5060 SIP/2.0

就会被解析成一个 SipRequest。


package com.litongjava.sip.model;

public class SipRequest extends SipMessage {

  private String method;
  private String requestUri;
  private String version;

  public String getMethod() {
    return method;
  }

  public void setMethod(String method) {
    this.method = method;
  }

  public String getRequestUri() {
    return requestUri;
  }

  public void setRequestUri(String requestUri) {
    this.requestUri = requestUri;
  }

  public String getVersion() {
    return version;
  }

  public void setVersion(String version) {
    this.version = version;
  }
}

5.3 SipResponse

表示 SIP 响应。 它比 SipMessage 多出这些字段:

  • version
  • statusCode
  • reasonPhrase

例如:

SIP/2.0 200 OK

会被解析成一个 SipResponse。

package com.litongjava.sip.model;

public class SipRequest extends SipMessage {

  private String method;
  private String requestUri;
  private String version;

  public String getMethod() {
    return method;
  }

  public void setMethod(String method) {
    this.method = method;
  }

  public String getRequestUri() {
    return requestUri;
  }

  public void setRequestUri(String requestUri) {
    this.requestUri = requestUri;
  }

  public String getVersion() {
    return version;
  }

  public void setVersion(String version) {
    this.version = version;
  }
}

5.4 CallSession

CallSession 是一次呼叫会话的数据载体。 第一版里,它主要保存:

  • Call-ID
  • From tag
  • To tag
  • 传输方式 TCP/UDP
  • SIP 对端 IP/端口
  • RTP 对端 IP/端口
  • 本地 RTP 端口
  • 是否收到了 ACK
  • 最近一次 200 OK
  • 对应的 RTP server 实例

为什么需要它?

因为在 SIP Echo 场景里,信令和媒体不是一回事:

  • SIP 用来建立会话
  • RTP 用来传语音

没有 CallSession,就没法把:

  • 这次 INVITE
  • 对应的 RTP 端口
  • 未来的 ACK / BYE

串起来。

package com.litongjava.sip.model;

import com.litongjava.sip.rtp.RtpUdpServer;

public class CallSession {

  private String callId;
  private String fromTag;
  private String toTag;

  private String transport; // TCP / UDP

  private String remoteSipIp;
  private int remoteSipPort;

  private String remoteRtpIp;
  private int remoteRtpPort;
  private int localRtpPort;

  private long createdTime;
  private long updatedTime;
  private long ackDeadline;

  private boolean ackReceived;
  private boolean terminated;

  private String last200Ok;

  private RtpUdpServer rtpServer;

  public String getCallId() {
    return callId;
  }

  public void setCallId(String callId) {
    this.callId = callId;
  }

  public String getFromTag() {
    return fromTag;
  }

  public void setFromTag(String fromTag) {
    this.fromTag = fromTag;
  }

  public String getToTag() {
    return toTag;
  }

  public void setToTag(String toTag) {
    this.toTag = toTag;
  }

  public String getTransport() {
    return transport;
  }

  public void setTransport(String transport) {
    this.transport = transport;
  }

  public String getRemoteSipIp() {
    return remoteSipIp;
  }

  public void setRemoteSipIp(String remoteSipIp) {
    this.remoteSipIp = remoteSipIp;
  }

  public int getRemoteSipPort() {
    return remoteSipPort;
  }

  public void setRemoteSipPort(int remoteSipPort) {
    this.remoteSipPort = remoteSipPort;
  }

  public String getRemoteRtpIp() {
    return remoteRtpIp;
  }

  public void setRemoteRtpIp(String remoteRtpIp) {
    this.remoteRtpIp = remoteRtpIp;
  }

  public int getRemoteRtpPort() {
    return remoteRtpPort;
  }

  public void setRemoteRtpPort(int remoteRtpPort) {
    this.remoteRtpPort = remoteRtpPort;
  }

  public int getLocalRtpPort() {
    return localRtpPort;
  }

  public void setLocalRtpPort(int localRtpPort) {
    this.localRtpPort = localRtpPort;
  }

  public long getCreatedTime() {
    return createdTime;
  }

  public void setCreatedTime(long createdTime) {
    this.createdTime = createdTime;
  }

  public long getUpdatedTime() {
    return updatedTime;
  }

  public void setUpdatedTime(long updatedTime) {
    this.updatedTime = updatedTime;
  }

  public long getAckDeadline() {
    return ackDeadline;
  }

  public void setAckDeadline(long ackDeadline) {
    this.ackDeadline = ackDeadline;
  }

  public boolean isAckReceived() {
    return ackReceived;
  }

  public void setAckReceived(boolean ackReceived) {
    this.ackReceived = ackReceived;
  }

  public boolean isTerminated() {
    return terminated;
  }

  public void setTerminated(boolean terminated) {
    this.terminated = terminated;
  }

  public String getLast200Ok() {
    return last200Ok;
  }

  public void setLast200Ok(String last200Ok) {
    this.last200Ok = last200Ok;
  }

  public RtpUdpServer getRtpServer() {
    return rtpServer;
  }

  public void setRtpServer(RtpUdpServer rtpServer) {
    this.rtpServer = rtpServer;
  }
}

6. parser 层:把 TCP 字节流和 SIP 文本处理扎实

这一层是第一版最重要的基础设施之一。


6.1 为什么 TCP 不能直接“读多少算一个包”

因为 SIP over TCP 本质上是字节流,而不是 datagram。

也就是说:

  • 一次 read 可能只读到半个 SIP 报文
  • 一次 read 也可能读到两个 SIP 报文
  • 如果简单按“本次拿到多少字节就是一个消息”,INVITE 带 SDP 时几乎迟早出问题

所以 TCP 模式下必须先做 frame decode。


6.2 SipTcpFrameDecoder 的职责

SipTcpFrameDecoder 只负责一件事:

从 TCP 流中切出完整的 SIP 消息。

它的规则是:

  1. 先找 \r\n\r\n
  2. 这表示 SIP header 结束
  3. 再从 header 中读取 Content-Length
  4. 判断 body 是否已经收满
  5. 如果完整,就切出一条消息
  6. 如果还不完整,就继续累积等待

这里解决的是“边界问题”,而不是“协议语义问题”。

这是非常关键的分层。

package com.litongjava.sip.parser;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import com.litongjava.tio.core.ChannelContext;

public class SipTcpFrameDecoder {

  private static final String ATTR_ACC = "sip_tcp_acc_buf";
  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
  private static final int MAX_SIP_MESSAGE_SIZE = 1024 * 1024; // 1MB

  public byte[] decode(ByteBuffer buffer, int readableLength, ChannelContext ctx) {
    if (readableLength <= 0) {
      return null;
    }

    ByteBuffer acc = (ByteBuffer) ctx.getAttribute(ATTR_ACC);
    if (acc == null) {
      acc = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
      ctx.setAttribute(ATTR_ACC, acc);
    }

    byte[] chunk = new byte[readableLength];
    buffer.get(chunk);

    acc = ensureCapacity(acc, chunk.length, ctx);
    acc.put(chunk);

    acc.flip();
    int frameLen = tryParseOneFrameLength(acc);
    if (frameLen <= 0) {
      acc.compact();
      return null;
    }

    byte[] frame = new byte[frameLen];
    acc.get(frame);
    acc.compact();
    return frame;
  }

  private ByteBuffer ensureCapacity(ByteBuffer acc, int incoming, ChannelContext ctx) {
    if (acc.remaining() >= incoming) {
      return acc;
    }

    int needed = acc.position() + incoming;
    int newCap = acc.capacity();

    while (newCap < needed) {
      newCap = newCap * 2;
      if (newCap > MAX_SIP_MESSAGE_SIZE) {
        throw new IllegalStateException("SIP accumulate buffer too large: " + newCap);
      }
    }

    ByteBuffer bigger = ByteBuffer.allocate(newCap);
    acc.flip();
    bigger.put(acc);
    ctx.setAttribute(ATTR_ACC, bigger);
    return bigger;
  }

  private int tryParseOneFrameLength(ByteBuffer acc) {
    int start = acc.position();
    int limit = acc.limit();

    if ((limit - start) > MAX_SIP_MESSAGE_SIZE) {
      throw new IllegalStateException("SIP message too large");
    }

    int headerEnd = indexOf(acc, "\r\n\r\n".getBytes(StandardCharsets.US_ASCII), start, limit);
    if (headerEnd < 0) {
      return -1;
    }

    int headerBlockEnd = headerEnd + 4;
    String head = sliceToString(acc, start, headerBlockEnd);

    int contentLength = parseContentLength(head);
    if (contentLength < 0) {
      throw new IllegalStateException("Invalid Content-Length");
    }

    int totalLength = (headerBlockEnd - start) + contentLength;
    if ((limit - start) < totalLength) {
      return -1;
    }

    return totalLength;
  }

  private int parseContentLength(String headers) {
    String[] lines = headers.split("\r\n");
    for (String line : lines) {
      int idx = line.indexOf(':');
      if (idx <= 0) {
        continue;
      }

      String name = line.substring(0, idx).trim();
      String value = line.substring(idx + 1).trim();

      if ("Content-Length".equalsIgnoreCase(name) || "l".equalsIgnoreCase(name)) {
        try {
          return Integer.parseInt(value);
        } catch (NumberFormatException e) {
          return -1;
        }
      }
    }
    return 0;
  }

  private int indexOf(ByteBuffer buf, byte[] pat, int from, int to) {
    for (int i = from; i <= to - pat.length; i++) {
      boolean ok = true;
      for (int j = 0; j < pat.length; j++) {
        if (buf.get(i + j) != pat[j]) {
          ok = false;
          break;
        }
      }
      if (ok) {
        return i;
      }
    }
    return -1;
  }

  private String sliceToString(ByteBuffer buf, int from, int to) {
    byte[] bytes = new byte[to - from];
    for (int i = 0; i < bytes.length; i++) {
      bytes[i] = buf.get(from + i);
    }
    return new String(bytes, StandardCharsets.US_ASCII);
  }
}

6.3 SipMessageParser 的职责

SipMessageParser 处理的是:

一整条完整 SIP 文本,如何解析成对象。

它要做的事情包括:

  • 识别 request line 还是 status line
  • 解析 headers
  • 支持 compact header
  • 提取 body
  • 返回 SipRequest 或 SipResponse

例如:

  • INVITE ... SIP/2.0 -> SipRequest
  • SIP/2.0 200 OK -> SipResponse
package com.litongjava.sip.parser;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;

public class SipMessageParser {

  private static final Map<String, String> COMPACT_HEADERS = new HashMap<>();

  static {
    COMPACT_HEADERS.put("v", "Via");
    COMPACT_HEADERS.put("f", "From");
    COMPACT_HEADERS.put("t", "To");
    COMPACT_HEADERS.put("i", "Call-ID");
    COMPACT_HEADERS.put("l", "Content-Length");
    COMPACT_HEADERS.put("c", "Content-Type");
    COMPACT_HEADERS.put("m", "Contact");
  }

  public SipMessage parse(byte[] bytes) {
    if (bytes == null || bytes.length == 0) {
      throw new IllegalArgumentException("empty sip message");
    }

    int headerEnd = indexOf(bytes, "\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
    if (headerEnd < 0) {
      throw new IllegalArgumentException("invalid sip message, no header terminator");
    }

    int headerBlockEnd = headerEnd + 4;

    String headerText = new String(bytes, 0, headerBlockEnd, StandardCharsets.US_ASCII);
    String[] lines = headerText.split("\r\n");

    if (lines.length == 0) {
      throw new IllegalArgumentException("invalid sip start line");
    }

    String startLine = lines[0];
    SipMessage message = parseStartLine(startLine);

    for (int i = 1; i < lines.length; i++) {
      String line = lines[i];
      if (line == null || line.isEmpty()) {
        continue;
      }

      int idx = line.indexOf(':');
      if (idx <= 0) {
        continue;
      }

      String name = line.substring(0, idx).trim();
      String value = line.substring(idx + 1).trim();
      name = normalizeHeaderName(name);

      message.addHeader(name, value);
    }

    int contentLength = parseContentLength(message);
    if (contentLength > 0) {
      if (bytes.length < headerBlockEnd + contentLength) {
        throw new IllegalArgumentException("sip body not complete");
      }

      byte[] body = new byte[contentLength];
      System.arraycopy(bytes, headerBlockEnd, body, 0, contentLength);
      message.setBody(body);
    } else {
      message.setBody(new byte[0]);
    }

    return message;
  }

  private SipMessage parseStartLine(String startLine) {
    if (startLine.startsWith("SIP/2.0")) {
      String[] parts = startLine.split(" ", 3);
      if (parts.length < 3) {
        throw new IllegalArgumentException("invalid sip response line: " + startLine);
      }

      SipResponse resp = new SipResponse();
      resp.setVersion(parts[0]);
      resp.setStatusCode(Integer.parseInt(parts[1]));
      resp.setReasonPhrase(parts[2]);
      return resp;
    } else {
      String[] parts = startLine.split(" ", 3);
      if (parts.length < 3) {
        throw new IllegalArgumentException("invalid sip request line: " + startLine);
      }

      SipRequest req = new SipRequest();
      req.setMethod(parts[0]);
      req.setRequestUri(parts[1]);
      req.setVersion(parts[2]);
      return req;
    }
  }

  private String normalizeHeaderName(String name) {
    String compact = COMPACT_HEADERS.get(name);
    return compact != null ? compact : name;
  }

  private int parseContentLength(SipMessage message) {
    String v = message.getHeader("Content-Length");
    if (v == null || v.isEmpty()) {
      return 0;
    }
    return Integer.parseInt(v.trim());
  }

  private int indexOf(byte[] src, byte[] pat) {
    for (int i = 0; i <= src.length - pat.length; i++) {
      boolean ok = true;
      for (int j = 0; j < pat.length; j++) {
        if (src[i + j] != pat[j]) {
          ok = false;
          break;
        }
      }
      if (ok) {
        return i;
      }
    }
    return -1;
  }
}

6.4 SipMessageEncoder 的职责

SipMessageEncoder 与 parser 相反,负责把对象重新编码成标准 SIP 文本。

主要用于服务端发响应时:

  • 组装状态行
  • 输出 headers
  • 自动补 Content-Length
  • 拼接 body

它的存在让 handler 不必到处拼字符串。

package com.litongjava.sip.parser;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import com.litongjava.sip.model.SipResponse;

public class SipMessageEncoder {

  public byte[] encodeResponse(SipResponse response) {
    StringBuilder sb = new StringBuilder();

    sb.append(response.getVersion()).append(' ').append(response.getStatusCode()).append(' ')
        .append(response.getReasonPhrase()).append("\r\n");

    for (Map.Entry<String, List<String>> e : response.getHeaders().entrySet()) {
      String name = e.getKey();
      List<String> values = e.getValue();
      if (values == null || values.isEmpty()) {
        continue;
      }
      for (String value : values) {
        sb.append(name).append(": ").append(value).append("\r\n");
      }
    }

    byte[] body = response.getBody();
    if (body == null) {
      body = new byte[0];
    }

    if (response.getHeader("Content-Length") == null) {
      sb.append("Content-Length: ").append(body.length).append("\r\n");
    }

    sb.append("\r\n");

    byte[] head = sb.toString().getBytes(StandardCharsets.US_ASCII);

    byte[] all = new byte[head.length + body.length];
    System.arraycopy(head, 0, all, 0, head.length);
    if (body.length > 0) {
      System.arraycopy(body, 0, all, head.length, body.length);
    }
    return all;
  }
}

7. rtp 层:动态端口与 echo 服务

这一层是“媒体能不能打通”的关键。


7.1 RtpPortAllocator

这个类负责在指定端口范围内分配 RTP 端口。

典型范围是:

  • 30000-40000

它的作用是:

  • 给每次通话找一个可用端口
  • 避免多个呼叫冲突
  • 挂断后回收端口

在 SIP 场景里,通常不是监听整个范围,而是:

  • 每通电话分配一个端口
  • 只监听这个端口
package com.litongjava.sip.rtp;

import java.io.IOException;
import java.net.DatagramSocket;
import java.util.BitSet;

public class RtpPortAllocator {

  private final int start;
  private final int end;

  private final BitSet used;

  public RtpPortAllocator() {
    this(30000, 40000);
  }

  public RtpPortAllocator(int start, int end) {
    this.start = start;
    this.end = end;
    this.used = new BitSet(end - start + 1);
  }

  public synchronized int allocate() {
    for (int p = start; p <= end; p++) {
      int idx = p - start;

      if (used.get(idx)) {
        continue;
      }

      if (canBind(p)) {
        used.set(idx, true);
        return p;
      }
    }

    throw new IllegalStateException("No available RTP port in range " + start + "-" + end);
  }

  public synchronized void release(int port) {
    if (port < start || port > end) {
      return;
    }
    used.clear(port - start);
  }

  private boolean canBind(int port) {
    try (DatagramSocket ignored = new DatagramSocket(port)) {
      ignored.setReuseAddress(false);
      return true;
    } catch (IOException e) {
      return false;
    }
  }
}

7.2 RtpUdpServer

这个类负责:

  • 持有一个具体的 UDP 监听实例
  • 在某个实际 RTP 端口上启动 UDP 服务
  • 在结束时关闭监听

可以理解为“一次会话对应的 RTP 端口服务实例”。

package com.litongjava.sip.rtp;

import java.net.SocketException;

import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;

public class RtpUdpServer {
  private final int port;
  private UdpServer udpServer;

  public RtpUdpServer(int port) {
    this.port = port;
  }

  public void start() throws SocketException {
    UdpServerConf conf = new UdpServerConf(port, new RtpEchoUdpHandler(), 5000);
    this.udpServer = new UdpServer(conf);
    this.udpServer.start();
  }

  public void stop() {
    if (udpServer != null) {
      udpServer.stop();
    }
  }

  public int port() {
    return port;
  }
}

7.3 RtpEchoUdpHandler

这是第一版媒体逻辑的核心。

职责非常简单:

  • 收到 UDP 包
  • 取出原始数据
  • 按原 remote 地址和端口回发

这就是 echo 的来源。

注意: 第一版只是原包回发,不是音频解码后再编码。 所以它验证的是“链路”而不是“媒体处理”。

package com.litongjava.sip.rtp;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;

import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RtpEchoUdpHandler implements UdpHandler {
  @Override
  public void handler(UdpPacket udpPacket, DatagramSocket datagramSocket) {
    byte[] data = udpPacket.getData();
    Node remote = udpPacket.getRemote();

    // 先做最简单:原包回显(用于验证链路)
    InetSocketAddress address = new InetSocketAddress(remote.getIp(), remote.getPort());
    DatagramPacket resp = new DatagramPacket(data, data.length, address);
    try {
      datagramSocket.send(resp);
    } catch (Exception e) {
      log.error(e.getMessage(), e);
    }
  }
}

7.4 RtpServerManager

这个类负责统一管理 RTP 资源。

它向上提供的是更高层的能力:

  • 为某个 session 分配端口并启动 RTP server
  • 停止 RTP server 并回收端口

这样 SIP handler 不用直接跟端口分配器和 UDP server 打交道。

package com.litongjava.sip.rtp;

import com.litongjava.sip.model.CallSession;

public class RtpServerManager {

  private final String localIp;
  private final RtpPortAllocator allocator;

  public RtpServerManager(String localIp) {
    this(localIp, new RtpPortAllocator());
  }

  public RtpServerManager(String localIp, RtpPortAllocator allocator) {
    this.localIp = localIp;
    this.allocator = allocator;
  }

  public CallSession allocateAndStart(CallSession session) throws Exception {
    int rtpPort = allocator.allocate();
    RtpUdpServer rtpServer = new RtpUdpServer(rtpPort);
    rtpServer.start();

    session.setLocalRtpPort(rtpPort);
    session.setRtpServer(rtpServer);
    session.setUpdatedTime(System.currentTimeMillis());
    return session;
  }

  public void stopAndRelease(CallSession session) {
    if (session == null) {
      return;
    }

    try {
      if (session.getRtpServer() != null) {
        session.getRtpServer().stop();
      }
    } finally {
      if (session.getLocalRtpPort() > 0) {
        allocator.release(session.getLocalRtpPort());
      }
    }
  }

  public String getLocalIp() {
    return localIp;
  }
}

8. server 层:真正使用 tio-core 的地方

这一层是与 tio-core 直接对接的地方。


8.1 SipInviteOnlyTcpHandler

它实现的是 ServerAioHandler。

在 tio-core 的 TCP 模型里,最核心的三个方法是:

decode

把原始字节流切成一个业务包。 在本项目里,它委托 SipTcpFrameDecoder 去做。

encode

把业务包编码成可发送的字节缓冲。 当前主要是把 ByteBufferPacket 交回 tio 发送。

handler

收到一个完整业务包后做业务处理。 在本项目里它主要处理:

  • INVITE
  • ACK
  • BYE
package com.litongjava.sip.server.handler;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import com.litongjava.aio.ByteBufferPacket;
import com.litongjava.aio.Packet;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.parser.SipTcpFrameDecoder;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.server.intf.ServerAioHandler;

public class SipInviteOnlyTcpHandler implements ServerAioHandler {

  private final String localIp;
  private final SipTcpFrameDecoder frameDecoder = new SipTcpFrameDecoder();
  private final SipMessageParser messageParser = new SipMessageParser();
  private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
  private final CallSessionManager sessionManager;
  private final RtpServerManager rtpServerManager;

  public SipInviteOnlyTcpHandler(String localIp) {
    this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
  }

  public SipInviteOnlyTcpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
    this.localIp = localIp;
    this.sessionManager = sessionManager;
    this.rtpServerManager = rtpServerManager;
  }

  @Override
  public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext ctx)
      throws Exception {
    byte[] frame = frameDecoder.decode(buffer, readableLength, ctx);
    if (frame == null) {
      return null;
    }
    return new ByteBufferPacket(ByteBuffer.wrap(frame));
  }

  @Override
  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext ctx) {
    ByteBufferPacket p = (ByteBufferPacket) packet;
    ByteBuffer bb = p.getByteBuffer();
    if (bb.position() != 0) {
      bb.rewind();
    }
    return bb;
  }

  @Override
  public void handler(Packet packet, ChannelContext ctx) throws Exception {
    ByteBufferPacket p = (ByteBufferPacket) packet;
    ByteBuffer bb = p.getByteBuffer();

    byte[] bytes = new byte[bb.remaining()];
    bb.get(bytes);

    SipMessage msg = messageParser.parse(bytes);
    if (!(msg instanceof SipRequest)) {
      return;
    }

    SipRequest req = (SipRequest) msg;
    String method = req.getMethod();

    if ("INVITE".equalsIgnoreCase(method)) {
      handleInvite(req, ctx);
      return;
    }

    if ("ACK".equalsIgnoreCase(method)) {
      handleAck(req);
      return;
    }

    if ("BYE".equalsIgnoreCase(method)) {
      handleBye(req, ctx);
      return;
    }

    SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
    send(ctx, resp);
  }

  private void handleInvite(SipRequest req, ChannelContext ctx) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession exist = sessionManager.getByCallId(callId);

    if (exist != null && exist.getLast200Ok() != null) {
      sendRaw(ctx, exist.getLast200Ok());
      return;
    }

    String remoteIp = ctx.getClientNode() != null ? ctx.getClientNode().getIp() : null;
    int remotePort = ctx.getClientNode() != null ? ctx.getClientNode().getPort() : 0;

    String toTag = "java" + System.nanoTime();

    CallSession session = new CallSession();
    session.setCallId(callId);
    session.setFromTag(parseTag(req.getHeader("From")));
    session.setToTag(toTag);
    session.setTransport("TCP");
    session.setRemoteSipIp(remoteIp);
    session.setRemoteSipPort(remotePort);
    session.setCreatedTime(System.currentTimeMillis());
    session.setUpdatedTime(System.currentTimeMillis());
    session.setAckDeadline(System.currentTimeMillis() + 32000);

    parseRemoteSdp(req, session);
    rtpServerManager.allocateAndStart(session);

    SipResponse resp = buildInvite200Ok(req, session);
    byte[] encoded = messageEncoder.encodeResponse(resp);
    String raw200 = new String(encoded, StandardCharsets.US_ASCII);

    session.setLast200Ok(raw200);
    sessionManager.createOrUpdate(session);

    Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(encoded)));
  }

  private void handleAck(SipRequest req) {
    String callId = req.getHeader("Call-ID");
    sessionManager.markAckReceived(callId);
  }

  private void handleBye(SipRequest req, ChannelContext ctx) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession session = sessionManager.getByCallId(callId);

    SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
    send(ctx, resp);

    if (session != null) {
      rtpServerManager.stopAndRelease(session);
      sessionManager.terminate(callId);
    }
  }

  private void send(ChannelContext ctx, SipResponse response) {
    byte[] bytes = messageEncoder.encodeResponse(response);
    Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
  }

  private void sendRaw(ChannelContext ctx, String text) {
    byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
    Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
  }

  private SipResponse buildInvite200Ok(SipRequest req, CallSession session) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(200);
    resp.setReasonPhrase("OK");

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + session.getToTag();
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");
    resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
    resp.addHeader("Content-Type", "application/sdp");

    String sdp = "v=0\r\n" + "o=- 1 1 IN IP4 " + localIp + "\r\n" + "s=JavaSip\r\n" + "c=IN IP4 " + localIp + "\r\n"
        + "t=0 0\r\n" + "m=audio " + session.getLocalRtpPort() + " RTP/AVP 0\r\n" + "a=rtpmap:0 PCMU/8000\r\n"
        + "a=ptime:20\r\n" + "a=sendrecv\r\n";

    resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
    return resp;
  }

  private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(code);
    resp.setReasonPhrase(reason);

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + toTag;
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");

    resp.setBody(new byte[0]);
    return resp;
  }

  private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
    for (String v : req.getHeaders(headerName)) {
      resp.addHeader(headerName, v);
    }
  }

  private String parseTag(String headerValue) {
    if (headerValue == null) {
      return null;
    }

    String lower = headerValue.toLowerCase();
    int idx = lower.indexOf("tag=");
    if (idx < 0) {
      return null;
    }

    String sub = headerValue.substring(idx + 4);
    int semi = sub.indexOf(';');
    if (semi >= 0) {
      sub = sub.substring(0, semi);
    }
    return sub.trim();
  }

  private void parseRemoteSdp(SipRequest req, CallSession session) {
    byte[] body = req.getBody();
    if (body == null || body.length == 0) {
      return;
    }

    String sdp = new String(body, StandardCharsets.US_ASCII);
    String[] lines = sdp.split("\r\n");

    String currentMedia = null;
    for (String line : lines) {
      if (line.startsWith("c=")) {
        String[] parts = line.split(" ");
        if (parts.length >= 3) {
          session.setRemoteRtpIp(parts[2].trim());
        }
      } else if (line.startsWith("m=")) {
        currentMedia = line;
        String[] parts = line.split(" ");
        if (parts.length >= 2 && parts[0].startsWith("m=audio")) {
          try {
            session.setRemoteRtpPort(Integer.parseInt(parts[1]));
          } catch (Exception ignore) {
          }
        }
      }
    }
  }
}

8.2 SipInviteOnlyUdpHandler

它实现的是 UdpHandler。

UDP 模式比 TCP 简单的地方在于:

  • 收到的一个 datagram 基本就是一条完整 SIP 消息
  • 不需要额外 frame decoder

所以它的流程就是:

  1. 收到 UDP 包
  2. 调 SipMessageParser
  3. 判断 method
  4. 生成响应
  5. 用 DatagramSocket 发回去

package com.litongjava.sip.server.handler;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;

public class SipInviteOnlyUdpHandler implements UdpHandler {

  private final String localIp;
  private final SipMessageParser messageParser = new SipMessageParser();
  private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
  private final CallSessionManager sessionManager;
  private final RtpServerManager rtpServerManager;

  public SipInviteOnlyUdpHandler(String localIp) {
    this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
  }

  public SipInviteOnlyUdpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
    this.localIp = localIp;
    this.sessionManager = sessionManager;
    this.rtpServerManager = rtpServerManager;
  }

  @Override
  public void handler(UdpPacket udpPacket, DatagramSocket socket) {
    try {
      Node remote = udpPacket.getRemote();
      byte[] data = udpPacket.getData();

      SipMessage msg = messageParser.parse(data);
      if (!(msg instanceof SipRequest)) {
        return;
      }

      SipRequest req = (SipRequest) msg;
      String method = req.getMethod();

      if ("INVITE".equalsIgnoreCase(method)) {
        handleInvite(req, remote, socket);
        return;
      }

      if ("ACK".equalsIgnoreCase(method)) {
        handleAck(req);
        return;
      }

      if ("BYE".equalsIgnoreCase(method)) {
        handleBye(req, remote, socket);
        return;
      }

      SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
      send(socket, remote, resp);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private void handleInvite(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession exist = sessionManager.getByCallId(callId);

    if (exist != null && exist.getLast200Ok() != null) {
      sendRaw(socket, remote, exist.getLast200Ok());
      return;
    }

    String toTag = "java" + System.nanoTime();

    CallSession session = new CallSession();
    session.setCallId(callId);
    session.setFromTag(parseTag(req.getHeader("From")));
    session.setToTag(toTag);
    session.setTransport("UDP");
    session.setRemoteSipIp(remote.getIp());
    session.setRemoteSipPort(remote.getPort());
    session.setCreatedTime(System.currentTimeMillis());
    session.setUpdatedTime(System.currentTimeMillis());
    session.setAckDeadline(System.currentTimeMillis() + 32000);

    parseRemoteSdp(req, session);
    rtpServerManager.allocateAndStart(session);

    SipResponse trying = buildSimpleResponse(req, 100, "Trying", null);
    send(socket, remote, trying);

    SipResponse ok = buildInvite200Ok(req, session);
    byte[] encoded = messageEncoder.encodeResponse(ok);
    String raw200 = new String(encoded, StandardCharsets.US_ASCII);

    session.setLast200Ok(raw200);
    sessionManager.createOrUpdate(session);

    sendBytes(socket, remote, encoded);
  }

  private void handleAck(SipRequest req) {
    String callId = req.getHeader("Call-ID");
    sessionManager.markAckReceived(callId);
  }

  private void handleBye(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession session = sessionManager.getByCallId(callId);

    SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
    send(socket, remote, resp);

    if (session != null) {
      rtpServerManager.stopAndRelease(session);
      sessionManager.terminate(callId);
    }
  }

  private void send(DatagramSocket socket, Node remote, SipResponse response) throws Exception {
    byte[] bytes = messageEncoder.encodeResponse(response);
    sendBytes(socket, remote, bytes);
  }

  private void sendRaw(DatagramSocket socket, Node remote, String text) throws Exception {
    byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
    sendBytes(socket, remote, bytes);
  }

  private void sendBytes(DatagramSocket socket, Node remote, byte[] bytes) throws Exception {
    DatagramPacket packet = new DatagramPacket(bytes, bytes.length,
        new InetSocketAddress(remote.getIp(), remote.getPort()));
    socket.send(packet);
  }

  private SipResponse buildInvite200Ok(SipRequest req, CallSession session) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(200);
    resp.setReasonPhrase("OK");

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + session.getToTag();
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");
    resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
    resp.addHeader("Content-Type", "application/sdp");

    String sdp = "v=0\r\n" + "o=- 1 1 IN IP4 " + localIp + "\r\n" + "s=JavaSip\r\n" + "c=IN IP4 " + localIp + "\r\n"
        + "t=0 0\r\n" + "m=audio " + session.getLocalRtpPort() + " RTP/AVP 0\r\n" + "a=rtpmap:0 PCMU/8000\r\n"
        + "a=ptime:20\r\n" + "a=sendrecv\r\n";

    resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
    return resp;
  }

  private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(code);
    resp.setReasonPhrase(reason);

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + toTag;
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");

    resp.setBody(new byte[0]);
    return resp;
  }

  private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
    for (String v : req.getHeaders(headerName)) {
      resp.addHeader(headerName, v);
    }
  }

  private String parseTag(String headerValue) {
    if (headerValue == null) {
      return null;
    }

    String lower = headerValue.toLowerCase();
    int idx = lower.indexOf("tag=");
    if (idx < 0) {
      return null;
    }

    String sub = headerValue.substring(idx + 4);
    int semi = sub.indexOf(';');
    if (semi >= 0) {
      sub = sub.substring(0, semi);
    }
    return sub.trim();
  }

  private void parseRemoteSdp(SipRequest req, CallSession session) {
    byte[] body = req.getBody();
    if (body == null || body.length == 0) {
      return;
    }

    String sdp = new String(body, StandardCharsets.US_ASCII);
    String[] lines = sdp.split("\r\n");

    for (String line : lines) {
      if (line.startsWith("c=")) {
        String[] parts = line.split(" ");
        if (parts.length >= 3) {
          session.setRemoteRtpIp(parts[2].trim());
        }
      } else if (line.startsWith("m=audio")) {
        String[] parts = line.split(" ");
        if (parts.length >= 2) {
          try {
            session.setRemoteRtpPort(Integer.parseInt(parts[1]));
          } catch (Exception ignore) {
          }
        }
      }
    }
  }
}

8.3 CallSessionManager

这个类负责统一管理 session。

第一版最小职责包括:

  • 根据 Call-ID 查找 session
  • 创建或更新 session
  • 标记 ACK 已收到
  • 删除 session
  • 终止 session 时关闭 RTP

在当前版本里,它是连接:

  • SIP 信令层
  • SDP 协商结果
  • RTP 端口实例

的关键中枢。

package com.litongjava.sip.server.session;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.litongjava.sip.model.CallSession;

public class CallSessionManager {

  private final Map<String, CallSession> sessions = new ConcurrentHashMap<>();

  public CallSession getByCallId(String callId) {
    if (callId == null) {
      return null;
    }
    return sessions.get(callId);
  }

  public CallSession createOrUpdate(CallSession session) {
    if (session == null || session.getCallId() == null) {
      throw new IllegalArgumentException("call session or callId is null");
    }
    session.setUpdatedTime(System.currentTimeMillis());
    sessions.put(session.getCallId(), session);
    return session;
  }

  public void markAckReceived(String callId) {
    CallSession session = sessions.get(callId);
    if (session != null) {
      session.setAckReceived(true);
      session.setUpdatedTime(System.currentTimeMillis());
    }
  }

  public void terminate(String callId) {
    CallSession session = sessions.remove(callId);
    if (session != null) {
      session.setTerminated(true);
      session.setUpdatedTime(System.currentTimeMillis());
      if (session.getRtpServer() != null) {
        session.getRtpServer().stop();
      }
    }
  }

  public void remove(String callId) {
    sessions.remove(callId);
  }

  public Map<String, CallSession> snapshot() {
    return Map.copyOf(sessions);
  }
}

8.4 启动服务

import java.io.IOException;
import java.net.SocketException;

import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.handler.SipInviteOnlyTcpHandler;
import com.litongjava.sip.server.handler.SipInviteOnlyUdpHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SipServerConfig {

  public void config() {
    String localIp = "192.168.3.219";

    CallSessionManager sessionManager = new CallSessionManager();
    RtpServerManager rtpServerManager = new RtpServerManager(localIp);

    SipInviteOnlyTcpHandler tcpHandler = new SipInviteOnlyTcpHandler(localIp, sessionManager, rtpServerManager);

    ServerTioConfig serverTioConfig = new ServerTioConfig("sip-server");
    serverTioConfig.setServerAioHandler(tcpHandler);
    serverTioConfig.setHeartbeatTimeout(-1L);

    TioServer tioServer = new TioServer(serverTioConfig);

    int port = 5060;
    try {
      tioServer.start(null, port);
      log.info("独立 TCP 服务器已成功启动,监听端口: {}", port);
    } catch (IOException e) {
      log.error("启动 TCP 服务器失败", e);
    }

    SipInviteOnlyUdpHandler udpHandler = new SipInviteOnlyUdpHandler(localIp, sessionManager, rtpServerManager);

    UdpServerConf udpServerConf = new UdpServerConf(5060, udpHandler, 5000);

    try {
      UdpServer udpServer = new UdpServer(udpServerConf);
      udpServer.start();
      log.info("UDP 服务器已成功启动,监听端口: {}", port);
    } catch (SocketException e) {
      log.error("启动 UDP 服务器失败", e);
    }
  }
}

9. SIP Echo 的完整处理流程

下面按真实时序说明整个系统如何工作。


9.1 服务端启动

服务端启动时需要做两件事:

启动 TCP 5060

用于接收 SIP over TCP 请求。

启动 UDP 5060

用于接收 SIP over UDP 请求。

这两个入口可以共享:

  • 同一个 CallSessionManager
  • 同一个 RtpServerManager

这样无论信令从 TCP 来还是 UDP 来,底层会话和媒体资源都由同一套组件管理。


9.2 客户端发送 INVITE

客户端发出一个 INVITE。

如果带 SDP,通常会包含:

  • 对端的 RTP 地址
  • 对端的 RTP 端口
  • 对端支持的 codec,例如 PCMU

这条 INVITE 到达服务端后:

  • TCP 入口先解帧再解析
  • UDP 入口直接解析

9.3 服务端处理 INVITE

处理 INVITE 时,服务端会做这些事情:

读取关键信息

  • Call-ID
  • From
  • To
  • Via
  • CSeq

检查是否重复 INVITE

如果这个 Call-ID 已经存在,并且已经保存过 200 OK,可以直接重发。

解析对端 SDP

提取:

  • 对端 RTP IP
  • 对端 RTP 端口

创建 CallSession

把这次呼叫的会话信息存下来。

分配 RTP 端口并启动 RTP 服务

例如本地分到 31234。


9.4 服务端返回响应

UDP 场景

可以先回一个:

  • 100 Trying

然后再回:

  • 200 OK

TCP 场景

通常直接回:

  • 200 OK

200 OK 的消息体里带着 SDP answer,最关键的是:

  • 本地 RTP IP
  • 本地 RTP 端口
  • 选用的音频编码

例如:

  • c=IN IP4 192.168.3.219
  • m=audio 31234 RTP/AVP 0
  • a=rtpmap:0 PCMU/8000

这意味着以后对端要把语音发到 192.168.3.219:31234。


9.5 客户端发送 ACK

客户端收到 200 OK 后,会发送 ACK。

服务端收到 ACK 后,通常只做状态更新:

  • 记录该 session 已收到 ACK

第一版里,这一步不需要额外响应。


9.6 客户端发送 RTP

客户端开始向 SDP answer 中声明的端口发送 RTP 包。

例如:

  • 目标 IP:192.168.3.219
  • 目标端口:31234

此时 RtpUdpServer 已经在这个端口监听。


9.7 服务端做 RTP Echo

RtpEchoUdpHandler 收到 RTP 包后:

  1. 读取 udpPacket.getData()
  2. 读取 remote IP 和 remote port
  3. 构造一个响应 datagram
  4. 把原始字节原样发回

因为终端收到的是自己刚刚发出的语音 RTP,所以会听到回声。


9.8 客户端发送 BYE

当对端挂断时,会发送 BYE。

服务端收到后要做这些事:

  1. 根据 Call-ID 查到 session
  2. 回复 200 OK
  3. 停止该 session 的 RTP server
  4. 回收 RTP 端口
  5. 从 CallSessionManager 移除会话

到这里,一次完整会话结束。


10. SDP 在这个项目里扮演的角色

虽然第一版还不是“完整协商”,但 SDP 已经发挥了非常关键的作用。

10.1 从 INVITE 中读取 offer

服务端从对端提供的 SDP 里读出:

  • 对端媒体地址
  • 对端媒体端口
  • 对端支持的音频格式

例如:

c=IN IP4 192.168.3.10
m=audio 40002 RTP/AVP 0 8 101
a=rtpmap:0 PCMU/8000

这里至少能知道:

  • 对端用 40002 接收 RTP
  • 对端支持 PT 0 = PCMU

10.2 在 200 OK 中返回 answer

服务端在 200 OK 里写回:

  • 自己的媒体地址
  • 自己的媒体端口
  • 自己当前选择的 codec

第一版最小策略是:

  • 固定返回 PCMU/8000

这使得双方至少能在一个最基本的音频格式上达成一致。


11. 为什么第一版先做“原包回显”

很多人在这里容易直接想跳到:

  • RTP 解码
  • PCM
  • 重采样
  • ASR
  • TTS
  • 大模型

但工程上更稳的顺序是先做“回显”。

原因很简单:

11.1 回显能先验证链路正确

只要能听到回声,就说明:

  • SIP 建会话没问题
  • SDP 协商端口没问题
  • 对端 RTP 能打到服务端
  • 服务端回 RTP 能打回对端
  • 终端能正常播放回来的包

这一步不成立,后面的语音识别和合成都没有意义。


11.2 回显把“网络问题”和“媒体处理问题”拆开了

如果一开始就做解码重编码,出问题时很难判断是:

  • SIP 没协商好
  • RTP 没打通
  • 编解码错了
  • 时间戳错了
  • sequence 错了

而原包 echo 只关注“网络路径通不通”。


12. 使用 tio-core 时的设计经验

这部分对读者很重要。


12.1 不要把所有逻辑都塞进 handler

在 tio-core 里,很多初学者会把:

  • decode
  • parse
  • SIP 业务
  • 字符串拼响应
  • session 管理

全写在一个 handler 里。

短期能跑,但很快会失控。

当前这个项目更推荐的方式是:

  • SipTcpFrameDecoder 负责 TCP 解帧
  • SipMessageParser 负责协议解析
  • SipMessageEncoder 负责协议编码
  • CallSessionManager 负责会话
  • RtpServerManager 负责媒体端口资源
  • SipInviteOnlyTcpHandler / SipInviteOnlyUdpHandler 只做适配和路由

这样结构会稳很多。


12.2 TCP 和 UDP 的处理思维必须分开

虽然都是 SIP 5060,但两者问题不一样。

TCP 关注点

  • 流式切包
  • 粘包拆包
  • Content-Length

UDP 关注点

  • 请求重传
  • 幂等处理
  • 重发 200 OK

如果混着想,很容易把协议处理写乱。


12.3 先做最小可运行链路,再做标准化增强

第一版不需要一口气实现:

  • 完整事务状态机
  • 完整 SDP 协商
  • 完整 RTP 重组
  • RTCP
  • 认证
  • 注册
  • 路由

先把最小主线跑通,是更合理的工程顺序。


13. 如何验证这个 SIP Echo 服务

你项目里已经有两个 client:

  • SipTcpClient
  • RtpUdpEchoClient

这正好对应两类验证方式。


13.1 先验证 SIP TCP 是否正常

用 SipTcpClient 发一个最小 INVITE。

预期结果:

  • 服务端返回 200 OK
  • 响应里带 SDP
  • 能看到 m=audio 某个端口 RTP/AVP 0

这说明:

  • TCP 解帧正常
  • SIP 解析正常
  • 会话创建正常
  • RTP 端口分配正常
  • 200 OK 编码正常

13.2 再验证 RTP Echo 是否正常

把 RtpUdpEchoClient 指向服务端分配出的 RTP 端口。

预期结果:

  • 发送一个伪 RTP 包
  • 收到服务端原样回包
  • 长度一致
  • 内容一致

这说明:

  • RTP UDP server 已经成功监听
  • 媒体端口是通的
  • echo handler 生效了

13.3 最终用软电话验证真实回声

当你把整个 SIP + RTP 都接起来后,可以直接用软电话拨打服务端。

预期结果:

  • 呼叫建立
  • 听到自己回声
  • 挂断正常

这一步是最有说服力的系统级验证。


14. 第一版已经解决了什么问题

到当前结构为止,已经解决了几个非常关键的工程问题:

14.1 TCP SIP 解包问题

不再用“读多少算一个 SIP 包”,而是按:

  • header 结束符
  • Content-Length

切完整报文。


14.2 SIP 解析与 handler 解耦

handler 不再直接手工 split 大段字符串去做所有事情。 parser 和 encoder 已经独立出来。


14.3 会话与媒体绑定

引入了 CallSession 和 CallSessionManager,让:

  • SIP INVITE
  • SDP 信息
  • RTP 端口实例

关联起来。


14.4 媒体端口动态管理

通过 RtpPortAllocator + RtpServerManager,每个呼叫都能独立拿到一个 RTP 端口并在结束后释放。


15. 第一版还没有做的事

这是给后续演进用的。

15.1 Session 生命周期还可增强

当前最小版已经能建和删,但后面最好补:

  • ACK 超时回收
  • 异常断开回收
  • 长时间无 RTP 回收

15.2 SDP 还不是真正完整协商

当前基本策略是:

  • 读取 offer
  • 回固定 PCMU

下一步应该变成:

  • 解析对端 offer 的 codec 列表
  • 从双方都支持的 codec 里选一个
  • 构造真正的 answer

15.3 RTP 还是原包回显

下一步应该做成:

  • 解析 RTP 头
  • 提取 payload
  • PCMU 解码成 PCM16
  • 送入媒体处理链
  • 再编码并重组 RTP

16. 如何继续升级成语音机器人服务

当 SIP Echo 跑通以后,后续演进路径通常是这样的:

第一步:把 RTP 从原包回显升级成音频帧处理

也就是:

  • 收 RTP
  • 解码 payload
  • 得到 PCM 音频帧

第二步:把 PCM 送到 ASR 或流式大模型

例如:

  • 语音识别
  • 实时对话模型
  • 流式理解

第三步:把模型输出结果做 TTS

生成一段回复音频。


第四步:重新编码并封装 RTP 发回终端

这时服务端就不再是 echo,而是“真正能说话的 SIP 语音机器人”。


17. 本文总结

基于 tio-core 实现 SIP Echo 服务,最关键的不是代码量,而是把职责分清楚:

  • TCP/UDP handler:对接 tio-core
  • frame decoder:解决 TCP 边界问题
  • SIP parser/encoder:解决协议对象化问题
  • session manager:把信令和媒体资源串起来
  • RTP server manager:动态管理媒体端口
  • RTP echo handler:验证整条语音链路

在这套结构下,第一版系统已经实现了完整闭环:

  • SIP 建会话
  • SDP 告知媒体端口
  • RTP 承载语音
  • 服务端回显 RTP
  • 对端听到回声

这说明你已经用 tio-core 成功搭出了一个最小可运行的 SIP 媒体服务骨架。

后面无论是继续增强 SIP/SDP 的标准化能力,还是把 RTP 换成真正的语音处理流水线,都是在这个基础上向前演进。

Edit this page
Last Updated: 3/7/26, 10:11 AM
Contributors: litongjava
Prev
SIP Server 第一版原理说明
Next
使用livekit-sip进行测试