mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 18:49:20 +02:00
704 lines
23 KiB
TypeScript
704 lines
23 KiB
TypeScript
import {
|
|
InstanceAiConfirmRequestDto,
|
|
instanceAiGatewayCapabilitiesSchema,
|
|
instanceAiFilesystemResponseSchema,
|
|
InstanceAiRenameThreadRequestDto,
|
|
InstanceAiSendMessageRequest,
|
|
InstanceAiEventsQuery,
|
|
instanceAiGatewayKeySchema,
|
|
InstanceAiCorrectTaskRequest,
|
|
InstanceAiUpdateMemoryRequest,
|
|
InstanceAiEnsureThreadRequest,
|
|
InstanceAiThreadMessagesQuery,
|
|
InstanceAiAdminSettingsUpdateRequest,
|
|
InstanceAiUserPreferencesUpdateRequest,
|
|
} from '@n8n/api-types';
|
|
import { ModuleRegistry } from '@n8n/backend-common';
|
|
import { GlobalConfig } from '@n8n/config';
|
|
import { AuthenticatedRequest } from '@n8n/db';
|
|
import {
|
|
RestController,
|
|
GlobalScope,
|
|
Middleware,
|
|
Get,
|
|
Post,
|
|
Put,
|
|
Patch,
|
|
Delete,
|
|
Param,
|
|
Body,
|
|
Query,
|
|
} from '@n8n/decorators';
|
|
import type { StoredEvent } from '@n8n/instance-ai';
|
|
import { buildAgentTreeFromEvents } from '@n8n/instance-ai';
|
|
import type { NextFunction, Request, Response } from 'express';
|
|
import { randomUUID, timingSafeEqual } from 'node:crypto';
|
|
import { InProcessEventBus } from './event-bus/in-process-event-bus';
|
|
import { InstanceAiMemoryService } from './instance-ai-memory.service';
|
|
import { InstanceAiSettingsService } from './instance-ai-settings.service';
|
|
import { InstanceAiService } from './instance-ai.service';
|
|
|
|
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
|
import { ConflictError } from '@/errors/response-errors/conflict.error';
|
|
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
|
|
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
|
import { Push } from '@/push';
|
|
|
|
type FlushableResponse = Response & { flush?: () => void };
|
|
|
|
const KEEP_ALIVE_INTERVAL_MS = 15_000;
|
|
|
|
@RestController('/instance-ai')
|
|
export class InstanceAiController {
|
|
private readonly gatewayApiKey: string;
|
|
|
|
private readonly instanceBaseUrl: string;
|
|
|
|
constructor(
|
|
private readonly instanceAiService: InstanceAiService,
|
|
private readonly memoryService: InstanceAiMemoryService,
|
|
private readonly settingsService: InstanceAiSettingsService,
|
|
private readonly eventBus: InProcessEventBus,
|
|
private readonly moduleRegistry: ModuleRegistry,
|
|
private readonly push: Push,
|
|
globalConfig: GlobalConfig,
|
|
) {
|
|
this.gatewayApiKey = globalConfig.instanceAi.gatewayApiKey;
|
|
this.instanceBaseUrl = globalConfig.editorBaseUrl || `http://localhost:${globalConfig.port}`;
|
|
}
|
|
|
|
// Each BrotliCompress stream allocates ~8.6 MB of native memory for its
|
|
// dictionary, and the compression middleware retains streams via closures on
|
|
// the response object for the lifetime of the HTTP keep-alive connection.
|
|
// Downgrade to gzip (~few KB per stream) for all instance-ai endpoints.
|
|
@Middleware()
|
|
stripBrotli(req: Request, _res: Response, next: NextFunction) {
|
|
const ae = req.headers['accept-encoding'];
|
|
if (typeof ae === 'string' && ae.includes('br')) {
|
|
req.headers['accept-encoding'] = ae.replace(/\bbr\b,?\s*/g, '').replace(/,\s*$/, '');
|
|
}
|
|
next();
|
|
}
|
|
|
|
@Post('/chat/:threadId')
|
|
@GlobalScope('instanceAi:message')
|
|
async chat(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
@Body payload: InstanceAiSendMessageRequest,
|
|
) {
|
|
// Verify the requesting user owns this thread (or it's new)
|
|
await this.assertThreadAccess(req.user.id, threadId, { allowNew: true });
|
|
|
|
// One active run per thread
|
|
if (this.instanceAiService.hasActiveRun(threadId)) {
|
|
throw new ConflictError('A run is already active for this thread');
|
|
}
|
|
|
|
const runId = this.instanceAiService.startRun(
|
|
req.user,
|
|
threadId,
|
|
payload.message,
|
|
payload.researchMode,
|
|
payload.attachments,
|
|
payload.timeZone,
|
|
payload.pushRef,
|
|
);
|
|
return { runId };
|
|
}
|
|
|
|
// usesTemplates bypasses the send() wrapper so we can write SSE frames directly
|
|
@Get('/events/:threadId', { usesTemplates: true })
|
|
@GlobalScope('instanceAi:message')
|
|
async events(
|
|
req: AuthenticatedRequest,
|
|
res: FlushableResponse,
|
|
@Param('threadId') threadId: string,
|
|
@Query query: InstanceAiEventsQuery,
|
|
) {
|
|
// Verify the requesting user owns this thread before streaming events.
|
|
// A thread that doesn't exist yet is allowed — the frontend opens the SSE
|
|
// connection for new conversations before the first message creates the thread.
|
|
const ownership = await this.memoryService.checkThreadOwnership(req.user.id, threadId);
|
|
if (ownership === 'other_user') {
|
|
throw new ForbiddenError('Not authorized for this thread');
|
|
}
|
|
|
|
// When the thread didn't exist at connect time, another user could create
|
|
// and own it before events start flowing. We re-check once on the first
|
|
// event and close the stream if ownership changed. Events are buffered
|
|
// until the check resolves to prevent leaking data during the async gap.
|
|
let ownershipVerified = ownership === 'owned';
|
|
let ownershipCheckInFlight = false;
|
|
const pendingEvents: StoredEvent[] = [];
|
|
const userId = req.user.id;
|
|
|
|
// 1. Set SSE headers.
|
|
// Disable response compression — SSE streams small chunks where compression
|
|
// overhead exceeds the benefit, and each Brotli compressor retains ~8.6 MB
|
|
// of native memory for the lifetime of the connection.
|
|
(res as unknown as { compress: boolean }).compress = false;
|
|
res.setHeader('Content-Type', 'text/event-stream; charset=UTF-8');
|
|
res.setHeader('Cache-Control', 'no-cache');
|
|
res.setHeader('Connection', 'keep-alive');
|
|
res.setHeader('X-Accel-Buffering', 'no');
|
|
res.flushHeaders();
|
|
|
|
// 2. Determine replay cursor
|
|
// Last-Event-ID header (browser auto-reconnect) takes precedence over query param.
|
|
// Both are validated as non-negative integers; invalid values fall back to 0.
|
|
const headerValue = req.headers['last-event-id'];
|
|
const parsedHeader = headerValue ? parseInt(String(headerValue), 10) : NaN;
|
|
const cursor =
|
|
Number.isFinite(parsedHeader) && parsedHeader >= 0 ? parsedHeader : (query.lastEventId ?? 0);
|
|
|
|
// 3. Replay missed events then subscribe in the same tick.
|
|
// Since InProcessEventBus is synchronous and single-threaded (Node.js
|
|
// event loop), there is no window for missed events between replay and
|
|
// subscribe when done in the same synchronous block.
|
|
const missed = this.eventBus.getEventsAfter(threadId, cursor);
|
|
for (const stored of missed) {
|
|
this.writeSseEvent(res, stored);
|
|
}
|
|
|
|
// 3b. Bootstrap sync: emit one run-sync control frame per live message group.
|
|
// Multiple groups can be active simultaneously when a background task
|
|
// from an older turn outlives its original turn. Each frame uses named
|
|
// SSE event type (event: run-sync) with NO id: field so the browser's
|
|
// lastEventId is unaffected and replay cursor stays consistent.
|
|
const threadStatus = this.instanceAiService.getThreadStatus(threadId);
|
|
|
|
// Collect all distinct message groups that have live activity.
|
|
const liveGroups = new Map<
|
|
string,
|
|
{ runIds: string[]; status: 'active' | 'suspended' | 'background' }
|
|
>();
|
|
|
|
// The active/suspended orchestrator run's group
|
|
if (threadStatus.hasActiveRun || threadStatus.isSuspended) {
|
|
const groupId = this.instanceAiService.getMessageGroupId(threadId);
|
|
if (groupId) {
|
|
liveGroups.set(groupId, {
|
|
runIds: this.instanceAiService.getRunIdsForMessageGroup(groupId),
|
|
status: threadStatus.hasActiveRun ? 'active' : 'suspended',
|
|
});
|
|
}
|
|
}
|
|
|
|
// Background tasks — each may belong to a different group
|
|
for (const task of threadStatus.backgroundTasks) {
|
|
if (task.status !== 'running' || !task.messageGroupId) continue;
|
|
if (!liveGroups.has(task.messageGroupId)) {
|
|
liveGroups.set(task.messageGroupId, {
|
|
runIds: this.instanceAiService.getRunIdsForMessageGroup(task.messageGroupId),
|
|
status: 'background',
|
|
});
|
|
}
|
|
}
|
|
|
|
for (const [groupId, group] of liveGroups) {
|
|
const runEvents = this.eventBus.getEventsForRuns(threadId, group.runIds);
|
|
if (runEvents.length === 0) continue;
|
|
|
|
const agentTree = buildAgentTreeFromEvents(runEvents);
|
|
// Use the group's own latest runId — NOT the thread-global activeRunId,
|
|
// which belongs to the current orchestrator turn and would be wrong for
|
|
// background groups from older turns.
|
|
const groupRunId = group.runIds.at(-1);
|
|
res.write(
|
|
`event: run-sync\ndata: ${JSON.stringify({
|
|
runId: groupRunId,
|
|
messageGroupId: groupId,
|
|
runIds: group.runIds,
|
|
agentTree,
|
|
status: group.status,
|
|
backgroundTasks: threadStatus.backgroundTasks,
|
|
})}\n\n`,
|
|
);
|
|
}
|
|
if (liveGroups.size > 0) res.flush?.();
|
|
|
|
// 4. Subscribe to live events
|
|
// When the thread was not_found at connect time, re-validate ownership on
|
|
// the first event. Buffer all events until the check resolves to avoid
|
|
// leaking data during the async gap.
|
|
const unsubscribe = this.eventBus.subscribe(threadId, (stored) => {
|
|
if (ownershipVerified) {
|
|
this.writeSseEvent(res, stored);
|
|
return;
|
|
}
|
|
|
|
pendingEvents.push(stored);
|
|
|
|
if (ownershipCheckInFlight) return;
|
|
ownershipCheckInFlight = true;
|
|
|
|
void this.memoryService
|
|
.checkThreadOwnership(userId, threadId)
|
|
.then((currentOwnership) => {
|
|
if (currentOwnership === 'other_user') {
|
|
res.end();
|
|
return;
|
|
}
|
|
ownershipVerified = true;
|
|
for (const buffered of pendingEvents) {
|
|
this.writeSseEvent(res, buffered);
|
|
}
|
|
pendingEvents.length = 0;
|
|
})
|
|
.catch(() => {
|
|
pendingEvents.length = 0;
|
|
res.end();
|
|
});
|
|
});
|
|
|
|
// 5. Keep-alive
|
|
const keepAlive = setInterval(() => {
|
|
res.write(': ping\n\n');
|
|
res.flush?.();
|
|
}, KEEP_ALIVE_INTERVAL_MS);
|
|
|
|
// 6. Cleanup on disconnect
|
|
const cleanup = () => {
|
|
unsubscribe();
|
|
clearInterval(keepAlive);
|
|
};
|
|
req.once('close', cleanup);
|
|
res.once('finish', cleanup);
|
|
}
|
|
|
|
@Post('/confirm/:requestId')
|
|
@GlobalScope('instanceAi:message')
|
|
async confirm(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('requestId') requestId: string,
|
|
@Body body: InstanceAiConfirmRequestDto,
|
|
) {
|
|
const resolved = await this.instanceAiService.resolveConfirmation(req.user.id, requestId, {
|
|
approved: body.approved,
|
|
credentialId: body.credentialId,
|
|
credentials: body.credentials,
|
|
nodeCredentials: body.nodeCredentials,
|
|
autoSetup: body.autoSetup,
|
|
userInput: body.userInput,
|
|
domainAccessAction: body.domainAccessAction,
|
|
action: body.action,
|
|
nodeParameters: body.nodeParameters,
|
|
testTriggerNode: body.testTriggerNode,
|
|
answers: body.answers,
|
|
resourceDecision: body.resourceDecision,
|
|
});
|
|
if (!resolved) {
|
|
throw new NotFoundError('Confirmation request not found or not authorized');
|
|
}
|
|
return { ok: true };
|
|
}
|
|
|
|
@Post('/chat/:threadId/cancel')
|
|
@GlobalScope('instanceAi:message')
|
|
async cancel(req: AuthenticatedRequest, _res: Response, @Param('threadId') threadId: string) {
|
|
await this.assertThreadAccess(req.user.id, threadId);
|
|
this.instanceAiService.cancelRun(threadId);
|
|
return { ok: true };
|
|
}
|
|
|
|
@Post('/chat/:threadId/tasks/:taskId/cancel')
|
|
@GlobalScope('instanceAi:message')
|
|
async cancelTask(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
@Param('taskId') taskId: string,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId);
|
|
this.instanceAiService.cancelBackgroundTask(threadId, taskId);
|
|
return { ok: true };
|
|
}
|
|
|
|
@Post('/chat/:threadId/tasks/:taskId/correct')
|
|
@GlobalScope('instanceAi:message')
|
|
async correctTask(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
@Param('taskId') taskId: string,
|
|
@Body payload: InstanceAiCorrectTaskRequest,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId);
|
|
this.instanceAiService.sendCorrectionToTask(threadId, taskId, payload.message);
|
|
return { ok: true };
|
|
}
|
|
|
|
// ── Credits ──────────────────────────────────────────────────────────────
|
|
|
|
@Get('/credits')
|
|
@GlobalScope('instanceAi:message')
|
|
async getCredits(req: AuthenticatedRequest) {
|
|
return await this.instanceAiService.getCredits(req.user);
|
|
}
|
|
|
|
// ── Admin settings (owner/admin only) ──────────────────────────────────
|
|
|
|
@Get('/settings')
|
|
@GlobalScope('instanceAi:manage')
|
|
async getAdminSettings(_req: AuthenticatedRequest) {
|
|
return this.settingsService.getAdminSettings();
|
|
}
|
|
|
|
@Put('/settings')
|
|
@GlobalScope('instanceAi:manage')
|
|
async updateAdminSettings(
|
|
_req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Body payload: InstanceAiAdminSettingsUpdateRequest,
|
|
) {
|
|
return await this.settingsService.updateAdminSettings(payload);
|
|
}
|
|
|
|
// ── User preferences (per-user, self-service) ──────────────────────────
|
|
|
|
@Get('/preferences')
|
|
@GlobalScope('instanceAi:message')
|
|
async getUserPreferences(req: AuthenticatedRequest) {
|
|
return await this.settingsService.getUserPreferences(req.user);
|
|
}
|
|
|
|
@Put('/preferences')
|
|
@GlobalScope('instanceAi:message')
|
|
async updateUserPreferences(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Body payload: InstanceAiUserPreferencesUpdateRequest,
|
|
) {
|
|
const result = await this.settingsService.updateUserPreferences(req.user, payload);
|
|
if (payload.localGatewayDisabled !== undefined) {
|
|
await this.moduleRegistry.refreshModuleSettings('instance-ai');
|
|
}
|
|
return result;
|
|
}
|
|
|
|
@Get('/settings/credentials')
|
|
@GlobalScope('instanceAi:message')
|
|
async listModelCredentials(req: AuthenticatedRequest) {
|
|
return await this.settingsService.listModelCredentials(req.user);
|
|
}
|
|
|
|
@Get('/settings/service-credentials')
|
|
@GlobalScope('instanceAi:manage')
|
|
async listServiceCredentials(req: AuthenticatedRequest) {
|
|
return await this.settingsService.listServiceCredentials(req.user);
|
|
}
|
|
|
|
@Get('/memory/:threadId')
|
|
@GlobalScope('instanceAi:message')
|
|
async getMemory(req: AuthenticatedRequest, _res: Response, @Param('threadId') threadId: string) {
|
|
await this.assertThreadAccess(req.user.id, threadId, { allowNew: true });
|
|
return await this.memoryService.getWorkingMemory(req.user.id, threadId);
|
|
}
|
|
|
|
@Put('/memory/:threadId')
|
|
@GlobalScope('instanceAi:message')
|
|
async updateMemory(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
@Body payload: InstanceAiUpdateMemoryRequest,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId, { allowNew: true });
|
|
await this.memoryService.updateWorkingMemory(req.user.id, threadId, payload.content);
|
|
return { ok: true };
|
|
}
|
|
|
|
@Get('/threads')
|
|
@GlobalScope('instanceAi:message')
|
|
async listThreads(req: AuthenticatedRequest) {
|
|
return await this.memoryService.listThreads(req.user.id);
|
|
}
|
|
|
|
@Post('/threads')
|
|
@GlobalScope('instanceAi:message')
|
|
async ensureThread(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Body payload: InstanceAiEnsureThreadRequest,
|
|
) {
|
|
const requestedThreadId = payload.threadId ?? randomUUID();
|
|
await this.assertThreadAccess(req.user.id, requestedThreadId, { allowNew: true });
|
|
return await this.memoryService.ensureThread(req.user.id, requestedThreadId);
|
|
}
|
|
|
|
@Delete('/threads/:threadId')
|
|
@GlobalScope('instanceAi:message')
|
|
async deleteThread(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId);
|
|
await this.instanceAiService.clearThreadState(threadId);
|
|
await this.memoryService.deleteThread(threadId);
|
|
return { ok: true };
|
|
}
|
|
|
|
@Patch('/threads/:threadId')
|
|
@GlobalScope('instanceAi:message')
|
|
async renameThread(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
@Body payload: InstanceAiRenameThreadRequestDto,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId);
|
|
const thread = await this.memoryService.renameThread(threadId, payload.title);
|
|
return { thread };
|
|
}
|
|
|
|
@Get('/threads/:threadId/messages')
|
|
@GlobalScope('instanceAi:message')
|
|
async getThreadMessages(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
@Query query: InstanceAiThreadMessagesQuery,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId);
|
|
|
|
// ?raw=true returns the old format for the thread inspector
|
|
if (query.raw === 'true') {
|
|
return await this.memoryService.getThreadMessages(req.user.id, threadId, {
|
|
limit: query.limit,
|
|
page: query.page,
|
|
});
|
|
}
|
|
|
|
const result = await this.memoryService.getRichMessages(req.user.id, threadId, {
|
|
limit: query.limit,
|
|
page: query.page,
|
|
});
|
|
|
|
// Include the next SSE event ID so the frontend can skip past events
|
|
// already covered by these historical messages (prevents duplicates)
|
|
const nextEventId = this.eventBus.getNextEventId(threadId);
|
|
return { ...result, nextEventId };
|
|
}
|
|
|
|
@Get('/threads/:threadId/status')
|
|
@GlobalScope('instanceAi:message')
|
|
async getThreadStatus(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
) {
|
|
// Allow new threads — the frontend polls status before the first message is sent
|
|
await this.assertThreadAccess(req.user.id, threadId, { allowNew: true });
|
|
return this.instanceAiService.getThreadStatus(threadId);
|
|
}
|
|
|
|
@Get('/threads/:threadId/context')
|
|
@GlobalScope('instanceAi:message')
|
|
async getThreadContext(
|
|
req: AuthenticatedRequest,
|
|
_res: Response,
|
|
@Param('threadId') threadId: string,
|
|
) {
|
|
await this.assertThreadAccess(req.user.id, threadId, { allowNew: true });
|
|
return await this.memoryService.getThreadContext(req.user.id, threadId);
|
|
}
|
|
|
|
// ── Gateway endpoints (daemon ↔ server) ──────────────────────────────────
|
|
|
|
@Post('/gateway/create-link')
|
|
@GlobalScope('instanceAi:gateway')
|
|
async createGatewayLink(req: AuthenticatedRequest) {
|
|
const token = this.instanceAiService.generatePairingToken(req.user.id);
|
|
const baseUrl = this.instanceBaseUrl.replace(/\/$/, '');
|
|
const command = `npx @n8n/fs-proxy ${baseUrl} ${token}`;
|
|
return { token, command };
|
|
}
|
|
|
|
@Get('/gateway/events', { usesTemplates: true, skipAuth: true })
|
|
async gatewayEvents(req: Request, res: FlushableResponse) {
|
|
const userId = this.validateGatewayApiKey(this.getGatewayKeyHeader(req));
|
|
|
|
(res as unknown as { compress: boolean }).compress = false;
|
|
res.setHeader('Content-Type', 'text/event-stream; charset=UTF-8');
|
|
res.setHeader('Cache-Control', 'no-cache');
|
|
res.setHeader('Connection', 'keep-alive');
|
|
res.setHeader('X-Accel-Buffering', 'no');
|
|
res.flushHeaders();
|
|
|
|
const gateway = this.instanceAiService.getLocalGateway(userId);
|
|
const unsubscribe = gateway.onRequest((event) => {
|
|
res.write(`data: ${JSON.stringify(event)}\n\n`);
|
|
res.flush?.();
|
|
});
|
|
|
|
const keepAlive = setInterval(() => {
|
|
res.write(': ping\n\n');
|
|
res.flush?.();
|
|
}, KEEP_ALIVE_INTERVAL_MS);
|
|
|
|
const cleanup = () => {
|
|
unsubscribe();
|
|
clearInterval(keepAlive);
|
|
this.instanceAiService.startDisconnectTimer(userId, () => {
|
|
this.push.sendToUsers(
|
|
{
|
|
type: 'instanceAiGatewayStateChanged',
|
|
data: {
|
|
connected: false,
|
|
directory: null,
|
|
hostIdentifier: null,
|
|
toolCategories: [],
|
|
},
|
|
},
|
|
[userId],
|
|
);
|
|
});
|
|
};
|
|
req.once('close', cleanup);
|
|
res.once('finish', cleanup);
|
|
}
|
|
|
|
@Post('/gateway/init', { skipAuth: true })
|
|
gatewayInit(req: Request) {
|
|
const key = this.getGatewayKeyHeader(req);
|
|
const userId = this.validateGatewayApiKey(key);
|
|
|
|
const parsed = instanceAiGatewayCapabilitiesSchema.safeParse(req.body);
|
|
if (!parsed.success) {
|
|
throw new BadRequestError(parsed.error.message);
|
|
}
|
|
this.instanceAiService.initGateway(userId, parsed.data);
|
|
|
|
this.push.sendToUsers(
|
|
{
|
|
type: 'instanceAiGatewayStateChanged',
|
|
data: {
|
|
connected: true,
|
|
directory: parsed.data.rootPath,
|
|
hostIdentifier: parsed.data.hostIdentifier ?? null,
|
|
toolCategories: parsed.data.toolCategories ?? [],
|
|
},
|
|
},
|
|
[userId],
|
|
);
|
|
|
|
// Try to consume a pairing token and upgrade to a session key
|
|
const sessionKey = key ? this.instanceAiService.consumePairingToken(userId, key) : null;
|
|
if (sessionKey) {
|
|
return { ok: true, sessionKey };
|
|
}
|
|
return { ok: true };
|
|
}
|
|
|
|
@Post('/gateway/disconnect', { skipAuth: true })
|
|
gatewayDisconnect(req: Request) {
|
|
const userId = this.validateGatewayApiKey(this.getGatewayKeyHeader(req));
|
|
|
|
this.instanceAiService.clearDisconnectTimer(userId);
|
|
this.instanceAiService.disconnectGateway(userId);
|
|
this.instanceAiService.clearActiveSessionKey(userId);
|
|
this.push.sendToUsers(
|
|
{
|
|
type: 'instanceAiGatewayStateChanged',
|
|
data: { connected: false, directory: null, hostIdentifier: null, toolCategories: [] },
|
|
},
|
|
[userId],
|
|
);
|
|
return { ok: true };
|
|
}
|
|
|
|
@Post('/gateway/response/:requestId', { skipAuth: true })
|
|
gatewayResponse(req: Request, _res: Response, @Param('requestId') requestId: string) {
|
|
const userId = this.validateGatewayApiKey(this.getGatewayKeyHeader(req));
|
|
|
|
const parsed = instanceAiFilesystemResponseSchema.safeParse(req.body);
|
|
if (!parsed.success) {
|
|
throw new BadRequestError(parsed.error.message);
|
|
}
|
|
const resolved = this.instanceAiService.resolveGatewayRequest(
|
|
userId,
|
|
requestId,
|
|
parsed.data.result,
|
|
parsed.data.error,
|
|
);
|
|
if (!resolved) {
|
|
throw new NotFoundError('Gateway request not found or already resolved');
|
|
}
|
|
return { ok: true };
|
|
}
|
|
|
|
@Get('/gateway/status')
|
|
@GlobalScope('instanceAi:gateway')
|
|
async gatewayStatus(req: AuthenticatedRequest) {
|
|
return this.instanceAiService.getGatewayStatus(req.user.id);
|
|
}
|
|
|
|
// ── Helpers ──────────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Verify thread ownership. Throws ForbiddenError if another user owns it.
|
|
* @param allowNew When true, a non-existent thread is permitted (new conversation).
|
|
*/
|
|
private async assertThreadAccess(
|
|
userId: string,
|
|
threadId: string,
|
|
options?: { allowNew?: boolean },
|
|
): Promise<void> {
|
|
const ownership = await this.memoryService.checkThreadOwnership(userId, threadId);
|
|
if (ownership === 'other_user') {
|
|
throw new ForbiddenError('Not authorized for this thread');
|
|
}
|
|
if (!options?.allowNew && ownership === 'not_found') {
|
|
throw new NotFoundError('Thread not found');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Safely extract and validate the x-gateway-key header value.
|
|
* Headers can be string | string[] | undefined — take only the first value
|
|
* and validate against the shared gateway key schema.
|
|
*/
|
|
private getGatewayKeyHeader(req: Request): string | undefined {
|
|
const raw = req.headers['x-gateway-key'];
|
|
const value = Array.isArray(raw) ? raw[0] : raw;
|
|
const parsed = instanceAiGatewayKeySchema.safeParse(value);
|
|
return parsed.success ? parsed.data : undefined;
|
|
}
|
|
|
|
/**
|
|
* Validate the gateway API key from query param or header.
|
|
* Accepts: static env var key, one-time pairing token (init only), or active session key.
|
|
* Returns the userId associated with the key.
|
|
*/
|
|
private validateGatewayApiKey(key: string | undefined): string {
|
|
if (!key) {
|
|
throw new ForbiddenError('Missing API key');
|
|
}
|
|
const actual = Buffer.from(key);
|
|
|
|
// Check static env var key — out of user-scoped flow, uses a sentinel userId
|
|
if (this.gatewayApiKey) {
|
|
const expected = Buffer.from(this.gatewayApiKey);
|
|
if (expected.length === actual.length && timingSafeEqual(expected, actual)) {
|
|
return 'env-gateway';
|
|
}
|
|
}
|
|
|
|
// Check per-user pairing token or session key via reverse lookup
|
|
const userId = this.instanceAiService.getUserIdForApiKey(key);
|
|
if (userId) return userId;
|
|
|
|
throw new ForbiddenError('Invalid API key');
|
|
}
|
|
|
|
private writeSseEvent(res: FlushableResponse, stored: StoredEvent): void {
|
|
// No `event:` field — events are discriminated by data.type per streaming-protocol.md
|
|
res.write(`id: ${stored.id}\ndata: ${JSON.stringify(stored.event)}\n\n`);
|
|
res.flush?.();
|
|
}
|
|
}
|