ai-robot-core/ai-service/app/services/intent/rule_service.py

299 lines
9.0 KiB
Python

"""
Intent rule service for AI Service.
[AC-AISVC-65~AC-AISVC-68] Intent rule CRUD and hit statistics management.
"""
import logging
import time
import uuid
from datetime import datetime
from typing import Any, Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import col
from app.models.entities import (
IntentRule,
IntentRuleCreate,
IntentRuleUpdate,
)
logger = logging.getLogger(__name__)
RULE_CACHE_TTL_SECONDS = 60
class RuleCache:
"""
[AC-AISVC-69] In-memory cache for intent rules.
Key: tenant_id
Value: (rules_list, cached_at)
TTL: 60 seconds
"""
def __init__(self, ttl_seconds: int = RULE_CACHE_TTL_SECONDS):
self._cache: dict[str, tuple[list[IntentRule], float]] = {}
self._ttl = ttl_seconds
def get(self, tenant_id: str) -> list[IntentRule] | None:
"""Get cached rules if not expired."""
if tenant_id in self._cache:
rules, cached_at = self._cache[tenant_id]
if time.time() - cached_at < self._ttl:
return rules
else:
del self._cache[tenant_id]
return None
def set(self, tenant_id: str, rules: list[IntentRule]) -> None:
"""Cache rules for a tenant."""
self._cache[tenant_id] = (rules, time.time())
def invalidate(self, tenant_id: str) -> None:
"""Invalidate cache for a tenant."""
if tenant_id in self._cache:
del self._cache[tenant_id]
logger.debug(f"Invalidated rule cache for tenant={tenant_id}")
_rule_cache = RuleCache()
class IntentRuleService:
"""
[AC-AISVC-65~AC-AISVC-68] Service for managing intent rules.
Features:
- Rule CRUD with tenant isolation
- Hit count statistics
- In-memory caching with TTL
- Cache invalidation on CRUD operations
"""
def __init__(self, session: AsyncSession):
self._session = session
self._cache = _rule_cache
async def create_rule(
self,
tenant_id: str,
create_data: IntentRuleCreate,
) -> IntentRule:
"""
[AC-AISVC-65] Create a new intent rule.
"""
flow_id_uuid = None
if create_data.flow_id:
try:
flow_id_uuid = uuid.UUID(create_data.flow_id)
except ValueError:
pass
rule = IntentRule(
tenant_id=tenant_id,
name=create_data.name,
keywords=create_data.keywords or [],
patterns=create_data.patterns or [],
priority=create_data.priority,
response_type=create_data.response_type,
target_kb_ids=create_data.target_kb_ids or [],
flow_id=flow_id_uuid,
fixed_reply=create_data.fixed_reply,
transfer_message=create_data.transfer_message,
is_enabled=True,
hit_count=0,
)
self._session.add(rule)
await self._session.flush()
self._cache.invalidate(tenant_id)
logger.info(
f"[AC-AISVC-65] Created intent rule: tenant={tenant_id}, "
f"id={rule.id}, name={rule.name}, response_type={rule.response_type}"
)
return rule
async def list_rules(
self,
tenant_id: str,
response_type: str | None = None,
is_enabled: bool | None = None,
) -> Sequence[IntentRule]:
"""
[AC-AISVC-66] List rules for a tenant with optional filters.
"""
stmt = select(IntentRule).where(
IntentRule.tenant_id == tenant_id
)
if response_type is not None:
stmt = stmt.where(IntentRule.response_type == response_type)
if is_enabled is not None:
stmt = stmt.where(IntentRule.is_enabled == is_enabled)
stmt = stmt.order_by(col(IntentRule.priority).desc(), col(IntentRule.created_at).desc())
result = await self._session.execute(stmt)
return result.scalars().all()
async def get_rule(
self,
tenant_id: str,
rule_id: uuid.UUID,
) -> IntentRule | None:
"""
[AC-AISVC-66] Get rule by ID with tenant isolation.
"""
stmt = select(IntentRule).where(
IntentRule.tenant_id == tenant_id,
IntentRule.id == rule_id,
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def update_rule(
self,
tenant_id: str,
rule_id: uuid.UUID,
update_data: IntentRuleUpdate,
) -> IntentRule | None:
"""
[AC-AISVC-67] Update an intent rule.
"""
rule = await self.get_rule(tenant_id, rule_id)
if not rule:
return None
if update_data.name is not None:
rule.name = update_data.name
if update_data.keywords is not None:
rule.keywords = update_data.keywords
if update_data.patterns is not None:
rule.patterns = update_data.patterns
if update_data.priority is not None:
rule.priority = update_data.priority
if update_data.response_type is not None:
rule.response_type = update_data.response_type
if update_data.target_kb_ids is not None:
rule.target_kb_ids = update_data.target_kb_ids
if update_data.flow_id is not None:
try:
rule.flow_id = uuid.UUID(update_data.flow_id)
except ValueError:
rule.flow_id = None
if update_data.fixed_reply is not None:
rule.fixed_reply = update_data.fixed_reply
if update_data.transfer_message is not None:
rule.transfer_message = update_data.transfer_message
if update_data.is_enabled is not None:
rule.is_enabled = update_data.is_enabled
rule.updated_at = datetime.utcnow()
await self._session.flush()
self._cache.invalidate(tenant_id)
logger.info(
f"[AC-AISVC-67] Updated intent rule: tenant={tenant_id}, id={rule_id}"
)
return rule
async def delete_rule(
self,
tenant_id: str,
rule_id: uuid.UUID,
) -> bool:
"""
[AC-AISVC-68] Delete an intent rule.
"""
rule = await self.get_rule(tenant_id, rule_id)
if not rule:
return False
await self._session.delete(rule)
await self._session.flush()
self._cache.invalidate(tenant_id)
logger.info(
f"[AC-AISVC-68] Deleted intent rule: tenant={tenant_id}, id={rule_id}"
)
return True
async def increment_hit_count(
self,
tenant_id: str,
rule_id: uuid.UUID,
) -> bool:
"""
[AC-AISVC-66] Increment hit count for a rule.
"""
rule = await self.get_rule(tenant_id, rule_id)
if not rule:
return False
rule.hit_count += 1
rule.updated_at = datetime.utcnow()
await self._session.flush()
logger.debug(
f"[AC-AISVC-66] Incremented hit count for rule: tenant={tenant_id}, "
f"id={rule_id}, hit_count={rule.hit_count}"
)
return True
async def get_enabled_rules_for_matching(
self,
tenant_id: str,
) -> list[IntentRule]:
"""
[AC-AISVC-69] Get enabled rules for matching, ordered by priority DESC.
Uses cache for performance.
"""
cached = self._cache.get(tenant_id)
if cached is not None:
logger.debug(f"[AC-AISVC-69] Cache hit for rules: tenant={tenant_id}")
return cached
stmt = (
select(IntentRule)
.where(
IntentRule.tenant_id == tenant_id,
IntentRule.is_enabled == True,
)
.order_by(col(IntentRule.priority).desc())
)
result = await self._session.execute(stmt)
rules = list(result.scalars().all())
self._cache.set(tenant_id, rules)
logger.info(
f"[AC-AISVC-69] Loaded {len(rules)} enabled rules from DB: tenant={tenant_id}"
)
return rules
def invalidate_cache(self, tenant_id: str) -> None:
"""Manually invalidate cache for a tenant."""
self._cache.invalidate(tenant_id)
async def rule_to_info_dict(self, rule: IntentRule) -> dict[str, Any]:
"""Convert rule entity to API response dict."""
return {
"id": str(rule.id),
"name": rule.name,
"keywords": rule.keywords or [],
"patterns": rule.patterns or [],
"priority": rule.priority,
"response_type": rule.response_type,
"target_kb_ids": rule.target_kb_ids or [],
"flow_id": str(rule.flow_id) if rule.flow_id else None,
"fixed_reply": rule.fixed_reply,
"transfer_message": rule.transfer_message,
"is_enabled": rule.is_enabled,
"hit_count": rule.hit_count,
"created_at": rule.created_at.isoformat(),
"updated_at": rule.updated_at.isoformat(),
}