refactor(core): Add update method to ExecutionPersistence (no-changelog) (#30447)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mike Repeć 2026-05-26 10:22:49 +02:00 committed by GitHub
parent 82dd59f341
commit b75c72850d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 751 additions and 50 deletions

View File

@ -1,7 +1,12 @@
import { mockInstance } from '@n8n/backend-test-utils';
import { GlobalConfig } from '@n8n/config';
import type { WorkflowEntity, User, Project } from '@n8n/db';
import { ExecutionRepository, WorkflowPublishHistoryRepository, WorkflowRepository } from '@n8n/db';
import {
ExecutionRepository,
ExecutionDataRepository,
WorkflowPublishHistoryRepository,
WorkflowRepository,
} from '@n8n/db';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { ExternalSecretsProxy } from 'n8n-core';
@ -108,6 +113,7 @@ describe('WorkflowExecuteAdditionalData', () => {
Container.set(CredentialsHelper, credentialsHelper);
Container.set(ExternalSecretsProxy, externalSecretsProxy);
const executionRepository = mockInstance(ExecutionRepository);
mockInstance(ExecutionDataRepository);
mockInstance(Telemetry);
const workflowRepository = mockInstance(WorkflowRepository);
const activeExecutions = mockInstance(ActiveExecutions);

View File

@ -3,7 +3,6 @@
import type { DatabaseConfig, ExecutionsConfig } from '@n8n/config';
import {
ExecutionData,
ExecutionEntity,
type CreateExecutionPayload,
type EntityManager,
@ -16,13 +15,16 @@ import type { IWorkflowBase } from 'n8n-workflow';
import { createEmptyRunExecutionData } from 'n8n-workflow';
import { DuplicateExecutionError } from '@/errors/duplicate-execution.error';
import type { DbStore } from '@/executions/execution-data/db-store';
import type { FsStore } from '@/executions/execution-data/fs-store';
import { MissingExecutionDataError } from '@/executions/execution-data/missing-execution-data.error';
import { ExecutionPersistence } from '@/executions/execution-persistence';
describe('ExecutionPersistence', () => {
const executionRepository = mock<ExecutionRepository>();
const binaryDataService = mock<BinaryDataService>();
const fsStore = mock<FsStore>();
const dbStore = mock<DbStore>();
const executionsConfig = mock<ExecutionsConfig>({
pruneData: true,
pruneDataHardDeleteBuffer: 1,
@ -64,6 +66,7 @@ describe('ExecutionPersistence', () => {
executionRepository,
binaryDataService,
fsStore,
dbStore,
mock<StorageConfig>({ modeTag }),
executionsConfig,
mock<DatabaseConfig>({ type: dbType }),
@ -82,7 +85,7 @@ describe('ExecutionPersistence', () => {
describe('database mode', () => {
const executionPersistence = createPersistenceService('db');
it('should create execution with `storedAt: db` and insert data via transaction', async () => {
it('should create execution with `storedAt: db` and write data via dbStore in the transaction', async () => {
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
@ -100,14 +103,15 @@ describe('ExecutionPersistence', () => {
createdAt: expect.any(Date) as Date,
}),
);
expect(mockTx.insert).toHaveBeenCalledWith(
ExecutionData,
expect(mockTx.insert).toHaveBeenCalledTimes(1);
expect(dbStore.write).toHaveBeenCalledWith(
{ workflowId: 'workflow-123', executionId: 'exec-1' },
expect.objectContaining({
executionId: 'exec-1',
data: expect.any(String) as string,
workflowVersionId: 'version-abc',
}),
mockTx,
);
expect(mockTx.insert).toHaveBeenCalledTimes(2);
expect(fsStore.write).not.toHaveBeenCalled();
});
});
@ -147,6 +151,7 @@ describe('ExecutionPersistence', () => {
workflowData: expectedWorkflowSnapshot,
workflowVersionId: 'version-abc',
}),
mockTx,
);
});
@ -289,6 +294,500 @@ describe('ExecutionPersistence', () => {
});
});
describe('updateExistingExecution', () => {
const executionId = 'exec-1';
const workflowId = 'wf-1';
beforeEach(() => {
fsStore.write.mockReset();
fsStore.read.mockReset();
dbStore.write.mockReset();
dbStore.read.mockReset();
executionRepository.findOne.mockReset();
executionRepository.update.mockReset();
});
const existingBundle = {
data: '[{"resultData":"1"},{}]',
workflowData: {
id: workflowId,
name: 'snapshot',
nodes: [],
connections: {},
settings: undefined,
},
workflowVersionId: 'v-original',
version: 1 as const,
};
const mockEntity = (storedAt: 'db' | 'fs') => {
executionRepository.findOne.mockResolvedValue({
id: executionId,
workflowId,
storedAt,
} as unknown as Awaited<ReturnType<ExecutionRepository['findOne']>>);
};
describe('metadata-only updates', () => {
it('should update the entity directly without touching either data store', async () => {
const executionPersistence = createPersistenceService('fs');
executionRepository.update.mockResolvedValue({
affected: 1,
generatedMaps: [],
raw: {},
});
const result = await executionPersistence.updateExistingExecution(executionId, {
retrySuccessId: 'retry-1',
});
expect(result).toBe(true);
expect(executionRepository.update).toHaveBeenCalledWith(
{ id: executionId },
{ retrySuccessId: 'retry-1' },
);
expect(executionRepository.findOne).not.toHaveBeenCalled();
expect(fsStore.write).not.toHaveBeenCalled();
expect(fsStore.read).not.toHaveBeenCalled();
expect(dbStore.write).not.toHaveBeenCalled();
expect(dbStore.read).not.toHaveBeenCalled();
});
it('should apply conditions to the where clause and return false when no rows match', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.update.mockResolvedValue({
affected: 0,
generatedMaps: [],
raw: {},
});
const result = await executionPersistence.updateExistingExecution(
executionId,
{ status: 'success' },
{ requireNotCanceled: true },
);
expect(result).toBe(false);
expect(executionRepository.update).toHaveBeenCalledWith(
expect.objectContaining({ id: executionId, status: expect.anything() as unknown }),
{ status: 'success' },
);
});
it('should return true when no entity fields are present after stripping immutables', async () => {
const executionPersistence = createPersistenceService('db');
const result = await executionPersistence.updateExistingExecution(executionId, {
id: executionId,
workflowId: 'wf-other',
createdAt: new Date(),
});
expect(result).toBe(true);
expect(executionRepository.update).not.toHaveBeenCalled();
});
});
describe('data updates on db-mode executions', () => {
it('should update entity in a transaction and write a fresh bundle via dbStore', async () => {
const executionPersistence = createPersistenceService('fs'); // current mode is irrelevant for routing
mockEntity('db');
dbStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
const payload = {
data: runData,
workflowData,
status: 'success' as const,
};
const result = await executionPersistence.updateExistingExecution(executionId, payload);
expect(result).toBe(true);
expect(executionRepository.findOne).toHaveBeenCalledWith({
where: { id: executionId },
select: ['id', 'workflowId', 'storedAt'],
});
expect(mockTx.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
{ status: 'success' },
);
expect(dbStore.write).toHaveBeenCalledWith(
{ workflowId, executionId },
expect.objectContaining({
data: expect.any(String) as string,
workflowData: {
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
settings: workflowData.settings,
},
workflowVersionId: 'v-original',
}),
mockTx,
);
expect(fsStore.write).not.toHaveBeenCalled();
});
it('should preserve fields not supplied in a partial payload', async () => {
const executionPersistence = createPersistenceService('db');
mockEntity('db');
dbStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await executionPersistence.updateExistingExecution(executionId, { data: runData });
expect(dbStore.write).toHaveBeenCalledWith(
{ workflowId, executionId },
expect.objectContaining({
workflowData: existingBundle.workflowData,
workflowVersionId: existingBundle.workflowVersionId,
}),
mockTx,
);
});
it('should apply requireStatus condition and skip the db write when no rows match', async () => {
const executionPersistence = createPersistenceService('db');
mockEntity('db');
const mockTx = mock<EntityManager>();
mockTx.update.mockResolvedValue({ affected: 0, generatedMaps: [], raw: {} });
executionRepository.manager.transaction = createMockTx(mockTx);
const result = await executionPersistence.updateExistingExecution(
executionId,
{ data: runData, status: 'success' },
{ requireStatus: 'waiting' },
);
expect(result).toBe(false);
expect(dbStore.read).not.toHaveBeenCalled();
expect(dbStore.write).not.toHaveBeenCalled();
});
it('should throw MissingExecutionDataError when the db row is missing', async () => {
const executionPersistence = createPersistenceService('db');
mockEntity('db');
dbStore.read.mockResolvedValue(null);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await expect(
executionPersistence.updateExistingExecution(executionId, { data: runData }),
).rejects.toBeInstanceOf(MissingExecutionDataError);
expect(dbStore.write).not.toHaveBeenCalled();
});
it('should return false when the execution does not exist', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.findOne.mockResolvedValue(null);
const result = await executionPersistence.updateExistingExecution(executionId, {
data: runData,
});
expect(result).toBe(false);
expect(dbStore.read).not.toHaveBeenCalled();
expect(dbStore.write).not.toHaveBeenCalled();
expect(fsStore.read).not.toHaveBeenCalled();
expect(fsStore.write).not.toHaveBeenCalled();
});
it('should apply conditions to the outer lookup to fail fast', async () => {
const executionPersistence = createPersistenceService('db');
executionRepository.findOne.mockResolvedValue(null);
const result = await executionPersistence.updateExistingExecution(
executionId,
{ data: runData },
{ requireStatus: 'waiting' },
);
expect(result).toBe(false);
expect(executionRepository.findOne).toHaveBeenCalledWith({
where: { id: executionId, status: 'waiting' },
select: ['id', 'workflowId', 'storedAt'],
});
expect(executionRepository.manager.transaction).not.toHaveBeenCalled();
});
});
describe('data updates on fs-mode executions', () => {
it('should update entity in a transaction and write a fresh bundle to fs', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
const payload = {
data: runData,
workflowData,
status: 'success' as const,
};
const result = await executionPersistence.updateExistingExecution(executionId, payload);
expect(result).toBe(true);
expect(mockTx.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
{ status: 'success' },
);
expect(fsStore.write).toHaveBeenCalledWith(
{ workflowId, executionId },
expect.objectContaining({
data: expect.any(String) as string,
workflowData: {
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
settings: workflowData.settings,
},
workflowVersionId: 'v-original',
}),
mockTx,
);
});
it('should preserve fields not supplied in a partial payload', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await executionPersistence.updateExistingExecution(executionId, { data: runData });
expect(fsStore.write).toHaveBeenCalledWith(
{ workflowId, executionId },
expect.objectContaining({
workflowData: existingBundle.workflowData,
workflowVersionId: existingBundle.workflowVersionId,
}),
mockTx,
);
});
it('should apply requireStatus condition and skip the fs write when no rows match', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
const mockTx = mock<EntityManager>();
mockTx.update.mockResolvedValue({ affected: 0, generatedMaps: [], raw: {} });
executionRepository.manager.transaction = createMockTx(mockTx);
const result = await executionPersistence.updateExistingExecution(
executionId,
{ data: runData, status: 'success' },
{ requireStatus: 'waiting' },
);
expect(result).toBe(false);
expect(mockTx.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId, status: 'waiting' },
{ status: 'success' },
);
expect(fsStore.read).not.toHaveBeenCalled();
expect(fsStore.write).not.toHaveBeenCalled();
});
it('should still write the bundle when the payload contains no entity fields', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
const result = await executionPersistence.updateExistingExecution(executionId, {
data: runData,
});
expect(result).toBe(true);
expect(mockTx.update).not.toHaveBeenCalled();
expect(fsStore.write).toHaveBeenCalled();
});
it('should skip the fs write on a data-only update when conditions do not match', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
const mockTx = createMockTransaction();
mockTx.count.mockResolvedValue(0);
executionRepository.manager.transaction = createMockTx(mockTx);
const result = await executionPersistence.updateExistingExecution(
executionId,
{ data: runData },
{ requireNotFinished: true },
);
expect(result).toBe(false);
expect(mockTx.count).toHaveBeenCalledWith(ExecutionEntity, {
where: { id: executionId, finished: false },
});
expect(mockTx.update).not.toHaveBeenCalled();
expect(fsStore.read).not.toHaveBeenCalled();
expect(fsStore.write).not.toHaveBeenCalled();
});
it('should perform the fs write on a data-only update when conditions match', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
mockTx.count.mockResolvedValue(1);
executionRepository.manager.transaction = createMockTx(mockTx);
const result = await executionPersistence.updateExistingExecution(
executionId,
{ data: runData },
{ requireNotFinished: true },
);
expect(result).toBe(true);
expect(mockTx.count).toHaveBeenCalled();
expect(fsStore.write).toHaveBeenCalled();
});
it('should roll the transaction back if the fs write fails', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const writeError = new Error('disk full');
fsStore.write.mockRejectedValue(writeError);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await expect(
executionPersistence.updateExistingExecution(executionId, {
data: runData,
status: 'success',
}),
).rejects.toBe(writeError);
});
it('should throw MissingExecutionDataError when the fs bundle is missing', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(null);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await expect(
executionPersistence.updateExistingExecution(executionId, { data: runData }),
).rejects.toBeInstanceOf(MissingExecutionDataError);
expect(fsStore.write).not.toHaveBeenCalled();
});
it('should apply requireNotFinished condition', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await executionPersistence.updateExistingExecution(
executionId,
{ data: runData, status: 'success' },
{ requireNotFinished: true },
);
expect(mockTx.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId, finished: false },
{ status: 'success' },
);
});
it('should treat undefined `affected` from the driver as zero rows updated', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
const mockTx = mock<EntityManager>();
mockTx.update.mockResolvedValue({ affected: undefined, generatedMaps: [], raw: {} });
executionRepository.manager.transaction = createMockTx(mockTx);
const result = await executionPersistence.updateExistingExecution(
executionId,
{ data: runData, status: 'success' },
{ requireStatus: 'waiting' },
);
expect(result).toBe(false);
expect(fsStore.read).not.toHaveBeenCalled();
expect(fsStore.write).not.toHaveBeenCalled();
});
it('should apply requireNotCanceled condition', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await executionPersistence.updateExistingExecution(
executionId,
{ data: runData, status: 'running' },
{ requireNotCanceled: true },
);
expect(mockTx.update).toHaveBeenCalledWith(
ExecutionEntity,
expect.objectContaining({ id: executionId, status: expect.anything() as unknown }),
{ status: 'running' },
);
});
it('should strip immutable fields before updating the entity', async () => {
const executionPersistence = createPersistenceService('fs');
mockEntity('fs');
fsStore.read.mockResolvedValue(existingBundle);
const mockTx = createMockTransaction();
executionRepository.manager.transaction = createMockTx(mockTx);
await executionPersistence.updateExistingExecution(executionId, {
id: executionId,
data: runData,
workflowId: 'other-wf',
workflowVersionId: 'v-new',
createdAt: new Date(),
startedAt: new Date(),
customData: { foo: 'bar' },
status: 'success',
});
expect(mockTx.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
{ status: 'success' },
);
});
});
});
describe('hardDelete', () => {
const executionPersistence = createPersistenceService('db');
const baseTarget = { workflowId: 'wf-1', executionId: 'exec-1' };

View File

@ -1,4 +1,5 @@
import { ExecutionDataRepository } from '@n8n/db';
import { ExecutionData, ExecutionDataRepository } from '@n8n/db';
import type { EntityManager } from '@n8n/db';
import { Service } from '@n8n/di';
import { EXECUTION_DATA_BUNDLE_VERSION } from './constants';
@ -13,12 +14,17 @@ import type {
export class DbStore implements ExecutionDataStore {
constructor(private readonly repository: ExecutionDataRepository) {}
async write({ executionId }: ExecutionRef, payload: ExecutionDataPayload) {
await this.repository.upsert({ ...payload, executionId }, ['executionId']);
async write({ executionId }: ExecutionRef, payload: ExecutionDataPayload, tx?: EntityManager) {
const repo = this.getRepository(tx);
await repo.upsert({ ...payload, executionId }, ['executionId']);
}
async read({ executionId }: ExecutionRef): Promise<ExecutionDataBundle | null> {
const result = await this.repository.findOne({
async read(
{ executionId }: ExecutionRef,
tx?: EntityManager,
): Promise<ExecutionDataBundle | null> {
const repo = this.getRepository(tx);
const result = await repo.findOne({
where: { executionId },
select: ['data', 'workflowData', 'workflowVersionId'],
});
@ -33,4 +39,8 @@ export class DbStore implements ExecutionDataStore {
await this.repository.deleteMany(ids);
}
private getRepository(tx?: EntityManager) {
return tx ? tx.getRepository(ExecutionData) : this.repository;
}
}

View File

@ -0,0 +1,14 @@
import { UnexpectedError } from 'n8n-workflow';
import type { ExecutionRef } from './types';
/**
* Thrown when an update targets an execution whose data bundle is
* missing from its backing store the entity exists, but the data
* needed to merge a partial update is gone.
*/
export class MissingExecutionDataError extends UnexpectedError {
constructor(ref: ExecutionRef) {
super('Missing execution data', { extra: { ...ref } });
}
}

View File

@ -1,3 +1,4 @@
import type { EntityManager } from '@n8n/db';
import type { IWorkflowBase } from 'n8n-workflow';
export type ExecutionRef = {
@ -24,9 +25,14 @@ export type ExecutionDataBundle = ExecutionDataPayload & {
version: 1;
};
/**
* Persistence operations for execution data bundles. Methods which accept an
* optional `tx` (`EntityManager`) do so for transactional participation:
* `DbStore` uses it; `FsStore` ignores it (the filesystem is not transactional).
*/
export interface ExecutionDataStore {
init?(): Promise<void>;
write(ref: ExecutionRef, payload: ExecutionDataPayload): Promise<void>;
read(ref: ExecutionRef): Promise<ExecutionDataBundle | null>;
write(ref: ExecutionRef, payload: ExecutionDataPayload, tx?: EntityManager): Promise<void>;
read(ref: ExecutionRef, tx?: EntityManager): Promise<ExecutionDataBundle | null>;
delete(ref: ExecutionRef | ExecutionRef[]): Promise<void>;
}

View File

@ -4,18 +4,35 @@ import type {
CreateExecutionPayload,
ExecutionDataStorageLocation,
ExecutionDeletionCriteria,
FindOptionsWhere,
IExecutionResponse,
UpdateExecutionConditions,
} from '@n8n/db';
import { ExecutionData, ExecutionEntity, ExecutionRepository } from '@n8n/db';
import { ExecutionEntity, ExecutionRepository, Not } from '@n8n/db';
import { Service } from '@n8n/di';
import { stringify } from 'flatted';
import { BinaryDataService, StorageConfig } from 'n8n-core';
import { DbStore } from './execution-data/db-store';
import { FsStore } from './execution-data/fs-store';
import type { ExecutionRef, WorkflowSnapshot } from './execution-data/types';
import { MissingExecutionDataError } from './execution-data/missing-execution-data.error';
import type { ExecutionDataStore, ExecutionRef, WorkflowSnapshot } from './execution-data/types';
import { DuplicateExecutionError } from '../errors/duplicate-execution.error';
type DeletionTarget = ExecutionRef & { storedAt: ExecutionDataStorageLocation };
type UpdatableEntityColumns = Omit<
Partial<IExecutionResponse>,
| 'id'
| 'data'
| 'workflowId'
| 'workflowData'
| 'workflowVersionId'
| 'createdAt'
| 'startedAt'
| 'customData'
>;
/**
* Performs a persistence operation on an execution and its blob of data.
* Writes per the configured storage mode. Reads per the recorded `storedAt` value.
@ -26,6 +43,7 @@ export class ExecutionPersistence {
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
private readonly fsStore: FsStore,
private readonly dbStore: DbStore,
private readonly storageConfig: StorageConfig,
private readonly executionsConfig: ExecutionsConfig,
private readonly databaseConfig: DatabaseConfig,
@ -49,21 +67,11 @@ export class ExecutionPersistence {
return await this.executionRepository.manager.transaction(async (tx) => {
const { identifiers } = await tx.insert(ExecutionEntity, executionEntity);
const executionId = String(identifiers[0].id);
const ref = { workflowId: id, executionId };
const bundle = { data, workflowData: workflowSnapshot, workflowVersionId };
if (storedAt === 'db') {
await tx.insert(ExecutionData, {
executionId,
workflowData: workflowSnapshot,
data,
workflowVersionId,
});
return executionId;
}
await this.getStoreFor(storedAt).write(ref, bundle, tx);
await this.fsStore.write(
{ workflowId: id, executionId },
{ data, workflowData: workflowSnapshot, workflowVersionId },
);
return executionId;
});
} catch (error) {
@ -75,30 +83,32 @@ export class ExecutionPersistence {
}
/**
* Detect whether the DB rejected the insert because of the unique index on
* `execution_entity.deduplicationKey`. We expect TypeORM to surface the
* driver's error code at `error.driverError.code` as a string, with the
* code's exact value depending on the configured DB.
* Update an existing execution and, if the payload includes data fields, its data in the configured storage.
* - In `db` mode, we update both entity and data in the DB in a transaction.
* - In `fs` mode, we update the entity in the DB and write its data to the filesystem in a transaction.
*/
private isDuplicateExecutionError(error: unknown): error is Error {
if (!(error instanceof Error) || !('driverError' in error)) return false;
const { driverError } = error;
if (typeof driverError !== 'object' || driverError === null || !('code' in driverError)) {
return false;
}
const { code } = driverError;
if (typeof code !== 'string') return false;
if (!error.message.includes('deduplicationKey')) return false;
async updateExistingExecution(
executionId: string,
execution: Partial<IExecutionResponse>,
conditions?: UpdateExecutionConditions,
): Promise<boolean> {
const hasDataField = execution.data !== undefined || execution.workflowData !== undefined;
if (this.databaseConfig.type === 'postgresdb') {
return code === '23505';
if (!hasDataField) {
return await this.updateEntityOnly(executionId, execution, conditions);
}
// SQLite reports `SQLITE_CONSTRAINT_UNIQUE` when extended result codes are
// enabled, and falls back to the base `SQLITE_CONSTRAINT` otherwise.
return (
code === 'SQLITE_CONSTRAINT_UNIQUE' ||
(code === 'SQLITE_CONSTRAINT' && error.message.includes('UNIQUE constraint failed'))
);
const entity = await this.executionRepository.findOne({
where: this.buildEntityWhereCondition(executionId, conditions),
select: ['id', 'workflowId', 'storedAt'],
});
if (!entity) return false;
const ref = { workflowId: entity.workflowId, executionId };
const store = this.getStoreFor(entity.storedAt);
return await this.applyDataUpdate(ref, store, execution, conditions);
}
/**
@ -138,4 +148,160 @@ export class ExecutionPersistence {
const fsRefs = refs.filter((r) => r.storedAt === 'fs');
if (fsRefs.length > 0) await this.fsStore.delete(fsRefs);
}
private async updateEntityOnly(
executionId: string,
execution: Partial<IExecutionResponse>,
conditions?: UpdateExecutionConditions,
): Promise<boolean> {
const updatableColumns = this.pickUpdatableEntityColumns(execution);
if (Object.keys(updatableColumns).length === 0) return true;
const whereCondition = this.buildEntityWhereCondition(executionId, conditions);
const result = await this.executionRepository.update(whereCondition, updatableColumns);
return (result.affected ?? 0) > 0;
}
private async applyDataUpdate(
ref: ExecutionRef,
store: ExecutionDataStore,
execution: Partial<IExecutionResponse>,
conditions?: UpdateExecutionConditions,
): Promise<boolean> {
const { data, workflowData } = execution;
const updatableColumns = this.pickUpdatableEntityColumns(execution);
return await this.executionRepository.manager.transaction(async (tx) => {
const whereCondition = this.buildEntityWhereCondition(ref.executionId, conditions);
if (Object.keys(updatableColumns).length > 0) {
const result = await tx.update(ExecutionEntity, whereCondition, updatableColumns);
if ((result.affected ?? 0) === 0) return false;
} else if (conditions) {
// No entity columns to update, but the caller still requested a guarded write.
// Re-verify the conditions inside the transaction so a data-only update can't slip
// past a `requireStatus` / `requireNotFinished` / `requireNotCanceled` check.
// TODO(CAT-3212): In Postgres this COUNT alone does not prevent a concurrent
// transaction from changing the row's status between the check and the data write —
// a row-level lock (e.g. `SELECT ... FOR UPDATE`) is required for true race-safety.
// SQLite is unaffected because `BEGIN` already takes an exclusive write lock.
const matchingRows = await tx.count(ExecutionEntity, { where: whereCondition });
if (matchingRows === 0) return false;
}
// TODO(CAT-3213): callers may supply only `data` or only `workflowData`, so we read
// the existing bundle to merge the unchanged half back in. Most callers in practice
// overwrite both fields, in which case the read is wasted work. Split the API into an
// overwrite path (no read) and an explicit partial-update path.
const existing = await store.read(ref, tx);
if (!existing) throw new MissingExecutionDataError(ref);
await store.write(
ref,
{
data: data !== undefined ? stringify(data) : existing.data,
workflowData: workflowData
? this.toWorkflowSnapshot(workflowData)
: existing.workflowData,
workflowVersionId: existing.workflowVersionId,
},
tx,
);
return true;
});
}
/**
* Narrow an {@link IExecutionResponse} payload to the subset of {@link UpdatableEntityColumns} that
* can be written directly to the `ExecutionEntity` row on update.
*
* Stripped fields fall into three categories:
* - **Identity / routing**: `id`, `workflowId` never updated here.
* - **Stored elsewhere**: `data`, `workflowData` persisted via the
* configured {@link ExecutionDataStore} (DB or filesystem), not as columns
* on the entity row.
* - **Immutable after creation**: `workflowVersionId`, `createdAt`,
* `startedAt` set once at insert time and never overwritten.
* - **Not persisted on the entity**: `customData` handled separately.
*/
private pickUpdatableEntityColumns(
execution: Partial<IExecutionResponse>,
): UpdatableEntityColumns {
const {
id: _id,
data: _data,
workflowId: _workflowId,
workflowData: _workflowData,
workflowVersionId: _workflowVersionId,
createdAt: _createdAt,
startedAt: _startedAt,
customData: _customData,
...updatableColumns
} = execution;
return updatableColumns;
}
private buildEntityWhereCondition(
executionId: string,
conditions?: UpdateExecutionConditions,
): FindOptionsWhere<ExecutionEntity> {
const where: FindOptionsWhere<ExecutionEntity> = { id: executionId };
if (conditions?.requireStatus) where.status = conditions.requireStatus;
// TODO(CAT-3214): `ExecutionEntity.finished` is deprecated and we should rely on statuses
// only, but for now we still use it to filter out finished executions for parity with
// ExecutionRepository.
if (conditions?.requireNotFinished) where.finished = false;
// TODO(CAT-3215): `requireStatus` and `requireNotCanceled` both write to `where.status`,
// so if both are supplied the `Not('canceled')` clause silently overwrites the specific
// status check. In practice callers never combine them, so once we drop strict parity with
// ExecutionRepository we should assert their mutual exclusivity (or combine them somehow).
if (conditions?.requireNotCanceled) where.status = Not('canceled');
return where;
}
private getStoreFor(location: ExecutionDataStorageLocation): ExecutionDataStore {
switch (location) {
case 'db':
return this.dbStore;
case 'fs':
return this.fsStore;
}
const _exhaustive: never = location;
throw new Error(`Unknown storage location: ${String(_exhaustive)}`);
}
private toWorkflowSnapshot(
workflowData: NonNullable<IExecutionResponse['workflowData']>,
): WorkflowSnapshot {
const { id, name, nodes, connections, settings } = workflowData;
return { id, name, nodes, connections, settings };
}
/**
* Detect whether the DB rejected the insert because of the unique index on
* `execution_entity.deduplicationKey`. We expect TypeORM to surface the
* driver's error code at `error.driverError.code` as a string, with the
* code's exact value depending on the configured DB.
*/
private isDuplicateExecutionError(error: unknown): error is Error {
if (!(error instanceof Error) || !('driverError' in error)) return false;
const { driverError } = error;
if (typeof driverError !== 'object' || driverError === null || !('code' in driverError)) {
return false;
}
const { code } = driverError;
if (typeof code !== 'string') return false;
if (!error.message.includes('deduplicationKey')) return false;
if (this.databaseConfig.type === 'postgresdb') {
return code === '23505';
}
// SQLite reports `SQLITE_CONSTRAINT_UNIQUE` when extended result codes are
// enabled, and falls back to the base `SQLITE_CONSTRAINT` otherwise.
return (
code === 'SQLITE_CONSTRAINT_UNIQUE' ||
(code === 'SQLITE_CONSTRAINT' && error.message.includes('UNIQUE constraint failed'))
);
}
}