ai-robot-core/ai-service/app/services/intent/tester.py

246 lines
7.5 KiB
Python
Raw Permalink Normal View History

"""
Intent rule tester for AI Service.
[AC-AISVC-96] Intent rule testing service with conflict detection.
"""
import logging
import re
from dataclasses import dataclass, field
from typing import Any
from app.models.entities import IntentRule
from app.services.intent.router import IntentRouter
logger = logging.getLogger(__name__)
@dataclass
class IntentRuleTestCase:
"""Result of testing a single message against a rule."""
message: str
matched: bool
matched_keywords: list[str] = field(default_factory=list)
matched_patterns: list[str] = field(default_factory=list)
match_type: str | None = None
priority: int = 0
priority_rank: int = 0
conflict_rules: list[dict[str, Any]] = field(default_factory=list)
reason: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"message": self.message,
"matched": self.matched,
"matchedKeywords": self.matched_keywords,
"matchedPatterns": self.matched_patterns,
"matchType": self.match_type,
"priority": self.priority,
"priorityRank": self.priority_rank,
"conflictRules": self.conflict_rules,
"reason": self.reason,
}
@dataclass
class IntentRuleTestResult:
"""Result of testing multiple messages against a rule."""
rule_id: str
rule_name: str
results: list[IntentRuleTestCase]
summary: dict[str, Any]
def to_dict(self) -> dict[str, Any]:
return {
"ruleId": self.rule_id,
"ruleName": self.rule_name,
"results": [r.to_dict() for r in self.results],
"summary": self.summary,
}
class IntentRuleTester:
"""
[AC-AISVC-96] Intent rule testing service.
Features:
- Test single rule against multiple messages
- Detect priority conflicts (other rules that also match)
- Provide detailed match information
"""
def __init__(self):
self._router = IntentRouter()
async def test_rule(
self,
rule: IntentRule,
test_messages: list[str],
all_rules: list[IntentRule],
) -> IntentRuleTestResult:
"""
Test a rule against multiple messages and detect conflicts.
Args:
rule: The rule to test
test_messages: List of test messages
all_rules: All rules for conflict detection (ordered by priority DESC)
Returns:
IntentRuleTestResult with detailed test results
"""
results = []
priority_rank = self._calculate_priority_rank(rule, all_rules)
for message in test_messages:
test_case = self._test_single_message(rule, message, all_rules, priority_rank)
results.append(test_case)
matched_count = sum(1 for r in results if r.matched)
summary = {
"totalTests": len(test_messages),
"matchedCount": matched_count,
"matchRate": matched_count / len(test_messages) if test_messages else 0,
}
logger.info(
f"[AC-AISVC-96] Tested rule {rule.name}: "
f"{matched_count}/{len(test_messages)} matched"
)
return IntentRuleTestResult(
rule_id=str(rule.id),
rule_name=rule.name,
results=results,
summary=summary,
)
def _test_single_message(
self,
rule: IntentRule,
message: str,
all_rules: list[IntentRule],
priority_rank: int,
) -> IntentRuleTestCase:
"""Test a single message against a rule."""
matched_keywords = self._match_keywords(message, rule)
matched_patterns = self._match_patterns(message, rule)
matched = len(matched_keywords) > 0 or len(matched_patterns) > 0
match_type = None
reason = None
if matched:
if matched_keywords:
match_type = "keyword"
else:
match_type = "regex"
else:
reason = self._determine_unmatch_reason(message, rule)
conflict_rules = self._detect_conflicts(rule, message, all_rules)
return IntentRuleTestCase(
message=message,
matched=matched,
matched_keywords=matched_keywords,
matched_patterns=matched_patterns,
match_type=match_type,
priority=rule.priority,
priority_rank=priority_rank,
conflict_rules=conflict_rules,
reason=reason,
)
def _match_keywords(self, message: str, rule: IntentRule) -> list[str]:
"""Match message against rule keywords."""
matched = []
keywords = rule.keywords or []
message_lower = message.lower()
for keyword in keywords:
if keyword and keyword.lower() in message_lower:
matched.append(keyword)
return matched
def _match_patterns(self, message: str, rule: IntentRule) -> list[str]:
"""Match message against rule regex patterns."""
matched = []
patterns = rule.patterns or []
for pattern in patterns:
if not pattern:
continue
try:
if re.search(pattern, message, re.IGNORECASE):
matched.append(pattern)
except re.error as e:
logger.warning(f"Invalid regex pattern: {pattern}, error: {e}")
return matched
def _detect_conflicts(
self,
current_rule: IntentRule,
message: str,
all_rules: list[IntentRule],
) -> list[dict[str, Any]]:
"""Detect other rules that also match the message (priority conflicts)."""
conflicts = []
for other_rule in all_rules:
if str(other_rule.id) == str(current_rule.id):
continue
if not other_rule.is_enabled:
continue
matched_keywords = self._match_keywords(message, other_rule)
matched_patterns = self._match_patterns(message, other_rule)
if matched_keywords or matched_patterns:
conflicts.append({
"ruleId": str(other_rule.id),
"ruleName": other_rule.name,
"priority": other_rule.priority,
"reason": f"同时匹配(优先级:{other_rule.priority}",
})
return conflicts
def _calculate_priority_rank(
self,
rule: IntentRule,
all_rules: list[IntentRule],
) -> int:
"""Calculate the priority rank of a rule among all rules."""
enabled_rules = [r for r in all_rules if r.is_enabled]
sorted_rules = sorted(enabled_rules, key=lambda r: r.priority, reverse=True)
for rank, r in enumerate(sorted_rules, start=1):
if str(r.id) == str(rule.id):
return rank
return 0
def _determine_unmatch_reason(self, message: str, rule: IntentRule) -> str:
"""Determine why a message did not match a rule."""
keywords = rule.keywords or []
patterns = rule.patterns or []
if not keywords and not patterns:
return "规则未配置关键词或正则表达式"
if keywords:
keyword_str = "".join(keywords[:3])
if len(keywords) > 3:
keyword_str += f"{len(keywords)}"
return f"关键词不匹配(规则关键词:{keyword_str}"
if patterns:
return f"正则表达式不匹配(规则模式:{len(patterns)}个)"
return "未匹配"