ai-robot-core/ai-service/app/services/intent/tester.py

246 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Intent rule tester for AI Service.
[AC-AISVC-96] Intent rule testing service with conflict detection.
"""
import logging
import re
from dataclasses import dataclass, field
from typing import Any
from app.models.entities import IntentRule
from app.services.intent.router import IntentRouter
logger = logging.getLogger(__name__)
@dataclass
class IntentRuleTestCase:
"""Result of testing a single message against a rule."""
message: str
matched: bool
matched_keywords: list[str] = field(default_factory=list)
matched_patterns: list[str] = field(default_factory=list)
match_type: str | None = None
priority: int = 0
priority_rank: int = 0
conflict_rules: list[dict[str, Any]] = field(default_factory=list)
reason: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"message": self.message,
"matched": self.matched,
"matchedKeywords": self.matched_keywords,
"matchedPatterns": self.matched_patterns,
"matchType": self.match_type,
"priority": self.priority,
"priorityRank": self.priority_rank,
"conflictRules": self.conflict_rules,
"reason": self.reason,
}
@dataclass
class IntentRuleTestResult:
"""Result of testing multiple messages against a rule."""
rule_id: str
rule_name: str
results: list[IntentRuleTestCase]
summary: dict[str, Any]
def to_dict(self) -> dict[str, Any]:
return {
"ruleId": self.rule_id,
"ruleName": self.rule_name,
"results": [r.to_dict() for r in self.results],
"summary": self.summary,
}
class IntentRuleTester:
"""
[AC-AISVC-96] Intent rule testing service.
Features:
- Test single rule against multiple messages
- Detect priority conflicts (other rules that also match)
- Provide detailed match information
"""
def __init__(self):
self._router = IntentRouter()
async def test_rule(
self,
rule: IntentRule,
test_messages: list[str],
all_rules: list[IntentRule],
) -> IntentRuleTestResult:
"""
Test a rule against multiple messages and detect conflicts.
Args:
rule: The rule to test
test_messages: List of test messages
all_rules: All rules for conflict detection (ordered by priority DESC)
Returns:
IntentRuleTestResult with detailed test results
"""
results = []
priority_rank = self._calculate_priority_rank(rule, all_rules)
for message in test_messages:
test_case = self._test_single_message(rule, message, all_rules, priority_rank)
results.append(test_case)
matched_count = sum(1 for r in results if r.matched)
summary = {
"totalTests": len(test_messages),
"matchedCount": matched_count,
"matchRate": matched_count / len(test_messages) if test_messages else 0,
}
logger.info(
f"[AC-AISVC-96] Tested rule {rule.name}: "
f"{matched_count}/{len(test_messages)} matched"
)
return IntentRuleTestResult(
rule_id=str(rule.id),
rule_name=rule.name,
results=results,
summary=summary,
)
def _test_single_message(
self,
rule: IntentRule,
message: str,
all_rules: list[IntentRule],
priority_rank: int,
) -> IntentRuleTestCase:
"""Test a single message against a rule."""
matched_keywords = self._match_keywords(message, rule)
matched_patterns = self._match_patterns(message, rule)
matched = len(matched_keywords) > 0 or len(matched_patterns) > 0
match_type = None
reason = None
if matched:
if matched_keywords:
match_type = "keyword"
else:
match_type = "regex"
else:
reason = self._determine_unmatch_reason(message, rule)
conflict_rules = self._detect_conflicts(rule, message, all_rules)
return IntentRuleTestCase(
message=message,
matched=matched,
matched_keywords=matched_keywords,
matched_patterns=matched_patterns,
match_type=match_type,
priority=rule.priority,
priority_rank=priority_rank,
conflict_rules=conflict_rules,
reason=reason,
)
def _match_keywords(self, message: str, rule: IntentRule) -> list[str]:
"""Match message against rule keywords."""
matched = []
keywords = rule.keywords or []
message_lower = message.lower()
for keyword in keywords:
if keyword and keyword.lower() in message_lower:
matched.append(keyword)
return matched
def _match_patterns(self, message: str, rule: IntentRule) -> list[str]:
"""Match message against rule regex patterns."""
matched = []
patterns = rule.patterns or []
for pattern in patterns:
if not pattern:
continue
try:
if re.search(pattern, message, re.IGNORECASE):
matched.append(pattern)
except re.error as e:
logger.warning(f"Invalid regex pattern: {pattern}, error: {e}")
return matched
def _detect_conflicts(
self,
current_rule: IntentRule,
message: str,
all_rules: list[IntentRule],
) -> list[dict[str, Any]]:
"""Detect other rules that also match the message (priority conflicts)."""
conflicts = []
for other_rule in all_rules:
if str(other_rule.id) == str(current_rule.id):
continue
if not other_rule.is_enabled:
continue
matched_keywords = self._match_keywords(message, other_rule)
matched_patterns = self._match_patterns(message, other_rule)
if matched_keywords or matched_patterns:
conflicts.append({
"ruleId": str(other_rule.id),
"ruleName": other_rule.name,
"priority": other_rule.priority,
"reason": f"同时匹配(优先级:{other_rule.priority}",
})
return conflicts
def _calculate_priority_rank(
self,
rule: IntentRule,
all_rules: list[IntentRule],
) -> int:
"""Calculate the priority rank of a rule among all rules."""
enabled_rules = [r for r in all_rules if r.is_enabled]
sorted_rules = sorted(enabled_rules, key=lambda r: r.priority, reverse=True)
for rank, r in enumerate(sorted_rules, start=1):
if str(r.id) == str(rule.id):
return rank
return 0
def _determine_unmatch_reason(self, message: str, rule: IntentRule) -> str:
"""Determine why a message did not match a rule."""
keywords = rule.keywords or []
patterns = rule.patterns or []
if not keywords and not patterns:
return "规则未配置关键词或正则表达式"
if keywords:
keyword_str = "".join(keywords[:3])
if len(keywords) > 3:
keyword_str += f"{len(keywords)}"
return f"关键词不匹配(规则关键词:{keyword_str}"
if patterns:
return f"正则表达式不匹配(规则模式:{len(patterns)}个)"
return "未匹配"