SIP Server 第二版实战
可以这么描述。
背景
当前项目基于 tio-core 自研了一个轻量级 SIP Server,用来承接语音呼叫接入,并为后续语音机器人、ASR/TTS、LLM 实时对话打基础。
第一版已经完成了最小链路验证,核心能力包括:
- 监听
SIP TCP 5060和SIP UDP 5060 - 收到
INVITE后分配动态RTP UDP端口 - 返回
200 OK + SDP - 接收对端 RTP 音频
- 通过 RTP 回声方式完成通话闭环验证
也就是说,第一版已经证明了两件事:
tio-core可以承载 SIP 信令收发- Java 侧可以完成动态 RTP 端口分配和媒体链路建立
但第一版的实现更偏“最小可跑通”,还存在几个明显不足:
- SIP 报文解析偏粗糙,TCP 粘包拆包能力不够扎实
INVITE的 SDP 应答还是固定写死,不是真正的 offer/answer 协商- 会话管理较弱,缺少 ACK 超时、异常中断、资源释放等生命周期控制
- RTP 处理仍是“原包回显”,还不是后续可接 AI 音频处理链的媒体架构
因此,需要推进第二版,把系统从“能验证链路的 demo”升级成“具备持续演进能力的 SIP/RTP 服务骨架”。
第二版实现目标
SIP Server 第二版的目标,不再只是“拨通后能听到回声”,而是把协议层、会话层、媒体层的基础能力补扎实,为后续真正接入语音机器人场景做准备。
具体目标有四个方向。
1. 把 SIP 解析和 TCP 解码做扎实
第二版需要把 SIP 处理从字符串拼接和简单 split,升级为更规范的协议解析能力,包括:
- 基于
\r\n\r\n + Content-Length的 TCP 流式切包 - 正确处理 SIP 请求行、响应行、Header、Body
- 支持
Content-Length与 compact header 的解析 - 为后续多方法、更多 SIP 头、事务扩展打基础
目标是让 SIP Server 在 TCP/UDP 两种传输下都具备稳定的信令解析能力,而不只是满足当前测试报文。
2. 建立真正的会话生命周期管理
第二版需要从“收到包就回”升级为“有状态的呼叫会话管理”,至少要覆盖:
- 按
Call-ID维护呼叫会话 - INVITE 建会话
- ACK 标记已建立
- BYE 结束会话
- 已发 200 OK 但长时间未收到 ACK 时自动回收
- 异常结束时释放 RTP 资源和端口
目标是避免会话泄漏、RTP 端口泄漏、长时间挂死等问题,使系统具备连续拨测和长期运行能力。
3. 把 SDP 从固定回包升级为真正协商
第二版需要真正解析对端 INVITE 里的 SDP offer,并根据本端支持能力生成合法 answer,而不是固定返回单一模板。
主要包括:
- 解析远端 RTP IP、RTP 端口
- 解析
m=audio中的 payload type 列表 - 识别
PCMU/8000、PCMA/8000 - 识别
telephone-event - 根据双方共同支持的 codec 选择最终音频编解码
- 若没有共同 codec,则返回
488 Not Acceptable Here
目标是让 SIP Server 从“写死一个能跑通的 SDP”变成“真正具备 SIP/SDP 协商能力的媒体接入端”。
4. 为后续 RTP 媒体处理链升级打基础
虽然第二版媒体层还没有完全从“原包回显”升级到“解码-处理-重组包”,但会先把上层会话和协商信息补齐,包括:
- 记录远端 RTP 地址和端口
- 记录本次通话选中的 codec
- 记录 ptime、telephone-event 等能力
- 将这些信息挂到
CallSession中
这样第三版就可以进一步实现:
- RTP header 解析
- PCMU/PCMA 解码
- PCM 音频帧处理
- 再编码和 RTP 重组
- 接入 ASR / TTS / Gemini 等音频处理流程
目标是让当前 SIP Server 不只是一个“回声 demo”,而是未来 AI 语音交互服务的信令接入层和媒体入口。
第二版的落地结果
经过第二版改造,系统达到的阶段性结果可以概括为:
- 信令层:SIP TCP/UDP 接入结构更清晰,解析与编码职责分离
- 协议层:支持更规范的 SIP 报文解析和 SDP offer/answer 协商
- 会话层:具备基本的呼叫状态维护、ACK 超时回收、BYE 资源释放能力
- 媒体层:为后续 RTP 解码与 AI 音频处理保留了清晰扩展点
一、先补 CallSession 字段
第二阶段要做 SDP 协商,所以你先给 CallSession 增加这些字段。
package com.litongjava.sip.model;
import com.litongjava.sip.rtp.RtpUdpServer;
import com.litongjava.sip.sdp.CodecSpec;
public class CallSession {
private String callId;
private String fromTag;
private String toTag;
private String transport; // TCP / UDP
private String remoteSipIp;
private int remoteSipPort;
private String remoteRtpIp;
private int remoteRtpPort;
private int localRtpPort;
private long createdTime;
private long updatedTime;
private long ackDeadline;
private boolean ackReceived;
private boolean terminated;
private String last200Ok;
private RtpUdpServer rtpServer;
// 第二阶段新增
private CodecSpec selectedCodec;
private boolean telephoneEventSupported;
private int remoteTelephoneEventPayloadType = -1;
private int ptime = 20;
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getFromTag() {
return fromTag;
}
public void setFromTag(String fromTag) {
this.fromTag = fromTag;
}
public String getToTag() {
return toTag;
}
public void setToTag(String toTag) {
this.toTag = toTag;
}
public String getTransport() {
return transport;
}
public void setTransport(String transport) {
this.transport = transport;
}
public String getRemoteSipIp() {
return remoteSipIp;
}
public void setRemoteSipIp(String remoteSipIp) {
this.remoteSipIp = remoteSipIp;
}
public int getRemoteSipPort() {
return remoteSipPort;
}
public void setRemoteSipPort(int remoteSipPort) {
this.remoteSipPort = remoteSipPort;
}
public String getRemoteRtpIp() {
return remoteRtpIp;
}
public void setRemoteRtpIp(String remoteRtpIp) {
this.remoteRtpIp = remoteRtpIp;
}
public int getRemoteRtpPort() {
return remoteRtpPort;
}
public void setRemoteRtpPort(int remoteRtpPort) {
this.remoteRtpPort = remoteRtpPort;
}
public int getLocalRtpPort() {
return localRtpPort;
}
public void setLocalRtpPort(int localRtpPort) {
this.localRtpPort = localRtpPort;
}
public long getCreatedTime() {
return createdTime;
}
public void setCreatedTime(long createdTime) {
this.createdTime = createdTime;
}
public long getUpdatedTime() {
return updatedTime;
}
public void setUpdatedTime(long updatedTime) {
this.updatedTime = updatedTime;
}
public long getAckDeadline() {
return ackDeadline;
}
public void setAckDeadline(long ackDeadline) {
this.ackDeadline = ackDeadline;
}
public boolean isAckReceived() {
return ackReceived;
}
public void setAckReceived(boolean ackReceived) {
this.ackReceived = ackReceived;
}
public boolean isTerminated() {
return terminated;
}
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
public String getLast200Ok() {
return last200Ok;
}
public void setLast200Ok(String last200Ok) {
this.last200Ok = last200Ok;
}
public RtpUdpServer getRtpServer() {
return rtpServer;
}
public void setRtpServer(RtpUdpServer rtpServer) {
this.rtpServer = rtpServer;
}
public CodecSpec getSelectedCodec() {
return selectedCodec;
}
public void setSelectedCodec(CodecSpec selectedCodec) {
this.selectedCodec = selectedCodec;
}
public boolean isTelephoneEventSupported() {
return telephoneEventSupported;
}
public void setTelephoneEventSupported(boolean telephoneEventSupported) {
this.telephoneEventSupported = telephoneEventSupported;
}
public int getRemoteTelephoneEventPayloadType() {
return remoteTelephoneEventPayloadType;
}
public void setRemoteTelephoneEventPayloadType(int remoteTelephoneEventPayloadType) {
this.remoteTelephoneEventPayloadType = remoteTelephoneEventPayloadType;
}
public int getPtime() {
return ptime;
}
public void setPtime(int ptime) {
this.ptime = ptime;
}
}
二、CodecSpec
这个类表示一个 codec。
package com.litongjava.sip.sdp;
public class CodecSpec {
private final int payloadType;
private final String codecName;
private final int clockRate;
public CodecSpec(int payloadType, String codecName, int clockRate) {
this.payloadType = payloadType;
this.codecName = codecName;
this.clockRate = clockRate;
}
public int getPayloadType() {
return payloadType;
}
public String getCodecName() {
return codecName;
}
public int getClockRate() {
return clockRate;
}
public boolean isSameCodec(String codecName, int clockRate) {
if (codecName == null) {
return false;
}
return this.codecName.equalsIgnoreCase(codecName) && this.clockRate == clockRate;
}
public boolean isStaticPcmu() {
return payloadType == 0 && "PCMU".equalsIgnoreCase(codecName) && clockRate == 8000;
}
public boolean isStaticPcma() {
return payloadType == 8 && "PCMA".equalsIgnoreCase(codecName) && clockRate == 8000;
}
@Override
public String toString() {
return "CodecSpec{" +
"payloadType=" + payloadType +
", codecName='" + codecName + '\'' +
", clockRate=" + clockRate +
'}';
}
}
三、SdpNegotiationResult
协商结果。
package com.litongjava.sip.sdp;
public class SdpNegotiationResult {
private boolean success;
private String failureReason;
private String remoteRtpIp;
private int remoteRtpPort;
private CodecSpec selectedCodec;
private boolean telephoneEventSupported;
private int remoteTelephoneEventPayloadType = -1;
private int ptime = 20;
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getFailureReason() {
return failureReason;
}
public void setFailureReason(String failureReason) {
this.failureReason = failureReason;
}
public String getRemoteRtpIp() {
return remoteRtpIp;
}
public void setRemoteRtpIp(String remoteRtpIp) {
this.remoteRtpIp = remoteRtpIp;
}
public int getRemoteRtpPort() {
return remoteRtpPort;
}
public void setRemoteRtpPort(int remoteRtpPort) {
this.remoteRtpPort = remoteRtpPort;
}
public CodecSpec getSelectedCodec() {
return selectedCodec;
}
public void setSelectedCodec(CodecSpec selectedCodec) {
this.selectedCodec = selectedCodec;
}
public boolean isTelephoneEventSupported() {
return telephoneEventSupported;
}
public void setTelephoneEventSupported(boolean telephoneEventSupported) {
this.telephoneEventSupported = telephoneEventSupported;
}
public int getRemoteTelephoneEventPayloadType() {
return remoteTelephoneEventPayloadType;
}
public void setRemoteTelephoneEventPayloadType(int remoteTelephoneEventPayloadType) {
this.remoteTelephoneEventPayloadType = remoteTelephoneEventPayloadType;
}
public int getPtime() {
return ptime;
}
public void setPtime(int ptime) {
this.ptime = ptime;
}
public static SdpNegotiationResult fail(String reason) {
SdpNegotiationResult r = new SdpNegotiationResult();
r.setSuccess(false);
r.setFailureReason(reason);
return r;
}
public static SdpNegotiationResult ok() {
SdpNegotiationResult r = new SdpNegotiationResult();
r.setSuccess(true);
return r;
}
}
四、SdpParser
这个类做两件事:
- 解析 INVITE 里的 SDP offer
- 选出双方都支持的 codec
先支持:
PCMU/8000PCMA/8000telephone-event/8000ptime- 只处理
audio
package com.litongjava.sip.sdp;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SdpParser {
private final List<CodecSpec> localSupportedCodecs;
public SdpParser() {
this.localSupportedCodecs = defaultSupportedCodecs();
}
public SdpParser(List<CodecSpec> localSupportedCodecs) {
this.localSupportedCodecs = localSupportedCodecs;
}
public SdpNegotiationResult negotiate(byte[] sdpBytes) {
if (sdpBytes == null || sdpBytes.length == 0) {
return SdpNegotiationResult.fail("missing sdp offer");
}
String sdp = new String(sdpBytes, StandardCharsets.US_ASCII);
String[] lines = sdp.split("\r\n");
String sessionConnectionIp = null;
String mediaConnectionIp = null;
int remoteAudioPort = 0;
int ptime = 20;
List<Integer> offeredPayloadTypes = new ArrayList<>();
Map<Integer, CodecSpec> offeredCodecMap = new HashMap<>();
boolean inAudioMedia = false;
boolean telephoneEventSupported = false;
int telephoneEventPt = -1;
for (String line : lines) {
if (line == null || line.isEmpty()) {
continue;
}
if (line.startsWith("c=")) {
String[] parts = line.split(" ");
if (parts.length >= 3) {
String ip = parts[2].trim();
if (inAudioMedia) {
mediaConnectionIp = ip;
} else {
sessionConnectionIp = ip;
}
}
continue;
}
if (line.startsWith("m=")) {
inAudioMedia = false;
String[] parts = line.split(" ");
if (parts.length >= 4 && parts[0].startsWith("m=audio")) {
inAudioMedia = true;
try {
remoteAudioPort = Integer.parseInt(parts[1].trim());
} catch (Exception e) {
return SdpNegotiationResult.fail("invalid remote audio port");
}
for (int i = 3; i < parts.length; i++) {
try {
offeredPayloadTypes.add(Integer.parseInt(parts[i].trim()));
} catch (Exception ignore) {
}
}
}
continue;
}
if (!inAudioMedia) {
continue;
}
if (line.startsWith("a=rtpmap:")) {
// a=rtpmap:0 PCMU/8000
// a=rtpmap:101 telephone-event/8000
try {
int colon = line.indexOf(':');
int space = line.indexOf(' ');
if (colon < 0 || space < 0 || space <= colon) {
continue;
}
int pt = Integer.parseInt(line.substring(colon + 1, space).trim());
String[] enc = line.substring(space + 1).trim().split("/");
if (enc.length < 2) {
continue;
}
String codecName = enc[0].trim();
int clockRate = Integer.parseInt(enc[1].trim());
CodecSpec spec = new CodecSpec(pt, codecName, clockRate);
offeredCodecMap.put(pt, spec);
if ("telephone-event".equalsIgnoreCase(codecName) && clockRate == 8000) {
telephoneEventSupported = true;
telephoneEventPt = pt;
}
} catch (Exception ignore) {
}
continue;
}
if (line.startsWith("a=ptime:")) {
try {
ptime = Integer.parseInt(line.substring("a=ptime:".length()).trim());
} catch (Exception ignore) {
}
}
}
if (remoteAudioPort <= 0) {
return SdpNegotiationResult.fail("missing audio media");
}
String remoteIp = mediaConnectionIp != null ? mediaConnectionIp : sessionConnectionIp;
if (remoteIp == null || remoteIp.isEmpty()) {
return SdpNegotiationResult.fail("missing connection address");
}
CodecSpec selected = chooseCodec(offeredPayloadTypes, offeredCodecMap);
if (selected == null) {
return SdpNegotiationResult.fail("no supported audio codec");
}
SdpNegotiationResult result = SdpNegotiationResult.ok();
result.setRemoteRtpIp(remoteIp);
result.setRemoteRtpPort(remoteAudioPort);
result.setSelectedCodec(selected);
result.setTelephoneEventSupported(telephoneEventSupported);
result.setRemoteTelephoneEventPayloadType(telephoneEventPt);
result.setPtime(ptime);
return result;
}
private CodecSpec chooseCodec(List<Integer> offeredPayloadTypes, Map<Integer, CodecSpec> offeredCodecMap) {
// 优先顺序按本地支持列表
for (CodecSpec local : localSupportedCodecs) {
for (Integer pt : offeredPayloadTypes) {
CodecSpec offered = offeredCodecMap.get(pt);
if (offered != null) {
if (local.isSameCodec(offered.getCodecName(), offered.getClockRate())) {
return new CodecSpec(pt, offered.getCodecName(), offered.getClockRate());
}
} else {
// 静态 payload type 可能没有 rtpmap,也要能识别
if (pt == 0 && local.isStaticPcmu()) {
return new CodecSpec(0, "PCMU", 8000);
}
if (pt == 8 && local.isStaticPcma()) {
return new CodecSpec(8, "PCMA", 8000);
}
}
}
}
return null;
}
public static List<CodecSpec> defaultSupportedCodecs() {
List<CodecSpec> codecs = new ArrayList<>();
codecs.add(new CodecSpec(0, "PCMU", 8000));
codecs.add(new CodecSpec(8, "PCMA", 8000));
return codecs;
}
}
五、SdpAnswerBuilder
这个类按协商结果生成 200 OK 里的 SDP answer。
package com.litongjava.sip.sdp;
public class SdpAnswerBuilder {
public String buildAnswer(String localIp, int localRtpPort, SdpNegotiationResult result) {
CodecSpec codec = result.getSelectedCodec();
int ptime = result.getPtime() > 0 ? result.getPtime() : 20;
StringBuilder sb = new StringBuilder();
sb.append("v=0\r\n");
sb.append("o=- 1 1 IN IP4 ").append(localIp).append("\r\n");
sb.append("s=JavaSip\r\n");
sb.append("c=IN IP4 ").append(localIp).append("\r\n");
sb.append("t=0 0\r\n");
sb.append("m=audio ")
.append(localRtpPort)
.append(" RTP/AVP ")
.append(codec.getPayloadType());
if (result.isTelephoneEventSupported()) {
sb.append(" 101");
}
sb.append("\r\n");
sb.append("a=rtpmap:")
.append(codec.getPayloadType())
.append(" ")
.append(codec.getCodecName())
.append("/")
.append(codec.getClockRate())
.append("\r\n");
if (result.isTelephoneEventSupported()) {
sb.append("a=rtpmap:101 telephone-event/8000\r\n");
sb.append("a=fmtp:101 0-15\r\n");
}
sb.append("a=ptime:").append(ptime).append("\r\n");
sb.append("a=sendrecv\r\n");
return sb.toString();
}
}
说明一下: 这里 answer 里把 telephone-event 固定写成 101,这是常见做法。第一阶段够用。后面如果你想更严谨,可以把本地 DTMF PT 也做成配置。
六、CallSessionReaper
这个类负责定时清理:
- 已发 200 但 ACK 超时
- 已 terminate 的会话
- 可选地清理长时间空闲会话
先做最小版本。
package com.litongjava.sip.server.session;
import java.util.Map;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.RtpServerManager;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CallSessionReaper implements Runnable {
private final CallSessionManager sessionManager;
private final RtpServerManager rtpServerManager;
// 已发 200 OK 但未 ACK,超时回收
private final long ackTimeoutMs;
// 已 terminated 的会话多留一小会儿再删
private final long terminatedRetentionMs;
public CallSessionReaper(CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
this(sessionManager, rtpServerManager, 32000L, 5000L);
}
public CallSessionReaper(CallSessionManager sessionManager, RtpServerManager rtpServerManager,
long ackTimeoutMs, long terminatedRetentionMs) {
this.sessionManager = sessionManager;
this.rtpServerManager = rtpServerManager;
this.ackTimeoutMs = ackTimeoutMs;
this.terminatedRetentionMs = terminatedRetentionMs;
}
@Override
public void run() {
long now = System.currentTimeMillis();
Map<String, CallSession> snapshot = sessionManager.snapshot();
for (Map.Entry<String, CallSession> entry : snapshot.entrySet()) {
String callId = entry.getKey();
CallSession session = entry.getValue();
if (session == null) {
continue;
}
try {
reapAckTimeout(callId, session, now);
reapTerminated(callId, session, now);
} catch (Exception e) {
log.error("reap session error, callId={}", callId, e);
}
}
}
private void reapAckTimeout(String callId, CallSession session, long now) {
if (session.isTerminated()) {
return;
}
if (session.isAckReceived()) {
return;
}
long deadline = session.getAckDeadline();
if (deadline <= 0) {
return;
}
// 支持沿用传入 session 的 deadline,也支持兜底 createdTime + ackTimeoutMs
long actualDeadline = deadline > 0 ? deadline : session.getCreatedTime() + ackTimeoutMs;
if (now < actualDeadline) {
return;
}
log.info("ACK timeout, release callId={}, localRtpPort={}", callId, session.getLocalRtpPort());
rtpServerManager.stopAndRelease(session);
session.setTerminated(true);
session.setUpdatedTime(now);
}
private void reapTerminated(String callId, CallSession session, long now) {
if (!session.isTerminated()) {
return;
}
long base = session.getUpdatedTime() > 0 ? session.getUpdatedTime() : session.getCreatedTime();
if (now - base < terminatedRetentionMs) {
return;
}
sessionManager.remove(callId);
log.info("session removed, callId={}", callId);
}
}
七、CallSessionManager 小补丁
为了配合 CallSessionReaper,你可以把第一阶段的 terminate() 调整一下,避免重复 stop 引发混乱。 建议改成下面这样。
package com.litongjava.sip.server.session;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.litongjava.sip.model.CallSession;
public class CallSessionManager {
private final Map<String, CallSession> sessions = new ConcurrentHashMap<>();
public CallSession getByCallId(String callId) {
if (callId == null) {
return null;
}
return sessions.get(callId);
}
public CallSession createOrUpdate(CallSession session) {
if (session == null || session.getCallId() == null) {
throw new IllegalArgumentException("call session or callId is null");
}
session.setUpdatedTime(System.currentTimeMillis());
sessions.put(session.getCallId(), session);
return session;
}
public void markAckReceived(String callId) {
CallSession session = sessions.get(callId);
if (session != null) {
session.setAckReceived(true);
session.setUpdatedTime(System.currentTimeMillis());
}
}
public void markTerminated(String callId) {
CallSession session = sessions.get(callId);
if (session != null) {
session.setTerminated(true);
session.setUpdatedTime(System.currentTimeMillis());
}
}
public void remove(String callId) {
sessions.remove(callId);
}
public Map<String, CallSession> snapshot() {
return Map.copyOf(sessions);
}
}
八、升级 SipInviteOnlyTcpHandler
关键变化:
- 新增
SdpParser - 新增
SdpAnswerBuilder - INVITE 先解析 offer
- 不支持 codec 时返回
488 Not Acceptable Here - 成功协商后把 codec、RTP 地址、DTMF 能力写到
CallSession
下面是完整 TCP 版。
package com.litongjava.sip.server.handler;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import com.litongjava.aio.ByteBufferPacket;
import com.litongjava.aio.Packet;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.parser.SipTcpFrameDecoder;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.sdp.SdpAnswerBuilder;
import com.litongjava.sip.sdp.SdpNegotiationResult;
import com.litongjava.sip.sdp.SdpParser;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.server.intf.ServerAioHandler;
public class SipInviteOnlyTcpHandler implements ServerAioHandler {
private final String localIp;
private final SipTcpFrameDecoder frameDecoder = new SipTcpFrameDecoder();
private final SipMessageParser messageParser = new SipMessageParser();
private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
private final CallSessionManager sessionManager;
private final RtpServerManager rtpServerManager;
private final SdpParser sdpParser = new SdpParser();
private final SdpAnswerBuilder sdpAnswerBuilder = new SdpAnswerBuilder();
public SipInviteOnlyTcpHandler(String localIp) {
this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
}
public SipInviteOnlyTcpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
this.localIp = localIp;
this.sessionManager = sessionManager;
this.rtpServerManager = rtpServerManager;
}
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext ctx)
throws Exception {
byte[] frame = frameDecoder.decode(buffer, readableLength, ctx);
if (frame == null) {
return null;
}
return new ByteBufferPacket(ByteBuffer.wrap(frame));
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext ctx) {
ByteBufferPacket p = (ByteBufferPacket) packet;
ByteBuffer bb = p.getByteBuffer();
if (bb.position() != 0) {
bb.rewind();
}
return bb;
}
@Override
public void handler(Packet packet, ChannelContext ctx) throws Exception {
ByteBufferPacket p = (ByteBufferPacket) packet;
ByteBuffer bb = p.getByteBuffer();
byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
SipMessage msg = messageParser.parse(bytes);
if (!(msg instanceof SipRequest)) {
return;
}
SipRequest req = (SipRequest) msg;
String method = req.getMethod();
if ("INVITE".equalsIgnoreCase(method)) {
handleInvite(req, ctx);
return;
}
if ("ACK".equalsIgnoreCase(method)) {
handleAck(req);
return;
}
if ("BYE".equalsIgnoreCase(method)) {
handleBye(req, ctx);
return;
}
SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
send(ctx, resp);
}
private void handleInvite(SipRequest req, ChannelContext ctx) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession exist = sessionManager.getByCallId(callId);
if (exist != null && exist.getLast200Ok() != null) {
sendRaw(ctx, exist.getLast200Ok());
return;
}
SdpNegotiationResult negotiation = sdpParser.negotiate(req.getBody());
if (!negotiation.isSuccess()) {
SipResponse fail = buildSimpleResponse(req, 488, "Not Acceptable Here", null);
send(ctx, fail);
return;
}
String remoteIp = ctx.getClientNode() != null ? ctx.getClientNode().getIp() : null;
int remotePort = ctx.getClientNode() != null ? ctx.getClientNode().getPort() : 0;
String toTag = "java" + System.nanoTime();
CallSession session = new CallSession();
session.setCallId(callId);
session.setFromTag(parseTag(req.getHeader("From")));
session.setToTag(toTag);
session.setTransport("TCP");
session.setRemoteSipIp(remoteIp);
session.setRemoteSipPort(remotePort);
session.setCreatedTime(System.currentTimeMillis());
session.setUpdatedTime(System.currentTimeMillis());
session.setAckDeadline(System.currentTimeMillis() + 32000);
session.setRemoteRtpIp(negotiation.getRemoteRtpIp());
session.setRemoteRtpPort(negotiation.getRemoteRtpPort());
session.setSelectedCodec(negotiation.getSelectedCodec());
session.setTelephoneEventSupported(negotiation.isTelephoneEventSupported());
session.setRemoteTelephoneEventPayloadType(negotiation.getRemoteTelephoneEventPayloadType());
session.setPtime(negotiation.getPtime());
rtpServerManager.allocateAndStart(session);
SipResponse resp = buildInvite200Ok(req, session, negotiation);
byte[] encoded = messageEncoder.encodeResponse(resp);
String raw200 = new String(encoded, StandardCharsets.US_ASCII);
session.setLast200Ok(raw200);
sessionManager.createOrUpdate(session);
Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(encoded)));
}
private void handleAck(SipRequest req) {
String callId = req.getHeader("Call-ID");
sessionManager.markAckReceived(callId);
}
private void handleBye(SipRequest req, ChannelContext ctx) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession session = sessionManager.getByCallId(callId);
SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
send(ctx, resp);
if (session != null) {
rtpServerManager.stopAndRelease(session);
sessionManager.markTerminated(callId);
}
}
private void send(ChannelContext ctx, SipResponse response) {
byte[] bytes = messageEncoder.encodeResponse(response);
Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
}
private void sendRaw(ChannelContext ctx, String text) {
byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
}
private SipResponse buildInvite200Ok(SipRequest req, CallSession session, SdpNegotiationResult negotiation) {
SipResponse resp = new SipResponse();
resp.setStatusCode(200);
resp.setReasonPhrase("OK");
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + session.getToTag();
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
resp.addHeader("Content-Type", "application/sdp");
String sdp = sdpAnswerBuilder.buildAnswer(localIp, session.getLocalRtpPort(), negotiation);
resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
return resp;
}
private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
SipResponse resp = new SipResponse();
resp.setStatusCode(code);
resp.setReasonPhrase(reason);
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + toTag;
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.setBody(new byte[0]);
return resp;
}
private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
for (String v : req.getHeaders(headerName)) {
resp.addHeader(headerName, v);
}
}
private String parseTag(String headerValue) {
if (headerValue == null) {
return null;
}
String lower = headerValue.toLowerCase();
int idx = lower.indexOf("tag=");
if (idx < 0) {
return null;
}
String sub = headerValue.substring(idx + 4);
int semi = sub.indexOf(';');
if (semi >= 0) {
sub = sub.substring(0, semi);
}
return sub.trim();
}
}
九、升级 SipInviteOnlyUdpHandler
UDP 版同理。
package com.litongjava.sip.server.handler;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.sdp.SdpAnswerBuilder;
import com.litongjava.sip.sdp.SdpNegotiationResult;
import com.litongjava.sip.sdp.SdpParser;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;
public class SipInviteOnlyUdpHandler implements UdpHandler {
private final String localIp;
private final SipMessageParser messageParser = new SipMessageParser();
private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
private final CallSessionManager sessionManager;
private final RtpServerManager rtpServerManager;
private final SdpParser sdpParser = new SdpParser();
private final SdpAnswerBuilder sdpAnswerBuilder = new SdpAnswerBuilder();
public SipInviteOnlyUdpHandler(String localIp) {
this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
}
public SipInviteOnlyUdpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
this.localIp = localIp;
this.sessionManager = sessionManager;
this.rtpServerManager = rtpServerManager;
}
@Override
public void handler(UdpPacket udpPacket, DatagramSocket socket) {
try {
Node remote = udpPacket.getRemote();
byte[] data = udpPacket.getData();
SipMessage msg = messageParser.parse(data);
if (!(msg instanceof SipRequest)) {
return;
}
SipRequest req = (SipRequest) msg;
String method = req.getMethod();
if ("INVITE".equalsIgnoreCase(method)) {
handleInvite(req, remote, socket);
return;
}
if ("ACK".equalsIgnoreCase(method)) {
handleAck(req);
return;
}
if ("BYE".equalsIgnoreCase(method)) {
handleBye(req, remote, socket);
return;
}
SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
send(socket, remote, resp);
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleInvite(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession exist = sessionManager.getByCallId(callId);
if (exist != null && exist.getLast200Ok() != null) {
sendRaw(socket, remote, exist.getLast200Ok());
return;
}
SdpNegotiationResult negotiation = sdpParser.negotiate(req.getBody());
if (!negotiation.isSuccess()) {
SipResponse fail = buildSimpleResponse(req, 488, "Not Acceptable Here", null);
send(socket, remote, fail);
return;
}
String toTag = "java" + System.nanoTime();
CallSession session = new CallSession();
session.setCallId(callId);
session.setFromTag(parseTag(req.getHeader("From")));
session.setToTag(toTag);
session.setTransport("UDP");
session.setRemoteSipIp(remote.getIp());
session.setRemoteSipPort(remote.getPort());
session.setCreatedTime(System.currentTimeMillis());
session.setUpdatedTime(System.currentTimeMillis());
session.setAckDeadline(System.currentTimeMillis() + 32000);
session.setRemoteRtpIp(negotiation.getRemoteRtpIp());
session.setRemoteRtpPort(negotiation.getRemoteRtpPort());
session.setSelectedCodec(negotiation.getSelectedCodec());
session.setTelephoneEventSupported(negotiation.isTelephoneEventSupported());
session.setRemoteTelephoneEventPayloadType(negotiation.getRemoteTelephoneEventPayloadType());
session.setPtime(negotiation.getPtime());
rtpServerManager.allocateAndStart(session);
SipResponse trying = buildSimpleResponse(req, 100, "Trying", null);
send(socket, remote, trying);
SipResponse ok = buildInvite200Ok(req, session, negotiation);
byte[] encoded = messageEncoder.encodeResponse(ok);
String raw200 = new String(encoded, StandardCharsets.US_ASCII);
session.setLast200Ok(raw200);
sessionManager.createOrUpdate(session);
sendBytes(socket, remote, encoded);
}
private void handleAck(SipRequest req) {
String callId = req.getHeader("Call-ID");
sessionManager.markAckReceived(callId);
}
private void handleBye(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
String callId = req.getHeader("Call-ID");
CallSession session = sessionManager.getByCallId(callId);
SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
send(socket, remote, resp);
if (session != null) {
rtpServerManager.stopAndRelease(session);
sessionManager.markTerminated(callId);
}
}
private void send(DatagramSocket socket, Node remote, SipResponse response) throws Exception {
byte[] bytes = messageEncoder.encodeResponse(response);
sendBytes(socket, remote, bytes);
}
private void sendRaw(DatagramSocket socket, Node remote, String text) throws Exception {
byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
sendBytes(socket, remote, bytes);
}
private void sendBytes(DatagramSocket socket, Node remote, byte[] bytes) throws Exception {
DatagramPacket packet = new DatagramPacket(
bytes, bytes.length, new InetSocketAddress(remote.getIp(), remote.getPort()));
socket.send(packet);
}
private SipResponse buildInvite200Ok(SipRequest req, CallSession session, SdpNegotiationResult negotiation) {
SipResponse resp = new SipResponse();
resp.setStatusCode(200);
resp.setReasonPhrase("OK");
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + session.getToTag();
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
resp.addHeader("Content-Type", "application/sdp");
String sdp = sdpAnswerBuilder.buildAnswer(localIp, session.getLocalRtpPort(), negotiation);
resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
return resp;
}
private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
SipResponse resp = new SipResponse();
resp.setStatusCode(code);
resp.setReasonPhrase(reason);
copyIfPresent(req, resp, "Via");
copyIfPresent(req, resp, "From");
String to = req.getHeader("To");
if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
to = to + ";tag=" + toTag;
}
if (to != null) {
resp.addHeader("To", to);
}
copyIfPresent(req, resp, "Call-ID");
copyIfPresent(req, resp, "CSeq");
resp.setBody(new byte[0]);
return resp;
}
private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
for (String v : req.getHeaders(headerName)) {
resp.addHeader(headerName, v);
}
}
private String parseTag(String headerValue) {
if (headerValue == null) {
return null;
}
String lower = headerValue.toLowerCase();
int idx = lower.indexOf("tag=");
if (idx < 0) {
return null;
}
String sub = headerValue.substring(idx + 4);
int semi = sub.indexOf(';');
if (semi >= 0) {
sub = sub.substring(0, semi);
}
return sub.trim();
}
}
十、SipServerConfig 里把 CallSessionReaper 跑起来
你需要加一个定时器。
package com.jobright.study.voice.agent.config;
import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.handler.SipInviteOnlyTcpHandler;
import com.litongjava.sip.server.handler.SipInviteOnlyUdpHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.sip.server.session.CallSessionReaper;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SipServerConfig {
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
public void config() {
String localIp = "192.168.3.219";
CallSessionManager sessionManager = new CallSessionManager();
RtpServerManager rtpServerManager = new RtpServerManager(localIp);
SipInviteOnlyTcpHandler tcpHandler = new SipInviteOnlyTcpHandler(localIp, sessionManager, rtpServerManager);
ServerTioConfig serverTioConfig = new ServerTioConfig("sip-server");
serverTioConfig.setServerAioHandler(tcpHandler);
serverTioConfig.setHeartbeatTimeout(-1L);
TioServer tioServer = new TioServer(serverTioConfig);
int port = 5060;
try {
tioServer.start(null, port);
log.info("独立 TCP 服务器已成功启动,监听端口: {}", port);
} catch (IOException e) {
log.error("启动 TCP 服务器失败", e);
}
SipInviteOnlyUdpHandler udpHandler = new SipInviteOnlyUdpHandler(localIp, sessionManager, rtpServerManager);
UdpServerConf udpServerConf = new UdpServerConf(5060, udpHandler, 5000);
try {
UdpServer udpServer = new UdpServer(udpServerConf);
udpServer.start();
log.info("UDP 服务器已成功启动,监听端口: {}", port);
} catch (SocketException e) {
log.error("启动 UDP 服务器失败", e);
}
scheduledExecutor.scheduleAtFixedRate(new CallSessionReaper(sessionManager, rtpServerManager), 5, 5,
TimeUnit.SECONDS);
}
}
十一、这版已经实现了什么
这版第二阶段落完后,你得到的是:
已完成
INVITE不再固定写死 PCMU- 会解析远端 SDP offer
- 会在
PCMU/PCMA里选共同支持的 codec - 不支持时返回
488 Not Acceptable Here - 会记录远端 RTP 地址、端口、选中的 codec、ptime、telephone-event 能力
- 会清理 ACK 超时会话
还没做
- 真正的
Call-ID + FromTag + ToTagdialog key CANCELOPTIONS180 RingingRecord-Route / Routere-INVITE- RTP 真正按 codec 解码-处理-重组
