影刀RPA店群自动化教程:Python驱动实时运营仪表板与手动干预架构实战

张开发
2026/6/5 13:10:58 15 分钟阅读

分享文章

影刀RPA店群自动化教程:Python驱动实时运营仪表板与手动干预架构实战
影刀RPA店群自动化教程Python驱动实时运营仪表板与手动干预架构实战几十个店铺同时在跑自动化运营却只能靠“猜”来判断系统是否正常。拼多多店群自动化上架方案一个店铺卡住了可能要等到日报出来才知道——那时候已经晚了半天。店群自动化的早期我们的监控手段相当原始看日志、查数据库、偶尔远程桌面到Worker上瞅一眼浏览器窗口。运营同事想知道“现在有哪些店铺在跑什么任务”唯一的途径是在企业微信群里问开发。后来我们决定不能再让运营“盲飞”了。必须有一个所有人都能看懂的实时仪表板展示每个店铺的自动化状态并且允许授权人员直接进行干预。这篇文章就完整展开这套实时运营仪表板与手动干预系统的架构设计和工程实现。涉及Python后端、WebSocket实时推送、React前端和与现有调度系统的无缝集成。TEMU店群如何管理运营一、实时数据从哪里来仪表板要展示的核心信息其实分散在几个已有的组件中Redis任务状态、队列长度、Worker心跳、浏览器实例占用情况PostgreSQL历史任务记录、店铺配置调度引擎当前正在执行的任务详情我们选择不做新的数据采集而是构建一个只读的数据汇聚层把这些信息以统一格式暴露给前端。classDashboardDataAggregator:def__init__(self,redis,db_pool):self.redisredis self.dbdb_poolasyncdefget_all_shops_status(self)-list:# 从Redis获取所有店铺的实时状态shop_keysawaitself.redis.keys(shop:status:*)shops[]forkeyinshop_keys:shop_idkey.decode().split(:)[-1]dataawaitself.redis.hgetall(key)shops.append({shop_id:shop_id,platform:data.get(bplatform,b).decode(),current_task:data.get(bcurrent_task,b).decode(),task_status:data.get(btask_status,bidle).decode(),progress:int(data.get(bprogress,0)),worker_id:data.get(bworker_id,b).decode(),last_active:data.get(blast_active,b).decode(),})returnshopsasyncdefget_worker_loads(self)-list:worker_keysawaitself.redis.keys(workers:*)workers[]forkeyinworker_keys:infoawaitself.redis.hgetall(key)workers.append({worker_id:key.decode().split(:)[-1],load:json.loads(info.get(bload,b{})),status:info.get(bstatus,bonline).decode(),})returnworkersasyncdefget_queue_depth(self)-dict:depths{}forstreamin[task:stream:pdd,task:stream:temu,task:stream:tiktok]:depths[stream]awaitself.redis.xlen(stream)returndepths 这个聚合层不是服务而是被仪表板后端直接调用。 数据每2秒刷新一次对于WebSocket推送变化时立即推送。---## 二、WebSocket实时推送让数据主动找前端如果前端每2秒轮询一次REST API在网络波动时会产生明显的延迟和毛刺。 我们采用WebSocket建立长连接后端在检测到状态变化时主动推送前端接收更新后局部刷新界面。 后端WebSocket服务使用FastAPIwebsockets 实现。 pythonfromfastapiimportFastAPI,WebSocket,WebSocketDisconnectfromtypingimportDictimportasyncioimportjson appFastAPI()classConnectionManager:def__init__(self):self.active_connections:Dict[str,WebSocket]{}asyncdefconnect(self,websocket:WebSocket,user_id:str):awaitwebsocket.accept()self.active_connections[user_id]websocketdefdisconnect(self,user_id:str):self.active_connections.pop(user_id,None)asyncdefbroadcast(self,message:dict):disconnected[]foruser_id,wsinself.active_connections.items():try:awaitws.send_json(message)except:disconnected.append(user_id)foruidindisconnected:self.disconnect(uid)managerConnectionManager()app.websocket(/ws/dashboard)asyncdefdashboard_websocket(websocket:WebSocket):# 简单的token验证tokenwebsocket.query_params.get(token)user_idverify_token(token)ifnotuser_id:awaitwebsocket.close(code4001)returnawaitmanager.connect(websocket,user_id)try:whileTrue:# 保持连接等待客户端消息如心跳dataawaitwebsocket.receive_text()exceptWebSocketDisconnect:manager.disconnect(user_id) 当调度引擎或Worker代理更新任务状态时会调用一个通知函数将状态变更消息发送到所有连接的WebSocket客户端。 pythonasyncdefnotify_status_change(shop_id:str,status_update:dict):awaitmanager.broadcast({type:shop_status,shop_id:shop_id,data:status_update}) 前端收到消息后更新对应店铺卡片的状态和进度条无需刷新整个页面。---## 三、前端设计一眼看清所有店铺前端使用ReactAnt Design构建本文侧重后端架构仅描述前端思路。 每个店铺显示为一个卡片卡片上包含-店铺名称与平台图标--当前任务名称如“上货中”“采集竞品”--进度条已完成步骤/总步骤--状态标签运行中绿色、暂停黄色、失败红色、空闲灰色--Worker节点和最近活跃时间 卡片按平台分组支持拖拽排序和自定义视图。 运营可以将自己负责的店铺固定在顶部或者按状态筛选。 当某个店铺任务失败时对应卡片变红并闪烁点击可查看最近的错误信息。**设计原则不需要任何培训运营一看就懂。**---## 四、手动干预从“只能看”到“可以动”如果仪表板只是展示它依然是一个高级监控屏。 真正释放运营能力的是允许他们直接在仪表板上进行干预操作。 我们提供的干预能力包括-**暂停任务**安全暂停当前正在执行的任务上一篇文章详细描述了暂停机制--**恢复任务**从暂停点继续执行--**跳过当前步骤**遇到已知的临时问题如平台公告页弹窗跳过非关键步骤--**修改任务参数**如临时调整上架价格、更换回复话术模板--**终止任务**强制终止并将任务标记为失败 每个操作都对应后端的一个REST API需要权限校验和审计日志。 pythonfromfastapiimportHTTPException,Dependsapp.post(/api/tasks/{task_id}/pause)asyncdefapi_pause_task(task_id:str,reason:str,userDepends(get_current_user)):ifnotuser.has_permission(task:pause):raiseHTTPException(status_code403,detailNo permission)awaittask_manager.request_pause(task_id,reason)awaitaudit_logger.log(user.id,pause_task,task_id,{reason:reason})return{status:ok}app.post(/api/tasks/{task_id}/modify_step)asyncdefapi_modify_step(task_id:str,step_index:int,new_params:dict,userDepends(get_current_user)):ifnotuser.has_permission(task:modify):raiseHTTPException(status_code403)awaittask_manager.modify_step_params(task_id,step_index,new_params)awaitaudit_logger.log(user.id,modify_step,task_id,{step:step_index,params:new_params})return{status:ok} 权限模型基于RBAC与之前的安全系统一致。 运营人员默认只有“查看”和“暂停/恢复”权限“修改参数”和“终止任务”需要更高角色。 操作完成后系统立即通过WebSocket向所有连接的客户端广播状态更新前端实时反映变化。---## 五、任务进度的精确展示进度条是仪表板上最直观的指标。 为了让进度条准确我们需要知道每个任务的总步骤数和当前执行到第几步。 指令配置中的每个步骤都有明确的序列号。 Worker在执行每一步前、后都会更新Redis中的进度信息。 pythonasyncdefupdate_task_progress(task_id:str,step_index:int,total_steps:int):awaitredis.hset(ftask:progress:{task_id},mapping{current_step:step_index,total_steps:total_steps,updated_at:time.time()})# 触发WebSocket推送awaitnotify_status_change(task_context[shop_id],{task_id:task_id,progress:int(step_index/total_steps*100)}) 前端收到进度更新后对应店铺卡片上的进度条平滑增长。 这让运营能直观判断任务是否在正常进行——如果进度条长时间卡住不动说明可能遇到了问题。---## 六、大规模店铺下的性能优化当店铺数量达到60时每个前端都连接WebSocket并接收全量广播会产生大量的前端渲染压力。 我们做了以下优化-**频道订阅**WebSocket连接时可以指定关注的平台或店铺组后端只推送相关更新而非全量广播。--**增量更新**只推送变更的字段前端做局部Patch而不是整个店铺对象。--**前端虚拟滚动**店铺卡片列表使用react-window只渲染可视区域内的卡片大幅降低DOM节点数。--**后端数据缓存**聚合数据在内存中缓存1秒多个WebSocket客户端共享同一份数据避免对Redis的高频查询。 pythonclassFilteredBroadcaster:def__init__(self):self.subscriptions:Dict[str,set]{}# user_id - {shop_id, ...}asyncdefpush_to_subscribers(self,shop_id,update):foruser_id,shopsinself.subscriptions.items():ifshop_idinshopsor*inshops:wsmanager.active_connections.get(user_id)ifws:awaitws.send_json({shop_id:shop_id,update:update}) 前端在建立WebSocket连接时可以传参 subscribe_shops后端据此过滤推送内容。---## 七、与现有系统的集成这套仪表板系统作为独立微服务部署不侵入调度引擎和Worker代理的核心代码。-通过只读访问Redis获取实时状态--通过REST API调用调度引擎的干预接口pause/resume/modify--通过订阅Redis Keyspace Notifications或现有的事件流获取状态变更事件 Worker代理在更新任务状态时已经习惯写入Redis Hash。 我们只是在写入后增加了一步notify_status_change调用不影响原有执行流程。---## 八、踩坑与教训**WebSocket断线重连风暴。**前端在断线后设置了自动重连但没有加退避延迟。一次后端短暂重启导致数十个前端几乎同时重连产生了瞬间的认证请求洪峰。 我们加入了1~5秒的随机退避重连并在后端增加了连接限速。**进度条跳跃。**当任务包含并行步骤时步骤序号和总步骤数的线性比例无法准确反映进度。 我们改为在指令配置中定义每个步骤的“权重”总进度按已完成步骤的权重累加计算。**权限边界模糊。**运营有时会绕过仪表板直接通过其他渠道找开发修改任务导致审计日志缺失。 我们强制所有干预操作必须通过仪表板的API入口Worker不接受来自其他渠道的干预指令。---## 九、写在最后自动化的价值不仅体现在“机器替人干活”更体现在“人能够清晰地看见机器在干什么”。 一套好的运营仪表板加上安全的手动干预能力让运营团队从“担心自动化跑出问题”变成了“信任自动化并参与其中”。当运营可以一边喝咖啡一边在手机上看到所有店铺在有条不紊地工作时自动化的工程价值才算真正落地。---*作者林焱*

更多文章