package com.wecom.robot.service; import com.alibaba.fastjson.JSON; import com.wecom.robot.dto.ServiceStateResponse; import com.wecom.robot.dto.SyncMsgResponse; import com.wecom.robot.dto.WxCallbackMessage; import com.wecom.robot.entity.Message; import com.wecom.robot.entity.Session; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.List; @Slf4j @Service @RequiredArgsConstructor public class MessageProcessService { private final SessionManagerService sessionManagerService; private final AiService aiService; private final TransferService transferService; private final WecomApiService wecomApiService; private final WebSocketService webSocketService; @Async public void processKfMessageEvent(WxCallbackMessage event) { String openKfId = event.getOpenKfId(); String token = event.getToken(); log.info("处理客户消息事件: openKfId={}, token={}", openKfId, token); if (openKfId == null) { log.warn("事件缺少openKfId"); return; } SyncMsgResponse syncResponse = wecomApiService.syncMessagesByToken(openKfId, token); if (!syncResponse.isSuccess()) { log.error("拉取消息失败: errcode={}, errmsg={}", syncResponse.getErrcode(), syncResponse.getErrmsg()); return; } if (!syncResponse.hasMessages()) { log.info("没有新消息"); return; } log.info("拉取到{}条消息", syncResponse.getMsgList().size()); for (SyncMsgResponse.MsgItem msgItem : syncResponse.getMsgList()) { try { processSyncedItem(msgItem); } catch (Exception e) { log.error("处理消息失败: msgId={}", msgItem.getMsgId(), e); } } while (Boolean.TRUE.equals(syncResponse.getHasMore())) { log.info("还有更多消息,继续拉取..."); syncResponse = wecomApiService.syncMessages(openKfId, null); if (!syncResponse.isSuccess() || !syncResponse.hasMessages()) { break; } for (SyncMsgResponse.MsgItem msgItem : syncResponse.getMsgList()) { try { processSyncedItem(msgItem); } catch (Exception e) { log.error("处理消息失败: msgId={}", msgItem.getMsgId(), e); } } } } private void processSyncedItem(SyncMsgResponse.MsgItem msgItem) { String customerId = msgItem.getExternalUserId(); String kfId = msgItem.getOpenKfId(); log.info("处理消息项: msgId={}, origin={}, msgType={}, customerId={}", msgItem.getMsgId(), msgItem.getOrigin(), msgItem.getMsgType(), customerId); if (msgItem.isEvent()) { processEventMessage(msgItem); return; } if (!msgItem.isFromCustomer()) { log.debug("非客户消息,跳过处理: origin={}", msgItem.getOrigin()); return; } if (customerId == null || kfId == null) { log.warn("消息缺少必要字段: customerId={}, kfId={}", customerId, kfId); return; } String content = extractContent(msgItem); String msgType = msgItem.getMsgType(); Session session = sessionManagerService.getOrCreateSession(customerId, kfId); sessionManagerService.saveMessage( msgItem.getMsgId(), session.getSessionId(), Message.SENDER_TYPE_CUSTOMER, customerId, content, msgType, msgItem.getOriginData() ); ServiceStateResponse wxState = wecomApiService.getServiceState(kfId, customerId); if (!wxState.isSuccess()) { log.warn("获取微信会话状态失败: errcode={}, errmsg={}", wxState.getErrcode(), wxState.getErrmsg()); } log.info("微信会话状态: {} ({})", wxState.getStateDesc(), wxState.getServiceState()); sessionManagerService.updateWxServiceState(session.getSessionId(), wxState.getServiceState()); processByWxState(session, customerId, kfId, content, msgType, wxState); } private void processEventMessage(SyncMsgResponse.MsgItem msgItem) { SyncMsgResponse.EventContent event = msgItem.getEvent(); if (event == null) { return; } String eventType = event.getEventType(); String customerId = event.getExternalUserid(); String kfId = event.getOpenKfId(); log.info("处理事件消息: eventType={}, customerId={}, kfId={}", eventType, customerId, kfId); switch (eventType) { case SyncMsgResponse.EventContent.EVENT_ENTER_SESSION: handleEnterSessionEvent(event, customerId, kfId); break; case SyncMsgResponse.EventContent.EVENT_SESSION_STATUS_CHANGE: handleSessionStatusChangeEvent(event, customerId, kfId); break; case SyncMsgResponse.EventContent.EVENT_MSG_SEND_FAIL: log.warn("消息发送失败: failMsgid={}, failType={}", event.getFailMsgid(), event.getFailType()); break; case SyncMsgResponse.EventContent.EVENT_USER_RECALL_MSG: log.info("用户撤回消息: recallMsgid={}", event.getRecallMsgid()); break; default: log.info("其他事件类型: {}", eventType); } } private void handleEnterSessionEvent(SyncMsgResponse.EventContent event, String customerId, String kfId) { log.info("用户进入会话: customerId={}, scene={}, sceneParam={}", customerId, event.getScene(), event.getSceneParam()); Session session = sessionManagerService.getOrCreateSession(customerId, kfId); String welcomeCode = event.getWelcomeCode(); if (welcomeCode != null && !welcomeCode.isEmpty()) { String welcomeMsg = "您好,欢迎咨询!请问有什么可以帮您?"; wecomApiService.sendWelcomeMsg(welcomeCode, welcomeMsg); sessionManagerService.saveMessage( "welcome_" + System.currentTimeMillis(), session.getSessionId(), Message.SENDER_TYPE_AI, "AI", welcomeMsg, "text", null ); } } private void handleSessionStatusChangeEvent(SyncMsgResponse.EventContent event, String customerId, String kfId) { Integer changeType = event.getChangeType(); String newServicerUserid = event.getNewServicerUserid(); String oldServicerUserid = event.getOldServicerUserid(); String msgCode = event.getMsgCode(); log.info("会话状态变更: changeType={}, oldServicer={}, newServicer={}", changeType, oldServicerUserid, newServicerUserid); Session session = sessionManagerService.getOrCreateSession(customerId, kfId); switch (changeType) { case SyncMsgResponse.EventContent.CHANGE_TYPE_FROM_POOL: log.info("从接待池接入会话: servicer={}", newServicerUserid); sessionManagerService.updateSessionStatus(session.getSessionId(), Session.STATUS_MANUAL); sessionManagerService.updateServicer(session.getSessionId(), newServicerUserid); break; case SyncMsgResponse.EventContent.CHANGE_TYPE_TRANSFER: log.info("转接会话: oldServicer={}, newServicer={}", oldServicerUserid, newServicerUserid); sessionManagerService.updateServicer(session.getSessionId(), newServicerUserid); break; case SyncMsgResponse.EventContent.CHANGE_TYPE_END: log.info("结束会话"); sessionManagerService.updateSessionStatus(session.getSessionId(), Session.STATUS_CLOSED); break; case SyncMsgResponse.EventContent.CHANGE_TYPE_REENTER: log.info("重新接入已结束会话: servicer={}", newServicerUserid); sessionManagerService.updateSessionStatus(session.getSessionId(), Session.STATUS_MANUAL); sessionManagerService.updateServicer(session.getSessionId(), newServicerUserid); break; } } private void processByWxState(Session session, String customerId, String kfId, String content, String msgType, ServiceStateResponse wxState) { Integer state = wxState.getServiceState(); if (state == null) { state = ServiceStateResponse.STATE_UNTREATED; } switch (state) { case ServiceStateResponse.STATE_UNTREATED: case ServiceStateResponse.STATE_AI: processAiMessage(session, customerId, kfId, content); break; case ServiceStateResponse.STATE_POOL: notifyPendingSession(session, customerId, kfId, content, msgType); break; case ServiceStateResponse.STATE_MANUAL: pushToManualCs(session, customerId, kfId, content, msgType, wxState.getServicerUserid()); break; case ServiceStateResponse.STATE_CLOSED: Session newSession = sessionManagerService.getOrCreateSession(customerId, kfId); processAiMessage(newSession, customerId, kfId, content); break; default: log.warn("未知的微信会话状态: {}", state); processAiMessage(session, customerId, kfId, content); } } private void processAiMessage(Session session, String customerId, String kfId, String content) { List history = sessionManagerService.getSessionMessages(session.getSessionId()); String reply = aiService.generateReply(content, history); double confidence = aiService.getLastConfidence(); int messageCount = sessionManagerService.getMessageCount(session.getSessionId()); boolean shouldTransfer = transferService.shouldTransferToManual( content, confidence, messageCount, session.getCreatedAt() ); if (shouldTransfer) { String reason = transferService.getTransferReason(content, confidence, messageCount); sessionManagerService.transferToManual(session.getSessionId(), reason); reply = reply + "\n\n正在为您转接人工客服,请稍候..."; wecomApiService.sendTextMessage(customerId, kfId, reply); boolean transferred = wecomApiService.transferToPool(kfId, customerId); if (transferred) { log.info("已将会话转入待接入池: customerId={}, kfId={}", customerId, kfId); sessionManagerService.updateWxServiceState(session.getSessionId(), ServiceStateResponse.STATE_POOL); } webSocketService.notifyNewPendingSession(session.getSessionId()); } else { wecomApiService.sendTextMessage(customerId, kfId, reply); sessionManagerService.saveMessage( "ai_" + System.currentTimeMillis(), session.getSessionId(), Message.SENDER_TYPE_AI, "AI", reply, "text", null ); } } private void notifyPendingSession(Session session, String customerId, String kfId, String content, String msgType) { WxCallbackMessage notifyMsg = new WxCallbackMessage(); notifyMsg.setExternalUserId(customerId); notifyMsg.setOpenKfId(kfId); notifyMsg.setContent(content); notifyMsg.setMsgType(msgType); webSocketService.notifyNewMessage(session.getSessionId(), notifyMsg); } private void pushToManualCs(Session session, String customerId, String kfId, String content, String msgType, String servicerUserid) { WxCallbackMessage pushMsg = new WxCallbackMessage(); pushMsg.setExternalUserId(customerId); pushMsg.setOpenKfId(kfId); pushMsg.setContent(content); pushMsg.setMsgType(msgType); pushMsg.setServicerUserid(servicerUserid); webSocketService.pushMessageToCs(session.getSessionId(), pushMsg); } private String extractContent(SyncMsgResponse.MsgItem msgItem) { String msgType = msgItem.getMsgType(); switch (msgType) { case "text": return msgItem.getTextContent(); case "image": return "[图片]"; case "voice": return "[语音]"; case "video": return "[视频]"; case "file": return "[文件]"; case "location": SyncMsgResponse.LocationContent loc = msgItem.getLocation(); if (loc != null) { return "[位置] " + loc.getName() + " " + loc.getAddress(); } return "[位置]"; case "link": SyncMsgResponse.LinkContent link = msgItem.getLink(); if (link != null) { return "[链接] " + link.getTitle(); } return "[链接]"; case "business_card": return "[名片]"; case "miniprogram": return "[小程序]"; case "msgmenu": return "[菜单消息]"; default: return "[" + msgType + "]"; } } @Async public void processMessage(WxCallbackMessage message) { log.info("直接处理消息(测试用): msgType={}", message.getMsgType()); String customerId = message.getExternalUserId(); String kfId = message.getOpenKfId(); if (customerId == null || kfId == null) { log.warn("消息缺少必要字段: customerId={}, kfId={}", customerId, kfId); return; } Session session = sessionManagerService.getOrCreateSession(customerId, kfId); String status = sessionManagerService.getSessionStatus(session.getSessionId()); sessionManagerService.saveMessage( message.getMsgId() != null ? message.getMsgId() : "test_" + System.currentTimeMillis(), session.getSessionId(), Message.SENDER_TYPE_CUSTOMER, customerId, message.getContent(), message.getMsgType() != null ? message.getMsgType() : "text", JSON.toJSONString(message.getRawData()) ); List history = sessionManagerService.getSessionMessages(session.getSessionId()); switch (status) { case Session.STATUS_AI: processAiMessage(session, customerId, kfId, message.getContent()); break; case Session.STATUS_PENDING: notifyPendingSession(session, customerId, kfId, message.getContent(), message.getMsgType()); break; case Session.STATUS_MANUAL: pushToManualCs(session, customerId, kfId, message.getContent(), message.getMsgType(), null); break; case Session.STATUS_CLOSED: Session newSession = sessionManagerService.getOrCreateSession(customerId, kfId); processAiMessage(newSession, customerId, kfId, message.getContent()); break; default: log.warn("未知的会话状态: {}", status); } } }