feat: add WS Gateway and Chat Service (Step 2)
WS Gateway:
- WebSocket lifecycle handler with RS256 JWT auth
- Redis bridge: device registry, frame publishing, tool_result routing
- Inbound routing: tool_result→LPUSH, home/floating→chat pub/sub
- Outbound: subscribes to ws:out:{user_id}, forwards to Electron
- Single-worker Dockerfile (long-lived WS connections)
Chat Service:
- Redis consumer: subscribes to chat:request:* pattern
- Redis-based ws_context: tool_call→publish, BRPOP tool_result (30s timeout)
- deep_agent: single-agent runner with home/floating/stream variants
- memory_middleware: core/associative/episodic/proactive memory with Fernet
- Domain agents: task (8 tools), note (5), project (6), timeline (4)
- LLM factory via LiteLLM (100+ providers)
- Output formatter (StreamFormatter)
- POST /chat REST fallback with Traefik header auth
- Multi-worker Dockerfile with 120s timeout for LLM calls
This commit is contained in:
170
services/chat/app/redis_consumer.py
Normal file
170
services/chat/app/redis_consumer.py
Normal file
@@ -0,0 +1,170 @@
|
||||
"""Redis consumer — listens for chat requests and dispatches to deep_agent.
|
||||
|
||||
Subscribes to a Redis pattern channel chat:request:* so it receives
|
||||
requests for ALL users. Each request is processed in a separate asyncio task.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from uuid import uuid4
|
||||
|
||||
from shared.db import async_session
|
||||
from shared.redis import redis_client, ws_out_channel
|
||||
|
||||
from app.deep_agent import run_floating_stream, run_home_stream
|
||||
from app.memory_middleware import MemoryMiddleware
|
||||
from app.output_formatter import StreamFormatter
|
||||
from app.ws_context import clear_current_user, set_current_user
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def start_consumer() -> asyncio.Task:
|
||||
"""Start the Redis consumer as a background asyncio task."""
|
||||
return asyncio.create_task(_consumer_loop())
|
||||
|
||||
|
||||
async def _consumer_loop() -> None:
|
||||
"""Subscribe to chat:request:* and dispatch incoming frames."""
|
||||
pubsub = redis_client.pubsub()
|
||||
await pubsub.psubscribe("chat:request:*")
|
||||
logger.info("redis_consumer: subscribed to chat:request:*")
|
||||
|
||||
try:
|
||||
while True:
|
||||
message = await pubsub.get_message(
|
||||
ignore_subscribe_messages=True, timeout=1.0
|
||||
)
|
||||
if message is not None and message["type"] == "pmessage":
|
||||
frame = json.loads(message["data"])
|
||||
asyncio.create_task(_dispatch(frame))
|
||||
else:
|
||||
await asyncio.sleep(0.01)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("redis_consumer: shutting down")
|
||||
finally:
|
||||
await pubsub.punsubscribe()
|
||||
await pubsub.aclose()
|
||||
|
||||
|
||||
async def _dispatch(frame: dict) -> None:
|
||||
"""Route a chat request frame to the appropriate handler."""
|
||||
frame_type = frame.get("type")
|
||||
user_id = frame.get("user_id")
|
||||
|
||||
if not user_id:
|
||||
logger.warning("redis_consumer: frame missing user_id: %s", frame.get("type"))
|
||||
return
|
||||
|
||||
if frame_type == "home_request":
|
||||
await _handle_home_request(user_id, frame)
|
||||
elif frame_type == "floating_request":
|
||||
await _handle_floating_request(user_id, frame)
|
||||
else:
|
||||
logger.debug("redis_consumer: unknown frame type %r", frame_type)
|
||||
|
||||
|
||||
async def _publish_frame(user_id: str, frame_data: str) -> None:
|
||||
"""Publish a frame to ws:out:{user_id} for the WS Gateway to forward."""
|
||||
channel = ws_out_channel(user_id)
|
||||
await redis_client.publish(channel, frame_data)
|
||||
|
||||
|
||||
async def _handle_home_request(user_id: str, frame: dict) -> None:
|
||||
"""Process a home_request — enrich with memory, run deep_agent, stream results."""
|
||||
request_id = frame.get("request_id") or str(uuid4())
|
||||
message: str = frame.get("message", "")
|
||||
session_id: str = frame.get("session_id") or str(uuid4())
|
||||
|
||||
logger.info(
|
||||
"redis_consumer: home_request user=%s req=%s msg=%s",
|
||||
user_id, request_id, message[:200],
|
||||
)
|
||||
|
||||
# Enrich with memory context
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
memory_context = await memory.enrich_context(
|
||||
user_id, message,
|
||||
trace_id=request_id, session_id=session_id,
|
||||
)
|
||||
|
||||
context: dict = {
|
||||
"conversation_history": frame.get("conversation_history", []),
|
||||
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
|
||||
**memory_context,
|
||||
}
|
||||
|
||||
set_current_user(user_id)
|
||||
response_chunks: list[str] = []
|
||||
try:
|
||||
event_stream = run_home_stream(user_id, message, context)
|
||||
formatter = StreamFormatter(request_id=request_id)
|
||||
async for ws_frame in formatter.format(event_stream):
|
||||
await _publish_frame(user_id, ws_frame.model_dump_json())
|
||||
if hasattr(ws_frame, "chunk"):
|
||||
response_chunks.append(ws_frame.chunk)
|
||||
except Exception as exc:
|
||||
logger.error("redis_consumer: home_request failed user=%s req=%s: %s", user_id, request_id, exc)
|
||||
finally:
|
||||
clear_current_user()
|
||||
|
||||
# Store episode
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
await memory.store_episode(
|
||||
user_id, session_id, message, "".join(response_chunks),
|
||||
trace_id=request_id,
|
||||
)
|
||||
|
||||
|
||||
async def _handle_floating_request(user_id: str, frame: dict) -> None:
|
||||
"""Process a floating_request — enrich with memory, run deep_agent, stream results."""
|
||||
request_id = frame.get("request_id") or str(uuid4())
|
||||
message: str = frame.get("message", "")
|
||||
session_id: str = frame.get("session_id") or str(uuid4())
|
||||
scope: dict = frame.get("scope", {})
|
||||
|
||||
logger.info(
|
||||
"redis_consumer: floating_request user=%s req=%s scope=%s msg=%s",
|
||||
user_id, request_id, json.dumps(scope)[:200], message[:200],
|
||||
)
|
||||
|
||||
# Enrich with memory context
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
memory_context = await memory.enrich_context(
|
||||
user_id, message,
|
||||
trace_id=request_id, session_id=session_id,
|
||||
)
|
||||
|
||||
context: dict = {
|
||||
"scope": scope,
|
||||
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
|
||||
**memory_context,
|
||||
}
|
||||
|
||||
set_current_user(user_id)
|
||||
response_chunks: list[str] = []
|
||||
try:
|
||||
event_stream = run_floating_stream(user_id, message, context)
|
||||
formatter = StreamFormatter(request_id=request_id)
|
||||
async for ws_frame in formatter.format(event_stream):
|
||||
await _publish_frame(user_id, ws_frame.model_dump_json())
|
||||
if hasattr(ws_frame, "chunk"):
|
||||
response_chunks.append(ws_frame.chunk)
|
||||
except Exception as exc:
|
||||
logger.error("redis_consumer: floating_request failed user=%s req=%s: %s", user_id, request_id, exc)
|
||||
finally:
|
||||
clear_current_user()
|
||||
|
||||
# Store episode
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
await memory.store_episode(
|
||||
user_id, session_id, message, "".join(response_chunks),
|
||||
trace_id=request_id,
|
||||
)
|
||||
Reference in New Issue
Block a user