package com.wecom.robot.service; import com.alibaba.fastjson.JSON; import com.wecom.robot.adapter.ChannelAdapter; import com.wecom.robot.adapter.MessageSyncCapable; import com.wecom.robot.dto.InboundMessage; import com.wecom.robot.dto.ServiceStateResponse; import com.wecom.robot.dto.SyncMsgResponse; import com.wecom.robot.dto.WxCallbackMessage; import com.wecom.robot.entity.Message; import com.wecom.robot.entity.Session; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; /** * 消息处理服务 *

* 负责从微信拉取消息并转换为 InboundMessage 传递给 MessageRouterService。 * [AC-MCA-08] 消息处理服务 */ @Slf4j @Service @RequiredArgsConstructor public class MessageProcessService { private static final String CHANNEL_TYPE = "wechat"; private final SessionManagerService sessionManagerService; private final TransferService transferService; private final WecomApiService wecomApiService; private final WebSocketService webSocketService; private final MessageRouterService messageRouterService; private final Map channelAdapters; @Async public void processKfMessageEvent(WxCallbackMessage event) { String openKfId = event.getOpenKfId(); String token = event.getToken(); log.info("[AC-MCA-08] 处理客户消息事件: openKfId={}, token={}", openKfId, token); if (openKfId == null) { log.warn("事件缺少openKfId"); return; } SyncMsgResponse syncResponse = wecomApiService.syncMessagesByToken(openKfId, token); if (!syncResponse.isSuccess()) { log.error("拉取消息失败: errcode={}, errmsg={}", syncResponse.getErrcode(), syncResponse.getErrmsg()); return; } if (!syncResponse.hasMessages()) { log.info("没有新消息"); return; } log.info("拉取到{}条消息", syncResponse.getMsgList().size()); for (SyncMsgResponse.MsgItem msgItem : syncResponse.getMsgList()) { try { processSyncedItem(msgItem); } catch (Exception e) { log.error("处理消息失败: msgId={}", msgItem.getMsgId(), e); } } while (Boolean.TRUE.equals(syncResponse.getHasMore())) { log.info("还有更多消息,继续拉取..."); syncResponse = wecomApiService.syncMessages(openKfId, null); if (!syncResponse.isSuccess() || !syncResponse.hasMessages()) { break; } for (SyncMsgResponse.MsgItem msgItem : syncResponse.getMsgList()) { try { processSyncedItem(msgItem); } catch (Exception e) { log.error("处理消息失败: msgId={}", msgItem.getMsgId(), e); } } } } private void processSyncedItem(SyncMsgResponse.MsgItem msgItem) { String customerId = msgItem.getExternalUserId(); String kfId = msgItem.getOpenKfId(); log.info("[AC-MCA-08] 处理消息项: msgId={}, origin={}, msgType={}, customerId={}", msgItem.getMsgId(), msgItem.getOrigin(), msgItem.getMsgType(), customerId); if (msgItem.isEvent()) { processEventMessage(msgItem); return; } if (!msgItem.isFromCustomer()) { log.debug("非客户消息,跳过处理: origin={}", msgItem.getOrigin()); return; } if (customerId == null || kfId == null) { log.warn("消息缺少必要字段: customerId={}, kfId={}", customerId, kfId); return; } ServiceStateResponse wxState = wecomApiService.getServiceState(kfId, customerId); if (!wxState.isSuccess()) { log.warn("获取微信会话状态失败: errcode={}, errmsg={}", wxState.getErrcode(), wxState.getErrmsg()); } log.info("微信会话状态: {} ({})", wxState.getStateDesc(), wxState.getServiceState()); Session session = sessionManagerService.getOrCreateSession(customerId, kfId); sessionManagerService.updateWxServiceState(session.getSessionId(), wxState.getServiceState()); InboundMessage inboundMessage = buildInboundMessage(msgItem, customerId, kfId); messageRouterService.processInboundMessage(inboundMessage); } private void processEventMessage(SyncMsgResponse.MsgItem msgItem) { SyncMsgResponse.EventContent event = msgItem.getEvent(); if (event == null) { return; } String eventType = event.getEventType(); String customerId = event.getExternalUserid(); String kfId = event.getOpenKfId(); log.info("处理事件消息: eventType={}, customerId={}, kfId={}", eventType, customerId, kfId); switch (eventType) { case SyncMsgResponse.EventContent.EVENT_ENTER_SESSION: handleEnterSessionEvent(event, customerId, kfId); break; case SyncMsgResponse.EventContent.EVENT_SESSION_STATUS_CHANGE: handleSessionStatusChangeEvent(event, customerId, kfId); break; case SyncMsgResponse.EventContent.EVENT_MSG_SEND_FAIL: log.warn("消息发送失败: failMsgid={}, failType={}", event.getFailMsgid(), event.getFailType()); break; case SyncMsgResponse.EventContent.EVENT_USER_RECALL_MSG: log.info("用户撤回消息: recallMsgid={}", event.getRecallMsgid()); break; default: log.info("其他事件类型: {}", eventType); } } private void handleEnterSessionEvent(SyncMsgResponse.EventContent event, String customerId, String kfId) { log.info("用户进入会话: customerId={}, scene={}, sceneParam={}", customerId, event.getScene(), event.getSceneParam()); Session session = sessionManagerService.getOrCreateSession(customerId, kfId); String welcomeCode = event.getWelcomeCode(); if (welcomeCode != null && !welcomeCode.isEmpty()) { String welcomeMsg = "您好,欢迎咨询!请问有什么可以帮您?"; wecomApiService.sendWelcomeMsg(welcomeCode, welcomeMsg); sessionManagerService.saveMessage( "welcome_" + System.currentTimeMillis(), session.getSessionId(), Message.SENDER_TYPE_AI, "AI", welcomeMsg, "text", null ); } } private void handleSessionStatusChangeEvent(SyncMsgResponse.EventContent event, String customerId, String kfId) { Integer changeType = event.getChangeType(); String newServicerUserid = event.getNewServicerUserid(); String oldServicerUserid = event.getOldServicerUserid(); log.info("会话状态变更: changeType={}, oldServicer={}, newServicer={}", changeType, oldServicerUserid, newServicerUserid); Session session = sessionManagerService.getOrCreateSession(customerId, kfId); switch (changeType) { case SyncMsgResponse.EventContent.CHANGE_TYPE_FROM_POOL: log.info("从接待池接入会话: servicer={}", newServicerUserid); sessionManagerService.updateSessionStatus(session.getSessionId(), Session.STATUS_MANUAL); sessionManagerService.updateServicer(session.getSessionId(), newServicerUserid); break; case SyncMsgResponse.EventContent.CHANGE_TYPE_TRANSFER: log.info("转接会话: oldServicer={}, newServicer={}", oldServicerUserid, newServicerUserid); sessionManagerService.updateServicer(session.getSessionId(), newServicerUserid); break; case SyncMsgResponse.EventContent.CHANGE_TYPE_END: log.info("结束会话"); sessionManagerService.updateSessionStatus(session.getSessionId(), Session.STATUS_CLOSED); break; case SyncMsgResponse.EventContent.CHANGE_TYPE_REENTER: log.info("重新接入已结束会话: servicer={}", newServicerUserid); sessionManagerService.updateSessionStatus(session.getSessionId(), Session.STATUS_MANUAL); sessionManagerService.updateServicer(session.getSessionId(), newServicerUserid); break; } } private InboundMessage buildInboundMessage(SyncMsgResponse.MsgItem msgItem, String customerId, String kfId) { String content = extractContent(msgItem); String sessionKey = kfId + "_" + customerId; return InboundMessage.builder() .channelType(InboundMessage.CHANNEL_WECHAT) .channelMessageId(msgItem.getMsgId()) .sessionKey(sessionKey) .customerId(customerId) .kfId(kfId) .sender(customerId) .content(content) .msgType(msgItem.getMsgType()) .rawPayload(msgItem.getOriginData()) .timestamp(System.currentTimeMillis()) .build(); } private String extractContent(SyncMsgResponse.MsgItem msgItem) { String msgType = msgItem.getMsgType(); switch (msgType) { case "text": return msgItem.getTextContent(); case "image": return "[图片]"; case "voice": return "[语音]"; case "video": return "[视频]"; case "file": return "[文件]"; case "location": SyncMsgResponse.LocationContent loc = msgItem.getLocation(); if (loc != null) { return "[位置] " + loc.getName() + " " + loc.getAddress(); } return "[位置]"; case "link": SyncMsgResponse.LinkContent link = msgItem.getLink(); if (link != null) { return "[链接] " + link.getTitle(); } return "[链接]"; case "business_card": return "[名片]"; case "miniprogram": return "[小程序]"; case "msgmenu": return "[菜单消息]"; default: return "[" + msgType + "]"; } } @Async public void processMessage(WxCallbackMessage message) { log.info("[AC-MCA-08] 直接处理消息(测试用): msgType={}", message.getMsgType()); String customerId = message.getExternalUserId(); String kfId = message.getOpenKfId(); if (customerId == null || kfId == null) { log.warn("消息缺少必要字段: customerId={}, kfId={}", customerId, kfId); return; } String sessionKey = kfId + "_" + customerId; InboundMessage inboundMessage = InboundMessage.builder() .channelType(InboundMessage.CHANNEL_WECHAT) .channelMessageId(message.getMsgId() != null ? message.getMsgId() : "test_" + System.currentTimeMillis()) .sessionKey(sessionKey) .customerId(customerId) .kfId(kfId) .sender(customerId) .content(message.getContent()) .msgType(message.getMsgType() != null ? message.getMsgType() : "text") .rawPayload(JSON.toJSONString(message.getRawData())) .timestamp(System.currentTimeMillis()) .build(); messageRouterService.processInboundMessage(inboundMessage); } }