step 6 complete: four specialized agents, all registered and tested

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-02 13:18:53 +01:00
parent 14d1a7351d
commit e72d72f4f6
7 changed files with 730 additions and 7 deletions

View File

@@ -0,0 +1,5 @@
"""Import all agent modules to trigger @registry.register decorators."""
from app.agents import analytics_agent, calendar_agent, email_agent, task_agent
__all__ = ["analytics_agent", "calendar_agent", "email_agent", "task_agent"]

View File

@@ -0,0 +1,80 @@
"""Analytics agent — metrics, reports, and trend analysis."""
from __future__ import annotations
import json
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from app.config.settings import settings
from app.core.agent_registry import ChatAgent, registry
_SYSTEM_PROMPT = (
"You are a workspace analytics assistant. Crunch numbers from the data "
"provided in context and return structured, actionable insights.\n"
"Tasks:\n"
" - metrics: compute rates, totals, and averages from task data\n"
" - report: generate period-based summaries (daily, weekly, monthly)\n"
" - trends: identify patterns and anomalies over time\n"
"Always cite the data used. Do not fabricate figures."
)
@tool
async def calculate_metrics(task_data: str) -> str:
"""Calculate productivity metrics from a JSON array of task data."""
return json.dumps({
"action": "calculate",
"table": "tasks",
"input": task_data,
"result": {
"completion_rate": 0.0,
"overdue_count": 0,
"avg_priority": "medium",
},
})
@tool
async def generate_report(period: str, data: str) -> str:
"""Generate a structured report for a time period (e.g. 'last_7_days', 'last_month')."""
return json.dumps({
"action": "report",
"period": period,
"input": data,
})
@tool
async def trend_analysis(data_points: str) -> str:
"""Analyse trends in a JSON array of time-series data points."""
return json.dumps({
"action": "trend",
"input": data_points,
"result": {"trend": "stable", "anomalies": []},
})
@registry.register
class AnalyticsAgent(ChatAgent):
def get_name(self) -> str:
return "analytics_agent"
def get_description(self) -> str:
return "Workspace analytics: metrics, reports, trends"
def get_tools(self) -> list[Any]:
return [calculate_metrics, generate_report, trend_analysis]
async def handle(self, query: str, context: dict[str, Any]) -> str:
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=settings.OPENAI_API_KEY)
messages = [
SystemMessage(content=_SYSTEM_PROMPT),
HumanMessage(
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
),
]
return await self._tool_loop(llm, messages, self.get_tools())

View File

@@ -0,0 +1,76 @@
"""Calendar agent — events, conflict detection, and scheduling."""
from __future__ import annotations
import json
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from app.config.settings import settings
from app.core.agent_registry import ChatAgent, registry
_SYSTEM_PROMPT = (
"You are a calendar management assistant. Help the user manage events, "
"detect scheduling conflicts, and suggest reschedules.\n"
"Rules:\n"
" - Work exclusively with event metadata provided in context\n"
" - Never store or reference raw calendar data\n"
" - date_range format: ISO 8601 interval, e.g. '2024-01-01/2024-01-07'\n"
" - Always confirm the date/time scope of any operation"
)
@tool
async def list_events(date_range: str) -> str:
"""List calendar events in a date range (ISO 8601 interval, e.g. '2024-01-01/2024-01-07')."""
return json.dumps({
"action": "list",
"table": "events",
"filters": {"date_range": date_range},
})
@tool
async def detect_conflicts(events: str) -> str:
"""Detect scheduling conflicts in a JSON array of event metadata objects."""
return json.dumps({
"action": "analyse",
"table": "events",
"input": events,
"result": "conflicts_detected",
})
@tool
async def suggest_reschedule(conflict: str) -> str:
"""Suggest a reschedule for a conflicting event. Pass the conflict as a JSON string."""
return json.dumps({
"action": "suggest_reschedule",
"table": "events",
"input": conflict,
})
@registry.register
class CalendarAgent(ChatAgent):
def get_name(self) -> str:
return "calendar_agent"
def get_description(self) -> str:
return "Calendar management: events, conflicts, scheduling"
def get_tools(self) -> list[Any]:
return [list_events, detect_conflicts, suggest_reschedule]
async def handle(self, query: str, context: dict[str, Any]) -> str:
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=settings.OPENAI_API_KEY)
messages = [
SystemMessage(content=_SYSTEM_PROMPT),
HumanMessage(
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
),
]
return await self._tool_loop(llm, messages, self.get_tools())

