mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
fix(core): Defer Instance AI temporary workflow cleanup (no-changelog) (#29700)
Some checks failed
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Release: Create Minor Release PR / Create release PR (push) Has been cancelled
Release: Create Minor Release PR / Notify Slack (push) Has been cancelled
Some checks failed
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Release: Create Minor Release PR / Create release PR (push) Has been cancelled
Release: Create Minor Release PR / Notify Slack (push) Has been cancelled
This commit is contained in:
parent
17b1206790
commit
b41f1a06ab
|
|
@ -51,6 +51,33 @@ type ServiceInternals = {
|
|||
logger: { debug: jest.Mock; warn: jest.Mock; error: jest.Mock };
|
||||
};
|
||||
|
||||
type RunningTask = { taskId: string };
|
||||
type MarkedWorkflow = { workflowId: string };
|
||||
type ArchiveIfAiTemporary = jest.MockedFunction<(workflowId: string) => Promise<boolean>>;
|
||||
|
||||
type TemporaryCleanupService = {
|
||||
reapAiTemporaryFromRun: (
|
||||
threadId: string,
|
||||
user: User,
|
||||
createdWorkflowIds: Set<string> | undefined,
|
||||
) => Promise<string[]>;
|
||||
backgroundTasks: {
|
||||
getRunningTasks: jest.MockedFunction<(threadId: string) => RunningTask[]>;
|
||||
};
|
||||
aiBuilderTemporaryWorkflowRepository: {
|
||||
findByThread: jest.MockedFunction<(threadId: string) => Promise<MarkedWorkflow[]>>;
|
||||
};
|
||||
adapterService: {
|
||||
createContext: jest.MockedFunction<
|
||||
(
|
||||
user: User,
|
||||
options: { threadId: string },
|
||||
) => { workflowService: { archiveIfAiTemporary: ArchiveIfAiTemporary } }
|
||||
>;
|
||||
};
|
||||
logger: { debug: jest.Mock; warn: jest.Mock; error: jest.Mock };
|
||||
};
|
||||
|
||||
function createCheckpointService(): ServiceInternals {
|
||||
// Bypass the constructor — we only exercise the three pending-reentry helpers
|
||||
// and their direct dependencies. Everything else (scheduler, event bus, etc.)
|
||||
|
|
@ -77,6 +104,46 @@ function createCheckpointService(): ServiceInternals {
|
|||
return service;
|
||||
}
|
||||
|
||||
function createTemporaryCleanupService({
|
||||
runningTaskCount = 0,
|
||||
markedWorkflows = [],
|
||||
archivedWorkflowIds = new Set<string>(),
|
||||
}: {
|
||||
runningTaskCount?: number;
|
||||
markedWorkflows?: MarkedWorkflow[];
|
||||
archivedWorkflowIds?: Set<string>;
|
||||
} = {}): {
|
||||
service: TemporaryCleanupService;
|
||||
archiveIfAiTemporary: ArchiveIfAiTemporary;
|
||||
} {
|
||||
const service = Object.create(InstanceAiService.prototype) as unknown as TemporaryCleanupService;
|
||||
const runningTasks: RunningTask[] = Array.from({ length: runningTaskCount }, (_value, index) => ({
|
||||
taskId: `task-${index}`,
|
||||
}));
|
||||
const archiveIfAiTemporary: ArchiveIfAiTemporary = jest.fn(async (workflowId: string) =>
|
||||
archivedWorkflowIds.has(workflowId),
|
||||
);
|
||||
|
||||
service.backgroundTasks = {
|
||||
getRunningTasks: jest.fn((_threadId: string) => runningTasks),
|
||||
};
|
||||
service.aiBuilderTemporaryWorkflowRepository = {
|
||||
findByThread: jest.fn(async (_threadId: string) => markedWorkflows),
|
||||
};
|
||||
service.adapterService = {
|
||||
createContext: jest.fn((_user: User, _options: { threadId: string }) => ({
|
||||
workflowService: { archiveIfAiTemporary },
|
||||
})),
|
||||
};
|
||||
service.logger = {
|
||||
debug: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
error: jest.fn(),
|
||||
};
|
||||
|
||||
return { service, archiveIfAiTemporary };
|
||||
}
|
||||
|
||||
const fakeUser = { id: 'user-1' } as User;
|
||||
|
||||
describe('InstanceAiService — pending checkpoint re-entry', () => {
|
||||
|
|
@ -183,3 +250,51 @@ describe('InstanceAiService — pending checkpoint re-entry', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('InstanceAiService — AI temporary workflow cleanup', () => {
|
||||
it('defers cleanup while background tasks are running', async () => {
|
||||
const { service, archiveIfAiTemporary } = createTemporaryCleanupService({
|
||||
runningTaskCount: 1,
|
||||
markedWorkflows: [{ workflowId: 'wf-marked' }],
|
||||
archivedWorkflowIds: new Set(['wf-marked', 'wf-created']),
|
||||
});
|
||||
|
||||
await expect(
|
||||
service.reapAiTemporaryFromRun('thread-a', fakeUser, new Set(['wf-created'])),
|
||||
).resolves.toEqual([]);
|
||||
|
||||
expect(service.backgroundTasks.getRunningTasks).toHaveBeenCalledWith('thread-a');
|
||||
expect(service.aiBuilderTemporaryWorkflowRepository.findByThread).not.toHaveBeenCalled();
|
||||
expect(service.adapterService.createContext).not.toHaveBeenCalled();
|
||||
expect(archiveIfAiTemporary).not.toHaveBeenCalled();
|
||||
expect(service.logger.debug).toHaveBeenCalledWith(
|
||||
'Deferring AI-builder temporary workflow cleanup until tasks settle',
|
||||
{
|
||||
threadId: 'thread-a',
|
||||
runningTaskCount: 1,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it('archives marked temporary workflows after background tasks settle', async () => {
|
||||
const { service, archiveIfAiTemporary } = createTemporaryCleanupService({
|
||||
markedWorkflows: [{ workflowId: 'wf-marked' }],
|
||||
archivedWorkflowIds: new Set(['wf-marked', 'wf-created']),
|
||||
});
|
||||
|
||||
await expect(
|
||||
service.reapAiTemporaryFromRun('thread-a', fakeUser, new Set(['wf-created'])),
|
||||
).resolves.toEqual(['wf-marked', 'wf-created']);
|
||||
|
||||
expect(service.backgroundTasks.getRunningTasks).toHaveBeenCalledWith('thread-a');
|
||||
expect(service.aiBuilderTemporaryWorkflowRepository.findByThread).toHaveBeenCalledWith(
|
||||
'thread-a',
|
||||
);
|
||||
expect(service.adapterService.createContext).toHaveBeenCalledWith(fakeUser, {
|
||||
threadId: 'thread-a',
|
||||
});
|
||||
expect(archiveIfAiTemporary).toHaveBeenCalledTimes(2);
|
||||
expect(archiveIfAiTemporary).toHaveBeenNthCalledWith(1, 'wf-marked');
|
||||
expect(archiveIfAiTemporary).toHaveBeenNthCalledWith(2, 'wf-created');
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -3240,6 +3240,15 @@ export class InstanceAiService {
|
|||
user: User,
|
||||
createdWorkflowIds: Set<string> | undefined,
|
||||
): Promise<string[]> {
|
||||
const runningTaskCount = this.backgroundTasks.getRunningTasks(threadId).length;
|
||||
if (runningTaskCount > 0) {
|
||||
this.logger.debug('Deferring AI-builder temporary workflow cleanup until tasks settle', {
|
||||
threadId,
|
||||
runningTaskCount,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
let markedWorkflows: Array<{ workflowId: string }> = [];
|
||||
try {
|
||||
markedWorkflows = await this.aiBuilderTemporaryWorkflowRepository.findByThread(threadId);
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user