From d856dfd28cdfb59e989163b225e5f1fa05d4586f Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Mon, 23 Mar 2026 23:01:45 +0100 Subject: [PATCH] refactor: deduplicate shared code into shared/ module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move duplicated files from chat + batch-agent into shared/: - shared/ws_context.py — Redis-based tool call round-trip - shared/llm.py — LiteLLM factory (get_llm, embed) - shared/agents/ — 4 domain agents (task, note, project, timeline) Update all service imports to use shared.* instead of app.*. Delete 12 duplicated files across both services. --- services/batch-agent/app/agent_runner.py | 12 +- .../app/agents/filesystem_agent.py | 2 +- services/batch-agent/app/agents/note_agent.py | 110 ---------- .../batch-agent/app/agents/project_agent.py | 110 ---------- services/batch-agent/app/agents/task_agent.py | 197 ------------------ .../batch-agent/app/agents/timeline_agent.py | 88 -------- services/batch-agent/app/journey.py | 2 +- services/batch-agent/app/llm.py | 78 ------- services/batch-agent/app/main.py | 8 + services/batch-agent/app/redis_consumer.py | 2 +- services/chat/app/agents/__init__.py | 1 - services/chat/app/deep_agent.py | 12 +- services/chat/app/redis_consumer.py | 2 +- services/chat/app/routes.py | 2 +- services/chat/app/ws_context.py | 115 ---------- shared/agents/__init__.py | 1 + .../chat/app => shared}/agents/note_agent.py | 6 +- .../app => shared}/agents/project_agent.py | 4 +- .../chat/app => shared}/agents/task_agent.py | 7 +- .../app => shared}/agents/timeline_agent.py | 5 +- {services/chat/app => shared}/llm.py | 4 +- .../batch-agent/app => shared}/ws_context.py | 23 +- 22 files changed, 48 insertions(+), 743 deletions(-) delete mode 100644 services/batch-agent/app/agents/note_agent.py delete mode 100644 services/batch-agent/app/agents/project_agent.py delete mode 100644 services/batch-agent/app/agents/task_agent.py delete mode 100644 services/batch-agent/app/agents/timeline_agent.py delete mode 100644 services/batch-agent/app/llm.py delete mode 100644 services/chat/app/agents/__init__.py delete mode 100644 services/chat/app/ws_context.py create mode 100644 shared/agents/__init__.py rename {services/chat/app => shared}/agents/note_agent.py (96%) rename {services/chat/app => shared}/agents/project_agent.py (97%) rename {services/chat/app => shared}/agents/task_agent.py (95%) rename {services/chat/app => shared}/agents/timeline_agent.py (94%) rename {services/chat/app => shared}/llm.py (94%) rename {services/batch-agent/app => shared}/ws_context.py (84%) diff --git a/services/batch-agent/app/agent_runner.py b/services/batch-agent/app/agent_runner.py index d692cde..fe02e37 100644 --- a/services/batch-agent/app/agent_runner.py +++ b/services/batch-agent/app/agent_runner.py @@ -22,12 +22,12 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, Tool from sqlalchemy import select from app.agents.filesystem_agent import FILESYSTEM_TOOLS -from app.agents.note_agent import NOTE_TOOLS -from app.agents.project_agent import PROJECT_TOOLS -from app.agents.task_agent import TASK_TOOLS -from app.agents.timeline_agent import TIMELINE_TOOLS -from app.llm import get_llm -from app.ws_context import execute_on_client, set_current_user, clear_current_user +from shared.agents.note_agent import NOTE_TOOLS +from shared.agents.project_agent import PROJECT_TOOLS +from shared.agents.task_agent import TASK_TOOLS +from shared.agents.timeline_agent import TIMELINE_TOOLS +from shared.llm import get_llm +from shared.ws_context import execute_on_client, set_current_user, clear_current_user import app.tracing as tracing from shared.db import async_session from shared.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig diff --git a/services/batch-agent/app/agents/filesystem_agent.py b/services/batch-agent/app/agents/filesystem_agent.py index 921caaa..4ad418e 100644 --- a/services/batch-agent/app/agents/filesystem_agent.py +++ b/services/batch-agent/app/agents/filesystem_agent.py @@ -9,7 +9,7 @@ from typing import Any from langchain_core.tools import tool -from app.ws_context import execute_on_client +from shared.ws_context import execute_on_client @tool diff --git a/services/batch-agent/app/agents/note_agent.py b/services/batch-agent/app/agents/note_agent.py deleted file mode 100644 index 7b48046..0000000 --- a/services/batch-agent/app/agents/note_agent.py +++ /dev/null @@ -1,110 +0,0 @@ -"""Note agent — Markdown note management. - -Adapted for Batch Agent Service: import from app.ws_context and app.llm. -""" - -from __future__ import annotations - -import re -from typing import Any - -from langchain_core.tools import tool - -from app.llm import embed -from app.ws_context import execute_on_client - -_UUID_RE = re.compile( - r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" -) - - -def _is_uuid(value: str) -> bool: - return bool(_UUID_RE.match(value)) - - -@tool -async def list_notes(project_id: str = "") -> str: - """List notes, optionally scoped to a project by project_id.""" - normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else "" - result = await execute_on_client( - action="select", - table="notes", - filters={"projectId": normalized_project_id or None}, - ) - rows = result.get("rows", []) - if not rows: - return "No notes found." - lines = [f"- {r['title']} (id: {r['id']})" for r in rows] - return f"Found {len(rows)} note(s):\n" + "\n".join(lines) - - -@tool -async def get_note(note_id: str) -> str: - """Fetch a single note by its UUID to read its full Markdown content.""" - result = await execute_on_client(action="get", table="notes", data={"id": note_id}) - row = result.get("row") - if not row: - return f"Note {note_id} not found." - return f"Note '{row['title']}' (id: {row['id']}):\n\n{row['content']}" - - -@tool -async def create_note(title: str, content: str, project_id: str = "") -> str: - """Create a new note.""" - result = await execute_on_client( - action="insert", - table="notes", - data={ - "title": title, - "content": content, - "projectId": project_id or None, - }, - ) - row = result["row"] - vector = await embed(content) - await execute_on_client( - action="vector_upsert", - data={"id": row["id"], "projectId": row.get("projectId"), "content": content}, - vector=vector, - ) - return f"Note created: '{row['title']}' (id: {row['id']})." - - -@tool -async def update_note(note_id: str, title: str = "", content: str = "") -> str: - """Update an existing note. Only pass fields that should change.""" - updates: dict[str, Any] = {} - if title: - updates["title"] = title - if content: - updates["content"] = content - result = await execute_on_client( - action="update", - table="notes", - data={"id": note_id, "updates": updates}, - ) - row = result["row"] - if content: - vector = await embed(content) - await execute_on_client( - action="vector_upsert", - data={"id": note_id, "projectId": row.get("projectId"), "content": content}, - vector=vector, - ) - return f"Note updated: '{row['title']}' (id: {row['id']})." - - -@tool -async def delete_note(note_id: str) -> str: - """Delete a note permanently by its UUID.""" - await execute_on_client(action="delete", table="notes", data={"id": note_id}) - return f"Note {note_id} deleted." - - -NOTE_TOOLS: list[Any] = [ - list_notes, - get_note, - create_note, - update_note, - delete_note, -] diff --git a/services/batch-agent/app/agents/project_agent.py b/services/batch-agent/app/agents/project_agent.py deleted file mode 100644 index 2d30eaf..0000000 --- a/services/batch-agent/app/agents/project_agent.py +++ /dev/null @@ -1,110 +0,0 @@ -"""Project agent — full lifecycle management. - -Adapted for Batch Agent Service: import from app.ws_context. -""" - -from __future__ import annotations - -from typing import Any - -from langchain_core.tools import tool - -from app.ws_context import execute_on_client - - -@tool -async def list_projects(client_id: str = "", include_archived: int = 0) -> str: - """List projects, optionally filtered by client_id.""" - result = await execute_on_client( - action="select", - table="projects", - filters={ - "clientId": client_id or None, - "includeArchived": bool(include_archived), - }, - ) - rows = result.get("rows", []) - if not rows: - return "No projects found." - lines = [f"- {r['name']} (status: {r['status']}, id: {r['id']})" for r in rows] - return f"Found {len(rows)} project(s):\n" + "\n".join(lines) - - -@tool -async def list_all_projects() -> str: - """List every project regardless of client or status.""" - result = await execute_on_client(action="select", table="projects") - rows = result.get("rows", []) - if not rows: - return "No projects found." - lines = [f"- {r['name']} (status: {r['status']}, id: {r['id']})" for r in rows] - return f"All projects ({len(rows)}):\n" + "\n".join(lines) - - -@tool -async def get_project(project_id: str) -> str: - """Fetch a single project by its UUID.""" - result = await execute_on_client(action="get", table="projects", data={"id": project_id}) - row = result.get("row") - if not row: - return f"Project {project_id} not found." - return ( - f"Project: '{row['name']}' (id: {row['id']}, status: {row['status']}, " - f"clientId: {row.get('clientId', 'none')})" - ) - - -@tool -async def create_project(name: str, client_id: str = "") -> str: - """Create a new project.""" - result = await execute_on_client( - action="insert", - table="projects", - data={"name": name, "clientId": client_id or None}, - ) - row = result["row"] - return f"Project created: '{row['name']}' (id: {row['id']})" - - -@tool -async def update_project( - project_id: str, - name: str = "", - client_id: str = "", - status: str = "", - ai_summary: str = "", -) -> str: - """Update a project. Only pass fields that should change.""" - updates: dict[str, Any] = {} - if name: - updates["name"] = name - if client_id: - updates["clientId"] = client_id - if status: - updates["status"] = status - if ai_summary: - updates["aiSummary"] = ai_summary - result = await execute_on_client( - action="update", - table="projects", - data={"id": project_id, "updates": updates}, - ) - row = result["row"] - return f"Project updated: '{row['name']}' (id: {row['id']}, status: {row['status']})" - - -@tool -async def delete_project(project_id: str) -> str: - """Permanently delete a project.""" - await execute_on_client(action="delete", table="projects", data={"id": project_id}) - return f"Project {project_id} permanently deleted." - - -PROJECT_TOOLS: list[Any] = [ - list_projects, - list_all_projects, - get_project, - create_project, - update_project, - delete_project, -] diff --git a/services/batch-agent/app/agents/task_agent.py b/services/batch-agent/app/agents/task_agent.py deleted file mode 100644 index 5e8753d..0000000 --- a/services/batch-agent/app/agents/task_agent.py +++ /dev/null @@ -1,197 +0,0 @@ -"""Task agent — full CRUD for tasks and task comments. - -Adapted for Batch Agent Service: import from app.ws_context. -""" - -from __future__ import annotations - -from datetime import datetime, timezone -import re -from typing import Any - -from langchain_core.tools import tool - -from app.ws_context import execute_on_client - -_UUID_RE = re.compile( - r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" -) - - -def _is_uuid(value: str) -> bool: - return bool(_UUID_RE.match(value)) - - -@tool -async def list_tasks( - project_id: str = "", - status: str = "", - search: str = "", - order_by: str = "", -) -> str: - """List tasks, optionally filtered by project_id, status, search, or order_by.""" - normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else "" - result = await execute_on_client( - action="select", - table="tasks", - filters={ - "projectId": normalized_project_id or None, - "status": status or None, - "search": search or None, - "orderBy": order_by or None, - }, - ) - rows = result.get("rows", []) - if not rows: - return "No tasks found matching the given filters." - lines = [ - f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, id: {r['id']})" - for r in rows - ] - return f"Found {len(rows)} task(s):\n" + "\n".join(lines) - - -@tool -async def create_task( - title: str, - description: str = "", - status: str = "todo", - priority: str = "medium", - assignees: str = "[]", - due_date: int = 0, - project_id: str = "", - is_ai_suggested: int = 0, -) -> str: - """Create a new task.""" - result = await execute_on_client( - action="insert", - table="tasks", - data={ - "title": title, - "description": description or None, - "status": status, - "priority": priority, - "assignee": assignees, - "dueDate": due_date or None, - "projectId": project_id or None, - "isAiSuggested": is_ai_suggested, - }, - ) - row = result["row"] - return ( - f"Task created: '{row['title']}' " - f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']})" - ) - - -@tool -async def update_task( - task_id: str, - title: str = "", - description: str = "", - status: str = "", - priority: str = "", - assignees: str = "", - due_date: int = -1, - project_id: str = "", -) -> str: - """Update fields on an existing task. Only pass fields you want to change.""" - updates: dict[str, Any] = {} - if title: - updates["title"] = title - if description: - updates["description"] = description - if status: - updates["status"] = status - if priority: - updates["priority"] = priority - if assignees: - updates["assignee"] = assignees - if due_date != -1: - updates["dueDate"] = due_date or None - if project_id: - updates["projectId"] = project_id - result = await execute_on_client( - action="update", - table="tasks", - data={"id": task_id, "updates": updates}, - ) - row = result["row"] - return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']})" - - -@tool -async def delete_task(task_id: str) -> str: - """Delete a task permanently by its UUID.""" - await execute_on_client(action="delete", table="tasks", data={"id": task_id}) - return f"Task {task_id} deleted." - - -@tool -async def list_tasks_due_today() -> str: - """List all tasks whose due date falls on today's date.""" - now = datetime.now(tz=timezone.utc) - start_ms = int(datetime(now.year, now.month, now.day, tzinfo=timezone.utc).timestamp() * 1000) - end_ms = start_ms + 86_400_000 - 1 - result = await execute_on_client( - action="select", - table="tasks", - filters={"dueDateFrom": start_ms, "dueDateTo": end_ms}, - ) - rows = result.get("rows", []) - if not rows: - return "No tasks are due today." - lines = [ - f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, id: {r['id']})" - for r in rows - ] - return f"Tasks due today ({len(rows)}):\n" + "\n".join(lines) - - -@tool -async def list_task_comments(task_id: str) -> str: - """List all comments on a task by its UUID.""" - result = await execute_on_client( - action="select", - table="taskComments", - filters={"taskId": task_id}, - ) - rows = result.get("rows", []) - if not rows: - return f"No comments found for task {task_id}." - lines = [f"- [{r['author']}]: {r['content']} (id: {r['id']})" for r in rows] - return f"Found {len(rows)} comment(s):\n" + "\n".join(lines) - - -@tool -async def add_task_comment(task_id: str, author: str, content: str) -> str: - """Add a comment to a task.""" - result = await execute_on_client( - action="insert", - table="taskComments", - data={"taskId": task_id, "author": author, "content": content}, - ) - row = result.get("row", {}) - row_author = row.get("author", author) - row_task_id = row.get("taskId") or row.get("task_id") or task_id - row_comment_id = row.get("id", "unknown") - return f"Comment added by {row_author} on task {row_task_id} (comment id: {row_comment_id})." - - -@tool -async def delete_task_comment(comment_id: str) -> str: - """Delete a task comment by its UUID.""" - await execute_on_client(action="delete", table="taskComments", data={"id": comment_id}) - return f"Comment {comment_id} deleted." - - -TASK_TOOLS: list[Any] = [ - list_tasks, - create_task, - update_task, - delete_task, - list_tasks_due_today, - list_task_comments, - add_task_comment, - delete_task_comment, -] diff --git a/services/batch-agent/app/agents/timeline_agent.py b/services/batch-agent/app/agents/timeline_agent.py deleted file mode 100644 index 1e54582..0000000 --- a/services/batch-agent/app/agents/timeline_agent.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Timeline agent — project milestone management. - -Adapted for Batch Agent Service: import from app.ws_context. -""" - -from __future__ import annotations - -import re -from typing import Any - -from langchain_core.tools import tool - -from app.ws_context import execute_on_client - -_UUID_RE = re.compile( - r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" -) - - -def _is_uuid(value: str) -> bool: - return bool(_UUID_RE.match(value)) - - -@tool -async def list_timelines(project_id: str = "") -> str: - """List timelines. Provide project_id to scope to a specific project.""" - normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else "" - result = await execute_on_client( - action="select", - table="timelines", - filters={"projectId": normalized_project_id or None}, - ) - rows = result.get("rows", []) - if not rows: - return "No timelines found." - lines = [f"- {r['title']} (date: {r['date']}, id: {r['id']})" for r in rows] - return f"Found {len(rows)} timeline(s):\n" + "\n".join(lines) - - -@tool -async def create_timeline( - project_id: str, title: str, date: int, is_ai_suggested: int = 0, -) -> str: - """Create a project timeline (milestone).""" - result = await execute_on_client( - action="insert", - table="timelines", - data={ - "projectId": project_id, - "title": title, - "date": date, - "isAiSuggested": is_ai_suggested, - }, - ) - row = result["row"] - return f"Timeline created: '{row['title']}' (id: {row['id']}, date: {row['date']})" - - -@tool -async def update_timeline(timeline_id: str, title: str = "", date: int = -1) -> str: - """Update a timeline. Only pass fields that should change.""" - updates: dict[str, Any] = {} - if title: - updates["title"] = title - if date != -1: - updates["date"] = date - result = await execute_on_client( - action="update", - table="timelines", - data={"id": timeline_id, "updates": updates}, - ) - row = result["row"] - return f"Timeline updated: '{row['title']}' (id: {row['id']})" - - -@tool -async def delete_timeline(timeline_id: str) -> str: - """Delete a timeline permanently by its UUID.""" - await execute_on_client(action="delete", table="timelines", data={"id": timeline_id}) - return f"Timeline {timeline_id} deleted." - - -TIMELINE_TOOLS: list[Any] = [ - list_timelines, - create_timeline, - update_timeline, - delete_timeline, -] diff --git a/services/batch-agent/app/journey.py b/services/batch-agent/app/journey.py index 26151af..9dcafeb 100644 --- a/services/batch-agent/app/journey.py +++ b/services/batch-agent/app/journey.py @@ -25,7 +25,7 @@ from typing import Any from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from app.agents.filesystem_agent import FILESYSTEM_TOOLS -from app.llm import get_llm +from shared.llm import get_llm import app.tracing as tracing logger = logging.getLogger(__name__) diff --git a/services/batch-agent/app/llm.py b/services/batch-agent/app/llm.py deleted file mode 100644 index 67f8b0b..0000000 --- a/services/batch-agent/app/llm.py +++ /dev/null @@ -1,78 +0,0 @@ -"""LLM factory — centralised model instantiation via LiteLLM. - -Identical to services/chat/app/llm.py. Uses shared.config.settings. -""" - -from __future__ import annotations - -import os -import warnings - -from openai import AsyncOpenAI -import litellm - -from langchain_openai import ChatOpenAI -from langchain_litellm import ChatLiteLLM - -from shared.config import settings - -litellm.drop_params = True - -warnings.filterwarnings( - "ignore", - message=r"PydanticSerializationUnexpectedValue\(Expected `ResponseAPIUsage`", - category=UserWarning, -) - - -def _api_key_for_model(model: str) -> str | None: - if model.startswith("anthropic/"): - return settings.ANTHROPIC_API_KEY or None - if model.startswith("gemini/") or model.startswith("google/"): - return settings.GOOGLE_API_KEY or None - if model.startswith("cerebras/"): - return settings.CEREBRAS_API_KEY or None - if model.startswith("github_copilot/"): - return None - return settings.OPENAI_API_KEY or None - - -def get_llm( - *, - model: str | None = None, - temperature: float = 0, - callbacks: list | None = None, -) -> ChatOpenAI | ChatLiteLLM: - model = model or settings.LLM_MODEL - - if settings.GITHUB_COPILOT_TOKEN_DIR: - os.environ.setdefault("GITHUB_COPILOT_TOKEN_DIR", settings.GITHUB_COPILOT_TOKEN_DIR) - - if "/" in model: - return ChatLiteLLM(model=model, temperature=temperature, callbacks=callbacks) - - return ChatOpenAI( - model=model, - temperature=temperature, - api_key=_api_key_for_model(model), - callbacks=callbacks, - ) - - -def get_router_llm( - *, - temperature: float = 0, -) -> ChatOpenAI | ChatLiteLLM: - return get_llm(model=settings.LLM_ROUTER_MODEL, temperature=temperature) - - -async def embed(text: str) -> list[float]: - model = settings.LLM_EMBED_MODEL - - if model.startswith("github_copilot/") or "/" in model: - response = await litellm.aembedding(model=model, input=[text]) - return response.data[0]["embedding"] - - client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY) - response = await client.embeddings.create(model=model, input=text) - return response.data[0].embedding diff --git a/services/batch-agent/app/main.py b/services/batch-agent/app/main.py index ea9105e..d817370 100644 --- a/services/batch-agent/app/main.py +++ b/services/batch-agent/app/main.py @@ -14,6 +14,14 @@ from __future__ import annotations import asyncio import logging +import sys +from pathlib import Path + +# Ensure the repo root is on sys.path so ``shared`` is importable when +# running locally (in Docker the COPY already places it at /app/shared/). +_repo_root = str(Path(__file__).resolve().parents[3]) +if _repo_root not in sys.path: + sys.path.insert(0, _repo_root) from contextlib import asynccontextmanager from typing import AsyncGenerator diff --git a/services/batch-agent/app/redis_consumer.py b/services/batch-agent/app/redis_consumer.py index 12b1c5a..8adb02f 100644 --- a/services/batch-agent/app/redis_consumer.py +++ b/services/batch-agent/app/redis_consumer.py @@ -18,7 +18,7 @@ from typing import Any from shared.redis import redis_client, batch_request_channel, ws_out_channel import app.tracing as tracing -from app.ws_context import set_current_user, clear_current_user +from shared.ws_context import set_current_user, clear_current_user logger = logging.getLogger(__name__) diff --git a/services/chat/app/agents/__init__.py b/services/chat/app/agents/__init__.py deleted file mode 100644 index 29d6694..0000000 --- a/services/chat/app/agents/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Chat Service domain agents.""" diff --git a/services/chat/app/deep_agent.py b/services/chat/app/deep_agent.py index 486bbb7..1db25c6 100644 --- a/services/chat/app/deep_agent.py +++ b/services/chat/app/deep_agent.py @@ -16,13 +16,13 @@ from typing import Any, Literal from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.tools import tool -from app.agents.note_agent import NOTE_TOOLS -from app.agents.project_agent import PROJECT_TOOLS -from app.agents.task_agent import TASK_TOOLS -from app.agents.timeline_agent import TIMELINE_TOOLS -from app.llm import get_llm +from shared.agents.note_agent import NOTE_TOOLS +from shared.agents.project_agent import PROJECT_TOOLS +from shared.agents.task_agent import TASK_TOOLS +from shared.agents.timeline_agent import TIMELINE_TOOLS +from shared.llm import get_llm from app.memory_middleware import MemoryMiddleware -from app.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector +from shared.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app import tracing from shared.db import async_session diff --git a/services/chat/app/redis_consumer.py b/services/chat/app/redis_consumer.py index b663b0a..6278e86 100644 --- a/services/chat/app/redis_consumer.py +++ b/services/chat/app/redis_consumer.py @@ -17,7 +17,7 @@ from shared.redis import redis_client, ws_out_channel from app.deep_agent import run_floating_stream, run_home_stream from app.memory_middleware import MemoryMiddleware from app.output_formatter import StreamFormatter -from app.ws_context import clear_current_user, set_current_user +from shared.ws_context import clear_current_user, set_current_user from app import tracing logger = logging.getLogger(__name__) diff --git a/services/chat/app/routes.py b/services/chat/app/routes.py index 80950e4..08d61a9 100644 --- a/services/chat/app/routes.py +++ b/services/chat/app/routes.py @@ -8,7 +8,7 @@ from fastapi.responses import JSONResponse from shared.schemas import ChatRequest from app.deep_agent import run_home -from app.ws_context import clear_current_user, set_current_user +from shared.ws_context import clear_current_user, set_current_user router = APIRouter(prefix="/chat", tags=["chat"]) diff --git a/services/chat/app/ws_context.py b/services/chat/app/ws_context.py deleted file mode 100644 index 2eabe67..0000000 --- a/services/chat/app/ws_context.py +++ /dev/null @@ -1,115 +0,0 @@ -"""WebSocket context for Chat Service — Redis-based tool call round-trip. - -Replaces the monolith's ws_context.py. Instead of calling Electron directly -via WebSocket, this publishes tool_call frames to Redis (ws:out:{user_id}) -and awaits the result via BRPOP on tool:result:{call_id}. -""" - -from __future__ import annotations - -import json -import logging -from contextvars import ContextVar -from typing import Any -from uuid import uuid4 - -from shared.redis import redis_client, tool_result_key, ws_out_channel - -logger = logging.getLogger(__name__) - -_TOOL_CALL_TIMEOUT = 30 # seconds — BRPOP timeout - -# Per-request user_id context var (set before agent runs) -_current_user_id: ContextVar[str | None] = ContextVar("_current_user_id", default=None) - -# Optional collector for debug -_tool_result_collector: ContextVar[list[dict] | None] = ContextVar( - "_tool_result_collector", default=None -) - - -def set_current_user(user_id: str) -> None: - _current_user_id.set(user_id) - - -def clear_current_user() -> None: - _current_user_id.set(None) - - -def set_tool_result_collector(lst: list[dict]) -> None: - _tool_result_collector.set(lst) - - -def clear_tool_result_collector() -> None: - _tool_result_collector.set(None) - - -async def execute_on_client( - action: str, - table: str | None = None, - data: dict[str, Any] | None = None, - filters: dict[str, Any] | None = None, - vector: list[float] | None = None, - limit: int | None = None, -) -> dict[str, Any]: - """Send a tool_call to Electron via Redis and await the result. - - 1. Build tool_call payload - 2. Publish to ws:out:{user_id} (WS Gateway forwards to Electron) - 3. BRPOP on tool:result:{call_id} (WS Gateway pushes when Electron replies) - 4. Return result dict - - Raises RuntimeError if no user_id is set or if the call times out. - """ - user_id = _current_user_id.get() - if not user_id: - raise RuntimeError( - "execute_on_client() called without a user_id — " - "set_current_user() must be called first." - ) - - call_id = str(uuid4()) - payload: dict[str, Any] = { - "type": "tool_call", - "id": call_id, - "action": action, - } - if table is not None: - payload["table"] = table - if data is not None: - payload["data"] = data - if filters is not None: - payload["filters"] = {k: v for k, v in filters.items() if v is not None} - if vector is not None: - payload["vector"] = vector - if limit is not None: - payload["limit"] = limit - - # Publish tool_call to WS Gateway → Electron - channel = ws_out_channel(user_id) - await redis_client.publish(channel, json.dumps(payload)) - - # Wait for Electron's tool_result - result_key = tool_result_key(call_id) - response = await redis_client.brpop(result_key, timeout=_TOOL_CALL_TIMEOUT) - - if response is None: - raise RuntimeError( - f"Tool call {call_id} timed out after {_TOOL_CALL_TIMEOUT}s — " - f"device may be offline or unresponsive." - ) - - # response is (key, value) tuple - _, raw = response - result = json.loads(raw) - - # Collect for debug if requested - collector = _tool_result_collector.get(None) - if collector is not None: - collector.append({ - "action": action, - "table": table, - "data": result, - }) - - return result diff --git a/shared/agents/__init__.py b/shared/agents/__init__.py new file mode 100644 index 0000000..6f4b77f --- /dev/null +++ b/shared/agents/__init__.py @@ -0,0 +1 @@ +"""Shared domain agents — tool definitions used by both Chat and Batch Agent services.""" diff --git a/services/chat/app/agents/note_agent.py b/shared/agents/note_agent.py similarity index 96% rename from services/chat/app/agents/note_agent.py rename to shared/agents/note_agent.py index e296303..d9adf53 100644 --- a/services/chat/app/agents/note_agent.py +++ b/shared/agents/note_agent.py @@ -1,6 +1,6 @@ """Note agent — Markdown note management (list, get, create, update, delete). -Adapted for Chat Service: import from app.ws_context and app.llm. +Shared tool definitions used by both Chat and Batch Agent services. """ from __future__ import annotations @@ -10,8 +10,8 @@ from typing import Any from langchain_core.tools import tool -from app.llm import embed -from app.ws_context import execute_on_client +from shared.llm import embed +from shared.ws_context import execute_on_client _UUID_RE = re.compile( r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" diff --git a/services/chat/app/agents/project_agent.py b/shared/agents/project_agent.py similarity index 97% rename from services/chat/app/agents/project_agent.py rename to shared/agents/project_agent.py index 856d650..a1ee1d3 100644 --- a/services/chat/app/agents/project_agent.py +++ b/shared/agents/project_agent.py @@ -1,6 +1,6 @@ """Project agent — full lifecycle management (list, get, create, update, archive, delete). -Adapted for Chat Service: import from app.ws_context instead of app.core.ws_context. +Shared tool definitions used by both Chat and Batch Agent services. """ from __future__ import annotations @@ -9,7 +9,7 @@ from typing import Any from langchain_core.tools import tool -from app.ws_context import execute_on_client +from shared.ws_context import execute_on_client PROJECT_SYSTEM_PROMPT = ( "You are a project management assistant. You help users create, find,\n" diff --git a/services/chat/app/agents/task_agent.py b/shared/agents/task_agent.py similarity index 95% rename from services/chat/app/agents/task_agent.py rename to shared/agents/task_agent.py index 03d4f32..1581b80 100644 --- a/services/chat/app/agents/task_agent.py +++ b/shared/agents/task_agent.py @@ -1,6 +1,6 @@ """Task agent — full CRUD for tasks and task comments. -Adapted for Chat Service: import from app.ws_context instead of app.core.ws_context. +Shared tool definitions used by both Chat and Batch Agent services. """ from __future__ import annotations @@ -11,7 +11,7 @@ from typing import Any from langchain_core.tools import tool -from app.ws_context import execute_on_client +from shared.ws_context import execute_on_client _UUID_RE = re.compile( r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" @@ -32,7 +32,6 @@ TASK_SYSTEM_PROMPT = ( " - project_id is optional; link to a project when the user mentions one\n" " - is_ai_suggested: 1 only when proactively proposing a task the user\n" " did not explicitly request; 0 otherwise\n" - " - is_ai_suggested: 1 only when proactively proposing a task the user did not explicitly request; 0 otherwise\n" " - Use list_tasks_due_today for 'what's due today' queries\n" " - For update_task, use -1 for integer fields you do not want to change\n" " - Always confirm the action in plain, user-friendly language." @@ -225,7 +224,7 @@ async def delete_task_comment(comment_id: str) -> str: return f"Comment {comment_id} deleted." -# ── Agent ───────────────────────────────────────────────────────────── +# ── Exports ─────────────────────────────────────────────────────────── TASK_TOOLS: list[Any] = [ diff --git a/services/chat/app/agents/timeline_agent.py b/shared/agents/timeline_agent.py similarity index 94% rename from services/chat/app/agents/timeline_agent.py rename to shared/agents/timeline_agent.py index 669c558..d7cf6dd 100644 --- a/services/chat/app/agents/timeline_agent.py +++ b/shared/agents/timeline_agent.py @@ -1,6 +1,6 @@ """Timeline agent — project milestone management (list, create, update, delete). -Adapted for Chat Service: import from app.ws_context instead of app.core.ws_context. +Shared tool definitions used by both Chat and Batch Agent services. """ from __future__ import annotations @@ -10,7 +10,7 @@ from typing import Any from langchain_core.tools import tool -from app.ws_context import execute_on_client +from shared.ws_context import execute_on_client _UUID_RE = re.compile( r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" @@ -28,7 +28,6 @@ TIMELINE_SYSTEM_PROMPT = ( " - For listing, project_id must be a UUID; never pass plain names as project_id\n" " - date is a Unix timestamp in milliseconds; convert human-readable dates\n" " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" - " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" " - For update_timeline, use -1 for integer fields you do not want to change\n" " - Listing without a project_id returns all timelines across projects\n" " - Always echo the title and formatted date in your confirmation." diff --git a/services/chat/app/llm.py b/shared/llm.py similarity index 94% rename from services/chat/app/llm.py rename to shared/llm.py index 0a6cdce..e2ed26f 100644 --- a/services/chat/app/llm.py +++ b/shared/llm.py @@ -1,7 +1,7 @@ """LLM factory — centralised model instantiation via LiteLLM. -Adapted from app/core/llm.py for the Chat Service. -Uses shared.config.settings instead of app.config.settings. +Shared by Chat and Batch Agent services. +Uses shared.config.settings for all configuration. """ from __future__ import annotations diff --git a/services/batch-agent/app/ws_context.py b/shared/ws_context.py similarity index 84% rename from services/batch-agent/app/ws_context.py rename to shared/ws_context.py index ea3694b..c838bed 100644 --- a/services/batch-agent/app/ws_context.py +++ b/shared/ws_context.py @@ -1,12 +1,12 @@ -"""WebSocket context for Batch Agent Service — Redis-based tool call round-trip. +"""WebSocket context — Redis-based tool call round-trip. -Same pattern as services/chat/app/ws_context.py: publishes tool_call frames -to Redis ws:out:{user_id} and awaits BRPOP on tool:result:{call_id}. +Shared by Chat and Batch Agent services. Publishes tool_call frames to +Redis ``ws:out:{user_id}`` and awaits the result via BRPOP on +``tool:result:{call_id}``. -Additionally provides set_client_executor / clear_client_executor stubs -for backward compatibility with the agent_runner code (which originally -used a DeviceConnectionManager callback). In the microservice world these -are no-ops — execute_on_client() always uses the Redis path. +Also provides ``set_client_executor`` / ``clear_client_executor`` no-op +shims for backward compatibility with agent_runner code that originally +used a DeviceConnectionManager callback. """ from __future__ import annotations @@ -23,10 +23,10 @@ logger = logging.getLogger(__name__) _TOOL_CALL_TIMEOUT = 30 # seconds — BRPOP timeout -# Per-request user_id context var (set before agent run) +# Per-request user_id context var (set before agent runs) _current_user_id: ContextVar[str | None] = ContextVar("_current_user_id", default=None) -# Optional collector for debug / logging +# Optional collector for debug _tool_result_collector: ContextVar[list[dict] | None] = ContextVar( "_tool_result_collector", default=None ) @@ -51,17 +51,14 @@ def clear_tool_result_collector() -> None: # ── Compatibility shims ────────────────────────────────────────────────── # agent_runner.py originally called set_client_executor / clear_client_executor # with a DeviceConnectionManager callback. In the microservice world the -# Redis-based execute_on_client replaces this, so these are no-ops that -# keep the agent_runner code unchanged. +# Redis-based execute_on_client replaces this, so these are no-ops. def set_client_executor(fn: Callable[[dict], Coroutine[Any, Any, dict]] | None) -> None: """No-op — kept for agent_runner compatibility.""" - pass def clear_client_executor() -> None: """No-op — kept for agent_runner compatibility.""" - pass async def execute_on_client(