【WebSocket】实时推送与离线消息
对标盒子IM《接入消息推送》《消息推送底层分析》《离线消息和已读未读》。芋道 IM 的核心差异:WebSocket 只做在线提醒,不作为可靠投递依据——可靠性靠业务表 +
pull增量补偿,不引入按用户复制的消息索引表。
这是一篇通道层专题。好友、群聊、消息、通话各篇都会「触发推送」「支持离线补偿」,但都引到这里——本篇统一讲「消息怎么实时到对端的多个端、断线后怎么补」,业务篇不再重复。
# 1. 要解决的问题
一条消息(或一个状态变更)产生后,要做到三件事:
- 实时:在线的接收方,包括他登录的多个端,立刻收到。
- 不丢:接收方不在线、或推送漏了,下次能补回来。
- 一致:多端之间、断线重连后,未读数 / 已读位置 / 好友群状态都对得上。
芋道 IM 的取舍是:在线靠 WebSocket 推,不在线 / 漏推靠 pull 拉。WebSocket 不保证送达,业务表才是权威。
# 2. 整体架构
TODO @codex:"画一张 IM『实时推送 + 离线补偿』四层架构图。从上到下:① 接入层——芋道 WebSocket 框架(WebSocketSenderApi),按 userId 把消息下发到该用户的多个在线端(多端一致);② 推送层——ImWebSocketService,业务事件转通知 DTO,事务提交后(afterCommit)异步推送;③ 存储层——im_private/group/channel_message 三张消息表,消息单份存储、群消息写 receiver_user_ids 可见成员快照;④ 补偿层——pull 增量拉取(消息用 minId 游标,好友/群/会话已读等状态用 update_time+id 复合游标)。右侧用两条主线标注:在线消息走『WebSocket 实时推送』,离线/漏推走『pull 拉取补偿』。"
分四层看:
| 层 | 职责 | 关键实现 |
|---|---|---|
| 接入层 | 维护 WebSocket 连接、按用户下发到多端 | 复用芋道的 WebSocket 框架 WebSocketSenderApi |
| 推送层 | 业务事件 → 通知 DTO → 在线推送 | ImWebSocketService(事务感知 + 异步) |
| 存储层 | 消息单份入库;群消息固化可见成员 | im_private/group/channel_message + receiver_user_ids 快照 |
| 补偿层 | 离线 / 漏推后增量拉取 | 消息 minId 游标 + 状态事件 update_time + id 游标 |
TODO @codex:"把下面的端到端推送时序画成更直观的时序图:发送方 sendPrivateMessage → 服务端事务内落库 im_private_message → 事务提交(afterCommit)后 WebSocket 推接收方在线端 + 发送方其它登录端 → 接收方离线端重新上线后按 minId pull 补齐离线消息。"
端到端时序(以私聊发消息为例):
sequenceDiagram
participant 发送方
participant 服务端 as IM 服务端
participant DB as MySQL
participant 接收方在线端
participant 接收方离线端
发送方->>服务端: sendPrivateMessage
服务端->>DB: 落库 im_private_message(事务内)
Note over 服务端: 事务提交后(afterCommit)才推送
服务端->>接收方在线端: WebSocket 推送(im-notification)
服务端->>发送方: 同步推送其它登录端(多端一致)
接收方离线端-->>服务端: 重新上线后 pull(minId 游标)
服务端-->>接收方离线端: 返回离线期间的消息
# 3. WebSocket 推送通道
# 3.1 统一通知协议
所有推送走同一个外层信封 im-notification,body 固定三段:conversationType(会话类型)+ contentType(内容类型)+ payload(业务数据),对应 ImNotificationWebSocketDTO:
conversationType:枚举 ImConversationTypeEnum(0=无会话,1=私聊,2=群聊,3=频道),决定这条通知归到哪个会话;好友 / 通话等非会话通知用 0。contentType:即《消息》§1 消息类型 的内容类型,决定payload怎么解析、怎么渲染。payload:随contentType变化的业务数据,由service/websocket/notification包下的各 Notification DTO 定义(如 FriendRequestNotification、GroupMemberKickNotification、ImRtcCallNotification)。
前端拿到通知后,先按 conversationType 分流到对应会话,再按 contentType 路由渲染,不再依赖「好友 / 群通知的数字区间」做判断。
# 3.2 多端下发
ImWebSocketService 暴露三个推送方法,都按 userId 投递到该用户的所有在线端(多端一致):
sendNotificationAsync(userId, ...):推单个用户(如好友通知)。sendNotificationAsync(userIds, ...):推一批用户(如群消息推全体可见成员)。broadcastNotificationAsync(...):全局广播。
三者底层都走芋道框架的 WebSocketSenderApi#sendObject,按 userId 找到该用户的全部在线会话逐一下发:
// ImWebSocketServiceImpl#doSendNotification
webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
ImNotificationWebSocketDTO.TYPE, notification);
# 3.3 事务感知推送
推送一定延迟到数据库事务提交后再执行,否则客户端可能收到 WebSocket 通知时、回查数据库却查不到这条消息:
// ImWebSocketServiceImpl#executeAfterTransaction
private void executeAfterTransaction(Runnable task) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
task.run();
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
task.run(); // 事务提交后再异步推送
}
});
}
推送本身用 @Async 异步执行,不阻塞发送主流程;推送失败只记日志、不影响落库(在线送不到,由 pull 兜底)。
# 4. 在线推 + 离线拉
WebSocket 只负责「在线的人立刻看到」。一个内容类型是否能离线补回来,由它的 persistent(是否入库)标志决定(见 《消息》):入库的能 pull 补,不入库的(如通话信令、输入中)离线即丢弃,这是预期行为。
各类消息 / 事件的补偿方式:
| 类别 | 是否离线可补 | 是否计未读 | 补偿方式 |
|---|---|---|---|
| 私聊 / 群聊 / 频道消息 | 是 | 是 | 按 minId 游标 pull |
| 撤回、群系统提示 | 是 | 否 | 作为可见事件随消息 pull |
| 好友申请 / 群申请 | 是 | 否 | 业务表按 update_time + id 增量拉 |
| 已读 / 回执 | 是 | 否 | 读位置表补偿 |
| 通话信令 / 输入中 | 否 | 否 | 仅在线推,离线丢弃 |
# 4.1 消息增量拉取
消息用 minId 单调游标增量拉取(pullPrivateMessageList / pullGroupMessageList / 频道 pull):客户端记住已拉到的最大消息 id,重连 / 切前台时带 minId 拉后续。
群消息的可见性靠发送时固化的 receiver_user_ids 快照(逗号分隔的成员 id),pull 时用 FIND_IN_SET 过滤:
SELECT m.* FROM im_group_message m
WHERE m.group_id IN (...) AND m.id > :minId
AND FIND_IN_SET(:userId, m.receiver_user_ids)
ORDER BY m.id ASC LIMIT :size;
这样「发送时谁可见」在落库那一刻就定死,避免按「加入时间」反复重算成员区间——入群前的消息看不到、退群后仍能补拉退群前可见的历史。
# 4.2 顺序性
消息按服务端自增主键 id 定序:落库那一刻就定了序,同一会话内按 id 升序即时间序。不依赖客户端 send_time(各端时钟可能不准、会导致乱序),客户端本地插入、pull 拉取都按 id 升序。芋道 IM 没有 per-user seq 自增序列,全局 id 已足够定序。
# 4.3 唯一性
靠客户端生成的 client_message_id 配合唯一键 (sender_id, client_message_id) 去重:并发 / 弱网重发同一条时,唯一键冲突即视为已发成功(捕获 DuplicateKeyException、回查已存在消息返回),保证幂等、不会落两条。
# 5. 状态事件的离线补偿
好友 / 群关系、申请、读位置这类会更新旧行的状态,不能只靠「id 单调新增」补,于是统一用 update_time + id 复合游标增量拉取:
SELECT * FROM xxx
WHERE update_time > :lastUpdateTime
OR (update_time = :lastUpdateTime AND id > :lastId)
ORDER BY update_time ASC, id ASC LIMIT :limit;
对应各业务表都建了 idx_sync (..., update_time, id) 索引支撑游标扫描。前端按最后一条的 updateTime + id 推进高水位;且回扫前把游标向前回退几秒做 overlap,覆盖「同秒不同 id 更新」「端 / 服务端时间精度差」等边界,合并逻辑保持幂等。
删除即软删
删除、退群、拉黑、解散都不物理删,否则客户端无法增量补偿到「已失效」这一变化。
TODO @codex:"把下面的断线重连补偿时序画成更直观的时序图:客户端重连后并行发起多条 pull——消息按 minId、好友/群申请按 update_time+id(游标回退数秒)、会话读位置按 update_time+id,并刷新『我的群列表』快照;服务端各自返回增量,客户端只前进高水位。"
sequenceDiagram
participant 客户端
participant 服务端 as IM 服务端
Note over 客户端: 断线重连 / 切前台
客户端->>服务端: pull 消息(minId)
客户端->>服务端: pull 好友申请 / 群申请(update_time + id,回退数秒)
客户端->>服务端: pull 会话读位置(update_time + id)
客户端->>服务端: 刷新「我的群列表」快照
服务端-->>客户端: 各自返回增量,客户端只前进高水位
按补偿对象,分三类来看:
# 5.1 会话读位点
已读位置统一持久化在 im_conversation_read(一个用户在一个会话的「最大已读消息 id」),是未读数与回执的唯一权威(不放 Redis,避免双写一致性与退群回退问题),也按 update_time + id 增量拉取补偿。两条规则:
- 单调递增:上报已读只能把位点改大,不能回退(防乱序 / 并发回退)。
- 双向补偿:重连 / 进会话时,既补「我的读位置」(恢复本端未读、红点),也按需补「对端 / 群成员读位置」(恢复私聊已读、群回执人数)。对端读位置补偿有界执行——只在打开会话或有未完成回执时惰性补,不在每次重连时全量拉所有群成员读位置。
# 5.2 群信息与群成员
群是多人共享状态,同步策略和「消息」「读位点」都不同:
- 群信息(群名 / 公告 / 头像 / 全员禁言等):变更时全员广播对应通知(GROUP_NAME_UPDATE / GROUP_NOTICE_UPDATE 等,见 《消息》§1),客户端收到后局部更新;重连时不单独拉群资料,靠「我的群列表」快照整体校准。
- 本人成员态(被拉进群 / 被踢 / 退群 / 解散):不单开增量链路,进首页 / 重连时刷新「我相关的群列表」快照(含已退群)一次性校准——离线期间被拉进新群、被踢出群都能对上。
- 群成员列表:不做全局增量 pull,按
groupId本地缓存 + 失效标记;进群资料 / 成员列表时按list?groupId=全量刷新;收到成员变更通知先局部更新,拿不准就标记该群缓存过期、下次打开再强刷。
这样既不引入 per-group 版本号同步,也避免重连时全局扫描所有群成员。
# 5.3 好友与群申请
好友关系(im_friend)、好友申请(im_friend_request)、加群申请(im_group_request)都按 update_time + id 增量拉取补偿。申请类事件不塞进聊天消息流,权威状态始终以业务表为准;「你们已经是好友了」这类只是额外写一条聊天提示气泡,方便前端展示与交互,真实关系仍看业务表。