diff --git a/src/main/java/com/wecom/robot/service/MessageProcessService.java b/src/main/java/com/wecom/robot/service/MessageProcessService.java index c081bcf..c23f121 100644 --- a/src/main/java/com/wecom/robot/service/MessageProcessService.java +++ b/src/main/java/com/wecom/robot/service/MessageProcessService.java @@ -1,6 +1,7 @@ package com.wecom.robot.service; import com.alibaba.fastjson.JSON; +import com.wecom.robot.dto.InboundMessage; import com.wecom.robot.dto.ServiceStateResponse; import com.wecom.robot.dto.SyncMsgResponse; import com.wecom.robot.dto.WxCallbackMessage; @@ -11,25 +12,35 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import java.util.List; - +/** + * 消息处理服务 - 微信渠道消息处理入口 + * + *

职责: + *

+ * + *

关联 AC: [AC-MCA-08] 统一消息路由 + * + * @see MessageRouterService + * @see InboundMessage + */ @Slf4j @Service @RequiredArgsConstructor public class MessageProcessService { private final SessionManagerService sessionManagerService; - private final AiService aiService; - private final TransferService transferService; private final WecomApiService wecomApiService; - private final WebSocketService webSocketService; + private final MessageRouterService messageRouterService; @Async public void processKfMessageEvent(WxCallbackMessage event) { String openKfId = event.getOpenKfId(); String token = event.getToken(); - log.info("处理客户消息事件: openKfId={}, token={}", openKfId, token); + log.info("[AC-MCA-08] 处理客户消息事件: openKfId={}, token={}", openKfId, token); if (openKfId == null) { log.warn("事件缺少openKfId"); @@ -81,7 +92,7 @@ public class MessageProcessService { String customerId = msgItem.getExternalUserId(); String kfId = msgItem.getOpenKfId(); - log.info("处理消息项: msgId={}, origin={}, msgType={}, customerId={}", + log.info("[AC-MCA-08] 处理消息项: msgId={}, origin={}, msgType={}, customerId={}", msgItem.getMsgId(), msgItem.getOrigin(), msgItem.getMsgType(), customerId); if (msgItem.isEvent()) { @@ -99,21 +110,6 @@ public class MessageProcessService { return; } - String content = extractContent(msgItem); - String msgType = msgItem.getMsgType(); - - Session session = sessionManagerService.getOrCreateSession(customerId, kfId); - - sessionManagerService.saveMessage( - msgItem.getMsgId(), - session.getSessionId(), - Message.SENDER_TYPE_CUSTOMER, - customerId, - content, - msgType, - msgItem.getOriginData() - ); - ServiceStateResponse wxState = wecomApiService.getServiceState(kfId, customerId); if (!wxState.isSuccess()) { log.warn("获取微信会话状态失败: errcode={}, errmsg={}", @@ -121,9 +117,13 @@ public class MessageProcessService { } log.info("微信会话状态: {} ({})", wxState.getStateDesc(), wxState.getServiceState()); + + Session session = sessionManagerService.getOrCreateSession(customerId, kfId); sessionManagerService.updateWxServiceState(session.getSessionId(), wxState.getServiceState()); - processByWxState(session, customerId, kfId, content, msgType, wxState); + InboundMessage inboundMessage = buildInboundMessage(msgItem, customerId, kfId); + + messageRouterService.processInboundMessage(inboundMessage); } private void processEventMessage(SyncMsgResponse.MsgItem msgItem) { @@ -186,7 +186,6 @@ public class MessageProcessService { Integer changeType = event.getChangeType(); String newServicerUserid = event.getNewServicerUserid(); String oldServicerUserid = event.getOldServicerUserid(); - String msgCode = event.getMsgCode(); log.info("会话状态变更: changeType={}, oldServicer={}, newServicer={}", changeType, oldServicerUserid, newServicerUserid); @@ -215,97 +214,23 @@ public class MessageProcessService { } } - private void processByWxState(Session session, String customerId, String kfId, - String content, String msgType, ServiceStateResponse wxState) { - Integer state = wxState.getServiceState(); - - if (state == null) { - state = ServiceStateResponse.STATE_UNTREATED; - } + private InboundMessage buildInboundMessage(SyncMsgResponse.MsgItem msgItem, + String customerId, String kfId) { + String content = extractContent(msgItem); + String sessionKey = kfId + "_" + customerId; - switch (state) { - case ServiceStateResponse.STATE_UNTREATED: - case ServiceStateResponse.STATE_AI: - processAiMessage(session, customerId, kfId, content); - break; - case ServiceStateResponse.STATE_POOL: - notifyPendingSession(session, customerId, kfId, content, msgType); - break; - case ServiceStateResponse.STATE_MANUAL: - pushToManualCs(session, customerId, kfId, content, msgType, wxState.getServicerUserid()); - break; - case ServiceStateResponse.STATE_CLOSED: - Session newSession = sessionManagerService.getOrCreateSession(customerId, kfId); - processAiMessage(newSession, customerId, kfId, content); - break; - default: - log.warn("未知的微信会话状态: {}", state); - processAiMessage(session, customerId, kfId, content); - } - } - - private void processAiMessage(Session session, String customerId, String kfId, String content) { - List history = sessionManagerService.getSessionMessages(session.getSessionId()); - String reply = aiService.generateReply(content, history); - - double confidence = aiService.getLastConfidence(); - int messageCount = sessionManagerService.getMessageCount(session.getSessionId()); - - boolean shouldTransfer = transferService.shouldTransferToManual( - content, - confidence, - messageCount, - session.getCreatedAt() - ); - - if (shouldTransfer) { - String reason = transferService.getTransferReason(content, confidence, messageCount); - sessionManagerService.transferToManual(session.getSessionId(), reason); - - reply = reply + "\n\n正在为您转接人工客服,请稍候..."; - wecomApiService.sendTextMessage(customerId, kfId, reply); - - boolean transferred = wecomApiService.transferToPool(kfId, customerId); - if (transferred) { - log.info("已将会话转入待接入池: customerId={}, kfId={}", customerId, kfId); - sessionManagerService.updateWxServiceState(session.getSessionId(), ServiceStateResponse.STATE_POOL); - } - - webSocketService.notifyNewPendingSession(session.getSessionId()); - } else { - wecomApiService.sendTextMessage(customerId, kfId, reply); - - sessionManagerService.saveMessage( - "ai_" + System.currentTimeMillis(), - session.getSessionId(), - Message.SENDER_TYPE_AI, - "AI", - reply, - "text", - null - ); - } - } - - private void notifyPendingSession(Session session, String customerId, String kfId, - String content, String msgType) { - WxCallbackMessage notifyMsg = new WxCallbackMessage(); - notifyMsg.setExternalUserId(customerId); - notifyMsg.setOpenKfId(kfId); - notifyMsg.setContent(content); - notifyMsg.setMsgType(msgType); - webSocketService.notifyNewMessage(session.getSessionId(), notifyMsg); - } - - private void pushToManualCs(Session session, String customerId, String kfId, - String content, String msgType, String servicerUserid) { - WxCallbackMessage pushMsg = new WxCallbackMessage(); - pushMsg.setExternalUserId(customerId); - pushMsg.setOpenKfId(kfId); - pushMsg.setContent(content); - pushMsg.setMsgType(msgType); - pushMsg.setServicerUserid(servicerUserid); - webSocketService.pushMessageToCs(session.getSessionId(), pushMsg); + return InboundMessage.builder() + .channelType(InboundMessage.CHANNEL_WECHAT) + .channelMessageId(msgItem.getMsgId()) + .sessionKey(sessionKey) + .customerId(customerId) + .kfId(kfId) + .sender(customerId) + .content(content) + .msgType(msgItem.getMsgType()) + .rawPayload(msgItem.getOriginData()) + .timestamp(System.currentTimeMillis()) + .build(); } private String extractContent(SyncMsgResponse.MsgItem msgItem) { @@ -347,7 +272,7 @@ public class MessageProcessService { @Async public void processMessage(WxCallbackMessage message) { - log.info("直接处理消息(测试用): msgType={}", message.getMsgType()); + log.info("[AC-MCA-08] 直接处理消息(测试用): msgType={}", message.getMsgType()); String customerId = message.getExternalUserId(); String kfId = message.getOpenKfId(); @@ -357,37 +282,21 @@ public class MessageProcessService { return; } - Session session = sessionManagerService.getOrCreateSession(customerId, kfId); - String status = sessionManagerService.getSessionStatus(session.getSessionId()); + String sessionKey = kfId + "_" + customerId; - sessionManagerService.saveMessage( - message.getMsgId() != null ? message.getMsgId() : "test_" + System.currentTimeMillis(), - session.getSessionId(), - Message.SENDER_TYPE_CUSTOMER, - customerId, - message.getContent(), - message.getMsgType() != null ? message.getMsgType() : "text", - JSON.toJSONString(message.getRawData()) - ); + InboundMessage inboundMessage = InboundMessage.builder() + .channelType(InboundMessage.CHANNEL_WECHAT) + .channelMessageId(message.getMsgId() != null ? message.getMsgId() : "test_" + System.currentTimeMillis()) + .sessionKey(sessionKey) + .customerId(customerId) + .kfId(kfId) + .sender(customerId) + .content(message.getContent()) + .msgType(message.getMsgType() != null ? message.getMsgType() : "text") + .rawPayload(JSON.toJSONString(message.getRawData())) + .timestamp(System.currentTimeMillis()) + .build(); - List history = sessionManagerService.getSessionMessages(session.getSessionId()); - - switch (status) { - case Session.STATUS_AI: - processAiMessage(session, customerId, kfId, message.getContent()); - break; - case Session.STATUS_PENDING: - notifyPendingSession(session, customerId, kfId, message.getContent(), message.getMsgType()); - break; - case Session.STATUS_MANUAL: - pushToManualCs(session, customerId, kfId, message.getContent(), message.getMsgType(), null); - break; - case Session.STATUS_CLOSED: - Session newSession = sessionManagerService.getOrCreateSession(customerId, kfId); - processAiMessage(newSession, customerId, kfId, message.getContent()); - break; - default: - log.warn("未知的会话状态: {}", status); - } + messageRouterService.processInboundMessage(inboundMessage); } }