refactor(core): Add findById and findMany to ExecutionPersistence (no-changelog) (#30467)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mike Repeć 2026-06-02 11:45:32 +02:00 committed by GitHub
parent 166eb85509
commit 01dec32c0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 819 additions and 6 deletions

View File

@ -10,7 +10,7 @@ import {
} from '@n8n/db';
import { QueryFailedError } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import type { BinaryDataService, StorageConfig } from 'n8n-core';
import type { BinaryDataService, ErrorReporter, StorageConfig } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import { createEmptyRunExecutionData, UnexpectedError } from 'n8n-workflow';
@ -25,6 +25,7 @@ describe('ExecutionPersistence', () => {
const binaryDataService = mock<BinaryDataService>();
const fsStore = mock<FsStore>();
const dbStore = mock<DbStore>();
const errorReporter = mock<ErrorReporter>();
const executionsConfig = mock<ExecutionsConfig>({
pruneData: true,
pruneDataHardDeleteBuffer: 1,
@ -70,6 +71,7 @@ describe('ExecutionPersistence', () => {
mock<StorageConfig>({ modeTag }),
executionsConfig,
mock<DatabaseConfig>({ type: dbType }),
errorReporter,
);
describe('create', () => {
@ -804,6 +806,413 @@ describe('ExecutionPersistence', () => {
});
});
describe('findSingleExecution', () => {
const executionId = 'exec-1';
const workflowId = 'wf-1';
const bundle = {
data: '[{"resultData":"1"},{}]',
workflowData: {
id: workflowId,
name: 'snapshot',
nodes: [],
connections: {},
settings: undefined,
},
workflowVersionId: 'v-1',
version: 1 as const,
};
const mockEntity = (storedAt: 'db' | 'fs') =>
({
id: executionId,
workflowId,
storedAt,
metadata: [{ key: 'k', value: 'v' }],
annotation: undefined,
status: 'success',
}) as unknown as ExecutionEntity;
beforeEach(() => {
executionRepository.findOne.mockReset();
executionRepository.findSingleExecution.mockReset();
executionRepository.reportInvalidExecutions.mockReset();
dbStore.read.mockReset();
fsStore.read.mockReset();
});
it('should delegate to the repository when includeData is not set', async () => {
const executionPersistence = createPersistenceService('fs');
executionRepository.findSingleExecution.mockResolvedValue({ id: executionId } as never);
const result = await executionPersistence.findSingleExecution(executionId);
expect(result).toEqual({ id: executionId });
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(executionId, undefined);
expect(executionRepository.findOne).not.toHaveBeenCalled();
expect(dbStore.read).not.toHaveBeenCalled();
expect(fsStore.read).not.toHaveBeenCalled();
});
it('should load entity without the executionData JOIN and read data from DbStore for db-mode', async () => {
const executionPersistence = createPersistenceService('fs');
executionRepository.findOne.mockResolvedValue(mockEntity('db'));
dbStore.read.mockResolvedValue(bundle);
const result = await executionPersistence.findSingleExecution(executionId, {
includeData: true,
});
expect(executionRepository.findOne).toHaveBeenCalledWith({
where: { id: executionId },
relations: { metadata: true },
});
expect(dbStore.read).toHaveBeenCalledWith({ workflowId, executionId });
expect(fsStore.read).not.toHaveBeenCalled();
expect(result).toMatchObject({
id: executionId,
workflowId,
data: bundle.data,
workflowData: bundle.workflowData,
workflowVersionId: 'v-1',
customData: { k: 'v' },
});
});
it('should read data from FsStore for fs-mode', async () => {
const executionPersistence = createPersistenceService('fs');
executionRepository.findOne.mockResolvedValue(mockEntity('fs'));
fsStore.read.mockResolvedValue(bundle);
const result = await executionPersistence.findSingleExecution(executionId, {
includeData: true,
});
expect(fsStore.read).toHaveBeenCalledWith({ workflowId, executionId });
expect(dbStore.read).not.toHaveBeenCalled();
expect(result).toMatchObject({
data: bundle.data,
workflowData: bundle.workflowData,
});
});
it('should unflatten data when requested', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.findOne.mockResolvedValue(mockEntity('db'));
dbStore.read.mockResolvedValue(bundle);
const result = await executionPersistence.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
expect(result?.data).not.toEqual(bundle.data);
expect(result?.data).toBeTruthy();
});
it('should pass the annotation relation when requested', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.findOne.mockResolvedValue(mockEntity('db'));
dbStore.read.mockResolvedValue(bundle);
await executionPersistence.findSingleExecution(executionId, {
includeData: true,
includeAnnotation: true,
});
expect(executionRepository.findOne).toHaveBeenCalledWith({
where: { id: executionId },
relations: { metadata: true, annotation: { tags: true } },
});
});
it('should return undefined when entity not found', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.findOne.mockResolvedValue(null);
const result = await executionPersistence.findSingleExecution(executionId, {
includeData: true,
});
expect(result).toBeUndefined();
expect(dbStore.read).not.toHaveBeenCalled();
expect(fsStore.read).not.toHaveBeenCalled();
});
it('should report invalid and return undefined when db bundle is missing', async () => {
const executionPersistence = createPersistenceService('db');
const entity = mockEntity('db');
executionRepository.findOne.mockResolvedValue(entity);
dbStore.read.mockResolvedValue(null);
const result = await executionPersistence.findSingleExecution(executionId, {
includeData: true,
});
expect(result).toBeUndefined();
expect(executionRepository.reportInvalidExecutions).toHaveBeenCalledWith([entity]);
});
it('should throw when fs bundle is missing', async () => {
const executionPersistence = createPersistenceService('fs');
executionRepository.findOne.mockResolvedValue(mockEntity('fs'));
fsStore.read.mockResolvedValue(null);
await expect(
executionPersistence.findSingleExecution(executionId, { includeData: true }),
).rejects.toBeInstanceOf(MissingExecutionDataError);
expect(executionRepository.reportInvalidExecutions).not.toHaveBeenCalled();
});
it('should merge caller `where` into the entity lookup', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.findOne.mockResolvedValue(mockEntity('db'));
dbStore.read.mockResolvedValue(bundle);
await executionPersistence.findSingleExecution(executionId, {
includeData: true,
where: { status: 'success' },
});
expect(executionRepository.findOne).toHaveBeenCalledWith({
where: { id: executionId, status: 'success' },
relations: { metadata: true },
});
});
});
describe('findMultipleExecutions', () => {
const wf = 'wf-1';
const makeBundle = (id: string) => ({
data: `[{"id":"${id}"},{}]`,
workflowData: { id: wf, name: 's', nodes: [], connections: {}, settings: undefined },
workflowVersionId: 'v',
version: 1 as const,
});
const makeEntity = (id: string, storedAt: 'db' | 'fs') =>
({
id,
workflowId: wf,
storedAt,
metadata: [],
annotation: undefined,
status: 'success',
}) as unknown as ExecutionEntity;
beforeEach(() => {
executionRepository.find.mockReset();
executionRepository.findMultipleExecutions.mockReset();
executionRepository.reportInvalidExecutions.mockReset();
dbStore.readMany.mockReset();
fsStore.readMany.mockReset();
});
it('should delegate to the repository when includeData is not set', async () => {
const executionPersistence = createPersistenceService('fs');
executionRepository.findMultipleExecutions.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions({ where: { workflowId: wf } });
expect(executionRepository.findMultipleExecutions).toHaveBeenCalledWith(
{ where: { workflowId: wf } },
undefined,
);
expect(executionRepository.find).not.toHaveBeenCalled();
});
it('should batch-fetch db bundles in a single readMany call', async () => {
const executionPersistence = createPersistenceService('db');
const entities = [makeEntity('a', 'db'), makeEntity('b', 'db')];
executionRepository.find.mockResolvedValue(entities);
dbStore.readMany.mockResolvedValue(
new Map([
['a', makeBundle('a')],
['b', makeBundle('b')],
]),
);
const result = await executionPersistence.findMultipleExecutions(
{ where: { workflowId: wf } },
{ includeData: true },
);
expect(dbStore.readMany).toHaveBeenCalledTimes(1);
expect(dbStore.readMany).toHaveBeenCalledWith([
{ workflowId: wf, executionId: 'a' },
{ workflowId: wf, executionId: 'b' },
]);
expect(fsStore.readMany).not.toHaveBeenCalled();
expect(result).toHaveLength(2);
});
it('should partition mixed batches between db and fs stores', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([
makeEntity('a', 'db'),
makeEntity('b', 'fs'),
makeEntity('c', 'db'),
]);
dbStore.readMany.mockResolvedValue(
new Map([
['a', makeBundle('a')],
['c', makeBundle('c')],
]),
);
fsStore.readMany.mockResolvedValue(new Map([['b', makeBundle('b')]]));
const result = await executionPersistence.findMultipleExecutions({}, { includeData: true });
expect(dbStore.readMany).toHaveBeenCalledWith([
{ workflowId: wf, executionId: 'a' },
{ workflowId: wf, executionId: 'c' },
]);
expect(fsStore.readMany).toHaveBeenCalledWith([{ workflowId: wf, executionId: 'b' }]);
expect(result.map((e) => e.id)).toEqual(['a', 'b', 'c']);
});
it('should report missing bundles from both stores and drop them from the result', async () => {
const executionPersistence = createPersistenceService('db');
const dbA = makeEntity('a', 'db');
const dbB = makeEntity('b', 'db'); // missing
const fsC = makeEntity('c', 'fs'); // missing
executionRepository.find.mockResolvedValue([dbA, dbB, fsC]);
dbStore.readMany.mockResolvedValue(new Map([['a', makeBundle('a')]]));
fsStore.readMany.mockResolvedValue(new Map());
const result = await executionPersistence.findMultipleExecutions({}, { includeData: true });
expect(executionRepository.reportInvalidExecutions).toHaveBeenCalledWith([dbB, fsC]);
expect(result.map((e) => e.id)).toEqual(['a']);
});
it('should add metadata relation (not executionData) when none was supplied', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ where: { workflowId: wf }, take: 5 },
{ includeData: true },
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.relations).toEqual(['metadata']);
expect(findArg?.where).toEqual({ workflowId: wf });
expect(findArg?.take).toBe(5);
});
it('should append metadata to a caller-provided array of relations', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ relations: ['annotation'] },
{ includeData: true },
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.relations).toEqual(['annotation', 'metadata']);
});
it('should add metadata to a caller-provided object of relations', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ relations: { annotation: true } },
{ includeData: true },
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.relations).toEqual({ annotation: true, metadata: true });
});
it('should not duplicate metadata when the caller already requested it', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ relations: ['metadata'] },
{ includeData: true },
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.relations).toEqual(['metadata']);
});
it('should force-select routing fields when a caller-provided array `select` omits them', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ select: ['id', 'mode'] },
{ includeData: true },
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.select).toEqual(['id', 'mode', 'workflowId', 'storedAt']);
});
it('should force-select routing fields when a caller-provided object `select` omits them', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ select: { mode: true } },
{ includeData: true },
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.select).toEqual({ mode: true, id: true, workflowId: true, storedAt: true });
});
it('should not touch `select` when the caller did not narrow columns', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
await executionPersistence.findMultipleExecutions(
{ where: { workflowId: wf } },
{
includeData: true,
},
);
const findArg = executionRepository.find.mock.calls[0][0];
expect(findArg?.select).toBeUndefined();
});
it('should report a successful execution whose data is an empty stringified array', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([makeEntity('a', 'db')]);
// Distinct snapshot id proves the report uses the bundle's workflow id, not the entity's.
const bundle = {
...makeBundle('a'),
data: '[]',
workflowData: { ...makeBundle('a').workflowData, id: 'wf-from-snapshot' },
};
dbStore.readMany.mockResolvedValue(new Map([['a', bundle]]));
await executionPersistence.findMultipleExecutions({}, { includeData: true });
expect(errorReporter.error).toHaveBeenCalledWith(
'Found successful execution where data is empty stringified array',
{ extra: { executionId: 'a', workflowId: 'wf-from-snapshot' } },
);
});
it('should return an empty array when no entities match', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.find.mockResolvedValue([]);
const result = await executionPersistence.findMultipleExecutions({}, { includeData: true });
expect(result).toEqual([]);
expect(dbStore.readMany).not.toHaveBeenCalled();
expect(fsStore.readMany).not.toHaveBeenCalled();
});
});
describe('hardDelete', () => {
const executionPersistence = createPersistenceService('db');
const baseTarget = { workflowId: 'wf-1', executionId: 'exec-1' };

View File

@ -93,6 +93,35 @@ describe('read', () => {
});
});
describe('readMany', () => {
it('should return a map of bundles keyed by executionId, omitting missing ones', async () => {
const [a, b] = await Promise.all([createExecution(), createExecution()]);
const refA = createExecutionRef(workflowId, a.id);
const refB = createExecutionRef(workflowId, b.id);
await dbStore.write(refA, payload);
const bundles = await dbStore.readMany([refA, refB]);
expect(bundles.size).toBe(1);
expect(bundles.get(a.id)).toMatchObject({ ...payload, version: 1 });
expect(bundles.has(b.id)).toBe(false);
});
it('should return an empty map for an empty array', async () => {
const bundles = await dbStore.readMany([]);
expect(bundles.size).toBe(0);
});
it('should batch the IN-clause so a large id set stays within the DB parameter limit', async () => {
const refs = Array.from({ length: 2500 }, (_, i) =>
createExecutionRef(workflowId, String(100000 + i)),
);
await expect(dbStore.readMany(refs)).resolves.toEqual(new Map());
});
});
describe('delete', () => {
it('should delete data for single execution', async () => {
const execution = await createExecution();

View File

@ -1,3 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { mockInstance } from '@n8n/backend-test-utils';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
@ -19,15 +21,17 @@ jest.unmock('node:fs/promises');
let fsStore: FsStore;
let storagePath: string;
let errorReporter: ErrorReporter;
beforeAll(async () => {
storagePath = await mkdtemp(join(tmpdir(), 'n8n-fs-store-test-'));
mockInstance(StorageConfig, { storagePath });
mockInstance(ErrorReporter);
errorReporter = mockInstance(ErrorReporter);
fsStore = Container.get(FsStore);
});
beforeEach(async () => {
jest.mocked(errorReporter.error).mockClear();
const workflowsDir = join(storagePath, 'workflows');
await rm(workflowsDir, { recursive: true, force: true }).catch(() => {});
});
@ -147,6 +151,66 @@ describe('read', () => {
});
});
describe('readMany', () => {
it('should return a map of bundles keyed by executionId, omitting missing ones', async () => {
const present = createExecutionRef(workflowId, 'exec-1');
const missing = createExecutionRef(workflowId, 'exec-2');
await fsStore.write(present, payload);
const bundles = await fsStore.readMany([present, missing]);
expect(bundles.size).toBe(1);
expect(bundles.get('exec-1')).toMatchObject({ ...payload, version: 1 });
expect(bundles.has('exec-2')).toBe(false);
});
it('should report and drop a corrupted bundle instead of rejecting the whole read', async () => {
const good = createExecutionRef(workflowId, 'good');
const bad = createExecutionRef(workflowId, 'bad');
await fsStore.write(good, payload);
const badPath = join(
storagePath,
'workflows',
workflowId,
'executions',
'bad',
'execution_data',
EXECUTION_DATA_BUNDLE_FILENAME,
);
await fs.mkdir(
join(storagePath, 'workflows', workflowId, 'executions', 'bad', 'execution_data'),
{
recursive: true,
},
);
await fs.writeFile(badPath, 'invalid json{{{', 'utf-8');
const bundles = await fsStore.readMany([good, bad]);
expect(bundles.has('good')).toBe(true);
expect(bundles.has('bad')).toBe(false);
expect(errorReporter.error).toHaveBeenCalledWith(expect.any(CorruptedExecutionDataError));
});
it('should rethrow a systemic read error instead of swallowing it', async () => {
const target = createExecutionRef(workflowId, 'exec-1');
await fsStore.write(target, payload);
const eacces = Object.assign(new Error('EACCES: permission denied'), { code: 'EACCES' });
jest.spyOn(fs, 'readFile').mockRejectedValueOnce(eacces);
await expect(fsStore.readMany([target])).rejects.toBe(eacces);
expect(errorReporter.error).not.toHaveBeenCalled();
});
it('should return an empty map for an empty array', async () => {
const bundles = await fsStore.readMany([]);
expect(bundles.size).toBe(0);
});
});
describe('delete', () => {
it('should delete execution directory', async () => {
await fsStore.write(ref, payload);

View File

@ -1,6 +1,7 @@
import { ExecutionData, ExecutionDataRepository } from '@n8n/db';
import { ExecutionData, ExecutionDataRepository, In } from '@n8n/db';
import type { EntityManager } from '@n8n/db';
import { Service } from '@n8n/di';
import chunk from 'lodash/chunk';
import { EXECUTION_DATA_BUNDLE_VERSION } from './constants';
import type {
@ -10,6 +11,9 @@ import type {
ExecutionDataBundle,
} from './types';
// Max number of ids per IN-clause. Conservative, as some databases cap near 1000.
const MAX_READ_BATCH_SIZE = 900;
@Service()
export class DbStore implements ExecutionDataStore {
constructor(private readonly repository: ExecutionDataRepository) {}
@ -34,6 +38,33 @@ export class DbStore implements ExecutionDataStore {
return { ...result, version: EXECUTION_DATA_BUNDLE_VERSION };
}
async readMany(refs: ExecutionRef[]) {
const bundles = new Map<string, ExecutionDataBundle>();
if (refs.length === 0) return bundles;
const ids = refs.map((r) => r.executionId);
// Batch the IN-clause so an unbounded set of ids cannot exceed the DB's
// limit on bound parameters (SQLite caps near 1000).
for (const batch of chunk(ids, MAX_READ_BATCH_SIZE)) {
const rows = await this.repository.find({
where: { executionId: In(batch) },
select: ['executionId', 'data', 'workflowData', 'workflowVersionId'],
});
for (const row of rows) {
bundles.set(row.executionId, {
data: row.data,
workflowData: row.workflowData,
workflowVersionId: row.workflowVersionId,
version: EXECUTION_DATA_BUNDLE_VERSION,
});
}
}
return bundles;
}
async delete(ref: ExecutionRef | ExecutionRef[]) {
const ids = (Array.isArray(ref) ? ref : [ref]).map((r) => r.executionId);

View File

@ -1,5 +1,6 @@
import { assertDir } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import chunk from 'lodash/chunk';
import { ErrorReporter, StorageConfig } from 'n8n-core';
import { jsonParse, jsonStringify } from 'n8n-workflow';
import fs from 'node:fs/promises';
@ -15,6 +16,9 @@ import type {
ExecutionDataBundle,
} from './types';
// Max number of bundles read concurrently, to bound open file descriptors.
const MAX_READ_CONCURRENCY = 50;
@Service()
export class FsStore implements ExecutionDataStore {
constructor(
@ -69,6 +73,22 @@ export class FsStore implements ExecutionDataStore {
}
}
async readMany(refs: ExecutionRef[]) {
const bundles = new Map<string, ExecutionDataBundle>();
if (refs.length === 0) return bundles;
// Read in chunks to cap concurrent file descriptors.
for (const batch of chunk(refs, MAX_READ_CONCURRENCY)) {
const bundlesInBatch = await Promise.all(batch.map(async (ref) => await this.tryRead(ref)));
for (const [idx, bundle] of bundlesInBatch.entries()) {
if (bundle) bundles.set(batch[idx].executionId, bundle);
}
}
return bundles;
}
async delete(ref: ExecutionRef | ExecutionRef[]) {
const refs = Array.isArray(ref) ? ref : [ref];
@ -102,4 +122,22 @@ export class FsStore implements ExecutionDataStore {
error !== null && typeof error === 'object' && 'code' in error && error.code === 'ENOENT'
);
}
/**
* Read a single bundle, tolerating per-record faults so they cannot sink a whole
* {@link readMany} batch. A missing bundle returns `null` ({@link read} already maps ENOENT to
* `null`); a corrupted (non-parseable) bundle is reported and dropped. Systemic failures
* (permission denied, disk read error, broken mount) are rethrown so we don't mask them.
*/
private async tryRead(ref: ExecutionRef): Promise<ExecutionDataBundle | null> {
try {
return await this.read(ref);
} catch (error) {
if (error instanceof CorruptedExecutionDataError) {
this.errorReporter.error(error);
return null;
}
throw error;
}
}
}

View File

@ -34,5 +34,7 @@ export interface ExecutionDataStore {
init?(): Promise<void>;
write(ref: ExecutionRef, payload: ExecutionDataPayload, tx?: EntityManager): Promise<void>;
read(ref: ExecutionRef, tx?: EntityManager): Promise<ExecutionDataBundle | null>;
/** Read multiple bundles by ref. Returns a map keyed by `executionId`; missing entries are omitted. */
readMany(refs: ExecutionRef[]): Promise<Map<string, ExecutionDataBundle>>;
delete(ref: ExecutionRef | ExecutionRef[]): Promise<void>;
}

View File

@ -1,23 +1,33 @@
import { parseFlatted } from '@n8n/backend-common';
import { DatabaseConfig, ExecutionsConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import type {
CreateExecutionPayload,
ExecutionDataStorageLocation,
ExecutionDeletionCriteria,
FindManyOptions,
FindOptionsWhere,
IExecutionBase,
IExecutionFlattedDb,
IExecutionResponse,
UpdateExecutionConditions,
} from '@n8n/db';
import { ExecutionEntity, ExecutionRepository, Not } from '@n8n/db';
import { Service } from '@n8n/di';
import { stringify } from 'flatted';
import { BinaryDataService, StorageConfig } from 'n8n-core';
import { UnexpectedError } from 'n8n-workflow';
import { BinaryDataService, ErrorReporter, StorageConfig } from 'n8n-core';
import type { IRunExecutionData, IRunExecutionDataAll } from 'n8n-workflow';
import { migrateRunExecutionData, UnexpectedError } from 'n8n-workflow';
import { DbStore } from './execution-data/db-store';
import { FsStore } from './execution-data/fs-store';
import { MissingExecutionDataError } from './execution-data/missing-execution-data.error';
import type { ExecutionDataStore, ExecutionRef, WorkflowSnapshot } from './execution-data/types';
import type {
ExecutionDataBundle,
ExecutionDataStore,
ExecutionRef,
WorkflowSnapshot,
} from './execution-data/types';
import { DuplicateExecutionError } from '../errors/duplicate-execution.error';
type DeletionTarget = ExecutionRef & { storedAt: ExecutionDataStorageLocation };
@ -48,6 +58,7 @@ export class ExecutionPersistence {
private readonly storageConfig: StorageConfig,
private readonly executionsConfig: ExecutionsConfig,
private readonly databaseConfig: DatabaseConfig,
private readonly errorReporter: ErrorReporter,
) {}
/**
@ -112,6 +123,187 @@ export class ExecutionPersistence {
return await this.applyDataUpdate(ref, store, execution, conditions);
}
/**
* Find a single execution by id, dispatching data reads to the store matching its `storedAt`.
* - In `db` mode, we load entity, metadata, optional annotation, and data via `DbStore`.
* - In `fs` mode, we load entity, metadata, optional annotation from the DB, and data via `FsStore`.
*
* A missing data bundle is handled differently per store. In `db` mode the entity and its data
* share one database, so an absent data row means a known-corrupt record we report and skip
* (soft). In `fs` mode the entity lives in the DB while its data lives on disk, so a missing
* file points at an out-of-band loss (deletion, unmounted volume) that a single-execution read
* should surface loudly rather than silently swallow (hard).
*/
async findSingleExecution(
id: string,
options?: {
includeData: true;
includeAnnotation?: boolean;
unflattenData: true;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionResponse | undefined>;
async findSingleExecution(
id: string,
options?: {
includeData: true;
includeAnnotation?: boolean;
unflattenData?: false | undefined;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionFlattedDb | undefined>;
async findSingleExecution(
id: string,
options?: {
includeData?: boolean;
includeAnnotation?: boolean;
unflattenData?: boolean;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionBase | undefined>;
async findSingleExecution(
id: string,
options?: {
includeData?: boolean;
includeAnnotation?: boolean;
unflattenData?: boolean;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionFlattedDb | IExecutionResponse | IExecutionBase | undefined> {
if (!options?.includeData) {
return await this.executionRepository.findSingleExecution(id, options);
}
const entity = await this.executionRepository.findOne({
where: { id, ...options.where },
relations: {
metadata: true,
...(options.includeAnnotation ? { annotation: { tags: true } } : {}),
},
});
if (!entity) return undefined;
const store = this.getStoreFor(entity.storedAt);
const bundle = await store.read({ workflowId: entity.workflowId, executionId: entity.id });
if (!bundle) {
if (entity.storedAt === 'db') {
this.executionRepository.reportInvalidExecutions([entity]);
return undefined;
}
throw new MissingExecutionDataError({
workflowId: entity.workflowId,
executionId: entity.id,
});
}
return (await this.assembleExecution(entity, bundle, options)) as
| IExecutionFlattedDb
| IExecutionResponse
| IExecutionBase;
}
/**
* Find multiple executions matching `queryParams`. With `includeData: true`, partitions
* entities by `storedAt` and batch-fetches bundles from each store to avoid n+1 reads.
* - In `db` mode, we issue one `In(ids)` query against `execution_data` per batch.
* - In `fs` mode, we fan out reads across the filesystem.
*/
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData: true;
includeData?: true;
},
): Promise<IExecutionResponse[]>;
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData?: false | undefined;
includeData?: true;
},
): Promise<IExecutionFlattedDb[]>;
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData?: boolean;
includeData?: boolean;
},
): Promise<IExecutionBase[]>;
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData?: boolean;
includeData?: boolean;
},
): Promise<IExecutionFlattedDb[] | IExecutionResponse[] | IExecutionBase[]> {
if (!options?.includeData) {
return await this.executionRepository.findMultipleExecutions(queryParams, options);
}
queryParams.relations ??= [];
if (Array.isArray(queryParams.relations)) {
if (!queryParams.relations.includes('metadata')) queryParams.relations.push('metadata');
} else {
queryParams.relations.metadata = true;
}
// A narrowing `select` must still include the fields we route and read by: `storedAt` (else
// every execution defaults to the fs store) and `id`/`workflowId` (else no bundle resolves).
// An undefined `select` loads all columns, so no action needed.
if (queryParams.select) {
if (Array.isArray(queryParams.select)) {
for (const field of ['id', 'workflowId', 'storedAt'] as const) {
if (!queryParams.select.includes(field)) queryParams.select.push(field);
}
} else {
queryParams.select.id = true;
queryParams.select.workflowId = true;
queryParams.select.storedAt = true;
}
}
const entities = await this.executionRepository.find(queryParams);
if (entities.length === 0) return [];
// Group by storage location and batch-fetch each group from its store.
const entitiesByLocation = new Map<ExecutionDataStorageLocation, ExecutionEntity[]>();
for (const entity of entities) {
const group = entitiesByLocation.get(entity.storedAt) ?? [];
group.push(entity);
entitiesByLocation.set(entity.storedAt, group);
}
const bundlesById = new Map<string, ExecutionDataBundle>();
await Promise.all(
[...entitiesByLocation].map(async ([location, group]) => {
const refs = group.map((e) => ({ workflowId: e.workflowId, executionId: e.id }));
const bundles = await this.getStoreFor(location).readMany(refs);
for (const [id, bundle] of bundles) bundlesById.set(id, bundle);
}),
);
// Report invalid entities when they are found to be missing from the stores.
const invalidEntities = entities.filter((e) => !bundlesById.has(e.id));
if (invalidEntities.length > 0) {
this.executionRepository.reportInvalidExecutions(invalidEntities);
}
const assembled = await Promise.all(
entities.map(async (entity) => {
const bundle = bundlesById.get(entity.id);
if (!bundle) return null;
return await this.assembleExecution(entity, bundle, options);
}),
);
return assembled.filter((e): e is NonNullable<typeof e> => e !== null) as
| IExecutionFlattedDb[]
| IExecutionResponse[]
| IExecutionBase[];
}
/**
* Delete an in-flight execution that is not meant to be saved.
*
@ -279,6 +471,54 @@ export class ExecutionPersistence {
return { id, name, nodes, connections, settings };
}
private async assembleExecution(
entity: ExecutionEntity,
bundle: ExecutionDataBundle,
options: { unflattenData?: boolean; includeAnnotation?: boolean },
) {
const { metadata, annotation, ...rest } = entity;
const data = await this.parseExecutionData(bundle.data, options);
const serializedAnnotation = this.serializeAnnotation(annotation);
if (entity.status === 'success' && bundle.data === '[]') {
this.errorReporter.error('Found successful execution where data is empty stringified array', {
extra: { executionId: entity.id, workflowId: bundle.workflowData.id },
});
}
return {
...rest,
data,
workflowData: bundle.workflowData,
workflowVersionId: bundle.workflowVersionId ?? null,
customData: Object.fromEntries(metadata.map((m) => [m.key, m.value])),
...(options.includeAnnotation && serializedAnnotation
? { annotation: serializedAnnotation }
: {}),
};
}
private async parseExecutionData(
data: string,
options: { unflattenData?: boolean },
): Promise<IRunExecutionData | string | undefined> {
if (!options.unflattenData) return data;
const deserialized: unknown = await parseFlatted(data);
if (!deserialized) return undefined;
return migrateRunExecutionData(deserialized as IRunExecutionDataAll);
}
private serializeAnnotation(annotation: ExecutionEntity['annotation']) {
if (!annotation) return null;
const { id, vote, tags } = annotation;
return {
id,
vote,
tags: tags?.map(({ id, name }) => ({ id, name })) ?? [],
};
}
/**
* Detect whether the DB rejected the insert because of the unique index on
* `execution_entity.deduplicationKey`. We expect TypeORM to surface the