"""Scoped file-read and search tools for the project folder feature.""" from __future__ import annotations from langchain_core.tools import tool from app.core.folder_indexer import _extract_docx_text, _extract_pdf_text from app.core.ws_context import execute_on_client # Cap returned slice size to keep tool output under control. _MAX_RETURN_CHARS = 50_000 _MAX_SEARCH_MATCHES = 20 def _is_unsafe_path(rel: str) -> bool: if not rel: return True norm = rel.replace("\\", "/") if norm.startswith("/"): return True # Windows drive letter if len(rel) >= 2 and rel[1] == ":": return True parts = norm.split("/") return ".." in parts async def _fetch_file(project_id: str, relative_path: str, offset: int, length: int) -> dict: """Return the raw Electron tool_result dict for a file read.""" return await execute_on_client( action="read_project_folder_file", data={ "projectId": project_id, "relativePath": relative_path, "offset": offset, "length": length, }, ) def _decode(result: dict) -> tuple[str, str, int]: """Decode a tool_result into (text, kind, total_size). For pdf/docx, extracts text from base64. For images, returns a placeholder string. For text, content is already a sliced utf-8 string. """ kind = result.get("kind", "text") content = result.get("content", "") or "" total = int(result.get("totalSize", 0) or 0) if kind == "image": return ("[Image file — cannot be navigated as text. See manifest summary.]", kind, total) if kind == "pdf": return (_extract_pdf_text(content), kind, total) if kind == "docx": return (_extract_docx_text(content), kind, total) return (content, kind, total) @tool async def read_project_folder_file( project_id: str, relative_path: str, offset: int = 0, length: int = _MAX_RETURN_CHARS, ) -> str: """Read a slice of a file inside the project's linked folder. Args: project_id: project ID. relative_path: path relative to the linked folder root. offset: char offset to start reading from (0 = beginning). length: max chars to return. Default 50000. Use smaller values to save tokens. Returns text content slice with a header showing position. Header tells you when more content is available; call again with the suggested next offset. For PDF / DOCX files the backend extracts text first, then applies offset/length on the extracted text. For images returns a placeholder; navigate with the manifest summary instead. """ if _is_unsafe_path(relative_path): return "Access denied" result = await _fetch_file(project_id, relative_path, offset, length) text, kind, total_size = _decode(result) if not text and kind in ("missing", "error"): return f"File not found or unreadable: {relative_path}" if kind in ("pdf", "docx"): # Backend extracted full text — apply offset/length on chars. sliced = text[offset:offset + length] slice_end = min(offset + length, len(text)) header = ( f"[file={relative_path} kind={kind} offset={offset} end={slice_end} " f"totalChars={len(text)}]" ) if slice_end < len(text): header += f"\n[More content available — call again with offset={slice_end}.]" return header + "\n" + sliced if kind == "text": slice_end = offset + len(text) header = ( f"[file={relative_path} kind=text offset={offset} end={slice_end} " f"totalBytes={total_size}]" ) if slice_end < total_size: header += f"\n[More content available — call again with offset={slice_end}.]" return header + "\n" + text # image or unknown return text @tool async def search_project_folder_file( project_id: str, relative_path: str, query: str, context_lines: int = 3, ) -> str: """Search a project folder file for a query string (case-insensitive substring). Args: project_id: project ID. relative_path: path relative to the linked folder root. query: text to search for. context_lines: number of lines of context around each match (default 3). Returns matching line ranges with surrounding context and 1-based line numbers. Capped at 20 matches; if more exist the header shows the total. Works on text, code, markdown, PDF (extracted), and DOCX (extracted). Images and binary files are not searchable. """ if _is_unsafe_path(relative_path): return "Access denied" if not query: return "Empty query." # For text we still need full file; pass length=very large. result = await _fetch_file(project_id, relative_path, offset=0, length=10_000_000) text, kind, _ = _decode(result) if not text and kind in ("missing", "error"): return f"File not found or unreadable: {relative_path}" if kind == "image": return "Cannot search inside images." lines = text.splitlines() q = query.lower() matches = [i for i, line in enumerate(lines) if q in line.lower()] if not matches: return f"No matches for '{query}' in {relative_path}." shown = matches[:_MAX_SEARCH_MATCHES] snippets: list[str] = [] for i in shown: start = max(0, i - context_lines) end = min(len(lines), i + context_lines + 1) block = "\n".join(f"{n + 1:5d}: {lines[n]}" for n in range(start, end)) snippets.append(block) header = f"[file={relative_path} matches={len(matches)} showing={len(shown)} query='{query}']" body = "\n---\n".join(snippets) return header + "\n" + body FOLDER_TOOLS = [read_project_folder_file, search_project_folder_file]