""" Excel document parser implementation. [AC-AISVC-35] Excel (.xlsx) parsing using openpyxl. Extracts text content from Excel spreadsheets and converts to JSON format to preserve structural relationships for better RAG retrieval. """ import json import logging from pathlib import Path from typing import Any from app.services.document.base import ( DocumentParseException, DocumentParser, ParseResult, ) logger = logging.getLogger(__name__) class ExcelParser(DocumentParser): """ Parser for Excel documents. [AC-AISVC-35] Uses openpyxl for text extraction. Converts spreadsheet data to JSON format to preserve structure. """ def __init__( self, include_empty_cells: bool = False, max_rows_per_sheet: int = 10000, **kwargs: Any ): self._include_empty_cells = include_empty_cells self._max_rows_per_sheet = max_rows_per_sheet self._extra_config = kwargs self._openpyxl = None def _get_openpyxl(self): """Lazy import of openpyxl.""" if self._openpyxl is None: try: import openpyxl self._openpyxl = openpyxl except ImportError: raise DocumentParseException( "openpyxl not installed. Install with: pip install openpyxl", parser="excel" ) return self._openpyxl def _sheet_to_records(self, sheet, sheet_name: str) -> list[dict[str, Any]]: """ Convert a worksheet to a list of record dictionaries. First row is treated as header (column names). """ records = [] rows = list(sheet.iter_rows(max_row=self._max_rows_per_sheet, values_only=True)) if not rows: return records headers = rows[0] header_list = [str(h) if h is not None else f"column_{i}" for i, h in enumerate(headers)] for row in rows[1:]: record = {"_sheet": sheet_name} has_content = False for i, value in enumerate(row): if i < len(header_list): key = header_list[i] else: key = f"column_{i}" if value is not None: has_content = True if isinstance(value, (int, float, bool)): record[key] = value else: record[key] = str(value) elif self._include_empty_cells: record[key] = None if has_content or self._include_empty_cells: records.append(record) return records def parse(self, file_path: str | Path) -> ParseResult: """ Parse an Excel document and extract text content as JSON. [AC-AISVC-35] Converts spreadsheet data to JSON format. """ path = Path(file_path) if not path.exists(): raise DocumentParseException( f"File not found: {path}", file_path=str(path), parser="excel" ) if not self.supports_extension(path.suffix): raise DocumentParseException( f"Unsupported file extension: {path.suffix}", file_path=str(path), parser="excel" ) openpyxl = self._get_openpyxl() try: workbook = openpyxl.load_workbook(path, read_only=True, data_only=True) all_records: list[dict[str, Any]] = [] sheet_count = len(workbook.sheetnames) total_rows = 0 for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] records = self._sheet_to_records(sheet, sheet_name) all_records.extend(records) total_rows += len(records) workbook.close() json_str = json.dumps(all_records, ensure_ascii=False, indent=2) file_size = path.stat().st_size logger.info( f"Parsed Excel (JSON): {path.name}, sheets={sheet_count}, " f"rows={total_rows}, chars={len(json_str)}, size={file_size}" ) return ParseResult( text=json_str, source_path=str(path), file_size=file_size, metadata={ "format": "xlsx", "output_format": "json", "sheet_count": sheet_count, "total_rows": total_rows, } ) except DocumentParseException: raise except Exception as e: raise DocumentParseException( f"Failed to parse Excel document: {e}", file_path=str(path), parser="excel", details={"error": str(e)} ) def get_supported_extensions(self) -> list[str]: """Get supported file extensions.""" return [".xlsx", ".xls"] class CSVParser(DocumentParser): """ Parser for CSV files. [AC-AISVC-35] Uses Python's built-in csv module. Converts CSV data to JSON format to preserve structure. """ def __init__(self, delimiter: str = ",", encoding: str = "utf-8", **kwargs: Any): self._delimiter = delimiter self._encoding = encoding self._extra_config = kwargs def _parse_csv_to_records(self, path: Path, encoding: str) -> list[dict[str, Any]]: """Parse CSV file and return list of record dictionaries.""" import csv records = [] with open(path, encoding=encoding, newline="") as f: reader = csv.reader(f, delimiter=self._delimiter) rows = list(reader) if not rows: return records headers = rows[0] header_list = [str(h) if h else f"column_{i}" for i, h in enumerate(headers)] for row in rows[1:]: record = {} has_content = False for i, value in enumerate(row): if i < len(header_list): key = header_list[i] else: key = f"column_{i}" if value: has_content = True record[key] = value if has_content: records.append(record) return records def parse(self, file_path: str | Path) -> ParseResult: """ Parse a CSV file and extract text content as JSON. [AC-AISVC-35] Converts CSV data to JSON format. """ path = Path(file_path) if not path.exists(): raise DocumentParseException( f"File not found: {path}", file_path=str(path), parser="csv" ) try: records = self._parse_csv_to_records(path, self._encoding) row_count = len(records) used_encoding = self._encoding except UnicodeDecodeError: try: records = self._parse_csv_to_records(path, "gbk") row_count = len(records) used_encoding = "gbk" except Exception as e: raise DocumentParseException( f"Failed to parse CSV with encoding fallback: {e}", file_path=str(path), parser="csv", details={"error": str(e)} ) except Exception as e: raise DocumentParseException( f"Failed to parse CSV: {e}", file_path=str(path), parser="csv", details={"error": str(e)} ) json_str = json.dumps(records, ensure_ascii=False, indent=2) file_size = path.stat().st_size logger.info( f"Parsed CSV (JSON): {path.name}, rows={row_count}, " f"chars={len(json_str)}, size={file_size}" ) return ParseResult( text=json_str, source_path=str(path), file_size=file_size, metadata={ "format": "csv", "output_format": "json", "row_count": row_count, "delimiter": self._delimiter, "encoding": used_encoding, } ) def get_supported_extensions(self) -> list[str]: """Get supported file extensions.""" return [".csv"]