From e9fee2f80e6fd99182ebc05073c6d0cbba7ed781 Mon Sep 17 00:00:00 2001 From: MerCry Date: Wed, 25 Feb 2026 01:12:07 +0800 Subject: [PATCH] feat(AISVC): Excel/CSV parser converts to JSON format for better RAG retrieval [AC-AISVC-35] - ExcelParser: convert spreadsheet data to JSON records with header as keys - CSVParser: convert CSV data to JSON records preserving structure - Add _sheet field to identify worksheet source in Excel output - Preserve numeric types (int/float/bool) in JSON output - Support UTF-8 and GBK encoding fallback for CSV files --- .../app/services/document/excel_parser.py | 204 ++++++++++-------- 1 file changed, 119 insertions(+), 85 deletions(-) diff --git a/ai-service/app/services/document/excel_parser.py b/ai-service/app/services/document/excel_parser.py index 7b445cc..449e7c2 100644 --- a/ai-service/app/services/document/excel_parser.py +++ b/ai-service/app/services/document/excel_parser.py @@ -2,9 +2,11 @@ Excel document parser implementation. [AC-AISVC-35] Excel (.xlsx) parsing using openpyxl. -Extracts text content from Excel spreadsheets. +Extracts text content from Excel spreadsheets and converts to JSON format +to preserve structural relationships for better RAG retrieval. """ +import json import logging from pathlib import Path from typing import Any @@ -22,6 +24,7 @@ class ExcelParser(DocumentParser): """ Parser for Excel documents. [AC-AISVC-35] Uses openpyxl for text extraction. + Converts spreadsheet data to JSON format to preserve structure. """ def __init__( @@ -48,10 +51,48 @@ class ExcelParser(DocumentParser): ) return self._openpyxl + def _sheet_to_records(self, sheet, sheet_name: str) -> list[dict[str, Any]]: + """ + Convert a worksheet to a list of record dictionaries. + First row is treated as header (column names). + """ + records = [] + rows = list(sheet.iter_rows(max_row=self._max_rows_per_sheet, values_only=True)) + + if not rows: + return records + + headers = rows[0] + header_list = [str(h) if h is not None else f"column_{i}" for i, h in enumerate(headers)] + + for row in rows[1:]: + record = {"_sheet": sheet_name} + has_content = False + + for i, value in enumerate(row): + if i < len(header_list): + key = header_list[i] + else: + key = f"column_{i}" + + if value is not None: + has_content = True + if isinstance(value, (int, float, bool)): + record[key] = value + else: + record[key] = str(value) + elif self._include_empty_cells: + record[key] = None + + if has_content or self._include_empty_cells: + records.append(record) + + return records + def parse(self, file_path: str | Path) -> ParseResult: """ - Parse an Excel document and extract text content. - [AC-AISVC-35] Converts spreadsheet data to structured text. + Parse an Excel document and extract text content as JSON. + [AC-AISVC-35] Converts spreadsheet data to JSON format. """ path = Path(file_path) @@ -74,53 +115,33 @@ class ExcelParser(DocumentParser): try: workbook = openpyxl.load_workbook(path, read_only=True, data_only=True) - text_parts = [] + all_records: list[dict[str, Any]] = [] sheet_count = len(workbook.sheetnames) total_rows = 0 for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] - sheet_text_parts = [] - row_count = 0 - - for row in sheet.iter_rows(max_row=self._max_rows_per_sheet): - row_values = [] - has_content = False - - for cell in row: - value = cell.value - if value is not None: - has_content = True - row_values.append(str(value)) - elif self._include_empty_cells: - row_values.append("") - else: - row_values.append("") - - if has_content or self._include_empty_cells: - sheet_text_parts.append(" | ".join(row_values)) - row_count += 1 - - if sheet_text_parts: - text_parts.append(f"[Sheet: {sheet_name}]\n" + "\n".join(sheet_text_parts)) - total_rows += row_count + records = self._sheet_to_records(sheet, sheet_name) + all_records.extend(records) + total_rows += len(records) workbook.close() - full_text = "\n\n".join(text_parts) + json_str = json.dumps(all_records, ensure_ascii=False, indent=2) file_size = path.stat().st_size logger.info( - f"Parsed Excel: {path.name}, sheets={sheet_count}, " - f"rows={total_rows}, chars={len(full_text)}, size={file_size}" + f"Parsed Excel (JSON): {path.name}, sheets={sheet_count}, " + f"rows={total_rows}, chars={len(json_str)}, size={file_size}" ) return ParseResult( - text=full_text, + text=json_str, source_path=str(path), file_size=file_size, metadata={ "format": "xlsx", + "output_format": "json", "sheet_count": sheet_count, "total_rows": total_rows, } @@ -145,6 +166,7 @@ class CSVParser(DocumentParser): """ Parser for CSV files. [AC-AISVC-35] Uses Python's built-in csv module. + Converts CSV data to JSON format to preserve structure. """ def __init__(self, delimiter: str = ",", encoding: str = "utf-8", **kwargs: Any): @@ -152,13 +174,46 @@ class CSVParser(DocumentParser): self._encoding = encoding self._extra_config = kwargs - def parse(self, file_path: str | Path) -> ParseResult: - """ - Parse a CSV file and extract text content. - [AC-AISVC-35] Converts CSV data to structured text. - """ + def _parse_csv_to_records(self, path: Path, encoding: str) -> list[dict[str, Any]]: + """Parse CSV file and return list of record dictionaries.""" import csv + records = [] + + with open(path, "r", encoding=encoding, newline="") as f: + reader = csv.reader(f, delimiter=self._delimiter) + rows = list(reader) + + if not rows: + return records + + headers = rows[0] + header_list = [str(h) if h else f"column_{i}" for i, h in enumerate(headers)] + + for row in rows[1:]: + record = {} + has_content = False + + for i, value in enumerate(row): + if i < len(header_list): + key = header_list[i] + else: + key = f"column_{i}" + + if value: + has_content = True + record[key] = value + + if has_content: + records.append(record) + + return records + + def parse(self, file_path: str | Path) -> ParseResult: + """ + Parse a CSV file and extract text content as JSON. + [AC-AISVC-35] Converts CSV data to JSON format. + """ path = Path(file_path) if not path.exists(): @@ -169,56 +224,14 @@ class CSVParser(DocumentParser): ) try: - text_parts = [] - row_count = 0 - - with open(path, "r", encoding=self._encoding, newline="") as f: - reader = csv.reader(f, delimiter=self._delimiter) - for row in reader: - text_parts.append(" | ".join(row)) - row_count += 1 - - full_text = "\n".join(text_parts) - file_size = path.stat().st_size - - logger.info( - f"Parsed CSV: {path.name}, rows={row_count}, " - f"chars={len(full_text)}, size={file_size}" - ) - - return ParseResult( - text=full_text, - source_path=str(path), - file_size=file_size, - metadata={ - "format": "csv", - "row_count": row_count, - "delimiter": self._delimiter, - } - ) - + records = self._parse_csv_to_records(path, self._encoding) + row_count = len(records) + used_encoding = self._encoding except UnicodeDecodeError: try: - with open(path, "r", encoding="gbk", newline="") as f: - reader = csv.reader(f, delimiter=self._delimiter) - for row in reader: - text_parts.append(" | ".join(row)) - row_count += 1 - - full_text = "\n".join(text_parts) - file_size = path.stat().st_size - - return ParseResult( - text=full_text, - source_path=str(path), - file_size=file_size, - metadata={ - "format": "csv", - "row_count": row_count, - "delimiter": self._delimiter, - "encoding": "gbk", - } - ) + records = self._parse_csv_to_records(path, "gbk") + row_count = len(records) + used_encoding = "gbk" except Exception as e: raise DocumentParseException( f"Failed to parse CSV with encoding fallback: {e}", @@ -233,6 +246,27 @@ class CSVParser(DocumentParser): parser="csv", details={"error": str(e)} ) + + json_str = json.dumps(records, ensure_ascii=False, indent=2) + file_size = path.stat().st_size + + logger.info( + f"Parsed CSV (JSON): {path.name}, rows={row_count}, " + f"chars={len(json_str)}, size={file_size}" + ) + + return ParseResult( + text=json_str, + source_path=str(path), + file_size=file_size, + metadata={ + "format": "csv", + "output_format": "json", + "row_count": row_count, + "delimiter": self._delimiter, + "encoding": used_encoding, + } + ) def get_supported_extensions(self) -> list[str]: """Get supported file extensions."""