Support structured floating_domain routing and stabilize floating chat stream

This commit is contained in:
2026-03-13 16:10:13 +01:00
parent 8fe2b1c43e
commit 3bc08c6de7
9 changed files with 209 additions and 76 deletions

View File

@@ -31,12 +31,14 @@ const NOOP = () => undefined;
interface OrchestrateInput {
message: string;
requestId?: string;
conversationHistory?: Array<{ role: 'user' | 'assistant' | 'system'; content: string }>;
sender?: Electron.WebContents;
}
interface OrchestrateFloatingInput {
message: string;
requestId?: string;
scope: WsFloatingRequest['scope'];
sender?: Electron.WebContents;
}
@@ -81,18 +83,18 @@ async function checkConnectivity(): Promise<{ ok: true } | { ok: false; error: s
// ---------------------------------------------------------------------------
export async function orchestrate(input: OrchestrateInput): Promise<OrchestrateResult> {
const { message, conversationHistory, sender } = input;
const { message, requestId, conversationHistory, sender } = input;
const check = await checkConnectivity();
if (!check.ok) return { response: '', error: check.error };
try {
const client = getBackendClient();
const { requestId, promise } = client.sendHomeRequest(message, conversationHistory, {
onStart: () => sendFrame(sender, { type: 'stream_start', requestId }),
onText: (chunk) => sendFrame(sender, { type: 'stream_text', requestId, chunk }),
onEnd: (mutations) => sendFrame(sender, { type: 'stream_end', requestId, mutations: mutations as unknown[] | undefined }),
onError: () => sendFrame(sender, { type: 'stream_end', requestId }),
const { requestId: activeRequestId, promise } = client.sendHomeRequest(message, conversationHistory, requestId, {
onStart: () => sendFrame(sender, { type: 'stream_start', requestId: activeRequestId }),
onText: (chunk) => sendFrame(sender, { type: 'stream_text', requestId: activeRequestId, chunk }),
onEnd: (mutations) => sendFrame(sender, { type: 'stream_end', requestId: activeRequestId, mutations: mutations as unknown[] | undefined }),
onError: () => sendFrame(sender, { type: 'stream_end', requestId: activeRequestId }),
});
await promise;
@@ -117,19 +119,19 @@ export async function orchestrate(input: OrchestrateInput): Promise<OrchestrateR
// ---------------------------------------------------------------------------
export async function orchestrateFloating(input: OrchestrateFloatingInput): Promise<OrchestrateResult> {
const { message, scope, sender } = input;
const { message, requestId, scope, sender } = input;
const check = await checkConnectivity();
if (!check.ok) return { response: '', error: check.error };
try {
const client = getBackendClient();
const { requestId, promise } = client.sendFloatingRequest(message, scope, {
onStart: () => sendFrame(sender, { type: 'stream_start', requestId }),
onText: (chunk) => sendFrame(sender, { type: 'stream_text', requestId, chunk }),
onEnd: (mutations) => sendFrame(sender, { type: 'stream_end', requestId, mutations: mutations as unknown[] | undefined }),
onDomain: (domain) => sendFrame(sender, { type: 'floating_domain', requestId, domain }),
onError: () => sendFrame(sender, { type: 'stream_end', requestId }),
const { requestId: activeRequestId, promise } = client.sendFloatingRequest(message, scope, requestId, {
onStart: () => sendFrame(sender, { type: 'stream_start', requestId: activeRequestId }),
onText: (chunk) => sendFrame(sender, { type: 'stream_text', requestId: activeRequestId, chunk }),
onEnd: (mutations) => sendFrame(sender, { type: 'stream_end', requestId: activeRequestId, mutations: mutations as unknown[] | undefined }),
onDomain: (domain) => sendFrame(sender, { type: 'floating_domain', requestId: activeRequestId, domain }),
onError: () => sendFrame(sender, { type: 'stream_end', requestId: activeRequestId }),
});
await promise;
@@ -221,7 +223,7 @@ export async function generateAndCacheBrief(): Promise<boolean> {
let content = '';
try {
const client = getBackendClient();
const { promise } = client.sendHomeRequest(DAILY_BRIEF_PROMPT, undefined, {
const { promise } = client.sendHomeRequest(DAILY_BRIEF_PROMPT, undefined, undefined, {
onStart: NOOP,
onText: (chunk) => { content += chunk; },
onEnd: NOOP,
@@ -277,7 +279,7 @@ export function stopBriefScheduler(): void {
}
/** Stream the daily brief to the renderer and cache the result. */
export async function dailyBrief(sender?: Electron.WebContents): Promise<OrchestrateResult> {
export async function dailyBrief(sender?: Electron.WebContents, requestId?: string): Promise<OrchestrateResult> {
let content = '';
const check = await checkConnectivity();
@@ -285,15 +287,15 @@ export async function dailyBrief(sender?: Electron.WebContents): Promise<Orchest
try {
const client = getBackendClient();
const requestId = crypto.randomUUID();
const { promise } = client.sendHomeRequest(DAILY_BRIEF_PROMPT, undefined, {
onStart: () => sendFrame(sender, { type: 'stream_start', requestId }),
const activeRequestId = requestId ?? crypto.randomUUID();
const { promise } = client.sendHomeRequest(DAILY_BRIEF_PROMPT, undefined, activeRequestId, {
onStart: () => sendFrame(sender, { type: 'stream_start', requestId: activeRequestId }),
onText: (chunk) => {
content += chunk;
sendFrame(sender, { type: 'stream_text', requestId, chunk });
sendFrame(sender, { type: 'stream_text', requestId: activeRequestId, chunk });
},
onEnd: () => sendFrame(sender, { type: 'stream_end', requestId }),
onError: () => sendFrame(sender, { type: 'stream_end', requestId }),
onEnd: () => sendFrame(sender, { type: 'stream_end', requestId: activeRequestId }),
onError: () => sendFrame(sender, { type: 'stream_end', requestId: activeRequestId }),
});
await promise;
} catch (err) {

View File

@@ -30,6 +30,7 @@ import type {
WsAgentData,
LocalAgentConfig,
WsFloatingRequest,
WsFloatingDomain,
} from '../../shared/api-types';
import { DrizzleExecutor } from './drizzle-executor';
import { readAgentFiles } from '../agents/file-reader';
@@ -123,7 +124,7 @@ interface StreamListener {
onStart: () => void;
onText: (chunk: string) => void;
onEnd: (mutations?: unknown) => void;
onDomain: (domain: string) => void;
onDomain: (domain: WsFloatingDomain['domain']) => void;
onError: (err: Error) => void;
resolve: () => void;
reject: (err: Error) => void;
@@ -233,22 +234,23 @@ export class BackendClient {
sendHomeRequest(
message: string,
conversationHistory?: Array<{ role: 'user' | 'assistant' | 'system'; content: string }>,
requestId?: string,
callbacks?: Partial<Omit<StreamListener, 'resolve' | 'reject'>>,
): { requestId: string; promise: Promise<void> } {
const requestId = crypto.randomUUID();
const activeRequestId = requestId ?? crypto.randomUUID();
const promise = new Promise<void>((resolve, reject) => {
this.streamListeners.set(requestId, {
this.streamListeners.set(activeRequestId, {
onStart: callbacks?.onStart ?? ((): void => { /* no-op */ }),
onText: callbacks?.onText ?? ((): void => { /* no-op */ }),
onEnd: (mutations) => {
callbacks?.onEnd?.(mutations);
this.streamListeners.delete(requestId);
this.streamListeners.delete(activeRequestId);
resolve();
},
onDomain: callbacks?.onDomain ?? ((): void => { /* no-op */ }),
onError: (err) => {
callbacks?.onError?.(err);
this.streamListeners.delete(requestId);
this.streamListeners.delete(activeRequestId);
reject(err);
},
resolve,
@@ -257,17 +259,17 @@ export class BackendClient {
const ws = this.persistentWs;
if (!ws || ws.readyState !== WebSocket.OPEN) {
this.streamListeners.delete(requestId);
this.streamListeners.delete(activeRequestId);
reject(new OfflineError('Persistent WS not connected'));
return;
}
const homePayload = toSnakeCase({ type: 'home_request', requestId, message, conversationHistory });
const homePayload = toSnakeCase({ type: 'home_request', requestId: activeRequestId, message, conversationHistory });
logWsSend(homePayload);
ws.send(JSON.stringify(homePayload));
});
return { requestId, promise };
return { requestId: activeRequestId, promise };
}
/**
@@ -277,22 +279,23 @@ export class BackendClient {
sendFloatingRequest(
message: string,
scope: WsFloatingRequest['scope'],
requestId?: string,
callbacks?: Partial<Omit<StreamListener, 'resolve' | 'reject'>>,
): { requestId: string; promise: Promise<void> } {
const requestId = crypto.randomUUID();
const activeRequestId = requestId ?? crypto.randomUUID();
const promise = new Promise<void>((resolve, reject) => {
this.streamListeners.set(requestId, {
this.streamListeners.set(activeRequestId, {
onStart: callbacks?.onStart ?? ((): void => { /* no-op */ }),
onText: callbacks?.onText ?? ((): void => { /* no-op */ }),
onEnd: (mutations) => {
callbacks?.onEnd?.(mutations);
this.streamListeners.delete(requestId);
this.streamListeners.delete(activeRequestId);
resolve();
},
onDomain: callbacks?.onDomain ?? ((): void => { /* no-op */ }),
onError: (err) => {
callbacks?.onError?.(err);
this.streamListeners.delete(requestId);
this.streamListeners.delete(activeRequestId);
reject(err);
},
resolve,
@@ -301,17 +304,17 @@ export class BackendClient {
const ws = this.persistentWs;
if (!ws || ws.readyState !== WebSocket.OPEN) {
this.streamListeners.delete(requestId);
this.streamListeners.delete(activeRequestId);
reject(new OfflineError('Persistent WS not connected'));
return;
}
const floatingPayload = toSnakeCase({ type: 'floating_request', requestId, message, scope });
const floatingPayload = toSnakeCase({ type: 'floating_request', requestId: activeRequestId, message, scope });
logWsSend(floatingPayload);
ws.send(JSON.stringify(floatingPayload));
});
return { requestId, promise };
return { requestId: activeRequestId, promise };
}
// -------------------------------------------------------------------------

View File

@@ -649,6 +649,7 @@ const settingsRouter = router({
const aiRouter = router({
chat: publicProcedure
.input(z.object({
requestId: z.string().optional(),
message: z.string(),
conversationHistory: z.array(z.object({
role: z.enum(['user', 'assistant', 'system']),
@@ -665,12 +666,14 @@ const aiRouter = router({
if (input.mode === 'floating' && input.scope) {
return await orchestrateFloating({
message: input.message,
requestId: input.requestId,
scope: input.scope,
sender: ctx.sender,
});
}
return await orchestrate({
message: input.message,
requestId: input.requestId,
conversationHistory: input.conversationHistory,
sender: ctx.sender,
});
@@ -683,9 +686,10 @@ const aiRouter = router({
.query(() => getCachedBrief()),
dailyBrief: publicProcedure
.mutation(async ({ ctx }) => {
.input(z.object({ requestId: z.string().optional() }).optional())
.mutation(async ({ ctx, input }) => {
try {
return await dailyBrief(ctx.sender);
return await dailyBrief(ctx.sender, input?.requestId);
} catch (err) {
const msg = err instanceof Error ? err.message : 'Unknown error';
return { response: '', error: msg };

View File

@@ -26,7 +26,20 @@ type V3StreamEvent =
| { type: 'stream_start'; requestId: string }
| { type: 'stream_text'; requestId: string; chunk: string }
| { type: 'stream_end'; requestId: string; mutations?: unknown[] }
| { type: 'floating_domain'; requestId: string; domain: 'tasks' | 'notes' | 'timelines' | 'projects' };
| {
type: 'floating_domain';
requestId: string;
domain:
| 'tasks'
| 'notes'
| 'timelines'
| 'projects'
| {
type: 'task' | 'timeline' | 'project' | 'note' | 'node';
id?: string | null;
section?: 'task' | 'timeline' | 'note' | null;
};
};
contextBridge.exposeInMainWorld('electronAI', {
/** Subscribe to v3 AI stream events. Returns an unsubscribe function. */

View File

@@ -269,14 +269,10 @@ export function AIChatPanel({
if (briefModule.streamFired) return;
briefModule.streamFired = true;
briefContentRef.current = '';
const requestId = crypto.randomUUID();
let briefRequestId: string | null = null;
let unsubscribe: (() => void) | null = window.electronAI.onStreamEvent((event) => {
if (event.type === 'stream_start') {
briefRequestId = event.requestId;
return;
}
if (briefRequestId !== null && event.requestId !== briefRequestId) return;
if (event.requestId !== requestId) return;
if (event.type === 'stream_text') {
briefContentRef.current += event.chunk;
@@ -293,7 +289,7 @@ export function AIChatPanel({
}
});
briefMutation.mutate(undefined, {
briefMutation.mutate({ requestId }, {
onError: () => {
unsubscribe?.();
unsubscribe = null;

View File

@@ -10,7 +10,7 @@ import {
CHAT_HEIGHT,
PADDING,
} from '@/context/FloatingChatContext';
import { useAIChat, type UIChatContext } from '@/hooks/useAIChat';
import { useAIChat, type UIChatContext, type FloatingDomainSignal } from '@/hooks/useAIChat';
import { ChatMarkdown } from '@/components/ai/AIChatPanel';
import { Skeleton } from '@/components/ui/skeleton';
@@ -22,11 +22,63 @@ const DOMAIN_ROUTES: Record<string, string> = {
projects: '/projects',
};
const DOMAIN_SECTION_IDS: Partial<Record<'tasks' | 'notes' | 'timelines' | 'projects', string>> = {
tasks: 'tasks-list',
timelines: 'timeline-chart',
};
interface DomainNavigationTarget {
route: '/tasks' | '/timeline' | '/projects' | '/notes/$noteId';
sectionId?: string;
projectId?: string;
noteId?: string;
nodeId?: string;
}
function normalizeDomainSignal(domain: FloatingDomainSignal): DomainNavigationTarget | null {
if (typeof domain === 'string') {
const route = DOMAIN_ROUTES[domain];
if (!route) return null;
return {
route: route as DomainNavigationTarget['route'],
sectionId: DOMAIN_SECTION_IDS[domain as keyof typeof DOMAIN_SECTION_IDS],
};
}
switch (domain.type) {
case 'task':
return { route: '/tasks', sectionId: 'tasks-list' };
case 'timeline':
return { route: '/timeline', sectionId: 'timeline-chart' };
case 'note':
if (!domain.id) return { route: '/projects' };
return { route: '/notes/$noteId', noteId: domain.id };
case 'project': {
if (domain.section === 'task') {
return { route: '/projects', sectionId: 'project-tasks', projectId: domain.id ?? undefined };
}
if (domain.section === 'timeline') {
return { route: '/projects', sectionId: 'project-timeline', projectId: domain.id ?? undefined };
}
if (domain.section === 'note') {
return { route: '/projects', sectionId: 'project-notes', projectId: domain.id ?? undefined };
}
return { route: '/projects', projectId: domain.id ?? undefined };
}
case 'node':
if (!domain.id) return null;
return { route: '/projects', sectionId: domain.id, nodeId: domain.id };
default:
return null;
}
}
function FloatingChatInner() {
const { state, sections, close, updatePosition, setPendingSection } = useFloatingChat();
const { state, sections, close, updatePosition, setPendingSection, moveToSection } = useFloatingChat();
const navigate = useNavigate();
const routerState = useRouterState();
const prevPathRef = useRef(routerState.location.pathname);
const domainNavigationInFlightRef = useRef(false);
// Active section lookup
const activeSection = sections.get(state.activeSectionId ?? '');
@@ -55,17 +107,50 @@ function FloatingChatInner() {
// Handle floating_domain signals — navigate in background
const handleDomainSignal = useCallback(
(domain: 'tasks' | 'notes' | 'timelines' | 'projects') => {
const route = DOMAIN_ROUTES[domain];
if (!route) return;
(domainSignal: FloatingDomainSignal) => {
const target = normalizeDomainSignal(domainSignal);
if (!target) return;
// If backend points to a currently registered node/section, move there immediately.
if (target.sectionId && sections.has(target.sectionId)) {
moveToSection(target.sectionId);
return;
}
const currentPath = routerState.location.pathname;
if (currentPath === route) return;
const isCurrentRoute =
(target.route === '/projects' && currentPath === '/projects') ||
(target.route === '/tasks' && currentPath === '/tasks') ||
(target.route === '/timeline' && currentPath === '/timeline') ||
(target.route === '/notes/$noteId' && currentPath.startsWith('/notes/'));
setPendingSection({ sectionId: domain });
void navigate({ to: route });
if (isCurrentRoute && target.sectionId) {
setPendingSection({ sectionId: target.sectionId });
return;
}
if (isCurrentRoute) return;
domainNavigationInFlightRef.current = true;
const pendingSectionId = target.sectionId;
if (pendingSectionId) {
setPendingSection({ sectionId: pendingSectionId });
} else {
setPendingSection(undefined);
}
if (target.route === '/projects') {
void navigate({ to: '/projects', search: target.projectId ? { projectId: target.projectId } : {} });
} else if (target.route === '/notes/$noteId' && target.noteId) {
void navigate({ to: '/notes/$noteId', params: { noteId: target.noteId } });
} else if (target.route === '/tasks') {
void navigate({ to: '/tasks' });
} else if (target.route === '/timeline') {
void navigate({ to: '/timeline' });
}
},
[routerState.location.pathname, navigate, setPendingSection],
[routerState.location.pathname, navigate, setPendingSection, sections, moveToSection],
);
const {
@@ -98,8 +183,13 @@ function FloatingChatInner() {
useEffect(() => {
const currentPath = routerState.location.pathname;
if (prevPathRef.current !== currentPath && state.isOpen && !state.pendingSection) {
close();
if (prevPathRef.current !== currentPath && state.isOpen) {
// Keep floating chat alive when navigation is AI-domain driven.
if (domainNavigationInFlightRef.current) {
domainNavigationInFlightRef.current = false;
} else if (!state.pendingSection) {
close();
}
}
prevPathRef.current = currentPath;
}, [routerState.location.pathname, state.isOpen, state.pendingSection, close]);

View File

@@ -1,6 +1,17 @@
import { useState, useCallback, useRef, useEffect, useMemo } from 'react';
import { trpc } from '@/lib/trpc';
export type FloatingDomainSignal =
| 'tasks'
| 'notes'
| 'timelines'
| 'projects'
| {
type: 'task' | 'timeline' | 'project' | 'note' | 'node';
id?: string | null;
section?: 'task' | 'timeline' | 'note' | null;
};
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
@@ -37,7 +48,7 @@ interface UseAIChatReturn {
}
interface UseAIChatOptions {
onDomainSignal?: (domain: 'tasks' | 'notes' | 'timelines' | 'projects') => void;
onDomainSignal?: (domain: FloatingDomainSignal) => void;
}
interface CachedChatState {
@@ -51,9 +62,9 @@ function getContextCacheKey(ctx: UIChatContext): string {
if (ctx.type === 'global') return 'global';
if (ctx.type === 'project') return `project:${ctx.projectId ?? ''}`;
const scopeType = ctx.scope?.type ?? '';
const scopeId = ctx.scope?.id ?? '';
return `floating:${ctx.projectId ?? ''}:${scopeType}:${scopeId}`;
// Floating chat should keep a single continuous session while the panel is open,
// even when route/section context changes due floating-domain navigation.
return 'floating';
}
// ---------------------------------------------------------------------------
@@ -144,6 +155,7 @@ export function useAIChat(defaultContext: UIChatContext, options?: UseAIChatOpti
role: 'user',
content: trimmed,
};
const requestId = crypto.randomUUID();
setMessages((prev) => [...prev, userMsg]);
if (!overrideMessage) setInput('');
@@ -151,24 +163,16 @@ export function useAIChat(defaultContext: UIChatContext, options?: UseAIChatOpti
setStreamingContent('');
streamingContentRef.current = '';
// Capture the requestId from stream_start so we only handle events
// for this specific chat request (avoids cross-contamination with
// other concurrent streams like the daily brief).
let activeRequestId: string | null = null;
const unsubscribe = window.electronAI.onStreamEvent((event) => {
// Latch the requestId on first stream_start
if (event.type === 'stream_start') {
activeRequestId = event.requestId;
return;
}
// Once we have a requestId, ignore events from other streams
if (activeRequestId !== null && event.requestId !== activeRequestId) {
// Strictly process only frames for this chat request.
if (event.requestId !== requestId) {
return;
}
switch (event.type) {
case 'stream_start':
break;
case 'stream_text':
streamingContentRef.current += event.chunk;
setStreamingContent(streamingContentRef.current);
@@ -209,6 +213,7 @@ export function useAIChat(defaultContext: UIChatContext, options?: UseAIChatOpti
chatMutation.mutate(
{
requestId,
message: trimmed,
conversationHistory,
...(isFloating && ctx.scope

View File

@@ -17,7 +17,20 @@ type V3StreamEvent =
| { type: 'stream_start'; requestId: string }
| { type: 'stream_text'; requestId: string; chunk: string }
| { type: 'stream_end'; requestId: string; mutations?: unknown[] }
| { type: 'floating_domain'; requestId: string; domain: 'tasks' | 'notes' | 'timelines' | 'projects' };
| {
type: 'floating_domain';
requestId: string;
domain:
| 'tasks'
| 'notes'
| 'timelines'
| 'projects'
| {
type: 'task' | 'timeline' | 'project' | 'note' | 'node';
id?: string | null;
section?: 'task' | 'timeline' | 'note' | null;
};
};
interface ElectronAI {
onStreamEvent: (cb: (data: V3StreamEvent) => void) => () => void;

View File

@@ -199,7 +199,14 @@ export type WsStreamEnd = z.infer<typeof WsStreamEndSchema>;
export const WsFloatingDomainSchema = z.object({
type: z.literal('floating_domain'),
requestId: z.string(),
domain: z.enum(['tasks', 'notes', 'timelines', 'projects']),
domain: z.union([
z.enum(['tasks', 'notes', 'timelines', 'projects']),
z.object({
type: z.enum(['task', 'timeline', 'project', 'note', 'node']),
id: z.string().nullable().optional(),
section: z.enum(['task', 'timeline', 'note']).nullable().optional(),
}),
]),
});
export type WsFloatingDomain = z.infer<typeof WsFloatingDomainSchema>;