""" Intent router for AI Service. [AC-AISVC-69, AC-AISVC-70] Intent matching engine with keyword and regex support. [v0.8.0] Upgraded to hybrid routing with RuleMatcher + SemanticMatcher + LlmJudge + FusionPolicy. """ import logging import re import time from dataclasses import dataclass from typing import TYPE_CHECKING, Any from app.models.entities import IntentRule from app.services.intent.models import ( FusionConfig, FusionResult, LlmJudgeInput, LlmJudgeResult, RouteTrace, RuleMatchResult, SemanticMatchResult, ) if TYPE_CHECKING: from app.services.intent.fusion_policy import FusionPolicy from app.services.intent.llm_judge import LlmJudge from app.services.intent.semantic_matcher import SemanticMatcher logger = logging.getLogger(__name__) @dataclass class IntentMatchResult: """ [AC-AISVC-69] Result of intent matching. Contains the matched rule and match details. """ rule: IntentRule match_type: str matched: str def to_dict(self) -> dict[str, Any]: return { "rule_id": str(self.rule.id), "rule_name": self.rule.name, "match_type": self.match_type, "matched": self.matched, "response_type": self.rule.response_type, "target_kb_ids": self.rule.target_kb_ids or [], "flow_id": str(self.rule.flow_id) if self.rule.flow_id else None, "fixed_reply": self.rule.fixed_reply, "transfer_message": self.rule.transfer_message, } class RuleMatcher: """ [v0.8.0] Rule matcher for keyword and regex matching. Extracted from IntentRouter for hybrid routing. """ def match(self, message: str, rules: list[IntentRule]) -> RuleMatchResult: """ [AC-AISVC-112] Match user message against intent rules. Returns RuleMatchResult with score (1.0 for match, 0.0 for no match). Args: message: User input message rules: List of enabled rules ordered by priority DESC Returns: RuleMatchResult with match details """ start_time = time.time() if not message or not rules: duration_ms = int((time.time() - start_time) * 1000) return RuleMatchResult( rule_id=None, rule=None, match_type=None, matched_text=None, score=0.0, duration_ms=duration_ms, ) message_lower = message.lower() for rule in rules: if not rule.is_enabled: continue keyword_result = self._match_keywords(message, message_lower, rule) if keyword_result: duration_ms = int((time.time() - start_time) * 1000) logger.info( f"[AC-AISVC-69] Intent matched by keyword: " f"rule={rule.name}, matched='{keyword_result.matched}'" ) return RuleMatchResult( rule_id=rule.id, rule=rule, match_type="keyword", matched_text=keyword_result.matched, score=1.0, duration_ms=duration_ms, ) regex_result = self._match_patterns(message, rule) if regex_result: duration_ms = int((time.time() - start_time) * 1000) logger.info( f"[AC-AISVC-69] Intent matched by regex: " f"rule={rule.name}, matched='{regex_result.matched}'" ) return RuleMatchResult( rule_id=rule.id, rule=rule, match_type="regex", matched_text=regex_result.matched, score=1.0, duration_ms=duration_ms, ) duration_ms = int((time.time() - start_time) * 1000) logger.debug("[AC-AISVC-70] No intent matched, will fallback to default RAG") return RuleMatchResult( rule_id=None, rule=None, match_type=None, matched_text=None, score=0.0, duration_ms=duration_ms, ) def _match_keywords( self, message: str, message_lower: str, rule: IntentRule, ) -> IntentMatchResult | None: """ Match message against rule keywords. Any keyword match returns a result. """ keywords = rule.keywords or [] if not keywords: return None for keyword in keywords: if not keyword: continue if keyword.lower() in message_lower: return IntentMatchResult( rule=rule, match_type="keyword", matched=keyword, ) return None def _match_patterns( self, message: str, rule: IntentRule, ) -> IntentMatchResult | None: """ Match message against rule regex patterns. Any pattern match returns a result. """ patterns = rule.patterns or [] if not patterns: return None for pattern in patterns: if not pattern: continue try: if re.search(pattern, message, re.IGNORECASE): return IntentMatchResult( rule=rule, match_type="regex", matched=pattern, ) except re.error as e: logger.warning( f"Invalid regex pattern in rule {rule.id}: {pattern}, error: {e}" ) continue return None class IntentRouter: """ [AC-AISVC-69] Intent matching engine. [v0.8.0] Upgraded to support hybrid routing. Matching algorithm: 1. Load rules ordered by priority DESC 2. For each rule, try keyword matching first 3. If no keyword match, try regex pattern matching 4. Return first match (highest priority) 5. If no match, return None (fallback to default RAG) Hybrid routing (match_hybrid): 1. Parallel execute RuleMatcher + SemanticMatcher 2. Conditionally trigger LlmJudge 3. Execute FusionPolicy for final decision """ def __init__( self, rule_matcher: RuleMatcher | None = None, semantic_matcher: "SemanticMatcher | None" = None, llm_judge: "LlmJudge | None" = None, fusion_policy: "FusionPolicy | None" = None, config: FusionConfig | None = None, ): """ [v0.8.0] Initialize with optional dependencies for DI. Args: rule_matcher: Rule matcher for keyword/regex matching semantic_matcher: Semantic matcher for vector similarity llm_judge: LLM judge for arbitration fusion_policy: Fusion policy for decision making config: Fusion configuration """ self._rule_matcher = rule_matcher or RuleMatcher() self._semantic_matcher = semantic_matcher self._llm_judge = llm_judge self._fusion_policy = fusion_policy self._config = config or FusionConfig() def match( self, message: str, rules: list[IntentRule], ) -> IntentMatchResult | None: """ [AC-AISVC-69] Match user message against intent rules. Preserved for backward compatibility. Args: message: User input message rules: List of enabled rules ordered by priority DESC Returns: IntentMatchResult if matched, None otherwise """ result = self._rule_matcher.match(message, rules) if result.rule: return IntentMatchResult( rule=result.rule, match_type=result.match_type or "keyword", matched=result.matched_text or "", ) return None def match_with_stats( self, message: str, rules: list[IntentRule], ) -> tuple[IntentMatchResult | None, str | None]: """ [AC-AISVC-69] Match and return rule_id for statistics update. Returns: Tuple of (match_result, rule_id_for_stats) """ result = self.match(message, rules) if result: return result, str(result.rule.id) return None, None async def match_hybrid( self, message: str, rules: list[IntentRule], tenant_id: str, config: FusionConfig | None = None, ) -> FusionResult: """ [AC-AISVC-111] Hybrid routing entry point. Flow: 1. Parallel execute RuleMatcher + SemanticMatcher 2. Check if LlmJudge should trigger 3. Execute FusionPolicy for final decision Args: message: User input message rules: List of enabled rules ordered by priority DESC tenant_id: Tenant ID for isolation config: Optional fusion config override Returns: FusionResult with final intent, confidence, and trace """ effective_config = config or self._config start_time = time.time() rule_result = self._rule_matcher.match(message, rules) semantic_result = await self._execute_semantic_matcher( message, rules, tenant_id, effective_config ) llm_result = await self._conditionally_execute_llm_judge( message, rule_result, semantic_result, tenant_id, effective_config ) if self._fusion_policy: fusion_result = self._fusion_policy.fuse( rule_result, semantic_result, llm_result ) else: fusion_result = self._default_fusion( rule_result, semantic_result, llm_result, effective_config ) total_duration_ms = int((time.time() - start_time) * 1000) fusion_result.trace.fusion["total_duration_ms"] = total_duration_ms logger.info( f"[AC-AISVC-111] Hybrid routing completed: " f"decision={fusion_result.decision_reason}, " f"confidence={fusion_result.final_confidence:.3f}, " f"duration={total_duration_ms}ms" ) return fusion_result async def _execute_semantic_matcher( self, message: str, rules: list[IntentRule], tenant_id: str, config: FusionConfig, ) -> SemanticMatchResult: """Execute semantic matcher if available and enabled.""" if not self._semantic_matcher: return SemanticMatchResult( candidates=[], top_score=0.0, duration_ms=0, skipped=True, skip_reason="not_configured", ) if not config.semantic_matcher_enabled: return SemanticMatchResult( candidates=[], top_score=0.0, duration_ms=0, skipped=True, skip_reason="disabled", ) try: return await self._semantic_matcher.match( message=message, rules=rules, tenant_id=tenant_id, top_k=config.semantic_top_k, ) except Exception as e: logger.warning(f"[AC-AISVC-113] Semantic matcher failed: {e}") return SemanticMatchResult( candidates=[], top_score=0.0, duration_ms=0, skipped=True, skip_reason=f"error: {str(e)}", ) async def _conditionally_execute_llm_judge( self, message: str, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, tenant_id: str, config: FusionConfig, ) -> LlmJudgeResult | None: """Conditionally execute LLM judge based on trigger conditions.""" if not self._llm_judge: return None if not config.llm_judge_enabled: return None should_trigger, trigger_reason = self._check_llm_trigger( rule_result, semantic_result, config ) if not should_trigger: return None logger.info(f"[AC-AISVC-118] LLM judge triggered: reason={trigger_reason}") candidates = self._build_llm_candidates(rule_result, semantic_result) if not candidates: return None try: return await self._llm_judge.judge( LlmJudgeInput( message=message, candidates=candidates, conflict_type=trigger_reason, ), tenant_id, ) except Exception as e: logger.warning(f"[AC-AISVC-119] LLM judge failed: {e}") return LlmJudgeResult( intent_id=None, intent_name=None, score=0.0, reasoning=f"LLM error: {str(e)}", duration_ms=0, tokens_used=0, triggered=True, ) def _check_llm_trigger( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, config: FusionConfig, ) -> tuple[bool, str]: """ [AC-AISVC-118] Check if LLM judge should trigger. Trigger conditions: 1. Conflict: RuleMatcher and SemanticMatcher match different intents 2. Gray zone: Max confidence in gray zone range 3. Multi-intent: Multiple candidates with close scores Returns: (should_trigger, trigger_reason) """ rule_score = rule_result.score semantic_score = semantic_result.top_score if rule_score > 0 and semantic_score > 0 and not semantic_result.skipped: if semantic_result.candidates: top_semantic_rule_id = semantic_result.candidates[0].rule.id if rule_result.rule_id != top_semantic_rule_id: if abs(rule_score - semantic_score) < config.conflict_threshold: return True, "rule_semantic_conflict" max_score = max(rule_score, semantic_score) if config.min_trigger_threshold < max_score < config.gray_zone_threshold: return True, "gray_zone" if len(semantic_result.candidates) >= 2: top1_score = semantic_result.candidates[0].score top2_score = semantic_result.candidates[1].score if abs(top1_score - top2_score) < config.multi_intent_threshold: return True, "multi_intent" return False, "" def _build_llm_candidates( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, ) -> list[dict[str, Any]]: """Build candidate list for LLM judge.""" candidates = [] if rule_result.rule: candidates.append({ "id": str(rule_result.rule_id), "name": rule_result.rule.name, "description": f"匹配方式: {rule_result.match_type}, 匹配内容: {rule_result.matched_text}", }) for candidate in semantic_result.candidates[:3]: if not any(c["id"] == str(candidate.rule.id) for c in candidates): candidates.append({ "id": str(candidate.rule.id), "name": candidate.rule.name, "description": f"语义相似度: {candidate.score:.2f}", }) return candidates def _default_fusion( self, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, llm_result: LlmJudgeResult | None, config: FusionConfig, ) -> FusionResult: """Default fusion logic when FusionPolicy is not available.""" trace = RouteTrace( rule_match=rule_result.to_dict(), semantic_match=semantic_result.to_dict(), llm_judge=llm_result.to_dict() if llm_result else {}, fusion={}, ) final_intent = None final_confidence = 0.0 decision_reason = "no_match" if rule_result.score == 1.0 and rule_result.rule: final_intent = rule_result.rule final_confidence = 1.0 decision_reason = "rule_high_confidence" elif llm_result and llm_result.triggered and llm_result.intent_id: final_intent = self._find_rule_by_id( llm_result.intent_id, rule_result, semantic_result ) final_confidence = llm_result.score decision_reason = "llm_judge" elif rule_result.score == 0 and semantic_result.top_score > config.semantic_threshold: if semantic_result.candidates: final_intent = semantic_result.candidates[0].rule final_confidence = semantic_result.top_score decision_reason = "semantic_override" elif semantic_result.top_score > 0.5: if semantic_result.candidates: final_intent = semantic_result.candidates[0].rule final_confidence = semantic_result.top_score decision_reason = "semantic_fallback" need_clarify = final_confidence < config.clarify_threshold clarify_candidates = None if need_clarify and len(semantic_result.candidates) > 1: clarify_candidates = [c.rule for c in semantic_result.candidates[:3]] trace.fusion = { "weights": { "w_rule": config.w_rule, "w_semantic": config.w_semantic, "w_llm": config.w_llm, }, "final_confidence": final_confidence, "decision_reason": decision_reason, } return FusionResult( final_intent=final_intent, final_confidence=final_confidence, decision_reason=decision_reason, need_clarify=need_clarify, clarify_candidates=clarify_candidates, trace=trace, ) def _find_rule_by_id( self, intent_id: str | None, rule_result: RuleMatchResult, semantic_result: SemanticMatchResult, ) -> IntentRule | None: """Find rule by ID from rule or semantic results.""" if not intent_id: return None if rule_result.rule_id and str(rule_result.rule_id) == intent_id: return rule_result.rule for candidate in semantic_result.candidates: if str(candidate.rule.id) == intent_id: return candidate.rule return None