From d3ae92dec51d94b46a55cab1e3c8c5c923d71023 Mon Sep 17 00:00:00 2001 From: MerCry Date: Mon, 2 Mar 2026 22:15:19 +0800 Subject: [PATCH] feat: add metadata validation in KB upload and unify metadata storage [AC-IDSMETA-15, AC-IDSMETA-16] --- ai-service/app/api/admin/kb.py | 63 +++++++++++++++++-- ai-service/app/services/flow/flow_service.py | 11 +++- .../app/services/intent/rule_service.py | 10 ++- .../app/services/prompt/template_service.py | 26 ++++++-- 4 files changed, 94 insertions(+), 16 deletions(-) diff --git a/ai-service/app/api/admin/kb.py b/ai-service/app/api/admin/kb.py index b36edd0..0b1de55 100644 --- a/ai-service/app/api/admin/kb.py +++ b/ai-service/app/api/admin/kb.py @@ -7,7 +7,7 @@ Knowledge Base management endpoints. import logging import uuid from dataclasses import dataclass -from typing import Annotated, Optional +from typing import Annotated, Any, Optional import tiktoken from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, Query, UploadFile @@ -479,7 +479,7 @@ async def list_documents( "/documents", operation_id="uploadDocument", summary="Upload/import document", - description="[AC-ASA-01, AC-AISVC-63] Upload document to specified knowledge base and trigger indexing job.", + description="[AC-ASA-01, AC-AISVC-63, AC-IDSMETA-15] Upload document to specified knowledge base and trigger indexing job.", responses={ 202: {"description": "Accepted - async indexing job started"}, 400: {"description": "Bad Request - unsupported format or invalid kb_id"}, @@ -493,17 +493,28 @@ async def upload_document( background_tasks: BackgroundTasks, file: UploadFile = File(...), kb_id: str = Form(...), + metadata: str = Form(default="{}", description="元数据 JSON 字符串,根据元数据模式配置动态字段"), ) -> JSONResponse: """ - [AC-ASA-01, AC-AISVC-63] Upload document to specified knowledge base. + [AC-ASA-01, AC-AISVC-63, AC-IDSMETA-15] Upload document to specified knowledge base. Creates KB if not exists, indexes to corresponding Qdrant Collection. + + [AC-IDSMETA-15] 支持动态元数据校验: + - metadata: JSON 格式的元数据,字段根据元数据模式配置 + - 根据 scope=kb_document 的字段定义进行 required/type/enum 校验 + + 示例 metadata: + - 教育行业: {"grade": "初一", "subject": "语文", "type": "痛点"} + - 医疗行业: {"department": "内科", "disease_type": "慢性病", "content_type": "科普"} """ + import json from pathlib import Path from app.services.document import get_supported_document_formats + from app.services.metadata_field_definition_service import MetadataFieldDefinitionService logger.info( - f"[AC-AISVC-63] Uploading document: tenant={tenant_id}, " + f"[AC-IDSMETA-15] Uploading document: tenant={tenant_id}, " f"kb_id={kb_id}, filename={file.filename}" ) @@ -522,6 +533,36 @@ async def upload_document( }, ) + try: + metadata_dict = json.loads(metadata) if metadata else {} + except json.JSONDecodeError: + return JSONResponse( + status_code=400, + content={ + "code": "INVALID_METADATA", + "message": "Invalid JSON format for metadata", + }, + ) + + field_def_service = MetadataFieldDefinitionService(session) + + is_valid, validation_errors = await field_def_service.validate_metadata_for_create( + tenant_id, metadata_dict, "kb_document" + ) + + if not is_valid: + logger.warning(f"[AC-IDSMETA-15] Metadata validation failed: {validation_errors}") + return JSONResponse( + status_code=400, + content={ + "code": "METADATA_VALIDATION_ERROR", + "message": "Metadata validation failed", + "details": { + "errors": validation_errors, + }, + }, + ) + kb_service = KnowledgeBaseService(session) try: @@ -529,7 +570,7 @@ async def upload_document( if not kb: kb = await kb_service.get_or_create_default_kb(tenant_id) kb_id = str(kb.id) - logger.info(f"[AC-AISVC-63] KB not found, using default: {kb_id}") + logger.info(f"[AC-IDSMETA-15] KB not found, using default: {kb_id}") else: kb_id = str(kb.id) except Exception: @@ -550,7 +591,7 @@ async def upload_document( await session.commit() background_tasks.add_task( - _index_document, tenant_id, kb_id, str(job.id), str(document.id), file_content, file.filename + _index_document, tenant_id, kb_id, str(job.id), str(document.id), file_content, file.filename, metadata_dict ) return JSONResponse( @@ -560,6 +601,7 @@ async def upload_document( "docId": str(document.id), "kbId": kb_id, "status": job.status, + "metadata": metadata_dict, }, ) @@ -571,11 +613,15 @@ async def _index_document( doc_id: str, content: bytes, filename: str | None = None, + metadata: dict[str, Any] | None = None, ): """ Background indexing task. [AC-AISVC-33, AC-AISVC-34, AC-AISVC-35, AC-AISVC-63] Uses document parsing and pluggable embedding. Indexes to the specified knowledge base's Qdrant Collection. + + Args: + metadata: 动态元数据,字段根据元数据模式配置 """ import asyncio import tempfile @@ -704,6 +750,10 @@ async def _index_document( points = [] total_chunks = len(all_chunks) + + doc_metadata = metadata or {} + logger.info(f"[INDEX] Document metadata: {doc_metadata}") + for i, chunk in enumerate(all_chunks): payload = { "text": chunk.text, @@ -712,6 +762,7 @@ async def _index_document( "chunk_index": i, "start_token": chunk.start_token, "end_token": chunk.end_token, + "metadata": doc_metadata, } if chunk.page is not None: payload["page"] = chunk.page diff --git a/ai-service/app/services/flow/flow_service.py b/ai-service/app/services/flow/flow_service.py index 74fffe0..09582cc 100644 --- a/ai-service/app/services/flow/flow_service.py +++ b/ai-service/app/services/flow/flow_service.py @@ -42,6 +42,7 @@ class ScriptFlowService: ) -> ScriptFlow: """ [AC-AISVC-71] Create a new script flow with steps. + [AC-IDSMETA-16] Support metadata field. """ self._validate_steps(create_data.steps) @@ -51,12 +52,13 @@ class ScriptFlowService: description=create_data.description, steps=create_data.steps, is_enabled=create_data.is_enabled, + metadata_=create_data.metadata_, ) self._session.add(flow) await self._session.flush() logger.info( - f"[AC-AISVC-71] Created script flow: tenant={tenant_id}, " + f"[AC-AISVC-71][AC-IDSMETA-16] Created script flow: tenant={tenant_id}, " f"id={flow.id}, name={flow.name}, steps={len(flow.steps)}" ) return flow @@ -102,6 +104,7 @@ class ScriptFlowService: ) -> dict[str, Any] | None: """ [AC-AISVC-73] Get flow detail with complete step definitions. + [AC-IDSMETA-16] Include metadata field. """ flow = await self.get_flow(tenant_id, flow_id) if not flow: @@ -117,6 +120,7 @@ class ScriptFlowService: "is_enabled": flow.is_enabled, "step_count": len(flow.steps), "linked_rule_count": linked_rule_count, + "metadata": flow.metadata_, "created_at": flow.created_at.isoformat(), "updated_at": flow.updated_at.isoformat(), } @@ -129,6 +133,7 @@ class ScriptFlowService: ) -> ScriptFlow | None: """ [AC-AISVC-73] Update flow definition. + [AC-IDSMETA-16] Support metadata field. """ flow = await self.get_flow(tenant_id, flow_id) if not flow: @@ -143,12 +148,14 @@ class ScriptFlowService: flow.steps = update_data.steps if update_data.is_enabled is not None: flow.is_enabled = update_data.is_enabled + if update_data.metadata_ is not None: + flow.metadata_ = update_data.metadata_ flow.updated_at = datetime.utcnow() await self._session.flush() logger.info( - f"[AC-AISVC-73] Updated script flow: tenant={tenant_id}, id={flow_id}" + f"[AC-AISVC-73][AC-IDSMETA-16] Updated script flow: tenant={tenant_id}, id={flow_id}" ) return flow diff --git a/ai-service/app/services/intent/rule_service.py b/ai-service/app/services/intent/rule_service.py index 2b3bbf4..0df7c41 100644 --- a/ai-service/app/services/intent/rule_service.py +++ b/ai-service/app/services/intent/rule_service.py @@ -83,6 +83,7 @@ class IntentRuleService: ) -> IntentRule: """ [AC-AISVC-65] Create a new intent rule. + [AC-IDSMETA-16] Support metadata field. """ flow_id_uuid = None if create_data.flow_id: @@ -104,6 +105,7 @@ class IntentRuleService: transfer_message=create_data.transfer_message, is_enabled=True, hit_count=0, + metadata_=create_data.metadata_, ) self._session.add(rule) await self._session.flush() @@ -111,7 +113,7 @@ class IntentRuleService: self._cache.invalidate(tenant_id) logger.info( - f"[AC-AISVC-65] Created intent rule: tenant={tenant_id}, " + f"[AC-AISVC-65][AC-IDSMETA-16] Created intent rule: tenant={tenant_id}, " f"id={rule.id}, name={rule.name}, response_type={rule.response_type}" ) return rule @@ -162,6 +164,7 @@ class IntentRuleService: ) -> IntentRule | None: """ [AC-AISVC-67] Update an intent rule. + [AC-IDSMETA-16] Support metadata field. """ rule = await self.get_rule(tenant_id, rule_id) if not rule: @@ -190,6 +193,8 @@ class IntentRuleService: rule.transfer_message = update_data.transfer_message if update_data.is_enabled is not None: rule.is_enabled = update_data.is_enabled + if update_data.metadata_ is not None: + rule.metadata_ = update_data.metadata_ rule.updated_at = datetime.utcnow() await self._session.flush() @@ -197,7 +202,7 @@ class IntentRuleService: self._cache.invalidate(tenant_id) logger.info( - f"[AC-AISVC-67] Updated intent rule: tenant={tenant_id}, id={rule_id}" + f"[AC-AISVC-67][AC-IDSMETA-16] Updated intent rule: tenant={tenant_id}, id={rule_id}" ) return rule @@ -294,6 +299,7 @@ class IntentRuleService: "transfer_message": rule.transfer_message, "is_enabled": rule.is_enabled, "hit_count": rule.hit_count, + "metadata": rule.metadata_, "created_at": rule.created_at.isoformat(), "updated_at": rule.updated_at.isoformat(), } diff --git a/ai-service/app/services/prompt/template_service.py b/ai-service/app/services/prompt/template_service.py index e9b3eda..f927e50 100644 --- a/ai-service/app/services/prompt/template_service.py +++ b/ai-service/app/services/prompt/template_service.py @@ -95,6 +95,7 @@ class PromptTemplateService: ) -> PromptTemplate: """ [AC-AISVC-52] Create a new prompt template with initial version. + [AC-IDSMETA-16] Support metadata field. """ template = PromptTemplate( tenant_id=tenant_id, @@ -102,6 +103,7 @@ class PromptTemplateService: scene=create_data.scene, description=create_data.description, is_default=create_data.is_default, + metadata_=create_data.metadata_, ) self._session.add(template) await self._session.flush() @@ -117,7 +119,7 @@ class PromptTemplateService: await self._session.flush() logger.info( - f"[AC-AISVC-52] Created prompt template: tenant={tenant_id}, " + f"[AC-AISVC-52][AC-IDSMETA-16] Created prompt template: tenant={tenant_id}, " f"id={template.id}, name={template.name}" ) return template @@ -182,6 +184,7 @@ class PromptTemplateService: "scene": template.scene, "description": template.description, "is_default": template.is_default, + "metadata": template.metadata_, "current_version": { "version": current_version.version, "status": current_version.status, @@ -208,6 +211,7 @@ class PromptTemplateService: ) -> PromptTemplate | None: """ [AC-AISVC-53] Update template and create a new version. + [AC-IDSMETA-16] Support metadata field. """ template = await self.get_template(tenant_id, template_id) if not template: @@ -221,6 +225,8 @@ class PromptTemplateService: template.description = update_data.description if update_data.is_default is not None: template.is_default = update_data.is_default + if update_data.metadata_ is not None: + template.metadata_ = update_data.metadata_ template.updated_at = datetime.utcnow() if update_data.system_instruction is not None: @@ -241,7 +247,7 @@ class PromptTemplateService: self._cache.invalidate(tenant_id, template.scene) logger.info( - f"[AC-AISVC-53] Updated prompt template: tenant={tenant_id}, id={template_id}" + f"[AC-AISVC-53][AC-IDSMETA-16] Updated prompt template: tenant={tenant_id}, id={template_id}" ) return template @@ -400,11 +406,19 @@ class PromptTemplateService: if not template: return False - versions = await self._get_versions(template_id) - for v in versions: - await self._session.delete(v) + from sqlalchemy import delete - await self._session.delete(template) + await self._session.execute( + delete(PromptTemplateVersion).where( + PromptTemplateVersion.template_id == template_id + ) + ) + + await self._session.execute( + delete(PromptTemplate).where( + PromptTemplate.id == template_id + ) + ) await self._session.flush() self._cache.invalidate(tenant_id, template.scene)