diff --git a/packages/cli/src/executions/__tests__/execution-persistence.test.ts b/packages/cli/src/executions/__tests__/execution-persistence.test.ts index 9536621459d..3b04a488dc5 100644 --- a/packages/cli/src/executions/__tests__/execution-persistence.test.ts +++ b/packages/cli/src/executions/__tests__/execution-persistence.test.ts @@ -10,7 +10,7 @@ import { } from '@n8n/db'; import { QueryFailedError } from '@n8n/typeorm'; import { mock } from 'jest-mock-extended'; -import type { BinaryDataService, StorageConfig } from 'n8n-core'; +import type { BinaryDataService, ErrorReporter, StorageConfig } from 'n8n-core'; import type { IWorkflowBase } from 'n8n-workflow'; import { createEmptyRunExecutionData, UnexpectedError } from 'n8n-workflow'; @@ -25,6 +25,7 @@ describe('ExecutionPersistence', () => { const binaryDataService = mock(); const fsStore = mock(); const dbStore = mock(); + const errorReporter = mock(); const executionsConfig = mock({ pruneData: true, pruneDataHardDeleteBuffer: 1, @@ -70,6 +71,7 @@ describe('ExecutionPersistence', () => { mock({ modeTag }), executionsConfig, mock({ type: dbType }), + errorReporter, ); describe('create', () => { @@ -804,6 +806,413 @@ describe('ExecutionPersistence', () => { }); }); + describe('findSingleExecution', () => { + const executionId = 'exec-1'; + const workflowId = 'wf-1'; + + const bundle = { + data: '[{"resultData":"1"},{}]', + workflowData: { + id: workflowId, + name: 'snapshot', + nodes: [], + connections: {}, + settings: undefined, + }, + workflowVersionId: 'v-1', + version: 1 as const, + }; + + const mockEntity = (storedAt: 'db' | 'fs') => + ({ + id: executionId, + workflowId, + storedAt, + metadata: [{ key: 'k', value: 'v' }], + annotation: undefined, + status: 'success', + }) as unknown as ExecutionEntity; + + beforeEach(() => { + executionRepository.findOne.mockReset(); + executionRepository.findSingleExecution.mockReset(); + executionRepository.reportInvalidExecutions.mockReset(); + dbStore.read.mockReset(); + fsStore.read.mockReset(); + }); + + it('should delegate to the repository when includeData is not set', async () => { + const executionPersistence = createPersistenceService('fs'); + executionRepository.findSingleExecution.mockResolvedValue({ id: executionId } as never); + + const result = await executionPersistence.findSingleExecution(executionId); + + expect(result).toEqual({ id: executionId }); + expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(executionId, undefined); + expect(executionRepository.findOne).not.toHaveBeenCalled(); + expect(dbStore.read).not.toHaveBeenCalled(); + expect(fsStore.read).not.toHaveBeenCalled(); + }); + + it('should load entity without the executionData JOIN and read data from DbStore for db-mode', async () => { + const executionPersistence = createPersistenceService('fs'); + executionRepository.findOne.mockResolvedValue(mockEntity('db')); + dbStore.read.mockResolvedValue(bundle); + + const result = await executionPersistence.findSingleExecution(executionId, { + includeData: true, + }); + + expect(executionRepository.findOne).toHaveBeenCalledWith({ + where: { id: executionId }, + relations: { metadata: true }, + }); + expect(dbStore.read).toHaveBeenCalledWith({ workflowId, executionId }); + expect(fsStore.read).not.toHaveBeenCalled(); + expect(result).toMatchObject({ + id: executionId, + workflowId, + data: bundle.data, + workflowData: bundle.workflowData, + workflowVersionId: 'v-1', + customData: { k: 'v' }, + }); + }); + + it('should read data from FsStore for fs-mode', async () => { + const executionPersistence = createPersistenceService('fs'); + executionRepository.findOne.mockResolvedValue(mockEntity('fs')); + fsStore.read.mockResolvedValue(bundle); + + const result = await executionPersistence.findSingleExecution(executionId, { + includeData: true, + }); + + expect(fsStore.read).toHaveBeenCalledWith({ workflowId, executionId }); + expect(dbStore.read).not.toHaveBeenCalled(); + expect(result).toMatchObject({ + data: bundle.data, + workflowData: bundle.workflowData, + }); + }); + + it('should unflatten data when requested', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.findOne.mockResolvedValue(mockEntity('db')); + dbStore.read.mockResolvedValue(bundle); + + const result = await executionPersistence.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + expect(result?.data).not.toEqual(bundle.data); + expect(result?.data).toBeTruthy(); + }); + + it('should pass the annotation relation when requested', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.findOne.mockResolvedValue(mockEntity('db')); + dbStore.read.mockResolvedValue(bundle); + + await executionPersistence.findSingleExecution(executionId, { + includeData: true, + includeAnnotation: true, + }); + + expect(executionRepository.findOne).toHaveBeenCalledWith({ + where: { id: executionId }, + relations: { metadata: true, annotation: { tags: true } }, + }); + }); + + it('should return undefined when entity not found', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.findOne.mockResolvedValue(null); + + const result = await executionPersistence.findSingleExecution(executionId, { + includeData: true, + }); + + expect(result).toBeUndefined(); + expect(dbStore.read).not.toHaveBeenCalled(); + expect(fsStore.read).not.toHaveBeenCalled(); + }); + + it('should report invalid and return undefined when db bundle is missing', async () => { + const executionPersistence = createPersistenceService('db'); + const entity = mockEntity('db'); + executionRepository.findOne.mockResolvedValue(entity); + dbStore.read.mockResolvedValue(null); + + const result = await executionPersistence.findSingleExecution(executionId, { + includeData: true, + }); + + expect(result).toBeUndefined(); + expect(executionRepository.reportInvalidExecutions).toHaveBeenCalledWith([entity]); + }); + + it('should throw when fs bundle is missing', async () => { + const executionPersistence = createPersistenceService('fs'); + executionRepository.findOne.mockResolvedValue(mockEntity('fs')); + fsStore.read.mockResolvedValue(null); + + await expect( + executionPersistence.findSingleExecution(executionId, { includeData: true }), + ).rejects.toBeInstanceOf(MissingExecutionDataError); + expect(executionRepository.reportInvalidExecutions).not.toHaveBeenCalled(); + }); + + it('should merge caller `where` into the entity lookup', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.findOne.mockResolvedValue(mockEntity('db')); + dbStore.read.mockResolvedValue(bundle); + + await executionPersistence.findSingleExecution(executionId, { + includeData: true, + where: { status: 'success' }, + }); + + expect(executionRepository.findOne).toHaveBeenCalledWith({ + where: { id: executionId, status: 'success' }, + relations: { metadata: true }, + }); + }); + }); + + describe('findMultipleExecutions', () => { + const wf = 'wf-1'; + + const makeBundle = (id: string) => ({ + data: `[{"id":"${id}"},{}]`, + workflowData: { id: wf, name: 's', nodes: [], connections: {}, settings: undefined }, + workflowVersionId: 'v', + version: 1 as const, + }); + + const makeEntity = (id: string, storedAt: 'db' | 'fs') => + ({ + id, + workflowId: wf, + storedAt, + metadata: [], + annotation: undefined, + status: 'success', + }) as unknown as ExecutionEntity; + + beforeEach(() => { + executionRepository.find.mockReset(); + executionRepository.findMultipleExecutions.mockReset(); + executionRepository.reportInvalidExecutions.mockReset(); + dbStore.readMany.mockReset(); + fsStore.readMany.mockReset(); + }); + + it('should delegate to the repository when includeData is not set', async () => { + const executionPersistence = createPersistenceService('fs'); + executionRepository.findMultipleExecutions.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions({ where: { workflowId: wf } }); + + expect(executionRepository.findMultipleExecutions).toHaveBeenCalledWith( + { where: { workflowId: wf } }, + undefined, + ); + expect(executionRepository.find).not.toHaveBeenCalled(); + }); + + it('should batch-fetch db bundles in a single readMany call', async () => { + const executionPersistence = createPersistenceService('db'); + const entities = [makeEntity('a', 'db'), makeEntity('b', 'db')]; + executionRepository.find.mockResolvedValue(entities); + dbStore.readMany.mockResolvedValue( + new Map([ + ['a', makeBundle('a')], + ['b', makeBundle('b')], + ]), + ); + + const result = await executionPersistence.findMultipleExecutions( + { where: { workflowId: wf } }, + { includeData: true }, + ); + + expect(dbStore.readMany).toHaveBeenCalledTimes(1); + expect(dbStore.readMany).toHaveBeenCalledWith([ + { workflowId: wf, executionId: 'a' }, + { workflowId: wf, executionId: 'b' }, + ]); + expect(fsStore.readMany).not.toHaveBeenCalled(); + expect(result).toHaveLength(2); + }); + + it('should partition mixed batches between db and fs stores', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([ + makeEntity('a', 'db'), + makeEntity('b', 'fs'), + makeEntity('c', 'db'), + ]); + dbStore.readMany.mockResolvedValue( + new Map([ + ['a', makeBundle('a')], + ['c', makeBundle('c')], + ]), + ); + fsStore.readMany.mockResolvedValue(new Map([['b', makeBundle('b')]])); + + const result = await executionPersistence.findMultipleExecutions({}, { includeData: true }); + + expect(dbStore.readMany).toHaveBeenCalledWith([ + { workflowId: wf, executionId: 'a' }, + { workflowId: wf, executionId: 'c' }, + ]); + expect(fsStore.readMany).toHaveBeenCalledWith([{ workflowId: wf, executionId: 'b' }]); + expect(result.map((e) => e.id)).toEqual(['a', 'b', 'c']); + }); + + it('should report missing bundles from both stores and drop them from the result', async () => { + const executionPersistence = createPersistenceService('db'); + const dbA = makeEntity('a', 'db'); + const dbB = makeEntity('b', 'db'); // missing + const fsC = makeEntity('c', 'fs'); // missing + executionRepository.find.mockResolvedValue([dbA, dbB, fsC]); + dbStore.readMany.mockResolvedValue(new Map([['a', makeBundle('a')]])); + fsStore.readMany.mockResolvedValue(new Map()); + + const result = await executionPersistence.findMultipleExecutions({}, { includeData: true }); + + expect(executionRepository.reportInvalidExecutions).toHaveBeenCalledWith([dbB, fsC]); + expect(result.map((e) => e.id)).toEqual(['a']); + }); + + it('should add metadata relation (not executionData) when none was supplied', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { where: { workflowId: wf }, take: 5 }, + { includeData: true }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.relations).toEqual(['metadata']); + expect(findArg?.where).toEqual({ workflowId: wf }); + expect(findArg?.take).toBe(5); + }); + + it('should append metadata to a caller-provided array of relations', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { relations: ['annotation'] }, + { includeData: true }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.relations).toEqual(['annotation', 'metadata']); + }); + + it('should add metadata to a caller-provided object of relations', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { relations: { annotation: true } }, + { includeData: true }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.relations).toEqual({ annotation: true, metadata: true }); + }); + + it('should not duplicate metadata when the caller already requested it', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { relations: ['metadata'] }, + { includeData: true }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.relations).toEqual(['metadata']); + }); + + it('should force-select routing fields when a caller-provided array `select` omits them', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { select: ['id', 'mode'] }, + { includeData: true }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.select).toEqual(['id', 'mode', 'workflowId', 'storedAt']); + }); + + it('should force-select routing fields when a caller-provided object `select` omits them', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { select: { mode: true } }, + { includeData: true }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.select).toEqual({ mode: true, id: true, workflowId: true, storedAt: true }); + }); + + it('should not touch `select` when the caller did not narrow columns', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + await executionPersistence.findMultipleExecutions( + { where: { workflowId: wf } }, + { + includeData: true, + }, + ); + + const findArg = executionRepository.find.mock.calls[0][0]; + expect(findArg?.select).toBeUndefined(); + }); + + it('should report a successful execution whose data is an empty stringified array', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([makeEntity('a', 'db')]); + // Distinct snapshot id proves the report uses the bundle's workflow id, not the entity's. + const bundle = { + ...makeBundle('a'), + data: '[]', + workflowData: { ...makeBundle('a').workflowData, id: 'wf-from-snapshot' }, + }; + dbStore.readMany.mockResolvedValue(new Map([['a', bundle]])); + + await executionPersistence.findMultipleExecutions({}, { includeData: true }); + + expect(errorReporter.error).toHaveBeenCalledWith( + 'Found successful execution where data is empty stringified array', + { extra: { executionId: 'a', workflowId: 'wf-from-snapshot' } }, + ); + }); + + it('should return an empty array when no entities match', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.find.mockResolvedValue([]); + + const result = await executionPersistence.findMultipleExecutions({}, { includeData: true }); + + expect(result).toEqual([]); + expect(dbStore.readMany).not.toHaveBeenCalled(); + expect(fsStore.readMany).not.toHaveBeenCalled(); + }); + }); + describe('hardDelete', () => { const executionPersistence = createPersistenceService('db'); const baseTarget = { workflowId: 'wf-1', executionId: 'exec-1' }; diff --git a/packages/cli/src/executions/execution-data/__tests__/db-store.integration.test.ts b/packages/cli/src/executions/execution-data/__tests__/db-store.integration.test.ts index eaf90306da7..413fbbc287b 100644 --- a/packages/cli/src/executions/execution-data/__tests__/db-store.integration.test.ts +++ b/packages/cli/src/executions/execution-data/__tests__/db-store.integration.test.ts @@ -93,6 +93,35 @@ describe('read', () => { }); }); +describe('readMany', () => { + it('should return a map of bundles keyed by executionId, omitting missing ones', async () => { + const [a, b] = await Promise.all([createExecution(), createExecution()]); + const refA = createExecutionRef(workflowId, a.id); + const refB = createExecutionRef(workflowId, b.id); + await dbStore.write(refA, payload); + + const bundles = await dbStore.readMany([refA, refB]); + + expect(bundles.size).toBe(1); + expect(bundles.get(a.id)).toMatchObject({ ...payload, version: 1 }); + expect(bundles.has(b.id)).toBe(false); + }); + + it('should return an empty map for an empty array', async () => { + const bundles = await dbStore.readMany([]); + + expect(bundles.size).toBe(0); + }); + + it('should batch the IN-clause so a large id set stays within the DB parameter limit', async () => { + const refs = Array.from({ length: 2500 }, (_, i) => + createExecutionRef(workflowId, String(100000 + i)), + ); + + await expect(dbStore.readMany(refs)).resolves.toEqual(new Map()); + }); +}); + describe('delete', () => { it('should delete data for single execution', async () => { const execution = await createExecution(); diff --git a/packages/cli/src/executions/execution-data/__tests__/fs-store.integration.test.ts b/packages/cli/src/executions/execution-data/__tests__/fs-store.integration.test.ts index afb5401f9be..d152d2e6423 100644 --- a/packages/cli/src/executions/execution-data/__tests__/fs-store.integration.test.ts +++ b/packages/cli/src/executions/execution-data/__tests__/fs-store.integration.test.ts @@ -1,3 +1,5 @@ +/* eslint-disable @typescript-eslint/unbound-method */ + import { mockInstance } from '@n8n/backend-test-utils'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; @@ -19,15 +21,17 @@ jest.unmock('node:fs/promises'); let fsStore: FsStore; let storagePath: string; +let errorReporter: ErrorReporter; beforeAll(async () => { storagePath = await mkdtemp(join(tmpdir(), 'n8n-fs-store-test-')); mockInstance(StorageConfig, { storagePath }); - mockInstance(ErrorReporter); + errorReporter = mockInstance(ErrorReporter); fsStore = Container.get(FsStore); }); beforeEach(async () => { + jest.mocked(errorReporter.error).mockClear(); const workflowsDir = join(storagePath, 'workflows'); await rm(workflowsDir, { recursive: true, force: true }).catch(() => {}); }); @@ -147,6 +151,66 @@ describe('read', () => { }); }); +describe('readMany', () => { + it('should return a map of bundles keyed by executionId, omitting missing ones', async () => { + const present = createExecutionRef(workflowId, 'exec-1'); + const missing = createExecutionRef(workflowId, 'exec-2'); + await fsStore.write(present, payload); + + const bundles = await fsStore.readMany([present, missing]); + + expect(bundles.size).toBe(1); + expect(bundles.get('exec-1')).toMatchObject({ ...payload, version: 1 }); + expect(bundles.has('exec-2')).toBe(false); + }); + + it('should report and drop a corrupted bundle instead of rejecting the whole read', async () => { + const good = createExecutionRef(workflowId, 'good'); + const bad = createExecutionRef(workflowId, 'bad'); + await fsStore.write(good, payload); + + const badPath = join( + storagePath, + 'workflows', + workflowId, + 'executions', + 'bad', + 'execution_data', + EXECUTION_DATA_BUNDLE_FILENAME, + ); + await fs.mkdir( + join(storagePath, 'workflows', workflowId, 'executions', 'bad', 'execution_data'), + { + recursive: true, + }, + ); + await fs.writeFile(badPath, 'invalid json{{{', 'utf-8'); + + const bundles = await fsStore.readMany([good, bad]); + + expect(bundles.has('good')).toBe(true); + expect(bundles.has('bad')).toBe(false); + expect(errorReporter.error).toHaveBeenCalledWith(expect.any(CorruptedExecutionDataError)); + }); + + it('should rethrow a systemic read error instead of swallowing it', async () => { + const target = createExecutionRef(workflowId, 'exec-1'); + await fsStore.write(target, payload); + + const eacces = Object.assign(new Error('EACCES: permission denied'), { code: 'EACCES' }); + jest.spyOn(fs, 'readFile').mockRejectedValueOnce(eacces); + + await expect(fsStore.readMany([target])).rejects.toBe(eacces); + expect(errorReporter.error).not.toHaveBeenCalled(); + }); + + it('should return an empty map for an empty array', async () => { + const bundles = await fsStore.readMany([]); + + expect(bundles.size).toBe(0); + }); +}); + describe('delete', () => { it('should delete execution directory', async () => { await fsStore.write(ref, payload); diff --git a/packages/cli/src/executions/execution-data/db-store.ts b/packages/cli/src/executions/execution-data/db-store.ts index e6b644c7e6d..3d88363a4b5 100644 --- a/packages/cli/src/executions/execution-data/db-store.ts +++ b/packages/cli/src/executions/execution-data/db-store.ts @@ -1,6 +1,7 @@ -import { ExecutionData, ExecutionDataRepository } from '@n8n/db'; +import { ExecutionData, ExecutionDataRepository, In } from '@n8n/db'; import type { EntityManager } from '@n8n/db'; import { Service } from '@n8n/di'; +import chunk from 'lodash/chunk'; import { EXECUTION_DATA_BUNDLE_VERSION } from './constants'; import type { @@ -10,6 +11,9 @@ import type { ExecutionDataBundle, } from './types'; +// Max number of ids per IN-clause. Conservative, as some databases cap near 1000. +const MAX_READ_BATCH_SIZE = 900; + @Service() export class DbStore implements ExecutionDataStore { constructor(private readonly repository: ExecutionDataRepository) {} @@ -34,6 +38,33 @@ export class DbStore implements ExecutionDataStore { return { ...result, version: EXECUTION_DATA_BUNDLE_VERSION }; } + async readMany(refs: ExecutionRef[]) { + const bundles = new Map(); + if (refs.length === 0) return bundles; + + const ids = refs.map((r) => r.executionId); + + // Batch the IN-clause so an unbounded set of ids cannot exceed the DB's + // limit on bound parameters (SQLite caps near 1000). + for (const batch of chunk(ids, MAX_READ_BATCH_SIZE)) { + const rows = await this.repository.find({ + where: { executionId: In(batch) }, + select: ['executionId', 'data', 'workflowData', 'workflowVersionId'], + }); + + for (const row of rows) { + bundles.set(row.executionId, { + data: row.data, + workflowData: row.workflowData, + workflowVersionId: row.workflowVersionId, + version: EXECUTION_DATA_BUNDLE_VERSION, + }); + } + } + + return bundles; + } + async delete(ref: ExecutionRef | ExecutionRef[]) { const ids = (Array.isArray(ref) ? ref : [ref]).map((r) => r.executionId); diff --git a/packages/cli/src/executions/execution-data/fs-store.ts b/packages/cli/src/executions/execution-data/fs-store.ts index 42c823febb5..7e77fa8fc31 100644 --- a/packages/cli/src/executions/execution-data/fs-store.ts +++ b/packages/cli/src/executions/execution-data/fs-store.ts @@ -1,5 +1,6 @@ import { assertDir } from '@n8n/backend-common'; import { Service } from '@n8n/di'; +import chunk from 'lodash/chunk'; import { ErrorReporter, StorageConfig } from 'n8n-core'; import { jsonParse, jsonStringify } from 'n8n-workflow'; import fs from 'node:fs/promises'; @@ -15,6 +16,9 @@ import type { ExecutionDataBundle, } from './types'; +// Max number of bundles read concurrently, to bound open file descriptors. +const MAX_READ_CONCURRENCY = 50; + @Service() export class FsStore implements ExecutionDataStore { constructor( @@ -69,6 +73,22 @@ export class FsStore implements ExecutionDataStore { } } + async readMany(refs: ExecutionRef[]) { + const bundles = new Map(); + if (refs.length === 0) return bundles; + + // Read in chunks to cap concurrent file descriptors. + for (const batch of chunk(refs, MAX_READ_CONCURRENCY)) { + const bundlesInBatch = await Promise.all(batch.map(async (ref) => await this.tryRead(ref))); + + for (const [idx, bundle] of bundlesInBatch.entries()) { + if (bundle) bundles.set(batch[idx].executionId, bundle); + } + } + + return bundles; + } + async delete(ref: ExecutionRef | ExecutionRef[]) { const refs = Array.isArray(ref) ? ref : [ref]; @@ -102,4 +122,22 @@ export class FsStore implements ExecutionDataStore { error !== null && typeof error === 'object' && 'code' in error && error.code === 'ENOENT' ); } + + /** + * Read a single bundle, tolerating per-record faults so they cannot sink a whole + * {@link readMany} batch. A missing bundle returns `null` ({@link read} already maps ENOENT to + * `null`); a corrupted (non-parseable) bundle is reported and dropped. Systemic failures + * (permission denied, disk read error, broken mount) are rethrown so we don't mask them. + */ + private async tryRead(ref: ExecutionRef): Promise { + try { + return await this.read(ref); + } catch (error) { + if (error instanceof CorruptedExecutionDataError) { + this.errorReporter.error(error); + return null; + } + throw error; + } + } } diff --git a/packages/cli/src/executions/execution-data/types.ts b/packages/cli/src/executions/execution-data/types.ts index ab3a6f9f53d..342422cf38e 100644 --- a/packages/cli/src/executions/execution-data/types.ts +++ b/packages/cli/src/executions/execution-data/types.ts @@ -34,5 +34,7 @@ export interface ExecutionDataStore { init?(): Promise; write(ref: ExecutionRef, payload: ExecutionDataPayload, tx?: EntityManager): Promise; read(ref: ExecutionRef, tx?: EntityManager): Promise; + /** Read multiple bundles by ref. Returns a map keyed by `executionId`; missing entries are omitted. */ + readMany(refs: ExecutionRef[]): Promise>; delete(ref: ExecutionRef | ExecutionRef[]): Promise; } diff --git a/packages/cli/src/executions/execution-persistence.ts b/packages/cli/src/executions/execution-persistence.ts index b6667fe5032..206143aea5c 100644 --- a/packages/cli/src/executions/execution-persistence.ts +++ b/packages/cli/src/executions/execution-persistence.ts @@ -1,23 +1,33 @@ +import { parseFlatted } from '@n8n/backend-common'; import { DatabaseConfig, ExecutionsConfig } from '@n8n/config'; import { Time } from '@n8n/constants'; import type { CreateExecutionPayload, ExecutionDataStorageLocation, ExecutionDeletionCriteria, + FindManyOptions, FindOptionsWhere, + IExecutionBase, + IExecutionFlattedDb, IExecutionResponse, UpdateExecutionConditions, } from '@n8n/db'; import { ExecutionEntity, ExecutionRepository, Not } from '@n8n/db'; import { Service } from '@n8n/di'; import { stringify } from 'flatted'; -import { BinaryDataService, StorageConfig } from 'n8n-core'; -import { UnexpectedError } from 'n8n-workflow'; +import { BinaryDataService, ErrorReporter, StorageConfig } from 'n8n-core'; +import type { IRunExecutionData, IRunExecutionDataAll } from 'n8n-workflow'; +import { migrateRunExecutionData, UnexpectedError } from 'n8n-workflow'; import { DbStore } from './execution-data/db-store'; import { FsStore } from './execution-data/fs-store'; import { MissingExecutionDataError } from './execution-data/missing-execution-data.error'; -import type { ExecutionDataStore, ExecutionRef, WorkflowSnapshot } from './execution-data/types'; +import type { + ExecutionDataBundle, + ExecutionDataStore, + ExecutionRef, + WorkflowSnapshot, +} from './execution-data/types'; import { DuplicateExecutionError } from '../errors/duplicate-execution.error'; type DeletionTarget = ExecutionRef & { storedAt: ExecutionDataStorageLocation }; @@ -48,6 +58,7 @@ export class ExecutionPersistence { private readonly storageConfig: StorageConfig, private readonly executionsConfig: ExecutionsConfig, private readonly databaseConfig: DatabaseConfig, + private readonly errorReporter: ErrorReporter, ) {} /** @@ -112,6 +123,187 @@ export class ExecutionPersistence { return await this.applyDataUpdate(ref, store, execution, conditions); } + /** + * Find a single execution by id, dispatching data reads to the store matching its `storedAt`. + * - In `db` mode, we load entity, metadata, optional annotation, and data via `DbStore`. + * - In `fs` mode, we load entity, metadata, optional annotation from the DB, and data via `FsStore`. + * + * A missing data bundle is handled differently per store. In `db` mode the entity and its data + * share one database, so an absent data row means a known-corrupt record we report and skip + * (soft). In `fs` mode the entity lives in the DB while its data lives on disk, so a missing + * file points at an out-of-band loss (deletion, unmounted volume) that a single-execution read + * should surface loudly rather than silently swallow (hard). + */ + async findSingleExecution( + id: string, + options?: { + includeData: true; + includeAnnotation?: boolean; + unflattenData: true; + where?: FindOptionsWhere; + }, + ): Promise; + async findSingleExecution( + id: string, + options?: { + includeData: true; + includeAnnotation?: boolean; + unflattenData?: false | undefined; + where?: FindOptionsWhere; + }, + ): Promise; + async findSingleExecution( + id: string, + options?: { + includeData?: boolean; + includeAnnotation?: boolean; + unflattenData?: boolean; + where?: FindOptionsWhere; + }, + ): Promise; + async findSingleExecution( + id: string, + options?: { + includeData?: boolean; + includeAnnotation?: boolean; + unflattenData?: boolean; + where?: FindOptionsWhere; + }, + ): Promise { + if (!options?.includeData) { + return await this.executionRepository.findSingleExecution(id, options); + } + + const entity = await this.executionRepository.findOne({ + where: { id, ...options.where }, + relations: { + metadata: true, + ...(options.includeAnnotation ? { annotation: { tags: true } } : {}), + }, + }); + + if (!entity) return undefined; + + const store = this.getStoreFor(entity.storedAt); + const bundle = await store.read({ workflowId: entity.workflowId, executionId: entity.id }); + + if (!bundle) { + if (entity.storedAt === 'db') { + this.executionRepository.reportInvalidExecutions([entity]); + return undefined; + } + throw new MissingExecutionDataError({ + workflowId: entity.workflowId, + executionId: entity.id, + }); + } + + return (await this.assembleExecution(entity, bundle, options)) as + | IExecutionFlattedDb + | IExecutionResponse + | IExecutionBase; + } + + /** + * Find multiple executions matching `queryParams`. With `includeData: true`, partitions + * entities by `storedAt` and batch-fetches bundles from each store to avoid n+1 reads. + * - In `db` mode, we issue one `In(ids)` query against `execution_data` per batch. + * - In `fs` mode, we fan out reads across the filesystem. + */ + async findMultipleExecutions( + queryParams: FindManyOptions, + options?: { + unflattenData: true; + includeData?: true; + }, + ): Promise; + async findMultipleExecutions( + queryParams: FindManyOptions, + options?: { + unflattenData?: false | undefined; + includeData?: true; + }, + ): Promise; + async findMultipleExecutions( + queryParams: FindManyOptions, + options?: { + unflattenData?: boolean; + includeData?: boolean; + }, + ): Promise; + async findMultipleExecutions( + queryParams: FindManyOptions, + options?: { + unflattenData?: boolean; + includeData?: boolean; + }, + ): Promise { + if (!options?.includeData) { + return await this.executionRepository.findMultipleExecutions(queryParams, options); + } + + queryParams.relations ??= []; + if (Array.isArray(queryParams.relations)) { + if (!queryParams.relations.includes('metadata')) queryParams.relations.push('metadata'); + } else { + queryParams.relations.metadata = true; + } + + // A narrowing `select` must still include the fields we route and read by: `storedAt` (else + // every execution defaults to the fs store) and `id`/`workflowId` (else no bundle resolves). + // An undefined `select` loads all columns, so no action needed. + if (queryParams.select) { + if (Array.isArray(queryParams.select)) { + for (const field of ['id', 'workflowId', 'storedAt'] as const) { + if (!queryParams.select.includes(field)) queryParams.select.push(field); + } + } else { + queryParams.select.id = true; + queryParams.select.workflowId = true; + queryParams.select.storedAt = true; + } + } + + const entities = await this.executionRepository.find(queryParams); + if (entities.length === 0) return []; + + // Group by storage location and batch-fetch each group from its store. + const entitiesByLocation = new Map(); + for (const entity of entities) { + const group = entitiesByLocation.get(entity.storedAt) ?? []; + group.push(entity); + entitiesByLocation.set(entity.storedAt, group); + } + + const bundlesById = new Map(); + await Promise.all( + [...entitiesByLocation].map(async ([location, group]) => { + const refs = group.map((e) => ({ workflowId: e.workflowId, executionId: e.id })); + const bundles = await this.getStoreFor(location).readMany(refs); + for (const [id, bundle] of bundles) bundlesById.set(id, bundle); + }), + ); + + // Report invalid entities when they are found to be missing from the stores. + const invalidEntities = entities.filter((e) => !bundlesById.has(e.id)); + if (invalidEntities.length > 0) { + this.executionRepository.reportInvalidExecutions(invalidEntities); + } + + const assembled = await Promise.all( + entities.map(async (entity) => { + const bundle = bundlesById.get(entity.id); + if (!bundle) return null; + return await this.assembleExecution(entity, bundle, options); + }), + ); + + return assembled.filter((e): e is NonNullable => e !== null) as + | IExecutionFlattedDb[] + | IExecutionResponse[] + | IExecutionBase[]; + } + /** * Delete an in-flight execution that is not meant to be saved. * @@ -279,6 +471,54 @@ export class ExecutionPersistence { return { id, name, nodes, connections, settings }; } + private async assembleExecution( + entity: ExecutionEntity, + bundle: ExecutionDataBundle, + options: { unflattenData?: boolean; includeAnnotation?: boolean }, + ) { + const { metadata, annotation, ...rest } = entity; + const data = await this.parseExecutionData(bundle.data, options); + const serializedAnnotation = this.serializeAnnotation(annotation); + + if (entity.status === 'success' && bundle.data === '[]') { + this.errorReporter.error('Found successful execution where data is empty stringified array', { + extra: { executionId: entity.id, workflowId: bundle.workflowData.id }, + }); + } + + return { + ...rest, + data, + workflowData: bundle.workflowData, + workflowVersionId: bundle.workflowVersionId ?? null, + customData: Object.fromEntries(metadata.map((m) => [m.key, m.value])), + ...(options.includeAnnotation && serializedAnnotation + ? { annotation: serializedAnnotation } + : {}), + }; + } + + private async parseExecutionData( + data: string, + options: { unflattenData?: boolean }, + ): Promise { + if (!options.unflattenData) return data; + + const deserialized: unknown = await parseFlatted(data); + if (!deserialized) return undefined; + return migrateRunExecutionData(deserialized as IRunExecutionDataAll); + } + + private serializeAnnotation(annotation: ExecutionEntity['annotation']) { + if (!annotation) return null; + const { id, vote, tags } = annotation; + return { + id, + vote, + tags: tags?.map(({ id, name }) => ({ id, name })) ?? [], + }; + } + /** * Detect whether the DB rejected the insert because of the unique index on * `execution_entity.deduplicationKey`. We expect TypeORM to surface the