mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
fix(core): Stop workflow builder after terminal remediation (#30289)
Some checks are pending
Build: Benchmark Image / build (push) Waiting to run
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Util: Sync API Docs / sync-public-api (push) Waiting to run
Some checks are pending
Build: Benchmark Image / build (push) Waiting to run
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Util: Sync API Docs / sync-public-api (push) Waiting to run
This commit is contained in:
parent
d06110ba9d
commit
22f2e34fe6
|
|
@ -2,6 +2,7 @@ import type { Workspace } from '@mastra/core/workspace';
|
|||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type { BuilderWorkspace } from '../workspace/builder-sandbox-factory';
|
||||
import type { FilesystemMutationGuardSetter } from '../workspace/guarded-filesystem';
|
||||
|
||||
interface BuilderSandboxSessionInternal {
|
||||
sessionId: string;
|
||||
|
|
@ -13,6 +14,7 @@ interface BuilderSandboxSessionInternal {
|
|||
workspace: Workspace;
|
||||
root: string;
|
||||
cleanup: () => Promise<void>;
|
||||
setFilesystemMutationGuard?: FilesystemMutationGuardSetter;
|
||||
busy: boolean;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
|
|
@ -29,6 +31,7 @@ export interface BuilderSandboxSession {
|
|||
builderResourceId: string;
|
||||
workspace: Workspace;
|
||||
root: string;
|
||||
setFilesystemMutationGuard?: FilesystemMutationGuardSetter;
|
||||
busy: boolean;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
|
|
@ -50,7 +53,7 @@ function sessionKey(threadId: string, value: string): string {
|
|||
}
|
||||
|
||||
function toPublicSession(session: BuilderSandboxSessionInternal): BuilderSandboxSession {
|
||||
return {
|
||||
const publicSession: BuilderSandboxSession = {
|
||||
sessionId: session.sessionId,
|
||||
threadId: session.threadId,
|
||||
workflowId: session.workflowId,
|
||||
|
|
@ -64,6 +67,10 @@ function toPublicSession(session: BuilderSandboxSessionInternal): BuilderSandbox
|
|||
updatedAt: session.updatedAt,
|
||||
expiresAt: session.expiresAt,
|
||||
};
|
||||
if (session.setFilesystemMutationGuard) {
|
||||
publicSession.setFilesystemMutationGuard = session.setFilesystemMutationGuard;
|
||||
}
|
||||
return publicSession;
|
||||
}
|
||||
|
||||
export class BuilderSandboxSessionRegistry {
|
||||
|
|
@ -109,6 +116,9 @@ export class BuilderSandboxSessionRegistry {
|
|||
updatedAt: now,
|
||||
expiresAt: now + this.ttlMs,
|
||||
};
|
||||
if (input.builderWorkspace.setFilesystemMutationGuard) {
|
||||
session.setFilesystemMutationGuard = input.builderWorkspace.setFilesystemMutationGuard;
|
||||
}
|
||||
|
||||
this.sessions.set(session.sessionId, session);
|
||||
if (session.workflowId) {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import type {
|
|||
const {
|
||||
recordSuccessfulWorkflowBuilds,
|
||||
resultFromPostStreamError,
|
||||
resultFromTerminalRemediation,
|
||||
resultFromLaterFailedMainSubmit,
|
||||
attemptFromAutoResubmit,
|
||||
withTerminalLoopState,
|
||||
|
|
@ -563,6 +564,51 @@ describe('resultFromPostStreamError', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('resultFromTerminalRemediation', () => {
|
||||
it('returns terminal remediation without requiring a final auto-resubmit', () => {
|
||||
const remediation = createRemediation({
|
||||
category: 'blocked',
|
||||
shouldEdit: false,
|
||||
reason: 'workflow_save_failed',
|
||||
guidance: 'Stop editing.',
|
||||
});
|
||||
const submitAttempts: SubmitWorkflowAttempt[] = [
|
||||
{
|
||||
filePath: MAIN_PATH,
|
||||
sourceHash: 'a',
|
||||
success: true,
|
||||
workflowId: 'WF_123',
|
||||
},
|
||||
{
|
||||
filePath: MAIN_PATH,
|
||||
sourceHash: 'b',
|
||||
success: false,
|
||||
errors: ['Workflow save failed.'],
|
||||
remediation,
|
||||
},
|
||||
];
|
||||
|
||||
const result = resultFromTerminalRemediation({
|
||||
remediation,
|
||||
submitAttempts,
|
||||
mainWorkflowPath: MAIN_PATH,
|
||||
workItemId: 'wi_test',
|
||||
runId: 'run_test',
|
||||
taskId: 'task_test',
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
text: 'Stop editing.',
|
||||
outcome: {
|
||||
submitted: true,
|
||||
workflowId: 'WF_123',
|
||||
blockingReason: 'Stop editing.',
|
||||
remediation,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('supportingWorkflowIdsFromSubmitAttempts', () => {
|
||||
it('collects referenced successful non-main workflow IDs once in submit order', () => {
|
||||
const submitAttempts: SubmitWorkflowAttempt[] = [
|
||||
|
|
|
|||
|
|
@ -418,6 +418,7 @@ n8n normalizes column names to snake_case (e.g., \`dayName\` → \`day_name\`).
|
|||
8. **Submit**: When tsc passes cleanly, call \`submit-workflow\` to validate the workflow graph and save it to n8n.
|
||||
|
||||
9. **Fix submission errors**: If \`submit-workflow\` returns errors, edit the file and submit again immediately. Skip tsc for validation-only errors. **Never end your turn on a file edit — always re-submit first.** The system compares file hashes: if the file changed since the last submit, all your work is discarded. End only on a successful re-submit or after you explicitly report the blocking error.
|
||||
If remediation includes \`shouldEdit: false\`, stop immediately and report its guidance. Do not edit files, run commands, or call \`submit-workflow\` again.
|
||||
|
||||
10. **Done**: Output ONE sentence summarizing what was built, including the workflow ID and any known issues.
|
||||
|
||||
|
|
|
|||
|
|
@ -46,6 +46,8 @@ import type { BackgroundTaskResult, InstanceAiContext, OrchestrationContext } fr
|
|||
import { SDK_IMPORT_STATEMENT } from '../../workflow-builder/extract-code';
|
||||
import {
|
||||
createRemediation,
|
||||
createTerminalRemediationGuard,
|
||||
type RemediationMetadata,
|
||||
type TriggerType,
|
||||
type WorkflowBuildOutcome,
|
||||
type WorkflowSetupRequirement,
|
||||
|
|
@ -556,6 +558,19 @@ function hashContent(content: string | null): string {
|
|||
.digest('hex');
|
||||
}
|
||||
|
||||
function createLinkedAbortController(parentSignal: AbortSignal): AbortController {
|
||||
const controller = new AbortController();
|
||||
if (parentSignal.aborted) {
|
||||
controller.abort(parentSignal.reason);
|
||||
return controller;
|
||||
}
|
||||
|
||||
parentSignal.addEventListener('abort', () => controller.abort(parentSignal.reason), {
|
||||
once: true,
|
||||
});
|
||||
return controller;
|
||||
}
|
||||
|
||||
function deterministicSuffix(seed: string, label: string, length: number): string {
|
||||
return createHash('sha256')
|
||||
.update(label)
|
||||
|
|
@ -683,6 +698,47 @@ export function resultFromPostStreamError(input: {
|
|||
};
|
||||
}
|
||||
|
||||
export function resultFromTerminalRemediation(input: {
|
||||
remediation: RemediationMetadata;
|
||||
submitAttempts: SubmitWorkflowAttempt[];
|
||||
mainWorkflowPath: string;
|
||||
workItemId: string;
|
||||
runId: string;
|
||||
taskId: string;
|
||||
}): { text: string; outcome: WorkflowBuildOutcome } {
|
||||
const latestAttempt = latestMainSubmit(input.submitAttempts, input.mainWorkflowPath);
|
||||
const attempt =
|
||||
latestAttempt &&
|
||||
!latestAttempt.success &&
|
||||
shouldRecoverSavedWorkflowAfterFailedSubmit(latestAttempt)
|
||||
? (latestSuccessfulMainSubmit(input.submitAttempts, input.mainWorkflowPath) ?? latestAttempt)
|
||||
: latestAttempt;
|
||||
const text = input.remediation.guidance;
|
||||
const outcome = buildOutcome(
|
||||
input.workItemId,
|
||||
input.runId,
|
||||
input.taskId,
|
||||
attempt,
|
||||
text,
|
||||
supportingWorkflowIdsFromSubmitAttempts(
|
||||
input.submitAttempts,
|
||||
input.mainWorkflowPath,
|
||||
attempt?.workflowId,
|
||||
attempt?.referencedWorkflowIds,
|
||||
),
|
||||
);
|
||||
|
||||
return {
|
||||
text,
|
||||
outcome: withDeterministicRouting({
|
||||
...outcome,
|
||||
needsUserInput: outcome.needsUserInput || input.remediation.category === 'needs_setup',
|
||||
blockingReason: input.remediation.guidance,
|
||||
remediation: input.remediation,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
async function getWorkflowNodeSummaries(
|
||||
context: InstanceAiContext | undefined,
|
||||
workflowId: string | undefined,
|
||||
|
|
@ -1016,6 +1072,19 @@ export async function startBuildWorkflowAgentTask(
|
|||
// Append-only history so a later failed submit for the main path
|
||||
// cannot mask an earlier successful submit during post-error recovery.
|
||||
const submitAttemptHistory: SubmitWorkflowAttempt[] = [];
|
||||
const builderAbortController = createLinkedAbortController(signal);
|
||||
const terminalRemediationGuard = createTerminalRemediationGuard((remediation) => {
|
||||
context.trackTelemetry?.('Builder terminal remediation reached', {
|
||||
thread_id: context.threadId,
|
||||
run_id: context.runId,
|
||||
work_item_id: workItemId,
|
||||
category: remediation.category,
|
||||
attempt_count: remediation.attemptCount,
|
||||
reason: remediation.reason,
|
||||
});
|
||||
builderAbortController.abort(new Error(remediation.guidance));
|
||||
});
|
||||
let clearFilesystemMutationGuard: (() => void) | undefined;
|
||||
try {
|
||||
if (useSandbox) {
|
||||
let workspace: BuilderWorkspace['workspace'];
|
||||
|
|
@ -1058,6 +1127,31 @@ export async function startBuildWorkflowAgentTask(
|
|||
}
|
||||
|
||||
const mainWorkflowPath = `${root}/src/workflow.ts`;
|
||||
const setFilesystemMutationGuard =
|
||||
activeBuilderSession?.setFilesystemMutationGuard ??
|
||||
builderWs?.setFilesystemMutationGuard;
|
||||
if (setFilesystemMutationGuard) {
|
||||
setFilesystemMutationGuard(() => terminalRemediationGuard.get());
|
||||
clearFilesystemMutationGuard = () => setFilesystemMutationGuard(undefined);
|
||||
}
|
||||
const finishTerminalRemediation = async (remediation: RemediationMetadata) => {
|
||||
const terminalResult = resultFromTerminalRemediation({
|
||||
remediation,
|
||||
submitAttempts: submitAttemptHistory,
|
||||
mainWorkflowPath,
|
||||
workItemId,
|
||||
runId: context.runId,
|
||||
taskId,
|
||||
});
|
||||
if (terminalResult.outcome.submitted && terminalResult.outcome.workflowId) {
|
||||
await promoteMainWorkflow(
|
||||
domainContext,
|
||||
context.logger,
|
||||
terminalResult.outcome.workflowId,
|
||||
);
|
||||
}
|
||||
return await finalizeBuildResult(context, workItemId, terminalResult);
|
||||
};
|
||||
builderTools['submit-workflow'] = createIdentityEnforcedSubmitWorkflowTool({
|
||||
context: domainContext,
|
||||
workspace,
|
||||
|
|
@ -1067,6 +1161,7 @@ export async function startBuildWorkflowAgentTask(
|
|||
tracingRoot: traceContext?.rootRun,
|
||||
getWorkflowLoopState: async () =>
|
||||
await context.workflowTaskService?.getWorkflowLoopState(workItemId),
|
||||
getTerminalRemediation: () => terminalRemediationGuard.get(),
|
||||
onGuardFired: (event) => {
|
||||
context.trackTelemetry?.('Builder remediation guard fired', {
|
||||
thread_id: context.threadId,
|
||||
|
|
@ -1078,6 +1173,9 @@ export async function startBuildWorkflowAgentTask(
|
|||
reason: event.reason,
|
||||
});
|
||||
},
|
||||
onTerminalRemediation: (remediation) => {
|
||||
terminalRemediationGuard.record(remediation);
|
||||
},
|
||||
onAttempt: async (attempt) => {
|
||||
submitAttempts.set(attempt.filePath, attempt);
|
||||
submitAttemptHistory.push(attempt);
|
||||
|
|
@ -1165,7 +1263,7 @@ export async function startBuildWorkflowAgentTask(
|
|||
};
|
||||
const stream = await subAgent.stream(briefing, {
|
||||
maxSteps: MAX_STEPS.BUILDER,
|
||||
abortSignal: signal,
|
||||
abortSignal: builderAbortController.signal,
|
||||
modelSettings: { temperature: TEMPERATURE.BUILDER },
|
||||
providerOptions: {
|
||||
anthropic: { cacheControl: { type: 'ephemeral' } },
|
||||
|
|
@ -1188,7 +1286,7 @@ export async function startBuildWorkflowAgentTask(
|
|||
eventBus: context.eventBus,
|
||||
logger: context.logger,
|
||||
threadId: context.threadId,
|
||||
abortSignal: signal,
|
||||
abortSignal: builderAbortController.signal,
|
||||
waitForConfirmation: context.waitForConfirmation,
|
||||
drainCorrections,
|
||||
waitForCorrection,
|
||||
|
|
@ -1199,8 +1297,18 @@ export async function startBuildWorkflowAgentTask(
|
|||
});
|
||||
});
|
||||
|
||||
const terminalRemediation = terminalRemediationGuard.get();
|
||||
if (terminalRemediation) {
|
||||
return await finishTerminalRemediation(terminalRemediation);
|
||||
}
|
||||
|
||||
finalText = await hitlResult.text;
|
||||
} catch (error) {
|
||||
const terminalRemediation = terminalRemediationGuard.get();
|
||||
if (terminalRemediation) {
|
||||
return await finishTerminalRemediation(terminalRemediation);
|
||||
}
|
||||
|
||||
const recovered = resultFromPostStreamError({
|
||||
error,
|
||||
submitAttempts: submitAttemptHistory,
|
||||
|
|
@ -1220,6 +1328,11 @@ export async function startBuildWorkflowAgentTask(
|
|||
throw error;
|
||||
}
|
||||
|
||||
const terminalRemediation = terminalRemediationGuard.get();
|
||||
if (terminalRemediation) {
|
||||
return await finishTerminalRemediation(terminalRemediation);
|
||||
}
|
||||
|
||||
const mainWorkflowAttempt = submitAttempts.get(mainWorkflowPath);
|
||||
const currentMainWorkflow = await readFileViaSandbox(workspace, mainWorkflowPath);
|
||||
const currentMainWorkflowHash = hashContent(currentMainWorkflow);
|
||||
|
|
@ -1481,6 +1594,7 @@ export async function startBuildWorkflowAgentTask(
|
|||
await promoteMainWorkflow(domainContext, context.logger, fallbackMainWorkflowId);
|
||||
return { text: toolFinalText };
|
||||
} finally {
|
||||
clearFilesystemMutationGuard?.();
|
||||
if (activeBuilderSession && context.builderSandboxSessionRegistry) {
|
||||
await context.builderSandboxSessionRegistry.release(activeBuilderSession.sessionId, {
|
||||
keep: !signal.aborted,
|
||||
|
|
|
|||
|
|
@ -232,6 +232,82 @@ describe('wrapSubmitExecuteWithIdentity', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('records terminal submit output and blocks later submits in-process', async () => {
|
||||
let terminalRemediation: SubmitWorkflowOutput['remediation'];
|
||||
const remediation = createRemediation({
|
||||
category: 'blocked',
|
||||
shouldEdit: false,
|
||||
reason: 'workflow_save_failed',
|
||||
guidance: 'Stop editing.',
|
||||
});
|
||||
const execute = jest
|
||||
.fn<Promise<SubmitWorkflowOutput>, [SubmitWorkflowInput]>()
|
||||
.mockResolvedValueOnce({
|
||||
success: false,
|
||||
errors: ['Workflow save failed.'],
|
||||
remediation,
|
||||
})
|
||||
.mockResolvedValueOnce({ success: true, workflowId: 'wf_should_not_save' });
|
||||
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath, {
|
||||
getTerminalRemediation: () => terminalRemediation,
|
||||
onTerminalRemediation: (recorded) => {
|
||||
terminalRemediation = recorded;
|
||||
},
|
||||
});
|
||||
|
||||
const first = await wrapped({});
|
||||
const second = await wrapped({});
|
||||
|
||||
expect(first.remediation).toBe(remediation);
|
||||
expect(second).toMatchObject({
|
||||
success: false,
|
||||
errors: ['Stop editing.'],
|
||||
remediation,
|
||||
});
|
||||
expect(execute).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('returns terminal remediation to concurrent submit waiters when the first submit stops editing', async () => {
|
||||
let release: () => void = () => {};
|
||||
const gate = new Promise<void>((res) => {
|
||||
release = res;
|
||||
});
|
||||
let terminalRemediation: SubmitWorkflowOutput['remediation'];
|
||||
const remediation = createRemediation({
|
||||
category: 'blocked',
|
||||
shouldEdit: false,
|
||||
reason: 'workflow_save_failed',
|
||||
guidance: 'Stop editing.',
|
||||
});
|
||||
const execute = jest.fn(async (): Promise<SubmitWorkflowOutput> => {
|
||||
await gate;
|
||||
return {
|
||||
success: false,
|
||||
errors: ['Workflow save failed.'],
|
||||
remediation,
|
||||
};
|
||||
});
|
||||
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath, {
|
||||
getTerminalRemediation: () => terminalRemediation,
|
||||
onTerminalRemediation: (recorded) => {
|
||||
terminalRemediation = recorded;
|
||||
},
|
||||
});
|
||||
|
||||
const first = wrapped({});
|
||||
const second = wrapped({});
|
||||
await Promise.resolve();
|
||||
release();
|
||||
|
||||
await expect(first).resolves.toMatchObject({ success: false, remediation });
|
||||
await expect(second).resolves.toMatchObject({
|
||||
success: false,
|
||||
errors: ['Stop editing.'],
|
||||
remediation,
|
||||
});
|
||||
expect(execute).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('ignores terminal remediation from a previous run', async () => {
|
||||
const execute = jest.fn(async (): Promise<SubmitWorkflowOutput> => {
|
||||
await Promise.resolve();
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ export type SubmitExecute = (input: SubmitWorkflowInput) => Promise<SubmitWorkfl
|
|||
|
||||
interface SubmitGuardOptions {
|
||||
getWorkflowLoopState?: () => Promise<WorkflowLoopState | undefined>;
|
||||
getTerminalRemediation?: () => RemediationMetadata | undefined;
|
||||
currentRunId?: string;
|
||||
onGuardFired?: (event: {
|
||||
workflowId?: string;
|
||||
|
|
@ -49,6 +50,7 @@ interface SubmitGuardOptions {
|
|||
attemptCount?: number;
|
||||
reason?: string;
|
||||
}) => void;
|
||||
onTerminalRemediation?: (remediation: RemediationMetadata) => void;
|
||||
}
|
||||
|
||||
interface SubmitBudgetTracker {
|
||||
|
|
@ -128,10 +130,9 @@ export function wrapSubmitExecuteWithIdentity(
|
|||
async function blockedByTerminalRemediation(
|
||||
workflowId: string | undefined,
|
||||
): Promise<SubmitWorkflowOutput | undefined> {
|
||||
const terminalRemediation = terminalRemediationFromState(
|
||||
await options.getWorkflowLoopState?.(),
|
||||
options.currentRunId,
|
||||
);
|
||||
const terminalRemediation =
|
||||
options.getTerminalRemediation?.() ??
|
||||
terminalRemediationFromState(await options.getWorkflowLoopState?.(), options.currentRunId);
|
||||
if (!terminalRemediation) return undefined;
|
||||
|
||||
options.onGuardFired?.({
|
||||
|
|
@ -147,6 +148,14 @@ export function wrapSubmitExecuteWithIdentity(
|
|||
};
|
||||
}
|
||||
|
||||
function applyOutputGuards(path: string, output: SubmitWorkflowOutput): SubmitWorkflowOutput {
|
||||
const guardedOutput = options.budgetTracker?.applyToOutput(path, output) ?? output;
|
||||
if (guardedOutput.remediation?.shouldEdit === false) {
|
||||
options.onTerminalRemediation?.(guardedOutput.remediation);
|
||||
}
|
||||
return guardedOutput;
|
||||
}
|
||||
|
||||
return async (input) => {
|
||||
const resolvedPath = resolvePath(input.filePath);
|
||||
const terminalResult = await blockedByTerminalRemediation(input.workflowId);
|
||||
|
|
@ -159,6 +168,9 @@ export function wrapSubmitExecuteWithIdentity(
|
|||
try {
|
||||
boundId = await existing;
|
||||
} catch (error) {
|
||||
const terminalAfterFailure = await blockedByTerminalRemediation(input.workflowId);
|
||||
if (terminalAfterFailure) return terminalAfterFailure;
|
||||
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
return {
|
||||
success: false,
|
||||
|
|
@ -176,7 +188,7 @@ export function wrapSubmitExecuteWithIdentity(
|
|||
if (terminalAfterWait) return terminalAfterWait;
|
||||
|
||||
const result = await underlying({ ...input, workflowId: boundId });
|
||||
return options.budgetTracker?.applyToOutput(resolvedPath, result) ?? result;
|
||||
return applyOutputGuards(resolvedPath, result);
|
||||
}
|
||||
|
||||
let resolveFn: ((id: string) => void) | undefined;
|
||||
|
|
@ -198,7 +210,7 @@ export function wrapSubmitExecuteWithIdentity(
|
|||
rejectFn?.(new Error(result.errors?.join(' ') ?? 'submit-workflow failed'));
|
||||
pending.delete(resolvedPath);
|
||||
}
|
||||
return options.budgetTracker?.applyToOutput(resolvedPath, result) ?? result;
|
||||
return applyOutputGuards(resolvedPath, result);
|
||||
} catch (error) {
|
||||
rejectFn?.(error);
|
||||
pending.delete(resolvedPath);
|
||||
|
|
@ -219,7 +231,9 @@ export function createIdentityEnforcedSubmitWorkflowTool(args: {
|
|||
root: string;
|
||||
currentRunId?: string;
|
||||
getWorkflowLoopState?: () => Promise<WorkflowLoopState | undefined>;
|
||||
getTerminalRemediation?: SubmitGuardOptions['getTerminalRemediation'];
|
||||
onGuardFired?: SubmitGuardOptions['onGuardFired'];
|
||||
onTerminalRemediation?: SubmitGuardOptions['onTerminalRemediation'];
|
||||
tracingRoot?: InstanceAiTraceRun;
|
||||
}) {
|
||||
const budgetTracker = createPreSaveBudgetTracker();
|
||||
|
|
@ -245,7 +259,9 @@ export function createIdentityEnforcedSubmitWorkflowTool(args: {
|
|||
budgetTracker,
|
||||
currentRunId: args.currentRunId,
|
||||
getWorkflowLoopState: args.getWorkflowLoopState,
|
||||
getTerminalRemediation: args.getTerminalRemediation,
|
||||
onGuardFired: args.onGuardFired,
|
||||
onTerminalRemediation: args.onTerminalRemediation,
|
||||
},
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -52,3 +52,7 @@ export {
|
|||
remainingPostSubmitRemediations,
|
||||
terminalRemediationFromState,
|
||||
} from './remediation';
|
||||
export {
|
||||
createTerminalRemediationGuard,
|
||||
type TerminalRemediationGuard,
|
||||
} from './terminal-remediation-guard';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
import type { RemediationMetadata } from './workflow-loop-state';
|
||||
|
||||
export interface TerminalRemediationGuard {
|
||||
get(): RemediationMetadata | undefined;
|
||||
record(remediation: RemediationMetadata | undefined): RemediationMetadata | undefined;
|
||||
}
|
||||
|
||||
export function createTerminalRemediationGuard(
|
||||
onTerminal?: (remediation: RemediationMetadata) => void,
|
||||
): TerminalRemediationGuard {
|
||||
let terminalRemediation: RemediationMetadata | undefined;
|
||||
|
||||
return {
|
||||
get: () => terminalRemediation,
|
||||
record: (remediation) => {
|
||||
if (!remediation || remediation.shouldEdit || terminalRemediation) {
|
||||
return terminalRemediation;
|
||||
}
|
||||
|
||||
terminalRemediation = remediation;
|
||||
onTerminal?.(remediation);
|
||||
return terminalRemediation;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -0,0 +1,90 @@
|
|||
import type { WorkspaceFilesystem } from '@mastra/core/workspace';
|
||||
|
||||
import { createGuardedFilesystem } from '../guarded-filesystem';
|
||||
|
||||
function createFilesystemMock(): jest.Mocked<WorkspaceFilesystem> {
|
||||
return {
|
||||
id: 'fs-1',
|
||||
name: 'MockFilesystem',
|
||||
provider: 'mock',
|
||||
status: 'ready',
|
||||
readFile: jest.fn().mockResolvedValue('content'),
|
||||
writeFile: jest.fn().mockResolvedValue(undefined),
|
||||
appendFile: jest.fn().mockResolvedValue(undefined),
|
||||
deleteFile: jest.fn().mockResolvedValue(undefined),
|
||||
copyFile: jest.fn().mockResolvedValue(undefined),
|
||||
moveFile: jest.fn().mockResolvedValue(undefined),
|
||||
mkdir: jest.fn().mockResolvedValue(undefined),
|
||||
rmdir: jest.fn().mockResolvedValue(undefined),
|
||||
readdir: jest.fn().mockResolvedValue([]),
|
||||
exists: jest.fn().mockResolvedValue(true),
|
||||
stat: jest.fn().mockResolvedValue({
|
||||
name: 'workflow.ts',
|
||||
path: '/workspace/src/workflow.ts',
|
||||
type: 'file',
|
||||
size: 7,
|
||||
createdAt: new Date(0),
|
||||
modifiedAt: new Date(0),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
describe('createGuardedFilesystem', () => {
|
||||
it('allows reads and writes while no terminal remediation is set', async () => {
|
||||
const rawFilesystem = createFilesystemMock();
|
||||
const { filesystem } = createGuardedFilesystem(rawFilesystem);
|
||||
|
||||
await expect(filesystem.readFile('/workspace/src/workflow.ts')).resolves.toBe('content');
|
||||
await expect(
|
||||
filesystem.writeFile('/workspace/src/workflow.ts', 'updated'),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(rawFilesystem.readFile).toHaveBeenCalledTimes(1);
|
||||
expect(rawFilesystem.writeFile).toHaveBeenCalledWith(
|
||||
'/workspace/src/workflow.ts',
|
||||
'updated',
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
it('blocks mutating operations after terminal remediation', async () => {
|
||||
const rawFilesystem = createFilesystemMock();
|
||||
const { filesystem, setMutationGuard } = createGuardedFilesystem(rawFilesystem);
|
||||
setMutationGuard(() => ({ guidance: 'Stop editing.' }));
|
||||
|
||||
await expect(filesystem.readFile('/workspace/src/workflow.ts')).resolves.toBe('content');
|
||||
await expect(filesystem.writeFile('/workspace/src/workflow.ts', 'updated')).rejects.toThrow(
|
||||
'Stop editing.',
|
||||
);
|
||||
await expect(filesystem.mkdir('/workspace/chunks')).rejects.toThrow('Stop editing.');
|
||||
await expect(filesystem.deleteFile('/workspace/src/workflow.ts')).rejects.toThrow(
|
||||
'Stop editing.',
|
||||
);
|
||||
|
||||
expect(rawFilesystem.readFile).toHaveBeenCalledTimes(1);
|
||||
expect(rawFilesystem.writeFile).not.toHaveBeenCalled();
|
||||
expect(rawFilesystem.mkdir).not.toHaveBeenCalled();
|
||||
expect(rawFilesystem.deleteFile).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('can clear the mutation guard for a reused workspace', async () => {
|
||||
const rawFilesystem = createFilesystemMock();
|
||||
const { filesystem, setMutationGuard } = createGuardedFilesystem(rawFilesystem);
|
||||
|
||||
setMutationGuard(() => ({ guidance: 'Stop editing.' }));
|
||||
await expect(filesystem.writeFile('/workspace/src/workflow.ts', 'blocked')).rejects.toThrow(
|
||||
'Stop editing.',
|
||||
);
|
||||
|
||||
setMutationGuard(undefined);
|
||||
await expect(
|
||||
filesystem.writeFile('/workspace/src/workflow.ts', 'allowed'),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(rawFilesystem.writeFile).toHaveBeenCalledWith(
|
||||
'/workspace/src/workflow.ts',
|
||||
'allowed',
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
@ -16,6 +16,7 @@ import { join as posixJoin } from 'node:path/posix';
|
|||
import type { ErrorReporter, Logger } from '../logger';
|
||||
import type { SandboxConfig } from './create-workspace';
|
||||
import { DaytonaFilesystem } from './daytona-filesystem';
|
||||
import { createGuardedFilesystem, type FilesystemMutationGuardSetter } from './guarded-filesystem';
|
||||
import { N8nSandboxFilesystem } from './n8n-sandbox-filesystem';
|
||||
import { N8nSandboxServiceSandbox } from './n8n-sandbox-sandbox';
|
||||
import {
|
||||
|
|
@ -38,6 +39,7 @@ const NOOP_LOGGER: Logger = {
|
|||
export interface BuilderWorkspace {
|
||||
workspace: Workspace;
|
||||
cleanup: () => Promise<void>;
|
||||
setFilesystemMutationGuard?: FilesystemMutationGuardSetter;
|
||||
}
|
||||
|
||||
async function cleanupTrackedSandboxProcesses(workspace: Workspace): Promise<void> {
|
||||
|
|
@ -248,9 +250,10 @@ export class BuilderSandboxFactory {
|
|||
timeout: config.timeout ?? 300_000,
|
||||
});
|
||||
|
||||
const guardedFilesystem = createGuardedFilesystem(new DaytonaFilesystem(daytonaSandbox));
|
||||
const workspace = new Workspace({
|
||||
sandbox: daytonaSandbox,
|
||||
filesystem: new DaytonaFilesystem(daytonaSandbox),
|
||||
filesystem: guardedFilesystem.filesystem,
|
||||
});
|
||||
|
||||
await workspace.init();
|
||||
|
|
@ -267,6 +270,7 @@ export class BuilderSandboxFactory {
|
|||
|
||||
return {
|
||||
workspace,
|
||||
setFilesystemMutationGuard: guardedFilesystem.setMutationGuard,
|
||||
cleanup: async () => {
|
||||
await cleanupTrackedSandboxProcesses(workspace);
|
||||
await deleteSandbox();
|
||||
|
|
@ -301,9 +305,10 @@ export class BuilderSandboxFactory {
|
|||
};
|
||||
|
||||
try {
|
||||
const guardedFilesystem = createGuardedFilesystem(new N8nSandboxFilesystem(sandbox));
|
||||
const workspace = new Workspace({
|
||||
sandbox,
|
||||
filesystem: new N8nSandboxFilesystem(sandbox),
|
||||
filesystem: guardedFilesystem.filesystem,
|
||||
});
|
||||
|
||||
await workspace.init();
|
||||
|
|
@ -319,6 +324,7 @@ export class BuilderSandboxFactory {
|
|||
|
||||
return {
|
||||
workspace,
|
||||
setFilesystemMutationGuard: guardedFilesystem.setMutationGuard,
|
||||
cleanup: async () => {
|
||||
await cleanupTrackedSandboxProcesses(workspace);
|
||||
await destroySandbox();
|
||||
|
|
@ -354,15 +360,17 @@ export class BuilderSandboxFactory {
|
|||
): Promise<BuilderWorkspace> {
|
||||
const dir = `./workspace-builders/${builderId}`;
|
||||
const sandbox = new LocalSandbox({ workingDirectory: dir });
|
||||
const guardedFilesystem = createGuardedFilesystem(new LocalFilesystem({ basePath: dir }));
|
||||
const workspace = new Workspace({
|
||||
sandbox,
|
||||
filesystem: new LocalFilesystem({ basePath: dir }),
|
||||
filesystem: guardedFilesystem.filesystem,
|
||||
});
|
||||
await workspace.init();
|
||||
await setupSandboxWorkspace(workspace, context);
|
||||
|
||||
return {
|
||||
workspace,
|
||||
setFilesystemMutationGuard: guardedFilesystem.setMutationGuard,
|
||||
cleanup: async () => {
|
||||
await cleanupTrackedSandboxProcesses(workspace);
|
||||
// Local cleanup keeps the directory for debugging.
|
||||
|
|
|
|||
186
packages/@n8n/instance-ai/src/workspace/guarded-filesystem.ts
Normal file
186
packages/@n8n/instance-ai/src/workspace/guarded-filesystem.ts
Normal file
|
|
@ -0,0 +1,186 @@
|
|||
import type {
|
||||
CopyOptions,
|
||||
FileContent,
|
||||
FileEntry,
|
||||
FileStat,
|
||||
FilesystemInfo,
|
||||
ListOptions,
|
||||
ProviderStatus,
|
||||
ReadOptions,
|
||||
RemoveOptions,
|
||||
WorkspaceFilesystem,
|
||||
WriteOptions,
|
||||
} from '@mastra/core/workspace';
|
||||
|
||||
export interface FilesystemMutationBlocker {
|
||||
guidance: string;
|
||||
}
|
||||
|
||||
export type FilesystemMutationGuard = () => FilesystemMutationBlocker | undefined;
|
||||
|
||||
export type FilesystemMutationGuardSetter = (guard: FilesystemMutationGuard | undefined) => void;
|
||||
|
||||
export function createGuardedFilesystem(filesystem: WorkspaceFilesystem): {
|
||||
filesystem: WorkspaceFilesystem;
|
||||
setMutationGuard: FilesystemMutationGuardSetter;
|
||||
} {
|
||||
const guarded = new GuardedFilesystem(filesystem);
|
||||
return {
|
||||
filesystem: guarded,
|
||||
setMutationGuard: (guard) => guarded.setMutationGuard(guard),
|
||||
};
|
||||
}
|
||||
|
||||
class GuardedFilesystem implements WorkspaceFilesystem {
|
||||
private mutationGuard: FilesystemMutationGuard | undefined;
|
||||
|
||||
constructor(private readonly filesystem: WorkspaceFilesystem) {}
|
||||
|
||||
get id() {
|
||||
return this.filesystem.id;
|
||||
}
|
||||
|
||||
get name() {
|
||||
return this.filesystem.name;
|
||||
}
|
||||
|
||||
get provider() {
|
||||
return this.filesystem.provider;
|
||||
}
|
||||
|
||||
get status() {
|
||||
return this.filesystem.status;
|
||||
}
|
||||
|
||||
set status(status: ProviderStatus) {
|
||||
this.filesystem.status = status;
|
||||
}
|
||||
|
||||
get error() {
|
||||
return this.filesystem.error;
|
||||
}
|
||||
|
||||
set error(error: string | undefined) {
|
||||
this.filesystem.error = error;
|
||||
}
|
||||
|
||||
get readOnly() {
|
||||
return this.filesystem.readOnly;
|
||||
}
|
||||
|
||||
get basePath() {
|
||||
return this.filesystem.basePath;
|
||||
}
|
||||
|
||||
get icon() {
|
||||
return this.filesystem.icon;
|
||||
}
|
||||
|
||||
get displayName() {
|
||||
return this.filesystem.displayName;
|
||||
}
|
||||
|
||||
get description() {
|
||||
return this.filesystem.description;
|
||||
}
|
||||
|
||||
setMutationGuard(guard: FilesystemMutationGuard | undefined): void {
|
||||
this.mutationGuard = guard;
|
||||
}
|
||||
|
||||
async init(): Promise<void> {
|
||||
await this.filesystem.init?.();
|
||||
}
|
||||
|
||||
async destroy(): Promise<void> {
|
||||
await this.filesystem.destroy?.();
|
||||
}
|
||||
|
||||
async isReady(): Promise<boolean> {
|
||||
return (await this.filesystem.isReady?.()) ?? true;
|
||||
}
|
||||
|
||||
async getInfo(): Promise<FilesystemInfo> {
|
||||
const info = await this.filesystem.getInfo?.();
|
||||
if (info) return info;
|
||||
|
||||
const fallback: FilesystemInfo = {
|
||||
id: this.id,
|
||||
name: this.name,
|
||||
provider: this.provider,
|
||||
status: this.status,
|
||||
};
|
||||
if (this.error !== undefined) fallback.error = this.error;
|
||||
if (this.readOnly !== undefined) fallback.readOnly = this.readOnly;
|
||||
if (this.icon !== undefined) fallback.icon = this.icon;
|
||||
return fallback;
|
||||
}
|
||||
|
||||
getInstructions(
|
||||
options?: Parameters<NonNullable<WorkspaceFilesystem['getInstructions']>>[0],
|
||||
): string {
|
||||
return this.filesystem.getInstructions?.(options) ?? '';
|
||||
}
|
||||
|
||||
async readFile(path: string, options?: ReadOptions): Promise<string | Buffer> {
|
||||
return await this.filesystem.readFile(path, options);
|
||||
}
|
||||
|
||||
async writeFile(path: string, content: FileContent, options?: WriteOptions): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.writeFile(path, content, options);
|
||||
}
|
||||
|
||||
async appendFile(path: string, content: FileContent): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.appendFile(path, content);
|
||||
}
|
||||
|
||||
async deleteFile(path: string, options?: RemoveOptions): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.deleteFile(path, options);
|
||||
}
|
||||
|
||||
async copyFile(src: string, dest: string, options?: CopyOptions): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.copyFile(src, dest, options);
|
||||
}
|
||||
|
||||
async moveFile(src: string, dest: string, options?: CopyOptions): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.moveFile(src, dest, options);
|
||||
}
|
||||
|
||||
async mkdir(path: string, options?: { recursive?: boolean }): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.mkdir(path, options);
|
||||
}
|
||||
|
||||
async rmdir(path: string, options?: RemoveOptions): Promise<void> {
|
||||
this.assertCanMutate();
|
||||
await this.filesystem.rmdir(path, options);
|
||||
}
|
||||
|
||||
async readdir(path: string, options?: ListOptions): Promise<FileEntry[]> {
|
||||
return await this.filesystem.readdir(path, options);
|
||||
}
|
||||
|
||||
resolveAbsolutePath(path: string): string | undefined {
|
||||
return this.filesystem.resolveAbsolutePath?.(path);
|
||||
}
|
||||
|
||||
async exists(path: string): Promise<boolean> {
|
||||
return await this.filesystem.exists(path);
|
||||
}
|
||||
|
||||
async stat(path: string): Promise<FileStat> {
|
||||
return await this.filesystem.stat(path);
|
||||
}
|
||||
|
||||
private assertCanMutate(): void {
|
||||
const blocker = this.mutationGuard?.();
|
||||
if (blocker) {
|
||||
throw new Error(blocker.guidance);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user