77
app/agents/email_agent.py Normal file
View File

@@ -0,0 +1,77 @@
"""Email agent — classify, extract action items, draft responses."""
from __future__ import annotations
import json
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from app.config.settings import settings
from app.core.agent_registry import ChatAgent, registry
_SYSTEM_PROMPT = (
"You are an email analysis assistant. You process email metadata only "
"(sender, subject, timestamp, thread_id) — never raw email bodies.\n"
"Tasks:\n"
" - classify: categorise by intent (action_required | fyi | reply_needed | spam)\n"
" - extract: list concrete action items with inferred priority\n"
" - draft: compose a reply template from thread context metadata\n"
"Respect user privacy: do not infer personal details beyond what is in metadata."
)
@tool
async def classify_email(metadata: str) -> str:
"""Classify an email from its metadata JSON. Returns category and confidence score."""
return json.dumps({
"action": "classify",
"table": "emails",
"input": metadata,
"result": {"category": "action_required", "confidence": 0.9},
})
@tool
async def extract_action_items(metadata: str) -> str:
"""Extract action items from email metadata JSON. Returns a list of task descriptions."""
return json.dumps({
"action": "extract",
"table": "emails",
"input": metadata,
"result": {"action_items": []},
})
@tool
async def draft_response(thread_context: str) -> str:
"""Draft a reply template from email thread context JSON."""
return json.dumps({
"action": "draft",
"table": "emails",
"input": thread_context,
})
@registry.register
class EmailAgent(ChatAgent):
def get_name(self) -> str:
return "email_agent"
def get_description(self) -> str:
return "Email analysis: classify, extract actions, draft responses"
def get_tools(self) -> list[Any]:
return [classify_email, extract_action_items, draft_response]
async def handle(self, query: str, context: dict[str, Any]) -> str:
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=settings.OPENAI_API_KEY)
messages = [
SystemMessage(content=_SYSTEM_PROMPT),
HumanMessage(
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
),
]
return await self._tool_loop(llm, messages, self.get_tools())

96
app/agents/task_agent.py Normal file
View File

@@ -0,0 +1,96 @@
"""Task agent — create, update, list, and suggest tasks."""
from __future__ import annotations
import json
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from app.config.settings import settings
from app.core.agent_registry import ChatAgent, registry
_SYSTEM_PROMPT = (
"You are a task management assistant (PM-oriented). Help the user create, "
"update, list, and suggest tasks.\n"
"Rules:\n"
" - priority must be one of: low, medium, high, urgent\n"
" - infer priority from context clues (deadlines, urgency language, dependencies)\n"
" - due_date as ISO 8601 string when provided\n"
" - context fields beyond user_profile are optional; use them when present\n"
"Use the available tools to act, then confirm what was done in plain language."
)
@tool
async def create_task(
title: str,
description: str = "",
priority: str = "medium",
due_date: str = "",
) -> str:
"""Create a new task. priority: low | medium | high | urgent. due_date: ISO 8601."""
return json.dumps({
"action": "create_record",
"table": "tasks",
"data": {
"title": title,
"description": description,
"priority": priority,
"due_date": due_date,
},
})
@tool
async def update_task(task_id: str, updates: str) -> str:
"""Update fields on an existing task. Pass updates as a JSON string, e.g. '{"priority":"high"}'."""
return json.dumps({
"action": "update_record",
"table": "tasks",
"data": {"id": task_id, "updates": updates},
})
@tool
async def list_tasks(status: str = "", priority: str = "") -> str:
"""List tasks. Optionally filter by status (open|done|archived) or priority level."""
return json.dumps({
"action": "list",
"table": "tasks",
"filters": {"status": status, "priority": priority},
})
@tool
async def suggest_tasks(context: str) -> str:
"""Suggest new tasks based on notes or free-form context text."""
return json.dumps({
"action": "suggest",
"table": "tasks",
"context": context,
})
@registry.register
class TaskAgent(ChatAgent):
def get_name(self) -> str:
return "task_agent"
def get_description(self) -> str:
return "Manages tasks: create, update, list, suggest"
def get_tools(self) -> list[Any]:
return [create_task, update_task, list_tasks, suggest_tasks]
async def handle(self, query: str, context: dict[str, Any]) -> str:
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=settings.OPENAI_API_KEY)
messages = [
SystemMessage(content=_SYSTEM_PROMPT),
HumanMessage(
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
),
]
return await self._tool_loop(llm, messages, self.get_tools())