""" LLM judge for intent arbitration. [AC-AISVC-118, AC-AISVC-119] LLM-based intent arbitration. """ import asyncio import json import logging import time from typing import TYPE_CHECKING, Any from app.services.intent.models import ( FusionConfig, LlmJudgeInput, LlmJudgeResult, RuleMatchResult, SemanticMatchResult, ) if TYPE_CHECKING: from app.services.llm.base import LLMClient logger = logging.getLogger(__name__) class LlmJudge: """ [AC-AISVC-118] LLM-based intent arbitrator. Triggered when: - Rule vs Semantic conflict - Gray zone (low confidence) - Multiple intent candidates with similar scores """ JUDGE_PROMPT = """你是一个意图识别仲裁器。根据用户消息和候选意图,判断最匹配的意图。 用户消息:{message} 候选意图: {candidates} 请返回 JSON 格式(不要包含```json标记): {{ "intent_id": "最匹配的意图ID", "intent_name": "意图名称", "confidence": 0.0-1.0之间的置信度, "reasoning": "判断理由" }}""" def __init__( self, llm_client: "LLMClient", config: FusionConfig, ): """ Initialize LLM judge. Args: llm_client: LLM client for generating responses config: Fusion configuration """ self._llm_client = llm_client self._config = config def should_trigger( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, config: FusionConfig | None = None, ) -> tuple[bool, str]: """ [AC-AISVC-118] Check if LLM judge should be triggered. Trigger conditions: 1. Conflict: Rule and Semantic match different intents with close scores 2. Gray zone: Max confidence in gray zone range 3. Multi-intent: Multiple candidates with similar scores Args: rule_result: Rule matching result semantic_result: Semantic matching result config: Optional config override Returns: Tuple of (should_trigger, trigger_reason) """ effective_config = config or self._config if not effective_config.llm_judge_enabled: return False, "disabled" rule_score = rule_result.score semantic_score = semantic_result.top_score if rule_score > 0 and semantic_score > 0: if semantic_result.candidates: top_semantic_rule_id = semantic_result.candidates[0].rule.id if rule_result.rule_id != top_semantic_rule_id: if abs(rule_score - semantic_score) < effective_config.conflict_threshold: logger.info( f"[AC-AISVC-118] LLM judge triggered: rule_semantic_conflict, " f"rule_id={rule_result.rule_id}, semantic_id={top_semantic_rule_id}, " f"rule_score={rule_score}, semantic_score={semantic_score}" ) return True, "rule_semantic_conflict" max_score = max(rule_score, semantic_score) if effective_config.min_trigger_threshold < max_score < effective_config.gray_zone_threshold: logger.info( f"[AC-AISVC-118] LLM judge triggered: gray_zone, " f"max_score={max_score}" ) return True, "gray_zone" if len(semantic_result.candidates) >= 2: top1_score = semantic_result.candidates[0].score top2_score = semantic_result.candidates[1].score if abs(top1_score - top2_score) < effective_config.multi_intent_threshold: logger.info( f"[AC-AISVC-118] LLM judge triggered: multi_intent, " f"top1_score={top1_score}, top2_score={top2_score}" ) return True, "multi_intent" return False, "" async def judge( self, input_data: LlmJudgeInput, tenant_id: str, ) -> LlmJudgeResult: """ [AC-AISVC-119] Perform LLM arbitration. Args: input_data: Judge input with message and candidates tenant_id: Tenant ID for isolation Returns: LlmJudgeResult with arbitration decision """ start_time = time.time() candidates_text = "\n".join([ f"- ID: {c['id']}, 名称: {c['name']}, 描述: {c.get('description', 'N/A')}" for c in input_data.candidates ]) prompt = self.JUDGE_PROMPT.format( message=input_data.message, candidates=candidates_text, ) try: from app.services.llm.base import LLMConfig response = await asyncio.wait_for( self._llm_client.generate( messages=[{"role": "user", "content": prompt}], config=LLMConfig( max_tokens=200, temperature=0, ), ), timeout=self._config.llm_judge_timeout_ms / 1000, ) result = self._parse_response(response.content or "") duration_ms = int((time.time() - start_time) * 1000) tokens_used = 0 if response.usage: tokens_used = response.usage.get("total_tokens", 0) logger.info( f"[AC-AISVC-119] LLM judge completed for tenant={tenant_id}, " f"intent_id={result.get('intent_id')}, confidence={result.get('confidence', 0):.3f}, " f"duration={duration_ms}ms, tokens={tokens_used}" ) return LlmJudgeResult( intent_id=result.get("intent_id"), intent_name=result.get("intent_name"), score=float(result.get("confidence", 0.5)), reasoning=result.get("reasoning"), duration_ms=duration_ms, tokens_used=tokens_used, triggered=True, ) except asyncio.TimeoutError: duration_ms = int((time.time() - start_time) * 1000) logger.warning( f"[AC-AISVC-119] LLM judge timeout for tenant={tenant_id}, " f"timeout={self._config.llm_judge_timeout_ms}ms" ) return LlmJudgeResult( intent_id=None, intent_name=None, score=0.0, reasoning="LLM timeout", duration_ms=duration_ms, tokens_used=0, triggered=True, ) except Exception as e: duration_ms = int((time.time() - start_time) * 1000) logger.error( f"[AC-AISVC-119] LLM judge error for tenant={tenant_id}: {e}" ) return LlmJudgeResult( intent_id=None, intent_name=None, score=0.0, reasoning=f"LLM error: {str(e)}", duration_ms=duration_ms, tokens_used=0, triggered=True, ) def _parse_response(self, content: str) -> dict[str, Any]: """ Parse LLM response to extract JSON result. Args: content: LLM response content Returns: Parsed dictionary with intent_id, intent_name, confidence, reasoning """ try: cleaned = content.strip() if cleaned.startswith("```json"): cleaned = cleaned[7:] if cleaned.startswith("```"): cleaned = cleaned[3:] if cleaned.endswith("```"): cleaned = cleaned[:-3] cleaned = cleaned.strip() result: dict[str, Any] = json.loads(cleaned) return result except json.JSONDecodeError as e: logger.warning(f"[AC-AISVC-119] Failed to parse LLM response: {e}") return {}