""" 测试 Redis 缓存是否正常工作 """ import asyncio import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from app.services.metadata_cache_service import get_metadata_cache_service async def test_cache(): """测试缓存服务""" print("=" * 80) print("测试 Redis 缓存") print("=" * 80) cache_service = await get_metadata_cache_service() tenant_id = "szmp@ash@2026" # 1. 检查缓存是否存在 print("\n📊 1. 检查缓存是否存在") cached = await cache_service.get_fields(tenant_id) if cached: print(f" ✅ 缓存存在,包含 {len(cached)} 个字段") for field in cached[:3]: print(f" - {field['field_key']}: {field['label']}") else: print(" ❌ 缓存不存在") # 2. 手动设置缓存 print("\n📊 2. 手动设置测试缓存") test_fields = [ { "field_key": "grade", "label": "年级", "field_type": "enum", "required": False, "options": ["三年级", "四年级", "五年级"], "default_value": None, "is_filterable": True, }, { "field_key": "subject", "label": "学科", "field_type": "enum", "required": False, "options": ["语文", "数学", "英语"], "default_value": None, "is_filterable": True, }, ] result = await cache_service.set_fields(tenant_id, test_fields, ttl=3600) print(f" 设置缓存结果: {result}") # 3. 再次获取缓存 print("\n📊 3. 再次获取缓存") cached = await cache_service.get_fields(tenant_id) if cached: print(f" ✅ 缓存存在,包含 {len(cached)} 个字段") else: print(" ❌ 缓存不存在") print("\n" + "=" * 80) print("测试完成") print("=" * 80) if __name__ == "__main__": asyncio.run(test_cache())