websocket业务接入示例

张开发
2026/4/22 8:19:43 15 分钟阅读

分享文章

websocket业务接入示例
1.项目架构没目录2.maven?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdcom.admin/groupId artifactIdadmin-project/artifactId version1.0-SNAPSHOT/version /parent artifactIdadmin-websocket/artifactId name【${project.artifactId}】socket模块/name properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding /properties dependencies !-- 通用工具-- dependency groupIdcom.admin/groupId artifactIdadmin-common/artifactId /dependency dependency groupIdcom.admin/groupId artifactIdadmin-framework/artifactId /dependency dependency groupIdcom.admin/groupId artifactIdadmin-system/artifactId /dependency !-- SpringBoot WebSocket -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-websocket/artifactId /dependency !-- Web工具包 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency !-- 工具类 -- dependency groupIdcn.hutool/groupId artifactIdhutool-all/artifactId version5.8.16/version /dependency /dependencies /project3.WebSocketConfigpackage com.admin.websocket.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * WebSocket配置开启WebSocket支持 * author wangwei * date 2026-04-16 **/ Configuration public class WebSocketConfig { Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }4. Taskpackage com.admin.websocket.entity; import com.admin.common.annotation.Excel; import com.alibaba.excel.annotation.ExcelIgnore; import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.util.Date; Data Builder NoArgsConstructor AllArgsConstructor public class Task { //内容。。。。。 }5. UserRoleEnumpackage com.admin.websocket.enums; /** * 用户角色枚举 * * author wangwei * date 2026-04-16 **/ public enum UserRoleEnum { /* SUPER_ADMIN, // 超级管理员看所有消息 DEPT_ADMIN, // 部门管理员看本部门所有消息 USER; // 普通用户只看自己消息*/ SUPER_ADMIN(超级管理员), // 超级管理员 DEPT_ADMIN(部门管理员), // 部门管理员 USER(普通用户); // 普通用户 private final String roleName; UserRoleEnum(String roleName) { this.roleName roleName; } // 【核心】根据中文 匹配 枚举 public static UserRoleEnum getByRoleName(String roleName) { for (UserRoleEnum e : values()) { if (e.roleName.equals(roleName)) { return e; } } // 找不到默认普通用户 return USER; } }6. MessagePushServicepackage com.admin.websocket.service; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /*** * 消息推送服务层用于向客户端推送消息如实时通知、实时聊天等 * * author wangwei * date 2026-04-16 **/ Service public class MessagePushService { Resource private WebSocketServer webSocketServer; /** * 构造统一消息格式 */ private MapString, Object buildMessage(String type, String content, String fromUser) { MapString, Object msg new HashMap(); msg.put(type, type); msg.put(content, content); msg.put(from, fromUser); msg.put(time, System.currentTimeMillis()); return msg; } // 对外推送接口 /** * 推送给所有超级管理员 */ public void pushToSuperAdmin(String content, String fromUser) { MapString, Object message buildMessage(SUPER_ADMIN_MSG, content, fromUser); webSocketServer.sendToSuperAdmin(message); } /** * 推送给指定部门部门管理员部门用户 */ public void pushToDept(String deptId, String content, String fromUser) { MapString, Object message buildMessage(DEPT_MSG, content, fromUser); webSocketServer.sendToDept(deptId, message); } /** * 推送给单个用户 */ public void pushToUser(String userId, String content, String fromUser) { MapString, Object message buildMessage(USER_MSG, content, fromUser); webSocketServer.sendToUser(userId, message); } /** * 推送给所有人超管专用 */ public void pushToAll(String content, String fromUser) { MapString, Object message buildMessage(BROADCAST, content, fromUser); webSocketServer.sendToAll(message); } }7. TaskCountProviderpackage com.admin.websocket.service; import com.admin.common.entity.DeptOnlineVO; import java.util.List; import java.util.Map; import java.util.Set; /** * 任务数提供者接口 * * author wangwei * date 2026-04-16 **/ public interface TaskCountProvider { // 定义你需要的方法根据用户ID查任务数 int getUnFinishTaskCount(String userId); ListDeptOnlineVO onDeptLogin(MapString, SetString deptUsers); }8. WebSocketServerpackage com.admin.websocket.service; import cn.hutool.json.JSONUtil; import com.admin.common.core.domain.entity.SysUser; import com.admin.common.entity.DeptOnlineVO; import com.admin.common.utils.DateUtils; import com.admin.common.utils.SecurityUtils; import com.admin.framework.socket.UserLoginEvent; import com.admin.websocket.entity.Task; import com.admin.websocket.enums.UserRoleEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket 核心服务 * 连接地址ws://localhost:8080/ws/{用户ID}/{角色}/{部门ID} * * author wangwei * date 2026-04-16 **/ Slf4j Component ServerEndpoint(/ws/{userId}/{role}/{deptId}) public class WebSocketServer { // 在线用户会话 private static final MapString, Session SESSION_POOL new ConcurrentHashMap(); private static final MapString, UserRoleEnum USER_ROLE_MAP new ConcurrentHashMap(); private static final MapString, String USER_DEPT_MAP new ConcurrentHashMap(); // 新增静态回调接口 public static TaskCountProvider taskCountProvider; // 【新增】在线状态监控 private static final MapString, String ONLINE_USER_DEPT new ConcurrentHashMap(); // userId - deptId private static final MapString, SetString DEPT_ONLINE_USERS new ConcurrentHashMap(); /** * 连接建立 */ OnOpen public void onOpen(Session session, PathParam(userId) String userId, PathParam(role) String roleStr, PathParam(deptId) String deptId) { SimpleDateFormat sdfTime new SimpleDateFormat(yyyy-MM-dd HH:mm:ss); // 将字符串转换为枚举类型 // UserRoleEnum role UserRoleEnum.valueOf(roleStr); UserRoleEnum role UserRoleEnum.getByRoleName(roleStr); SESSION_POOL.put(userId, session); USER_ROLE_MAP.put(userId, role); USER_DEPT_MAP.put(userId, deptId); log.info(用户[{}] 在[{}]已连接WebSocket角色[{}]在线数{}, userId, sdfTime.format(DateUtils.getNowDate()), role, SESSION_POOL.size()); // 加这一行用户一上线自动推送未完成任务 //todo 接收 buss 业务子模块传过来的参数 int count WebSocketServer.taskCountProvider.getUnFinishTaskCount(userId); // 【新增】维护在线状态 ONLINE_USER_DEPT.put(userId, deptId); DEPT_ONLINE_USERS.computeIfAbsent(deptId, k - ConcurrentHashMap.newKeySet()).add(userId); // 维护在线用户-部门关系 ONLINE_USER_DEPT.put(userId, deptId); DEPT_ONLINE_USERS.computeIfAbsent(deptId, k - ConcurrentHashMap.newKeySet()).add(userId); ListDeptOnlineVO onDeptUserNumList WebSocketServer.taskCountProvider.onDeptLogin(DEPT_ONLINE_USERS); // 关键用户上线后 → 立即推送最新部门在线人数给前端 //pushDeptOnlineStatus(); log.info(用户[{}] 部门[{}] 已连接在线数{}, userId, deptId, SESSION_POOL.size()); // 合并JSON MapString, Object result new HashMap(); result.put(num, count); result.put(deptList, onDeptUserNumList); String jsonMsg JSONUtil.toJsonStr(result); log.info(建立连接给所有用户发送消息); // 发送 this.sendToAll(jsonMsg); } /** * 关闭连接 */ OnClose public void onClose(PathParam(userId) String userId) { SESSION_POOL.remove(userId); USER_ROLE_MAP.remove(userId); USER_DEPT_MAP.remove(userId); log.info(用户[{}]已断开, userId); // 【新增】下线移除 String deptId ONLINE_USER_DEPT.remove(userId); if (deptId ! null) { SetString users DEPT_ONLINE_USERS.get(deptId); if (users ! null) { users.remove(userId); if (users.isEmpty()) { DEPT_ONLINE_USERS.remove(deptId); } } } log.info(用户[{}] 已断开, userId); } /** * 核心推送方法 */ /** * 1. 推送给 超级管理员所有超管都能收到 */ public void sendToSuperAdmin(Object message) { SysUser user SecurityUtils.getLoginUser().getUser(); String msg JSONUtil.toJsonStr(message); USER_ROLE_MAP.forEach((userId, role) - { if (user.getUserName().equals(admin) || user.getUserName().contains(admin)) { sendMessage(SESSION_POOL.get(user.getUserId()), msg);//TODO 所有超管发送有问题 } }); } /** * 2. 推送给 指定部门的所有用户部门管理员部门普通用户 */ public void sendToDept(String deptId, Object message) { String msg JSONUtil.toJsonStr(message); USER_DEPT_MAP.forEach((userId, userDeptId) - { if (deptId.equals(userDeptId)) { sendMessage(SESSION_POOL.get(userId), msg); } }); } /** * 3. 推送给 单个普通用户 */ public void sendToUser(String userId, Object message) { String msg JSONUtil.toJsonStr(message); Session session SESSION_POOL.get(userId); if (session ! null) { sendMessage(session, msg); } } /** * 4. 推送给 所有人超管专用 */ public void sendToAll(Object message) { String msg JSONUtil.toJsonStr(message); SESSION_POOL.values().forEach(session - sendMessage(session, msg)); } /** * 发送消息工具方法 */ private void sendMessage(Session session, String message) { try { if (session ! null session.isOpen()) { session.getBasicRemote().sendText(message); } } catch (IOException e) { log.error(发送消息失败, e); } } /** * 异常 */ OnError public void onError(Throwable error) { log.error(WebSocket异常, error); } // 【核心】按权限推送任务 /**** * 数量 * 被发送人的信息 * param taskJson */ public void pushTask(String taskJson, Task task) { // 遍历所有在线用户根据角色推送 SESSION_POOL.forEach((userId, session) - { MapString, String pathParameters session.getPathParameters(); String role pathParameters.get(role) null ? : pathParameters.get(role); String deptId pathParameters.get(deptId) null ? : pathParameters.get(deptId); if (session null || !session.isOpen()) return; try { // 1. 超级管理员用户名包含admin 全部推送 if (role.equals(超级管理员)) { //消息体推送 session.getBasicRemote().sendText(taskJson); sendToSuperAdmin(todo 超级管理员接收到信息✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ); log.info(推送任务给超级管理员userId: {}, userId); } // 2. 部门管理员 只推本部门 else if (role.equals(部门管理员)) { String userDeptId USER_DEPT_MAP.get(userId); int count WebSocketServer.taskCountProvider.getUnFinishTaskCount(userId); session.getBasicRemote().sendText({\num\: count }\n); log.info(推送任务给部门管理员userId: {}, deptId: {}, userId, userDeptId); } // 3. 普通用户 只推自己的任务 else if (role.equals(普通用户)) { int count WebSocketServer.taskCountProvider.getUnFinishTaskCount(userId); session.getBasicRemote().sendText({\num\: count }\n); log.info(推送任务给普通用户userId: {}, userId); } } catch (IOException e) { log.error(推送任务失败用户ID{}, userId, e); } }); } // 监听登录事件自动推送 过期 EventListener public void onUserLogin(UserLoginEvent event) { String userId event.getUserId(); log.info(【用户登录】userId:{}, userId); // 推送任务你原有功能 pushTaskCountAfterLogin(userId); // 【新增】登录后实时推送给前端部门在线状态 pushDeptOnlineStatus(userId); } // 【新增】推送部门在线状态给前端 public void pushDeptOnlineStatus(String userId) { /* MapString, Integer deptOnline getDeptOnlineStatus();11111 String msg JSONUtil.toJsonStr(MapUtil.of(deptOnline, deptOnline)); sendToAll(msg); // 推给所有人*/ int count WebSocketServer.taskCountProvider.getUnFinishTaskCount(userId); ListDeptOnlineVO onDeptUserNumList WebSocketServer.taskCountProvider.onDeptLogin(DEPT_ONLINE_USERS); MapString, Object result new HashMap(); result.put(num, count); result.put(deptList, onDeptUserNumList); String jsonMsg JSONUtil.toJsonStr(result); log.info(【实时推送】部门在线状态以及待办任务数量信息{}, jsonMsg); // 发送 this.sendToAll(jsonMsg); } /** * 获取所有部门在线情况 * key: deptId * value: 在线人数 */ public static MapString, Integer getDeptOnlineStatus() { MapString, Integer result new HashMap(); DEPT_ONLINE_USERS.forEach((deptId, userSet) - { result.put(deptId, userSet.size()); }); return result; } /** * 用户登录成功后主动调用这个方法推送任务数量 * * param userId 用户ID */ public void pushTaskCountAfterLogin(String userId) { try { // 1. 获取用户会话 Session session SESSION_POOL.get(userId); if (session null || !session.isOpen()) { log.info(用户[{}] 未连接WebSocket不推送, userId); return; } // 2. 获取任务数量 if (taskCountProvider null) { log.error(taskCountProvider 未注册); return; } int count taskCountProvider.getUnFinishTaskCount(userId); // 3. 推送给当前登录用户 String msg {\num\: count }; session.getBasicRemote().sendText(msg); log.info(【登录推送】用户[{}] 未完成任务数{}, userId, count); } catch (Exception e) { log.error(【登录推送】失败 userId:{}, userId, e); } } }9. 测试html!DOCTYPE html html langzh-CN head meta charsetUTF-8 titleWebSocket 调试版控制台打印全信息/title style .task-box { border: 1px solid #e4e7ed; border-radius: 6px; padding: 12px 15px; margin: 10px 0; background: #ffffff; } .task-finish { background: #f7f7f7; color: #999; } #socketMsg { background: #f8f9fa; padding: 10px; border: 1px solid #ddd; border-radius: 6px; height: 300px; overflow-y: auto; white-space: pre-wrap; font-size: 12px; margin-bottom: 20px; } /style /head body h2 待办任务控制台可查看所有Socket信息/h2 h3 WebSocket 实时返回数据/h3 div idsocketMsg/div div idtaskContainer/div script // 【你的用户信息】 const userId 1; const userRole 超级管理员; const deptId 100; const taskMap {}; let ws null; // 页面加载完 → 立即主动连接 WebSocket window.onload function () { console.log(页面加载完成开始主动连接 WebSocket...); addMsgToPage(页面加载完成 → 主动连接 WebSocket...); connectWebSocket(); // 主动请求 }; function connectWebSocket() { const wsUrl ws://192.168.2.30:7070/pdxjzhapi/ws/ userId / userRole / deptId; console.log(); console.log(✅ WebSocket 连接地址, wsUrl); console.log(✅ userId, userId); console.log(✅ role, userRole); console.log(✅ deptId, deptId); addMsgToPage(主动连接 wsUrl); addMsgToPage(参数 → 用户 userId 角色 userRole 部门 deptId); // 主动创建连接 ws new WebSocket(wsUrl); ws.onopen function () { console.log( WebSocket 连接成功); addMsgToPage(✅ 连接成功); }; ws.onmessage function (evt) { console.log( 收到消息, evt.data); addMsgToPage( 收到返回数据\n evt.data); try { const task JSON.parse(evt.data); taskMap[task.ctId] task; renderList(); } catch (e) { console.error(解析失败, e); } }; ws.onclose function () { console.log( 断开2秒重连); addMsgToPage( 连接断开2秒后自动重连...); setTimeout(connectWebSocket, 2000); }; ws.onerror function (err) { console.error(❌ 连接错误); addMsgToPage(❌ 连接失败); }; } // 把消息显示到页面 function addMsgToPage(msg) { let time new Date().toLocaleString(); let dom document.getElementById(socketMsg); dom.innerHTML [${time}] ${msg}\n dom.innerHTML; } // 渲染列表 function renderList() { let html ; Object.values(taskMap).forEach(task { const isFinish task.ctFinish 1; const className isFinish ? task-box task-finish : task-box; const status isFinish ? 【已结束】 : 【待处理】; html div class${className} divstrong${status} ${task.ctName}/strong/div div备注${task.ctMark || 无}/div div发送人${task.ctSendName || 系统}/div /div; }); document.getElementById(taskContainer).innerHTML html; } /script /body /html10.DeptOnlineVOpackage com.admin.common.entity; import lombok.AllArgsConstructor; import lombok.Data; Data AllArgsConstructor public class DeptOnlineVO { private String deptId; // 部门ID private String deptName; // 部门名称 private Integer onlineCount;// 在线人数 private String state; // 状态 }11.业务代码模块 * 接口 静态回调 给webSocket项目模块 传参 业务数据的任务信息WebSocketTaskRegisterpackage com.admin.business.common; import com.admin.business.domain.TbTask; import com.admin.business.domain.TbTechnologyOrg; import com.admin.business.mapper.TbTaskMapper; import com.admin.business.mapper.TbTechnologyOrgMapper; import com.admin.common.core.domain.entity.SysUser; import com.admin.common.exception.CustomException; import com.admin.common.utils.StringUtils; import com.admin.system.mapper.SysUserMapper; import com.admin.websocket.service.TaskCountProvider; import com.admin.websocket.service.WebSocketServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.*; /*** * 接口 静态回调 给webSocket项目模块 传参 业务数据的任务信息 * 不让 websocket 直接依赖 business 的任何类而是让 business 主动把方法 “注册” 给 websocket。 * 解决子模块之间依赖循环的问题 * * author wangwei * date 2026-04-16 **/ Component public class WebSocketTaskRegister { // 这里可以正常注入因为在 business 内部 Autowired private TbTaskMapper tbTaskMapper; Autowired private SysUserMapper userMapper; Autowired private TbTechnologyOrgMapper tbTechnologyOrgMapper; // 项目启动时自动把查询方法注册给 websocket PostConstruct public void register() { WebSocketServer.taskCountProvider new TaskCountProvider() { Override public int getUnFinishTaskCount(String userId) { // 直接调用你的 mapper SysUser sysUser userMapper.selectUserById(Long.valueOf(userId)); if(sysUsernull){ throw new CustomException(websocket获取到的用户编号有误请联系管理员处理); } //TODO 代办任务查询 task表的 ctFinish ! 88 String roleStr ; if(StringUtils.isEmpty(sysUser.getRemark()) || sysUser.getRemark().equals(普通用户)){ roleStr 普通用户; TbTask task new TbTask(); task.setCtInceptP(userId); ListTbTask taskList tbTaskMapper.selectTbTaskDBList(task); return taskList.size(); }else if(sysUser.getRemark().equals(部门管理员)){ roleStr 部门管理员; TbTask task new TbTask(); task.setBy5(sysUser.getDeptId().toString());//任务表中的 by5 表示任务接收人的所属单位 ListTbTask taskList tbTaskMapper.selectTbTaskDBList(task); return taskList.size(); }else if(sysUser.getUserName().equals(admin) || sysUser.getRemark().equals(超级管理员)){ roleStr 超级管理员; TbTask task new TbTask(); ListTbTask taskList tbTaskMapper.selectTbTaskDBList(task); return taskList.size(); } return 0; } /*** * 部门用户在线状态在线人数信息 * param deptUsers * return */ public Listcom.admin.common.entity.DeptOnlineVO onDeptLogin(MapString, SetString deptUsers) { Listcom.admin.common.entity.DeptOnlineVO result new ArrayList(); // 1. 先查询 所有单位/部门/小组以这个顺序为准 TbTechnologyOrg tbTechnologyOrg new TbTechnologyOrg(); ListTbTechnologyOrg orgList tbTechnologyOrgMapper.selectTbTechnologyOrgList(tbTechnologyOrg); // 2. 遍历【标准机构列表】→ 以它顺序为准 for (TbTechnologyOrg org : orgList) { String deptId org.getDeptId(); // 部门ID String deptName org.getCtOrgName(); // 部门名称 // 3. 匹配在线状态 if (deptUsers.containsKey(deptId)) { // 匹配到 → 在线 int count deptUsers.get(deptId).size(); result.add(new com.admin.common.entity.DeptOnlineVO( deptId, deptName, count, 在线 )); } else { // 匹配不到 → 离线、数量0 result.add(new com.admin.common.entity.DeptOnlineVO( deptId, deptName, 0, 离线 )); } } return result; } }; } }11.业务中主动触发发送websocket消息//TODO socket任务推送1111 // * 推送单条任务新增/更新时调用 TbTask tbTask new TbTask(); ListTbTask taskList tbTaskMapper.selectTbTaskDBList(tbTask); String taskJson {\num\: taskList.size() }\n; WebSocketServer webSocketServer ApplicationContextUtils.getBean(WebSocketServer.class); webSocketServer.pushTask(taskJson, null);12.用户登录后 主动触发socket消息// 发布登录事件不依赖 websocket 接处警监控 各个科室在线状态 eventPublisher.publishEvent(new UserLoginEvent(this, user.getUserId().toString()));14.效果示例

更多文章