221 lines
8.4 KiB
Java
221 lines
8.4 KiB
Java
|
|
package com.wecom.robot.service;
|
||
|
|
|
||
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||
|
|
import com.wecom.robot.entity.Message;
|
||
|
|
import com.wecom.robot.entity.Session;
|
||
|
|
import com.wecom.robot.entity.TransferLog;
|
||
|
|
import com.wecom.robot.mapper.MessageMapper;
|
||
|
|
import com.wecom.robot.mapper.SessionMapper;
|
||
|
|
import com.wecom.robot.mapper.TransferLogMapper;
|
||
|
|
import lombok.RequiredArgsConstructor;
|
||
|
|
import lombok.extern.slf4j.Slf4j;
|
||
|
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||
|
|
import org.springframework.stereotype.Service;
|
||
|
|
import org.springframework.transaction.annotation.Transactional;
|
||
|
|
|
||
|
|
import java.time.LocalDateTime;
|
||
|
|
import java.util.List;
|
||
|
|
import java.util.concurrent.TimeUnit;
|
||
|
|
|
||
|
|
@Slf4j
|
||
|
|
@Service
|
||
|
|
@RequiredArgsConstructor
|
||
|
|
public class SessionManagerService {
|
||
|
|
|
||
|
|
private static final String SESSION_STATUS_KEY_PREFIX = "session:status:";
|
||
|
|
private static final String SESSION_MESSAGE_COUNT_KEY_PREFIX = "session:msg_count:";
|
||
|
|
|
||
|
|
private final SessionMapper sessionMapper;
|
||
|
|
private final MessageMapper messageMapper;
|
||
|
|
private final TransferLogMapper transferLogMapper;
|
||
|
|
private final StringRedisTemplate redisTemplate;
|
||
|
|
|
||
|
|
public Session getOrCreateSession(String customerId, String kfId) {
|
||
|
|
LambdaQueryWrapper<Session> query = new LambdaQueryWrapper<>();
|
||
|
|
query.eq(Session::getCustomerId, customerId)
|
||
|
|
.eq(Session::getKfId, kfId)
|
||
|
|
.ne(Session::getStatus, Session.STATUS_CLOSED)
|
||
|
|
.orderByDesc(Session::getCreatedAt)
|
||
|
|
.last("LIMIT 1");
|
||
|
|
|
||
|
|
Session session = sessionMapper.selectOne(query);
|
||
|
|
if (session == null) {
|
||
|
|
session = new Session();
|
||
|
|
session.setSessionId(generateSessionId(customerId, kfId));
|
||
|
|
session.setCustomerId(customerId);
|
||
|
|
session.setKfId(kfId);
|
||
|
|
session.setStatus(Session.STATUS_AI);
|
||
|
|
session.setWxServiceState(0);
|
||
|
|
session.setCreatedAt(LocalDateTime.now());
|
||
|
|
session.setUpdatedAt(LocalDateTime.now());
|
||
|
|
sessionMapper.insert(session);
|
||
|
|
|
||
|
|
cacheSessionStatus(session.getSessionId(), Session.STATUS_AI);
|
||
|
|
}
|
||
|
|
|
||
|
|
return session;
|
||
|
|
}
|
||
|
|
|
||
|
|
public Session getSession(String sessionId) {
|
||
|
|
return sessionMapper.selectById(sessionId);
|
||
|
|
}
|
||
|
|
|
||
|
|
public String getSessionStatus(String sessionId) {
|
||
|
|
String cachedStatus = redisTemplate.opsForValue().get(SESSION_STATUS_KEY_PREFIX + sessionId);
|
||
|
|
if (cachedStatus != null) {
|
||
|
|
return cachedStatus;
|
||
|
|
}
|
||
|
|
|
||
|
|
Session session = sessionMapper.selectById(sessionId);
|
||
|
|
if (session != null) {
|
||
|
|
cacheSessionStatus(sessionId, session.getStatus());
|
||
|
|
return session.getStatus();
|
||
|
|
}
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
|
||
|
|
public void updateSessionStatus(String sessionId, String status) {
|
||
|
|
Session session = new Session();
|
||
|
|
session.setSessionId(sessionId);
|
||
|
|
session.setStatus(status);
|
||
|
|
session.setUpdatedAt(LocalDateTime.now());
|
||
|
|
sessionMapper.updateById(session);
|
||
|
|
|
||
|
|
cacheSessionStatus(sessionId, status);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void updateWxServiceState(String sessionId, Integer wxServiceState) {
|
||
|
|
Session session = new Session();
|
||
|
|
session.setSessionId(sessionId);
|
||
|
|
session.setWxServiceState(wxServiceState);
|
||
|
|
session.setUpdatedAt(LocalDateTime.now());
|
||
|
|
sessionMapper.updateById(session);
|
||
|
|
log.info("更新微信会话状态: sessionId={}, wxServiceState={}", sessionId, wxServiceState);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void updateServicer(String sessionId, String servicerUserid) {
|
||
|
|
Session session = new Session();
|
||
|
|
session.setSessionId(sessionId);
|
||
|
|
session.setManualCsId(servicerUserid);
|
||
|
|
session.setUpdatedAt(LocalDateTime.now());
|
||
|
|
sessionMapper.updateById(session);
|
||
|
|
log.info("更新接待人员: sessionId={}, servicerUserid={}", sessionId, servicerUserid);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void assignManualCs(String sessionId, String csId) {
|
||
|
|
Session session = new Session();
|
||
|
|
session.setSessionId(sessionId);
|
||
|
|
session.setStatus(Session.STATUS_MANUAL);
|
||
|
|
session.setManualCsId(csId);
|
||
|
|
session.setUpdatedAt(LocalDateTime.now());
|
||
|
|
sessionMapper.updateById(session);
|
||
|
|
|
||
|
|
cacheSessionStatus(sessionId, Session.STATUS_MANUAL);
|
||
|
|
}
|
||
|
|
|
||
|
|
@Transactional
|
||
|
|
public void transferToManual(String sessionId, String reason) {
|
||
|
|
updateSessionStatus(sessionId, Session.STATUS_PENDING);
|
||
|
|
|
||
|
|
TransferLog transferLog = new TransferLog();
|
||
|
|
transferLog.setSessionId(sessionId);
|
||
|
|
transferLog.setTriggerReason(reason);
|
||
|
|
transferLog.setTriggerTime(LocalDateTime.now());
|
||
|
|
transferLogMapper.insert(transferLog);
|
||
|
|
|
||
|
|
log.info("会话转人工: sessionId={}, reason={}", sessionId, reason);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void acceptTransfer(String sessionId, String csId) {
|
||
|
|
assignManualCs(sessionId, csId);
|
||
|
|
|
||
|
|
LambdaQueryWrapper<TransferLog> query = new LambdaQueryWrapper<>();
|
||
|
|
query.eq(TransferLog::getSessionId, sessionId)
|
||
|
|
.isNull(TransferLog::getAcceptedTime)
|
||
|
|
.orderByDesc(TransferLog::getTriggerTime)
|
||
|
|
.last("LIMIT 1");
|
||
|
|
|
||
|
|
TransferLog transferLog = transferLogMapper.selectOne(query);
|
||
|
|
if (transferLog != null) {
|
||
|
|
transferLog.setAcceptedTime(LocalDateTime.now());
|
||
|
|
transferLog.setAcceptedCsId(csId);
|
||
|
|
transferLogMapper.updateById(transferLog);
|
||
|
|
}
|
||
|
|
|
||
|
|
log.info("客服接入会话: sessionId={}, csId={}", sessionId, csId);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void closeSession(String sessionId) {
|
||
|
|
updateSessionStatus(sessionId, Session.STATUS_CLOSED);
|
||
|
|
redisTemplate.delete(SESSION_STATUS_KEY_PREFIX + sessionId);
|
||
|
|
redisTemplate.delete(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId);
|
||
|
|
log.info("会话已关闭: sessionId={}", sessionId);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void saveMessage(String msgId, String sessionId, String senderType, String senderId,
|
||
|
|
String content, String msgType, String rawData) {
|
||
|
|
Message message = new Message();
|
||
|
|
message.setMsgId(msgId);
|
||
|
|
message.setSessionId(sessionId);
|
||
|
|
message.setSenderType(senderType);
|
||
|
|
message.setSenderId(senderId);
|
||
|
|
message.setContent(content);
|
||
|
|
message.setMsgType(msgType);
|
||
|
|
message.setCreatedAt(LocalDateTime.now());
|
||
|
|
message.setRawData(rawData);
|
||
|
|
messageMapper.insert(message);
|
||
|
|
|
||
|
|
incrementMessageCount(sessionId);
|
||
|
|
}
|
||
|
|
|
||
|
|
public List<Message> getSessionMessages(String sessionId) {
|
||
|
|
LambdaQueryWrapper<Message> query = new LambdaQueryWrapper<>();
|
||
|
|
query.eq(Message::getSessionId, sessionId)
|
||
|
|
.orderByAsc(Message::getCreatedAt);
|
||
|
|
return messageMapper.selectList(query);
|
||
|
|
}
|
||
|
|
|
||
|
|
public int getMessageCount(String sessionId) {
|
||
|
|
String count = redisTemplate.opsForValue().get(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId);
|
||
|
|
if (count != null) {
|
||
|
|
return Integer.parseInt(count);
|
||
|
|
}
|
||
|
|
|
||
|
|
LambdaQueryWrapper<Message> query = new LambdaQueryWrapper<>();
|
||
|
|
query.eq(Message::getSessionId, sessionId);
|
||
|
|
long dbCount = messageMapper.selectCount(query);
|
||
|
|
redisTemplate.opsForValue().set(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId, String.valueOf(dbCount));
|
||
|
|
return (int) dbCount;
|
||
|
|
}
|
||
|
|
|
||
|
|
public List<Session> getSessionsByKfId(String kfId, String status, int limit) {
|
||
|
|
LambdaQueryWrapper<Session> query = new LambdaQueryWrapper<>();
|
||
|
|
query.eq(Session::getKfId, kfId);
|
||
|
|
if (status != null && !status.isEmpty() && !"all".equals(status)) {
|
||
|
|
query.eq(Session::getStatus, status);
|
||
|
|
}
|
||
|
|
query.orderByDesc(Session::getUpdatedAt);
|
||
|
|
query.last("LIMIT " + limit);
|
||
|
|
return sessionMapper.selectList(query);
|
||
|
|
}
|
||
|
|
|
||
|
|
public List<Session> getAllSessions(int limit) {
|
||
|
|
LambdaQueryWrapper<Session> query = new LambdaQueryWrapper<>();
|
||
|
|
query.orderByDesc(Session::getUpdatedAt);
|
||
|
|
query.last("LIMIT " + limit);
|
||
|
|
return sessionMapper.selectList(query);
|
||
|
|
}
|
||
|
|
|
||
|
|
private void cacheSessionStatus(String sessionId, String status) {
|
||
|
|
redisTemplate.opsForValue().set(SESSION_STATUS_KEY_PREFIX + sessionId, status, 24, TimeUnit.HOURS);
|
||
|
|
}
|
||
|
|
|
||
|
|
private void incrementMessageCount(String sessionId) {
|
||
|
|
redisTemplate.opsForValue().increment(SESSION_MESSAGE_COUNT_KEY_PREFIX + sessionId);
|
||
|
|
}
|
||
|
|
|
||
|
|
private String generateSessionId(String customerId, String kfId) {
|
||
|
|
return kfId + "_" + customerId + "_" + System.currentTimeMillis();
|
||
|
|
}
|
||
|
|
}
|