SIP Server 第一版实战
1. 文档目标
本文介绍如何使用 tio-core 框架实现一个最小可运行的 SIP Echo 服务。 这个服务的目标不是做一个完整的 SIP 平台,而是先打通一条最关键的语音链路:
- 服务端同时监听
SIP TCP 5060和SIP UDP 5060 - 终端发起
INVITE - 服务端解析 SIP 和 SDP
- 服务端动态分配一个 RTP 端口
- 服务端返回
200 OK,并在 SDP 中告诉对端媒体端口 - 对端向该 RTP 端口发送语音
- 服务端收到 RTP 后原样回发
- 对端听到自己的回声
- 对端挂断,服务端处理
BYE,释放 RTP 资源
如果这条链路跑通,就说明三件事都成立:
tio-core可以承载 SIP 信令和rtp数据- Java 可以动态管理 RTP 端口
- SIP/SDP/RTP 三层配合是正确的
2. 先理解整体架构
在这个系统里,可以把功能拆成三层:
2.1 信令层:SIP
负责:
- 建立通话
- 确认通话
- 结束通话
- 携带 SDP 进行媒体协商
这里的核心协议消息有:
INVITE100 Trying200 OKACKBYE
2.2 协商层:SDP
负责告诉对端:
- 语音该发到哪个 IP
- 语音该发到哪个端口
- 当前用什么编码
- 每包多长时间
在本项目里,SDP 主要出现在:
INVITE的消息体中,作为对端的 offer200 OK的消息体中,作为服务端的 answer
2.3 媒体层:RTP
负责真正承载语音数据。
第一版里,RTP 服务只做一件事:
- 收到一个 RTP 包
- 原样发回去
这样就形成 echo。
3. 为什么选择 tio-core
tio-core 适合这个场景,原因有三个:
3.1 同时支持 TCP 与 UDP
SIP 常见传输方式本身就包括:
- TCP
- UDP
而 RTP 通常又是 UDP。 tio-core 同时提供了:
- TCP server 能力
- UDP server 能力
所以一个框架就能覆盖 SIP 和 RTP 这两类网络需求。
3.2 接口清晰
你会用到两个核心接口:
TCP
ServerAioHandler
负责:
decodeencodehandler
UDP
UdpHandler
负责:
- 收到 UDP 数据后的处理
这对于协议型服务很方便,因为你可以很自然地把:
- 解帧
- 解析
- 业务处理
- 响应编码
分开组织。
3.3 易于逐步演进
这个项目不是“一开始就做复杂系统”,而是从最小功能逐步扩展:
- 第一阶段:SIP echo 跑通
- 第二阶段:补 session 生命周期
- 第三阶段:补真正 SDP 协商
- 第四阶段:RTP 解码重组
- 第五阶段:接 ASR / TTS / LLM
tio-core 足够轻,适合这种逐步搭建。
4. 项目结构说明
工程结构如下:
com.litongjava.sip
├── client
│ ├── RtpUdpEchoClient.java
│ ├── SipTcpClient.java
├── model
│ ├── CallSession.java
│ ├── SipMessage.java
│ ├── SipRequest.java
│ ├── SipResponse.java
├── parser
│ ├── SipMessageEncoder.java
│ ├── SipMessageParser.java
│ ├── SipTcpFrameDecoder.java
├── rtp
│ ├── RtpEchoUdpHandler.java
│ ├── RtpPortAllocator.java
│ ├── RtpServerManager.java
│ ├── RtpUdpServer.java
├── server
│ ├── handler
│ │ ├── SipInviteOnlyTcpHandler.java
│ │ ├── SipInviteOnlyUdpHandler.java
│ ├── packet
│ │ ├── SipPacket.java
│ ├── session
│ │ ├── CallSessionManager.java
这个结构非常适合作为第一版落地结构,职责边界已经比较清晰了。
下面逐层解释。
5. model 层:协议对象和会话对象
5.1 SipMessage
SipMessage 是 SIP 消息的抽象父类。 它负责承载:
- headers
- body
它不关心这是请求还是响应。
这样做的好处是: parser 先把通用部分装进去,再根据起始行决定它是 SipRequest 还是 SipResponse。
package com.litongjava.sip.model;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public abstract class SipMessage {
private final Map<String, List<String>> headers = new LinkedHashMap<>();
private byte[] body;
public Map<String, List<String>> getHeaders() {
return headers;
}
public void addHeader(String name, String value) {
headers.computeIfAbsent(name, k -> new ArrayList<>()).add(value);
}
public String getHeader(String name) {
for (Map.Entry<String, List<String>> e : headers.entrySet()) {
if (e.getKey().equalsIgnoreCase(name)) {
List<String> vals = e.getValue();
return vals == null || vals.isEmpty() ? null : vals.get(0);
}
}
return null;
}
public List<String> getHeaders(String name) {
for (Map.Entry<String, List<String>> e : headers.entrySet()) {
if (e.getKey().equalsIgnoreCase(name)) {
return e.getValue();
}
}
return List.of();
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public int contentLength() {
return body == null ? 0 : body.length;
}
}
5.2 SipRequest
表示 SIP 请求。 它比 SipMessage 多出这些字段:
- method
- requestUri
- version
例如一条:
INVITE sip:1001@192.168.3.219:5060 SIP/2.0
就会被解析成一个 SipRequest。
package com.litongjava.sip.model;
public class SipRequest extends SipMessage {
private String method;
private String requestUri;
private String version;
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getRequestUri() {
return requestUri;
}
public void setRequestUri(String requestUri) {
this.requestUri = requestUri;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
5.3 SipResponse
表示 SIP 响应。 它比 SipMessage 多出这些字段:
- version
- statusCode
- reasonPhrase
例如:
SIP/2.0 200 OK
会被解析成一个 SipResponse。
package com.litongjava.sip.model;
public class SipRequest extends SipMessage {
private String method;
private String requestUri;
private String version;
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getRequestUri() {
return requestUri;
}
public void setRequestUri(String requestUri) {
this.requestUri = requestUri;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
5.4 CallSession
CallSession 是一次呼叫会话的数据载体。 第一版里,它主要保存:
Call-IDFrom tagTo tag- 传输方式 TCP/UDP
- SIP 对端 IP/端口
- RTP 对端 IP/端口
- 本地 RTP 端口
- 是否收到了 ACK
- 最近一次
200 OK - 对应的 RTP server 实例
为什么需要它?
因为在 SIP Echo 场景里,信令和媒体不是一回事:
- SIP 用来建立会话
- RTP 用来传语音
没有 CallSession,就没法把:
- 这次 INVITE
- 对应的 RTP 端口
- 未来的 ACK / BYE
串起来。
package com.litongjava.sip.model;
import com.litongjava.sip.rtp.RtpUdpServer;
public class CallSession {
private String callId;
private String fromTag;
private String toTag;
private String transport; // TCP / UDP
private String remoteSipIp;
private int remoteSipPort;
private String remoteRtpIp;
private int remoteRtpPort;
private int localRtpPort;
private long createdTime;
private long updatedTime;
private long ackDeadline;
private boolean ackReceived;
private boolean terminated;
private String last200Ok;
private RtpUdpServer rtpServer;
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getFromTag() {
return fromTag;
}
public void setFromTag(String fromTag) {
this.fromTag = fromTag;
}
public String getToTag() {
return toTag;
}
public void setToTag(String toTag) {
this.toTag = toTag;
}
public String getTransport() {
return transport;
}
public void setTransport(String transport) {
this.transport = transport;
}
public String getRemoteSipIp() {
return remoteSipIp;
}
public void setRemoteSipIp(String remoteSipIp) {
this.remoteSipIp = remoteSipIp;
}
public int getRemoteSipPort() {
return remoteSipPort;
}
public void setRemoteSipPort(int remoteSipPort) {
this.remoteSipPort = remoteSipPort;
}
public String getRemoteRtpIp() {
return remoteRtpIp;
}
public void setRemoteRtpIp(String remoteRtpIp) {
this.remoteRtpIp = remoteRtpIp;
}
public int getRemoteRtpPort() {
return remoteRtpPort;
}
public void setRemoteRtpPort(int remoteRtpPort) {
this.remoteRtpPort = remoteRtpPort;
}
public int getLocalRtpPort() {
return localRtpPort;
}
public void setLocalRtpPort(int localRtpPort) {
this.localRtpPort = localRtpPort;
}
public long getCreatedTime() {
return createdTime;
}
public void setCreatedTime(long createdTime) {
this.createdTime = createdTime;
}
public long getUpdatedTime() {
return updatedTime;
}
public void setUpdatedTime(long updatedTime) {
this.updatedTime = updatedTime;
}
public long getAckDeadline() {
return ackDeadline;
}
public void setAckDeadline(long ackDeadline) {
this.ackDeadline = ackDeadline;
}
public boolean isAckReceived() {
return ackReceived;
}
public void setAckReceived(boolean ackReceived) {
this.ackReceived = ackReceived;
}
public boolean isTerminated() {
return terminated;
}
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
public String getLast200Ok() {
return last200Ok;
}
public void setLast200Ok(String last200Ok) {
this.last200Ok = last200Ok;
}
public RtpUdpServer getRtpServer() {
return rtpServer;
}
public void setRtpServer(RtpUdpServer rtpServer) {
this.rtpServer = rtpServer;
}
}
6. parser 层:把 TCP 字节流和 SIP 文本处理扎实
这一层是第一版最重要的基础设施之一。
6.1 为什么 TCP 不能直接“读多少算一个包”
因为 SIP over TCP 本质上是字节流,而不是 datagram。
也就是说:
- 一次
read可能只读到半个 SIP 报文 - 一次
read也可能读到两个 SIP 报文 - 如果简单按“本次拿到多少字节就是一个消息”,INVITE 带 SDP 时几乎迟早出问题
所以 TCP 模式下必须先做 frame decode。
6.2 SipTcpFrameDecoder 的职责
SipTcpFrameDecoder 只负责一件事:
从 TCP 流中切出完整的 SIP 消息。
它的规则是:
- 先找
\r\n\r\n - 这表示 SIP header 结束
- 再从 header 中读取
Content-Length - 判断 body 是否已经收满
- 如果完整,就切出一条消息
- 如果还不完整,就继续累积等待
这里解决的是“边界问题”,而不是“协议语义问题”。
这是非常关键的分层。
package com.litongjava.sip.parser;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import com.litongjava.tio.core.ChannelContext;
public class SipTcpFrameDecoder {
private static final String ATTR_ACC = "sip_tcp_acc_buf";
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private static final int MAX_SIP_MESSAGE_SIZE = 1024 * 1024; // 1MB
public byte[] decode(ByteBuffer buffer, int readableLength, ChannelContext ctx) {
if (readableLength <= 0) {
return null;
}
ByteBuffer acc = (ByteBuffer) ctx.getAttribute(ATTR_ACC);
if (acc == null) {
acc = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
ctx.setAttribute(ATTR_ACC, acc);
}
byte[] chunk = new byte[readableLength];
buffer.get(chunk);
acc = ensureCapacity(acc, chunk.length, ctx);
acc.put(chunk);
acc.flip();
int frameLen = tryParseOneFrameLength(acc);
if (frameLen <= 0) {
acc.compact();
return null;
}
byte[] frame = new byte[frameLen];
acc.get(frame);
acc.compact();
return frame;
}
private ByteBuffer ensureCapacity(ByteBuffer acc, int incoming, ChannelContext ctx) {
if (acc.remaining() >= incoming) {
return acc;
}
int needed = acc.position() + incoming;
int newCap = acc.capacity();
while (newCap < needed) {
newCap = newCap * 2;
if (newCap > MAX_SIP_MESSAGE_SIZE) {
throw new IllegalStateException("SIP accumulate buffer too large: " + newCap);
}
}
ByteBuffer bigger = ByteBuffer.allocate(newCap);
acc.flip();
bigger.put(acc);
ctx.setAttribute(ATTR_ACC, bigger);
return bigger;
}
private int tryParseOneFrameLength(ByteBuffer acc) {
int start = acc.position();
int limit = acc.limit();
if ((limit - start) > MAX_SIP_MESSAGE_SIZE) {
throw new IllegalStateException("SIP message too large");
}
int headerEnd = indexOf(acc, "\r\n\r\n".getBytes(StandardCharsets.US_ASCII), start, limit);
if (headerEnd < 0) {
return -1;
}
int headerBlockEnd = headerEnd + 4;
String head = sliceToString(acc, start, headerBlockEnd);
int contentLength = parseContentLength(head);
if (contentLength < 0) {
throw new IllegalStateException("Invalid Content-Length");
}
int totalLength = (headerBlockEnd - start) + contentLength;
if ((limit - start) < totalLength) {
return -1;
}
return totalLength;
}
private int parseContentLength(String headers) {
String[] lines = headers.split("\r\n");
for (String line : lines) {
int idx = line.indexOf(':');
if (idx <= 0) {
continue;
}
String name = line.substring(0, idx).trim();
String value = line.substring(idx + 1).trim();
if ("Content-Length".equalsIgnoreCase(name) || "l".equalsIgnoreCase(name)) {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return -1;
}
}
}
return 0;
}
private int indexOf(ByteBuffer buf, byte[] pat, int from, int to) {
for (int i = from; i <= to - pat.length; i++) {
boolean ok = true;
for (int j = 0; j < pat.length; j++) {
if (buf.get(i + j) != pat[j]) {
ok = false;
break;
}
}
if (ok) {
return i;
}
}
return -1;
}
private String sliceToString(ByteBuffer buf, int from, int to) {
byte[] bytes = new byte[to - from];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = buf.get(from + i);
}
return new String(bytes, StandardCharsets.US_ASCII);
}
}
6.3 SipMessageParser 的职责
SipMessageParser 处理的是:
一整条完整 SIP 文本,如何解析成对象。
它要做的事情包括:
- 识别 request line 还是 status line
- 解析 headers
- 支持 compact header
- 提取 body
- 返回
SipRequest或SipResponse
例如:
INVITE ... SIP/2.0->SipRequestSIP/2.0 200 OK->SipResponse
package com.litongjava.sip.parser;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
public class SipMessageParser {
private static final Map<String, String> COMPACT_HEADERS = new HashMap<>();
static {
COMPACT_HEADERS.put("v", "Via");
COMPACT_HEADERS.put("f", "From");
COMPACT_HEADERS.put("t", "To");
COMPACT_HEADERS.put("i", "Call-ID");
COMPACT_HEADERS.put("l", "Content-Length");
COMPACT_HEADERS.put("c", "Content-Type");
COMPACT_HEADERS.put("m", "Contact");
}
public SipMessage parse(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("empty sip message");
}
int headerEnd = indexOf(bytes, "\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
if (headerEnd < 0) {
throw new IllegalArgumentException("invalid sip message, no header terminator");
}
int headerBlockEnd = headerEnd + 4;
String headerText = new String(bytes, 0, headerBlockEnd, StandardCharsets.US_ASCII);
String[] lines = headerText.split("\r\n");
if (lines.length == 0) {
throw new IllegalArgumentException("invalid sip start line");
}
String startLine = lines[0];
SipMessage message = parseStartLine(startLine);
for (int i = 1; i < lines.length; i++) {
String line = lines[i];
if (line == null || line.isEmpty()) {
continue;
}
int idx = line.indexOf(':');
if (idx <= 0) {
continue;
}
String name = line.substring(0, idx).trim();
String value = line.substring(idx + 1).trim();
name = normalizeHeaderName(name);
message.addHeader(name, value);
}
int contentLength = parseContentLength(message);
if (contentLength > 0) {
if (bytes.length < headerBlockEnd + contentLength) {
throw new IllegalArgumentException("sip body not complete");
}
byte[] body = new byte[contentLength];
System.arraycopy(bytes, headerBlockEnd, body, 0, contentLength);
message.setBody(body);
} else {
message.setBody(new byte[0]);
}
return message;
}
private SipMessage parseStartLine(String startLine) {
if (startLine.startsWith("SIP/2.0")) {
String[] parts = startLine.split(" ", 3);
if (parts.length < 3) {
throw new IllegalArgumentException("invalid sip response line: " + startLine);
}
SipResponse resp = new SipResponse();
resp.setVersion(parts[0]);
resp.setStatusCode(Integer.parseInt(parts[1]));
resp.setReasonPhrase(parts[2]);
return resp;
} else {
String[] parts = startLine.split(" ", 3);
if (parts.length < 3) {
throw new IllegalArgumentException("invalid sip request line: " + startLine);
}
SipRequest req = new SipRequest();
req.setMethod(parts[0]);
req.setRequestUri(parts[1]);
req.setVersion(parts[2]);
return req;
}
}
private String normalizeHeaderName(String name) {
String compact = COMPACT_HEADERS.get(name);
return compact != null ? compact : name;
}
private int parseContentLength(SipMessage message) {
String v = message.getHeader("Content-Length");
if (v == null || v.isEmpty()) {
return 0;
}
return Integer.parseInt(v.trim());
}
private int indexOf(byte[] src, byte[] pat) {
for (int i = 0; i <= src.length - pat.length; i++) {
boolean ok = true;
for (int j = 0; j < pat.length; j++) {
if (src[i + j] != pat[j]) {
ok = false;
break;
}
}
if (ok) {
return i;
}
}
return -1;
}
}
6.4 SipMessageEncoder 的职责
SipMessageEncoder 与 parser 相反,负责把对象重新编码成标准 SIP 文本。
主要用于服务端发响应时:
- 组装状态行
- 输出 headers
- 自动补
Content-Length - 拼接 body
它的存在让 handler 不必到处拼字符串。
package com.litongjava.sip.parser;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import com.litongjava.sip.model.SipResponse;
public class SipMessageEncoder {
public byte[] encodeResponse(SipResponse response) {
StringBuilder sb = new StringBuilder();
sb.append(response.getVersion()).append(' ').append(response.getStatusCode()).append(' ')
.append(response.getReasonPhrase()).append("\r\n");
for (Map.Entry<String, List<String>> e : response.getHeaders().entrySet()) {
String name = e.getKey();
List<String> values = e.getValue();
if (values == null || values.isEmpty()) {
continue;
}
for (String value : values) {
sb.append(name).append(": ").append(value).append("\r\n");
}
}
byte[] body = response.getBody();
if (body == null) {
body = new byte[0];
}
if (response.getHeader("Content-Length") == null) {
sb.append("Content-Length: ").append(body.length).append("\r\n");
}
sb.append("\r\n");
byte[] head = sb.toString().getBytes(StandardCharsets.US_ASCII);
byte[] all = new byte[head.length + body.length];
System.arraycopy(head, 0, all, 0, head.length);
if (body.length > 0) {
System.arraycopy(body, 0, all, head.length, body.length);
}
return all;
}
}
7. rtp 层:动态端口与 echo 服务
这一层是“媒体能不能打通”的关键。
7.1 RtpPortAllocator
这个类负责在指定端口范围内分配 RTP 端口。
典型范围是:
30000-40000
它的作用是:
- 给每次通话找一个可用端口
- 避免多个呼叫冲突
- 挂断后回收端口
在 SIP 场景里,通常不是监听整个范围,而是:
- 每通电话分配一个端口
- 只监听这个端口
package com.litongjava.sip.rtp;
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.BitSet;
public class RtpPortAllocator {
private final int start;
private final int end;
private final BitSet used;
public RtpPortAllocator() {
this(30000, 40000);
}
public RtpPortAllocator(int start, int end) {
this.start = start;
this.end = end;
this.used = new BitSet(end - start + 1);
}
public synchronized int allocate() {
for (int p = start; p <= end; p++) {
int idx = p - start;
if (used.get(idx)) {
continue;
}
if (canBind(p)) {
used.set(idx, true);
return p;
}
}
throw new IllegalStateException("No available RTP port in range " + start + "-" + end);
}
public synchronized void release(int port) {
if (port < start || port > end) {
return;
}
used.clear(port - start);
}
private boolean canBind(int port) {
try (DatagramSocket ignored = new DatagramSocket(port)) {
ignored.setReuseAddress(false);
return true;
} catch (IOException e) {
return false;
}
}
}
7.2 RtpUdpServer
这个类负责:
- 持有一个具体的 UDP 监听实例
- 在某个实际 RTP 端口上启动 UDP 服务
- 在结束时关闭监听
可以理解为“一次会话对应的 RTP 端口服务实例”。
package com.litongjava.sip.rtp;
import java.net.SocketException;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
public class RtpUdpServer {
private final int port;
private UdpServer udpServer;
public RtpUdpServer(int port) {
this.port = port;
}
public void start() throws SocketException {
UdpServerConf conf = new UdpServerConf(port, new RtpEchoUdpHandler(), 5000);
this.udpServer = new UdpServer(conf);
this.udpServer.start();
}
public void stop() {
if (udpServer != null) {
udpServer.stop();
}
}
public int port() {
return port;
}
}
7.3 RtpEchoUdpHandler
这是第一版媒体逻辑的核心。
职责非常简单:
- 收到 UDP 包
- 取出原始数据
- 按原 remote 地址和端口回发
这就是 echo 的来源。
注意: 第一版只是原包回发,不是音频解码后再编码。 所以它验证的是“链路”而不是“媒体处理”。
package com.litongjava.sip.rtp;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RtpEchoUdpHandler implements UdpHandler {
@Override
public void handler(UdpPacket udpPacket, DatagramSocket datagramSocket) {
byte[] data = udpPacket.getData();
Node remote = udpPacket.getRemote();
// 先做最简单:原包回显(用于验证链路)
InetSocketAddress address = new InetSocketAddress(remote.getIp(), remote.getPort());
DatagramPacket resp = new DatagramPacket(data, data.length, address);
try {
datagramSocket.send(resp);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
7.4 RtpServerManager
这个类负责统一管理 RTP 资源。
它向上提供的是更高层的能力:
- 为某个 session 分配端口并启动 RTP server
- 停止 RTP server 并回收端口
这样 SIP handler 不用直接跟端口分配器和 UDP server 打交道。
package com.litongjava.sip.rtp;
import com.litongjava.sip.model.CallSession;
public class RtpServerManager {
private final String localIp;
private final RtpPortAllocator allocator;
public RtpServerManager(String localIp) {
this(localIp, new RtpPortAllocator());
}
public RtpServerManager(String localIp, RtpPortAllocator allocator) {
this.localIp = localIp;
this.allocator = allocator;
}
public CallSession allocateAndStart(CallSession session) throws Exception {
int rtpPort = allocator.allocate();
RtpUdpServer rtpServer = new RtpUdpServer(rtpPort);
rtpServer.start();
session.setLocalRtpPort(rtpPort);
session.setRtpServer(rtpServer);
session.setUpdatedTime(System.currentTimeMillis());
return session;
}
public void stopAndRelease(CallSession session) {
if (session == null) {
return;
}
try {
if (session.getRtpServer() != null) {
session.getRtpServer().stop();
}
} finally {
if (session.getLocalRtpPort() > 0) {
allocator.release(session.getLocalRtpPort());
}
}
}
public String getLocalIp() {
return localIp;
}
}
8. server 层:真正使用 tio-core 的地方
这一层是与 tio-core 直接对接的地方。
8.1 SipInviteOnlyTcpHandler
它实现的是 ServerAioHandler。
在 tio-core 的 TCP 模型里,最核心的三个方法是:
decode
把原始字节流切成一个业务包。 在本项目里,它委托 SipTcpFrameDecoder 去做。
encode
把业务包编码成可发送的字节缓冲。 当前主要是把 ByteBufferPacket 交回 tio 发送。
handler
收到一个完整业务包后做业务处理。 在本项目里它主要处理:
- INVITE
- ACK
- BYE
package com.litongjava.sip.server.handler;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import com.litongjava.aio.ByteBufferPacket;
import com.litongjava.aio.Packet;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.parser.SipTcpFrameDecoder;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.server.intf.ServerAioHandler;
public class SipInviteOnlyTcpHandler implements ServerAioHandler {
private final String localIp;
private final SipTcpFrameDecoder frameDecoder = new SipTcpFrameDecoder();
private final SipMessageParser messageParser = new SipMessageParser();
private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
private final CallSessionManager sessionManager;
private final RtpServerManager rtpServerManager;
public SipInviteOnlyTcpHandler(String localIp) {
this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
}
public SipInviteOnlyTcpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
this.localIp = localIp;
this.sessionManager = sessionManager;
this.rtpServerManager = rtpServerManager;
}
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext ctx)
throws Exception {
byte[] frame = frameDecoder.decode(buffer, readableLength, ctx);
if (frame == null) {
return null;
}
return new ByteBufferPacket(ByteBuffer.wrap(frame));
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext ctx) {
ByteBufferPacket p = (ByteBufferPacket) packet;
ByteBuffer bb = p.getByteBuffer();
if (bb.position() != 0) {
bb.rewind();
}
return bb;
}
@Override
public void handler(Packet packet, ChannelContext ctx) throws Exception {
ByteBufferPacket p = (ByteBufferPacket) packet;
ByteBuffer bb = p.getByteBuffer();
byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
SipMessage msg = messageParser.parse(bytes);
if (!(msg instanceof SipRequest)) {
return;
}
SipRequest req = (SipRequest) msg;
String method = req.getMethod();
if ("INVITE".equalsIgnoreCase(method)) {
handleInvite(req, ctx);
return;
}
if ("ACK".equalsIgnoreCase(method)) {
handleAck(req);
return;
}
if ("BYE".equalsIgnoreCase(method)) {
handleBye(req, ctx);
return;
}
SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
send(ctx, resp);
}
private void handleInvite(SipRequest req, ChannelContext ctx) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession exist = sessionManager.getByCallId(callId);
if (exist != null && exist.getLast200Ok() != null) {
sendRaw(ctx, exist.getLast200Ok());
return;
}
String remoteIp = ctx.getClientNode() != null ? ctx.getClientNode().getIp() : null;
int remotePort = ctx.getClientNode() != null ? ctx.getClientNode().getPort() : 0;
String toTag = "java" + System.nanoTime();
CallSession session = new CallSession();
session.setCallId(callId);
session.setFromTag(parseTag(req.getHeader("From")));
session.setToTag(toTag);
session.setTransport("TCP");
session.setRemoteSipIp(remoteIp);
session.setRemoteSipPort(remotePort);
session.setCreatedTime(System.currentTimeMillis());
session.setUpdatedTime(System.currentTimeMillis());
session.setAckDeadline(System.currentTimeMillis() + 32000);
parseRemoteSdp(req, session);
rtpServerManager.allocateAndStart(session);
SipResponse resp = buildInvite200Ok(req, session);
byte[] encoded = messageEncoder.encodeResponse(resp);
String raw200 = new String(encoded, StandardCharsets.US_ASCII);
session.setLast200Ok(raw200);
sessionManager.createOrUpdate(session);
Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(encoded)));
}
private void handleAck(SipRequest req) {
String callId = req.getHeader("Call-ID");
sessionManager.markAckReceived(callId);
}
private void handleBye(SipRequest req, ChannelContext ctx) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession session = sessionManager.getByCallId(callId);
SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
send(ctx, resp);
if (session != null) {
rtpServerManager.stopAndRelease(session);
sessionManager.terminate(callId);
}
}
private void send(ChannelContext ctx, SipResponse response) {
byte[] bytes = messageEncoder.encodeResponse(response);
Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
}
private void sendRaw(ChannelContext ctx, String text) {
byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
}
private SipResponse buildInvite200Ok(SipRequest req, CallSession session) {
SipResponse resp = new SipResponse();
resp.setStatusCode(200);
resp.setReasonPhrase("OK");
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + session.getToTag();
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
resp.addHeader("Content-Type", "application/sdp");
String sdp = "v=0\r\n" + "o=- 1 1 IN IP4 " + localIp + "\r\n" + "s=JavaSip\r\n" + "c=IN IP4 " + localIp + "\r\n"
+ "t=0 0\r\n" + "m=audio " + session.getLocalRtpPort() + " RTP/AVP 0\r\n" + "a=rtpmap:0 PCMU/8000\r\n"
+ "a=ptime:20\r\n" + "a=sendrecv\r\n";
resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
return resp;
}
private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
SipResponse resp = new SipResponse();
resp.setStatusCode(code);
resp.setReasonPhrase(reason);
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + toTag;
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.setBody(new byte[0]);
return resp;
}
private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
for (String v : req.getHeaders(headerName)) {
resp.addHeader(headerName, v);
}
}
private String parseTag(String headerValue) {
if (headerValue == null) {
return null;
}
String lower = headerValue.toLowerCase();
int idx = lower.indexOf("tag=");
if (idx < 0) {
return null;
}
String sub = headerValue.substring(idx + 4);
int semi = sub.indexOf(';');
if (semi >= 0) {
sub = sub.substring(0, semi);
}
return sub.trim();
}
private void parseRemoteSdp(SipRequest req, CallSession session) {
byte[] body = req.getBody();
if (body == null || body.length == 0) {
return;
}
String sdp = new String(body, StandardCharsets.US_ASCII);
String[] lines = sdp.split("\r\n");
String currentMedia = null;
for (String line : lines) {
if (line.startsWith("c=")) {
String[] parts = line.split(" ");
if (parts.length >= 3) {
session.setRemoteRtpIp(parts[2].trim());
}
} else if (line.startsWith("m=")) {
currentMedia = line;
String[] parts = line.split(" ");
if (parts.length >= 2 && parts[0].startsWith("m=audio")) {
try {
session.setRemoteRtpPort(Integer.parseInt(parts[1]));
} catch (Exception ignore) {
}
}
}
}
}
}
8.2 SipInviteOnlyUdpHandler
它实现的是 UdpHandler。
UDP 模式比 TCP 简单的地方在于:
- 收到的一个 datagram 基本就是一条完整 SIP 消息
- 不需要额外 frame decoder
所以它的流程就是:
- 收到 UDP 包
- 调
SipMessageParser - 判断 method
- 生成响应
- 用
DatagramSocket发回去
package com.litongjava.sip.server.handler;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;
public class SipInviteOnlyUdpHandler implements UdpHandler {
private final String localIp;
private final SipMessageParser messageParser = new SipMessageParser();
private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
private final CallSessionManager sessionManager;
private final RtpServerManager rtpServerManager;
public SipInviteOnlyUdpHandler(String localIp) {
this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
}
public SipInviteOnlyUdpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
this.localIp = localIp;
this.sessionManager = sessionManager;
this.rtpServerManager = rtpServerManager;
}
@Override
public void handler(UdpPacket udpPacket, DatagramSocket socket) {
try {
Node remote = udpPacket.getRemote();
byte[] data = udpPacket.getData();
SipMessage msg = messageParser.parse(data);
if (!(msg instanceof SipRequest)) {
return;
}
SipRequest req = (SipRequest) msg;
String method = req.getMethod();
if ("INVITE".equalsIgnoreCase(method)) {
handleInvite(req, remote, socket);
return;
}
if ("ACK".equalsIgnoreCase(method)) {
handleAck(req);
return;
}
if ("BYE".equalsIgnoreCase(method)) {
handleBye(req, remote, socket);
return;
}
SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
send(socket, remote, resp);
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleInvite(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession exist = sessionManager.getByCallId(callId);
if (exist != null && exist.getLast200Ok() != null) {
sendRaw(socket, remote, exist.getLast200Ok());
return;
}
String toTag = "java" + System.nanoTime();
CallSession session = new CallSession();
session.setCallId(callId);
session.setFromTag(parseTag(req.getHeader("From")));
session.setToTag(toTag);
session.setTransport("UDP");
session.setRemoteSipIp(remote.getIp());
session.setRemoteSipPort(remote.getPort());
session.setCreatedTime(System.currentTimeMillis());
session.setUpdatedTime(System.currentTimeMillis());
session.setAckDeadline(System.currentTimeMillis() + 32000);
parseRemoteSdp(req, session);
rtpServerManager.allocateAndStart(session);
SipResponse trying = buildSimpleResponse(req, 100, "Trying", null);
send(socket, remote, trying);
SipResponse ok = buildInvite200Ok(req, session);
byte[] encoded = messageEncoder.encodeResponse(ok);
String raw200 = new String(encoded, StandardCharsets.US_ASCII);
session.setLast200Ok(raw200);
sessionManager.createOrUpdate(session);
sendBytes(socket, remote, encoded);
}
private void handleAck(SipRequest req) {
String callId = req.getHeader("Call-ID");
sessionManager.markAckReceived(callId);
}
private void handleBye(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession session = sessionManager.getByCallId(callId);
SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
send(socket, remote, resp);
if (session != null) {
rtpServerManager.stopAndRelease(session);
sessionManager.terminate(callId);
}
}
private void send(DatagramSocket socket, Node remote, SipResponse response) throws Exception {
byte[] bytes = messageEncoder.encodeResponse(response);
sendBytes(socket, remote, bytes);
}
private void sendRaw(DatagramSocket socket, Node remote, String text) throws Exception {
byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
sendBytes(socket, remote, bytes);
}
private void sendBytes(DatagramSocket socket, Node remote, byte[] bytes) throws Exception {
DatagramPacket packet = new DatagramPacket(bytes, bytes.length,
new InetSocketAddress(remote.getIp(), remote.getPort()));
socket.send(packet);
}
private SipResponse buildInvite200Ok(SipRequest req, CallSession session) {
SipResponse resp = new SipResponse();
resp.setStatusCode(200);
resp.setReasonPhrase("OK");
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + session.getToTag();
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
resp.addHeader("Content-Type", "application/sdp");
String sdp = "v=0\r\n" + "o=- 1 1 IN IP4 " + localIp + "\r\n" + "s=JavaSip\r\n" + "c=IN IP4 " + localIp + "\r\n"
+ "t=0 0\r\n" + "m=audio " + session.getLocalRtpPort() + " RTP/AVP 0\r\n" + "a=rtpmap:0 PCMU/8000\r\n"
+ "a=ptime:20\r\n" + "a=sendrecv\r\n";
resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
return resp;
}
private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
SipResponse resp = new SipResponse();
resp.setStatusCode(code);
resp.setReasonPhrase(reason);
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + toTag;
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.setBody(new byte[0]);
return resp;
}
private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
for (String v : req.getHeaders(headerName)) {
resp.addHeader(headerName, v);
}
}
private String parseTag(String headerValue) {
if (headerValue == null) {
return null;
}
String lower = headerValue.toLowerCase();
int idx = lower.indexOf("tag=");
if (idx < 0) {
return null;
}
String sub = headerValue.substring(idx + 4);
int semi = sub.indexOf(';');
if (semi >= 0) {
sub = sub.substring(0, semi);
}
return sub.trim();
}
private void parseRemoteSdp(SipRequest req, CallSession session) {
byte[] body = req.getBody();
if (body == null || body.length == 0) {
return;
}
String sdp = new String(body, StandardCharsets.US_ASCII);
String[] lines = sdp.split("\r\n");
for (String line : lines) {
if (line.startsWith("c=")) {
String[] parts = line.split(" ");
if (parts.length >= 3) {
session.setRemoteRtpIp(parts[2].trim());
}
} else if (line.startsWith("m=audio")) {
String[] parts = line.split(" ");
if (parts.length >= 2) {
try {
session.setRemoteRtpPort(Integer.parseInt(parts[1]));
} catch (Exception ignore) {
}
}
}
}
}
}
8.3 CallSessionManager
这个类负责统一管理 session。
第一版最小职责包括:
- 根据
Call-ID查找 session - 创建或更新 session
- 标记 ACK 已收到
- 删除 session
- 终止 session 时关闭 RTP
在当前版本里,它是连接:
- SIP 信令层
- SDP 协商结果
- RTP 端口实例
的关键中枢。
package com.litongjava.sip.server.session;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.litongjava.sip.model.CallSession;
public class CallSessionManager {
private final Map<String, CallSession> sessions = new ConcurrentHashMap<>();
public CallSession getByCallId(String callId) {
if (callId == null) {
return null;
}
return sessions.get(callId);
}
public CallSession createOrUpdate(CallSession session) {
if (session == null || session.getCallId() == null) {
throw new IllegalArgumentException("call session or callId is null");
}
session.setUpdatedTime(System.currentTimeMillis());
sessions.put(session.getCallId(), session);
return session;
}
public void markAckReceived(String callId) {
CallSession session = sessions.get(callId);
if (session != null) {
session.setAckReceived(true);
session.setUpdatedTime(System.currentTimeMillis());
}
}
public void terminate(String callId) {
CallSession session = sessions.remove(callId);
if (session != null) {
session.setTerminated(true);
session.setUpdatedTime(System.currentTimeMillis());
if (session.getRtpServer() != null) {
session.getRtpServer().stop();
}
}
}
public void remove(String callId) {
sessions.remove(callId);
}
public Map<String, CallSession> snapshot() {
return Map.copyOf(sessions);
}
}
8.4 启动服务
import java.io.IOException;
import java.net.SocketException;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.handler.SipInviteOnlyTcpHandler;
import com.litongjava.sip.server.handler.SipInviteOnlyUdpHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SipServerConfig {
public void config() {
String localIp = "192.168.3.219";
CallSessionManager sessionManager = new CallSessionManager();
RtpServerManager rtpServerManager = new RtpServerManager(localIp);
SipInviteOnlyTcpHandler tcpHandler = new SipInviteOnlyTcpHandler(localIp, sessionManager, rtpServerManager);
ServerTioConfig serverTioConfig = new ServerTioConfig("sip-server");
serverTioConfig.setServerAioHandler(tcpHandler);
serverTioConfig.setHeartbeatTimeout(-1L);
TioServer tioServer = new TioServer(serverTioConfig);
int port = 5060;
try {
tioServer.start(null, port);
log.info("独立 TCP 服务器已成功启动,监听端口: {}", port);
} catch (IOException e) {
log.error("启动 TCP 服务器失败", e);
}
SipInviteOnlyUdpHandler udpHandler = new SipInviteOnlyUdpHandler(localIp, sessionManager, rtpServerManager);
UdpServerConf udpServerConf = new UdpServerConf(5060, udpHandler, 5000);
try {
UdpServer udpServer = new UdpServer(udpServerConf);
udpServer.start();
log.info("UDP 服务器已成功启动,监听端口: {}", port);
} catch (SocketException e) {
log.error("启动 UDP 服务器失败", e);
}
}
}
9. SIP Echo 的完整处理流程
下面按真实时序说明整个系统如何工作。
9.1 服务端启动
服务端启动时需要做两件事:
启动 TCP 5060
用于接收 SIP over TCP 请求。
启动 UDP 5060
用于接收 SIP over UDP 请求。
这两个入口可以共享:
- 同一个
CallSessionManager - 同一个
RtpServerManager
这样无论信令从 TCP 来还是 UDP 来,底层会话和媒体资源都由同一套组件管理。
9.2 客户端发送 INVITE
客户端发出一个 INVITE。
如果带 SDP,通常会包含:
- 对端的 RTP 地址
- 对端的 RTP 端口
- 对端支持的 codec,例如 PCMU
这条 INVITE 到达服务端后:
- TCP 入口先解帧再解析
- UDP 入口直接解析
9.3 服务端处理 INVITE
处理 INVITE 时,服务端会做这些事情:
读取关键信息
Call-IDFromToViaCSeq
检查是否重复 INVITE
如果这个 Call-ID 已经存在,并且已经保存过 200 OK,可以直接重发。
解析对端 SDP
提取:
- 对端 RTP IP
- 对端 RTP 端口
创建 CallSession
把这次呼叫的会话信息存下来。
分配 RTP 端口并启动 RTP 服务
例如本地分到 31234。
9.4 服务端返回响应
UDP 场景
可以先回一个:
100 Trying
然后再回:
200 OK
TCP 场景
通常直接回:
200 OK
200 OK 的消息体里带着 SDP answer,最关键的是:
- 本地 RTP IP
- 本地 RTP 端口
- 选用的音频编码
例如:
c=IN IP4 192.168.3.219m=audio 31234 RTP/AVP 0a=rtpmap:0 PCMU/8000
这意味着以后对端要把语音发到 192.168.3.219:31234。
9.5 客户端发送 ACK
客户端收到 200 OK 后,会发送 ACK。
服务端收到 ACK 后,通常只做状态更新:
- 记录该 session 已收到 ACK
第一版里,这一步不需要额外响应。
9.6 客户端发送 RTP
客户端开始向 SDP answer 中声明的端口发送 RTP 包。
例如:
- 目标 IP:
192.168.3.219 - 目标端口:
31234
此时 RtpUdpServer 已经在这个端口监听。
9.7 服务端做 RTP Echo
RtpEchoUdpHandler 收到 RTP 包后:
- 读取
udpPacket.getData() - 读取 remote IP 和 remote port
- 构造一个响应 datagram
- 把原始字节原样发回
因为终端收到的是自己刚刚发出的语音 RTP,所以会听到回声。
9.8 客户端发送 BYE
当对端挂断时,会发送 BYE。
服务端收到后要做这些事:
- 根据
Call-ID查到 session - 回复
200 OK - 停止该 session 的 RTP server
- 回收 RTP 端口
- 从
CallSessionManager移除会话
到这里,一次完整会话结束。
10. SDP 在这个项目里扮演的角色
虽然第一版还不是“完整协商”,但 SDP 已经发挥了非常关键的作用。
10.1 从 INVITE 中读取 offer
服务端从对端提供的 SDP 里读出:
- 对端媒体地址
- 对端媒体端口
- 对端支持的音频格式
例如:
c=IN IP4 192.168.3.10
m=audio 40002 RTP/AVP 0 8 101
a=rtpmap:0 PCMU/8000
这里至少能知道:
- 对端用
40002接收 RTP - 对端支持
PT 0 = PCMU
10.2 在 200 OK 中返回 answer
服务端在 200 OK 里写回:
- 自己的媒体地址
- 自己的媒体端口
- 自己当前选择的 codec
第一版最小策略是:
- 固定返回
PCMU/8000
这使得双方至少能在一个最基本的音频格式上达成一致。
11. 为什么第一版先做“原包回显”
很多人在这里容易直接想跳到:
- RTP 解码
- PCM
- 重采样
- ASR
- TTS
- 大模型
但工程上更稳的顺序是先做“回显”。
原因很简单:
11.1 回显能先验证链路正确
只要能听到回声,就说明:
- SIP 建会话没问题
- SDP 协商端口没问题
- 对端 RTP 能打到服务端
- 服务端回 RTP 能打回对端
- 终端能正常播放回来的包
这一步不成立,后面的语音识别和合成都没有意义。
11.2 回显把“网络问题”和“媒体处理问题”拆开了
如果一开始就做解码重编码,出问题时很难判断是:
- SIP 没协商好
- RTP 没打通
- 编解码错了
- 时间戳错了
- sequence 错了
而原包 echo 只关注“网络路径通不通”。
12. 使用 tio-core 时的设计经验
这部分对读者很重要。
12.1 不要把所有逻辑都塞进 handler
在 tio-core 里,很多初学者会把:
- decode
- parse
- SIP 业务
- 字符串拼响应
- session 管理
全写在一个 handler 里。
短期能跑,但很快会失控。
当前这个项目更推荐的方式是:
SipTcpFrameDecoder负责 TCP 解帧SipMessageParser负责协议解析SipMessageEncoder负责协议编码CallSessionManager负责会话RtpServerManager负责媒体端口资源SipInviteOnlyTcpHandler/SipInviteOnlyUdpHandler只做适配和路由
这样结构会稳很多。
12.2 TCP 和 UDP 的处理思维必须分开
虽然都是 SIP 5060,但两者问题不一样。
TCP 关注点
- 流式切包
- 粘包拆包
- Content-Length
UDP 关注点
- 请求重传
- 幂等处理
- 重发 200 OK
如果混着想,很容易把协议处理写乱。
12.3 先做最小可运行链路,再做标准化增强
第一版不需要一口气实现:
- 完整事务状态机
- 完整 SDP 协商
- 完整 RTP 重组
- RTCP
- 认证
- 注册
- 路由
先把最小主线跑通,是更合理的工程顺序。
13. 如何验证这个 SIP Echo 服务
你项目里已经有两个 client:
SipTcpClientRtpUdpEchoClient
这正好对应两类验证方式。
13.1 先验证 SIP TCP 是否正常
用 SipTcpClient 发一个最小 INVITE。
预期结果:
- 服务端返回
200 OK - 响应里带 SDP
- 能看到
m=audio 某个端口 RTP/AVP 0
这说明:
- TCP 解帧正常
- SIP 解析正常
- 会话创建正常
- RTP 端口分配正常
- 200 OK 编码正常
13.2 再验证 RTP Echo 是否正常
把 RtpUdpEchoClient 指向服务端分配出的 RTP 端口。
预期结果:
- 发送一个伪 RTP 包
- 收到服务端原样回包
- 长度一致
- 内容一致
这说明:
- RTP UDP server 已经成功监听
- 媒体端口是通的
- echo handler 生效了
13.3 最终用软电话验证真实回声
当你把整个 SIP + RTP 都接起来后,可以直接用软电话拨打服务端。
预期结果:
- 呼叫建立
- 听到自己回声
- 挂断正常
这一步是最有说服力的系统级验证。
14. 第一版已经解决了什么问题
到当前结构为止,已经解决了几个非常关键的工程问题:
14.1 TCP SIP 解包问题
不再用“读多少算一个 SIP 包”,而是按:
- header 结束符
- Content-Length
切完整报文。
14.2 SIP 解析与 handler 解耦
handler 不再直接手工 split 大段字符串去做所有事情。 parser 和 encoder 已经独立出来。
14.3 会话与媒体绑定
引入了 CallSession 和 CallSessionManager,让:
- SIP INVITE
- SDP 信息
- RTP 端口实例
关联起来。
14.4 媒体端口动态管理
通过 RtpPortAllocator + RtpServerManager,每个呼叫都能独立拿到一个 RTP 端口并在结束后释放。
15. 第一版还没有做的事
这是给后续演进用的。
15.1 Session 生命周期还可增强
当前最小版已经能建和删,但后面最好补:
- ACK 超时回收
- 异常断开回收
- 长时间无 RTP 回收
15.2 SDP 还不是真正完整协商
当前基本策略是:
- 读取 offer
- 回固定 PCMU
下一步应该变成:
- 解析对端 offer 的 codec 列表
- 从双方都支持的 codec 里选一个
- 构造真正的 answer
15.3 RTP 还是原包回显
下一步应该做成:
- 解析 RTP 头
- 提取 payload
- PCMU 解码成 PCM16
- 送入媒体处理链
- 再编码并重组 RTP
16. 如何继续升级成语音机器人服务
当 SIP Echo 跑通以后,后续演进路径通常是这样的:
第一步:把 RTP 从原包回显升级成音频帧处理
也就是:
- 收 RTP
- 解码 payload
- 得到 PCM 音频帧
第二步:把 PCM 送到 ASR 或流式大模型
例如:
- 语音识别
- 实时对话模型
- 流式理解
第三步:把模型输出结果做 TTS
生成一段回复音频。
第四步:重新编码并封装 RTP 发回终端
这时服务端就不再是 echo,而是“真正能说话的 SIP 语音机器人”。
17. 本文总结
基于 tio-core 实现 SIP Echo 服务,最关键的不是代码量,而是把职责分清楚:
- TCP/UDP handler:对接 tio-core
- frame decoder:解决 TCP 边界问题
- SIP parser/encoder:解决协议对象化问题
- session manager:把信令和媒体资源串起来
- RTP server manager:动态管理媒体端口
- RTP echo handler:验证整条语音链路
在这套结构下,第一版系统已经实现了完整闭环:
- SIP 建会话
- SDP 告知媒体端口
- RTP 承载语音
- 服务端回显 RTP
- 对端听到回声
这说明你已经用 tio-core 成功搭出了一个最小可运行的 SIP 媒体服务骨架。
后面无论是继续增强 SIP/SDP 的标准化能力,还是把 RTP 换成真正的语音处理流水线,都是在这个基础上向前演进。
