import { FREE_AI_CREDITS_CREDENTIAL_NAME, STREAM_SEPARATOR } from '@/constants'; import type { CreateCredentialDto } from '@n8n/api-types'; import { AiChatRequestDto, AiApplySuggestionRequestDto, AiAskRequestDto, AiFreeCreditsRequestDto, AiBuilderChatRequestDto, AiSessionRetrievalRequestDto, AiUsageSettingsRequestDto, AiTruncateMessagesRequestDto, AiClearSessionRequestDto, } from '@n8n/api-types'; import { AuthenticatedRequest } from '@n8n/db'; import { Body, Get, Licensed, Post, RestController, GlobalScope } from '@n8n/decorators'; import { type AiAssistantSDK, APIResponseError } from '@n8n_io/ai-assistant-sdk'; import { Response } from 'express'; import { OPEN_AI_API_CREDENTIAL_TYPE } from 'n8n-workflow'; import { strict as assert } from 'node:assert'; import { WritableStream } from 'node:stream/web'; import { CredentialsService } from '@/credentials/credentials.service'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { ContentTooLargeError } from '@/errors/response-errors/content-too-large.error'; import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { TooManyRequestsError } from '@/errors/response-errors/too-many-requests.error'; import { AiUsageService } from '@/services/ai-usage.service'; import { WorkflowBuilderService } from '@/services/ai-workflow-builder.service'; import { AiService } from '@/services/ai.service'; import { UserService } from '@/services/user.service'; export type FlushableResponse = Response & { flush: () => void }; @RestController('/ai') export class AiController { constructor( private readonly aiService: AiService, private readonly workflowBuilderService: WorkflowBuilderService, private readonly credentialsService: CredentialsService, private readonly userService: UserService, private readonly aiUsageService: AiUsageService, ) {} // Use usesTemplates flag to bypass the send() wrapper which would cause // "Cannot set headers after they are sent" error for streaming responses. // This ensures errors during streaming are handled within the stream itself. @Licensed('feat:aiBuilder') @Post('/build', { ipRateLimit: { limit: 100 }, usesTemplates: true }) async build( req: AuthenticatedRequest, res: FlushableResponse, @Body payload: AiBuilderChatRequestDto, ) { try { const abortController = new AbortController(); const { signal } = abortController; const handleClose = () => abortController.abort(); res.on('close', handleClose); const { id, text, workflowContext, featureFlags, versionId } = payload.payload; const aiResponse = this.workflowBuilderService.chat( { id, message: text, workflowContext: { currentWorkflow: workflowContext.currentWorkflow, executionData: workflowContext.executionData, executionSchema: workflowContext.executionSchema, expressionValues: workflowContext.expressionValues, valuesExcluded: workflowContext.valuesExcluded, pinnedNodes: workflowContext.pinnedNodes, selectedNodes: workflowContext.selectedNodes, }, featureFlags, versionId, mode: payload.payload.mode, resumeData: payload.payload.resumeData, }, req.user, signal, ); res.header('Content-type', 'application/json-lines').flush(); try { // Handle the stream for await (const chunk of aiResponse) { res.flush(); res.write(JSON.stringify(chunk) + STREAM_SEPARATOR); } } catch (streamError) { // If an error occurs during streaming, send it as part of the stream // This prevents "Cannot set headers after they are sent" error assert(streamError instanceof Error); // Send error as proper error type now that frontend supports it const errorChunk = { messages: [ { role: 'assistant', type: 'error', content: streamError.message, }, ], }; res.write(JSON.stringify(errorChunk) + STREAM_SEPARATOR); } finally { // Clean up event listener res.off('close', handleClose); } res.end(); } catch (e) { // This catch block handles errors that occur before streaming starts // Since headers haven't been sent yet, we can still send a proper error response assert(e instanceof Error); if (!res.headersSent) { res.status(500).json({ code: 500, message: e.message, }); } else { // If headers were already sent dont't send a second error response res.end(); } } } @Post('/chat', { ipRateLimit: { limit: 100 } }) async chat(req: AuthenticatedRequest, res: FlushableResponse, @Body payload: AiChatRequestDto) { try { const abortController = new AbortController(); const { signal } = abortController; const handleClose = () => abortController.abort(); res.on('close', handleClose); const aiResponse = await this.aiService.chat(payload, req.user); if (aiResponse.body) { res.header('Content-type', 'application/json-lines').flush(); try { await aiResponse.body.pipeTo( new WritableStream({ write(chunk) { res.write(chunk); res.flush(); }, }), { signal }, ); } finally { res.off('close', handleClose); } res.end(); } } catch (e) { if (e instanceof DOMException && e.name === 'AbortError') { return; } assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Post('/chat/apply-suggestion') async applySuggestion( req: AuthenticatedRequest, _: Response, @Body payload: AiApplySuggestionRequestDto, ): Promise { try { return await this.aiService.applySuggestion(payload, req.user); } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Post('/ask-ai') async askAi( req: AuthenticatedRequest, _: Response, @Body payload: AiAskRequestDto, ): Promise { try { return await this.aiService.askAi(payload, req.user); } catch (e) { if (e instanceof APIResponseError) { switch (e.statusCode) { case 413: throw new ContentTooLargeError(e.message); case 429: throw new TooManyRequestsError(e.message); case 400: throw new BadRequestError(e.message); default: throw new InternalServerError(e.message, e); } } assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Post('/free-credits') async aiCredits(req: AuthenticatedRequest, _: Response, @Body payload: AiFreeCreditsRequestDto) { try { const aiCredits = await this.aiService.createFreeAiCredits(req.user); const credentialProperties: CreateCredentialDto = { name: FREE_AI_CREDITS_CREDENTIAL_NAME, type: OPEN_AI_API_CREDENTIAL_TYPE, data: { apiKey: aiCredits.apiKey, url: aiCredits.url, }, projectId: payload?.projectId, }; const newCredential = await this.credentialsService.createManagedCredential( credentialProperties, req.user, ); await this.userService.updateSettings(req.user.id, { userClaimedAiCredits: true, }); return newCredential; } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Licensed('feat:aiBuilder') @Post('/sessions', { ipRateLimit: { limit: 100 } }) async getSessions( req: AuthenticatedRequest, _: Response, @Body payload: AiSessionRetrievalRequestDto, ) { try { const sessions = await this.workflowBuilderService.getSessions( payload.workflowId, req.user, payload.codeBuilder, ); return sessions; } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Licensed('feat:aiBuilder') @Get('/build/credits') async getBuilderCredits( req: AuthenticatedRequest, _: Response, ): Promise { try { return await this.workflowBuilderService.getBuilderInstanceCredits(req.user); } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Licensed('feat:aiBuilder') @Post('/build/truncate-messages', { ipRateLimit: { limit: 100 } }) async truncateMessages( req: AuthenticatedRequest, _: Response, @Body payload: AiTruncateMessagesRequestDto, ): Promise<{ success: boolean }> { try { const success = await this.workflowBuilderService.truncateMessagesAfter( payload.workflowId, req.user, payload.messageId, payload.codeBuilder, ); return { success }; } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Licensed('feat:aiBuilder') @Post('/build/clear-session', { ipRateLimit: { limit: 100 } }) async clearSession( req: AuthenticatedRequest, _: Response, @Body payload: AiClearSessionRequestDto, ): Promise<{ success: boolean }> { try { await this.workflowBuilderService.clearSession(payload.workflowId, req.user); return { success: true }; } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } @Post('/usage-settings') @GlobalScope('aiAssistant:manage') async updateUsageSettings( _req: AuthenticatedRequest, _res: Response, @Body payload: AiUsageSettingsRequestDto, ): Promise { try { await this.aiUsageService.updateAiUsageSettings(payload.allowSendingParameterValues); } catch (e) { assert(e instanceof Error); throw new InternalServerError(e.message, e); } } }