diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index fda30b8f2d0..6412cfc8c0c 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -1,7 +1,12 @@ import { mockInstance } from '@n8n/backend-test-utils'; import { GlobalConfig } from '@n8n/config'; import type { WorkflowEntity, User, Project } from '@n8n/db'; -import { ExecutionRepository, WorkflowPublishHistoryRepository, WorkflowRepository } from '@n8n/db'; +import { + ExecutionRepository, + ExecutionDataRepository, + WorkflowPublishHistoryRepository, + WorkflowRepository, +} from '@n8n/db'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { ExternalSecretsProxy } from 'n8n-core'; @@ -108,6 +113,7 @@ describe('WorkflowExecuteAdditionalData', () => { Container.set(CredentialsHelper, credentialsHelper); Container.set(ExternalSecretsProxy, externalSecretsProxy); const executionRepository = mockInstance(ExecutionRepository); + mockInstance(ExecutionDataRepository); mockInstance(Telemetry); const workflowRepository = mockInstance(WorkflowRepository); const activeExecutions = mockInstance(ActiveExecutions); diff --git a/packages/cli/src/executions/__tests__/execution-persistence.test.ts b/packages/cli/src/executions/__tests__/execution-persistence.test.ts index 9a8ece9ef12..98890941921 100644 --- a/packages/cli/src/executions/__tests__/execution-persistence.test.ts +++ b/packages/cli/src/executions/__tests__/execution-persistence.test.ts @@ -3,7 +3,6 @@ import type { DatabaseConfig, ExecutionsConfig } from '@n8n/config'; import { - ExecutionData, ExecutionEntity, type CreateExecutionPayload, type EntityManager, @@ -16,13 +15,16 @@ import type { IWorkflowBase } from 'n8n-workflow'; import { createEmptyRunExecutionData } from 'n8n-workflow'; import { DuplicateExecutionError } from '@/errors/duplicate-execution.error'; +import type { DbStore } from '@/executions/execution-data/db-store'; import type { FsStore } from '@/executions/execution-data/fs-store'; +import { MissingExecutionDataError } from '@/executions/execution-data/missing-execution-data.error'; import { ExecutionPersistence } from '@/executions/execution-persistence'; describe('ExecutionPersistence', () => { const executionRepository = mock(); const binaryDataService = mock(); const fsStore = mock(); + const dbStore = mock(); const executionsConfig = mock({ pruneData: true, pruneDataHardDeleteBuffer: 1, @@ -64,6 +66,7 @@ describe('ExecutionPersistence', () => { executionRepository, binaryDataService, fsStore, + dbStore, mock({ modeTag }), executionsConfig, mock({ type: dbType }), @@ -82,7 +85,7 @@ describe('ExecutionPersistence', () => { describe('database mode', () => { const executionPersistence = createPersistenceService('db'); - it('should create execution with `storedAt: db` and insert data via transaction', async () => { + it('should create execution with `storedAt: db` and write data via dbStore in the transaction', async () => { const mockTx = createMockTransaction(); executionRepository.manager.transaction = createMockTx(mockTx); @@ -100,14 +103,15 @@ describe('ExecutionPersistence', () => { createdAt: expect.any(Date) as Date, }), ); - expect(mockTx.insert).toHaveBeenCalledWith( - ExecutionData, + expect(mockTx.insert).toHaveBeenCalledTimes(1); + expect(dbStore.write).toHaveBeenCalledWith( + { workflowId: 'workflow-123', executionId: 'exec-1' }, expect.objectContaining({ - executionId: 'exec-1', + data: expect.any(String) as string, workflowVersionId: 'version-abc', }), + mockTx, ); - expect(mockTx.insert).toHaveBeenCalledTimes(2); expect(fsStore.write).not.toHaveBeenCalled(); }); }); @@ -147,6 +151,7 @@ describe('ExecutionPersistence', () => { workflowData: expectedWorkflowSnapshot, workflowVersionId: 'version-abc', }), + mockTx, ); }); @@ -289,6 +294,500 @@ describe('ExecutionPersistence', () => { }); }); + describe('updateExistingExecution', () => { + const executionId = 'exec-1'; + const workflowId = 'wf-1'; + + beforeEach(() => { + fsStore.write.mockReset(); + fsStore.read.mockReset(); + dbStore.write.mockReset(); + dbStore.read.mockReset(); + executionRepository.findOne.mockReset(); + executionRepository.update.mockReset(); + }); + + const existingBundle = { + data: '[{"resultData":"1"},{}]', + workflowData: { + id: workflowId, + name: 'snapshot', + nodes: [], + connections: {}, + settings: undefined, + }, + workflowVersionId: 'v-original', + version: 1 as const, + }; + + const mockEntity = (storedAt: 'db' | 'fs') => { + executionRepository.findOne.mockResolvedValue({ + id: executionId, + workflowId, + storedAt, + } as unknown as Awaited>); + }; + + describe('metadata-only updates', () => { + it('should update the entity directly without touching either data store', async () => { + const executionPersistence = createPersistenceService('fs'); + executionRepository.update.mockResolvedValue({ + affected: 1, + generatedMaps: [], + raw: {}, + }); + + const result = await executionPersistence.updateExistingExecution(executionId, { + retrySuccessId: 'retry-1', + }); + + expect(result).toBe(true); + expect(executionRepository.update).toHaveBeenCalledWith( + { id: executionId }, + { retrySuccessId: 'retry-1' }, + ); + expect(executionRepository.findOne).not.toHaveBeenCalled(); + expect(fsStore.write).not.toHaveBeenCalled(); + expect(fsStore.read).not.toHaveBeenCalled(); + expect(dbStore.write).not.toHaveBeenCalled(); + expect(dbStore.read).not.toHaveBeenCalled(); + }); + + it('should apply conditions to the where clause and return false when no rows match', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.update.mockResolvedValue({ + affected: 0, + generatedMaps: [], + raw: {}, + }); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { status: 'success' }, + { requireNotCanceled: true }, + ); + + expect(result).toBe(false); + expect(executionRepository.update).toHaveBeenCalledWith( + expect.objectContaining({ id: executionId, status: expect.anything() as unknown }), + { status: 'success' }, + ); + }); + + it('should return true when no entity fields are present after stripping immutables', async () => { + const executionPersistence = createPersistenceService('db'); + + const result = await executionPersistence.updateExistingExecution(executionId, { + id: executionId, + workflowId: 'wf-other', + createdAt: new Date(), + }); + + expect(result).toBe(true); + expect(executionRepository.update).not.toHaveBeenCalled(); + }); + }); + + describe('data updates on db-mode executions', () => { + it('should update entity in a transaction and write a fresh bundle via dbStore', async () => { + const executionPersistence = createPersistenceService('fs'); // current mode is irrelevant for routing + mockEntity('db'); + dbStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + const payload = { + data: runData, + workflowData, + status: 'success' as const, + }; + + const result = await executionPersistence.updateExistingExecution(executionId, payload); + + expect(result).toBe(true); + expect(executionRepository.findOne).toHaveBeenCalledWith({ + where: { id: executionId }, + select: ['id', 'workflowId', 'storedAt'], + }); + expect(mockTx.update).toHaveBeenCalledWith( + ExecutionEntity, + { id: executionId }, + { status: 'success' }, + ); + expect(dbStore.write).toHaveBeenCalledWith( + { workflowId, executionId }, + expect.objectContaining({ + data: expect.any(String) as string, + workflowData: { + id: workflowData.id, + name: workflowData.name, + nodes: workflowData.nodes, + connections: workflowData.connections, + settings: workflowData.settings, + }, + workflowVersionId: 'v-original', + }), + mockTx, + ); + expect(fsStore.write).not.toHaveBeenCalled(); + }); + + it('should preserve fields not supplied in a partial payload', async () => { + const executionPersistence = createPersistenceService('db'); + mockEntity('db'); + dbStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await executionPersistence.updateExistingExecution(executionId, { data: runData }); + + expect(dbStore.write).toHaveBeenCalledWith( + { workflowId, executionId }, + expect.objectContaining({ + workflowData: existingBundle.workflowData, + workflowVersionId: existingBundle.workflowVersionId, + }), + mockTx, + ); + }); + + it('should apply requireStatus condition and skip the db write when no rows match', async () => { + const executionPersistence = createPersistenceService('db'); + mockEntity('db'); + + const mockTx = mock(); + mockTx.update.mockResolvedValue({ affected: 0, generatedMaps: [], raw: {} }); + executionRepository.manager.transaction = createMockTx(mockTx); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { data: runData, status: 'success' }, + { requireStatus: 'waiting' }, + ); + + expect(result).toBe(false); + expect(dbStore.read).not.toHaveBeenCalled(); + expect(dbStore.write).not.toHaveBeenCalled(); + }); + + it('should throw MissingExecutionDataError when the db row is missing', async () => { + const executionPersistence = createPersistenceService('db'); + mockEntity('db'); + dbStore.read.mockResolvedValue(null); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await expect( + executionPersistence.updateExistingExecution(executionId, { data: runData }), + ).rejects.toBeInstanceOf(MissingExecutionDataError); + + expect(dbStore.write).not.toHaveBeenCalled(); + }); + + it('should return false when the execution does not exist', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.findOne.mockResolvedValue(null); + + const result = await executionPersistence.updateExistingExecution(executionId, { + data: runData, + }); + + expect(result).toBe(false); + expect(dbStore.read).not.toHaveBeenCalled(); + expect(dbStore.write).not.toHaveBeenCalled(); + expect(fsStore.read).not.toHaveBeenCalled(); + expect(fsStore.write).not.toHaveBeenCalled(); + }); + + it('should apply conditions to the outer lookup to fail fast', async () => { + const executionPersistence = createPersistenceService('db'); + executionRepository.findOne.mockResolvedValue(null); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { data: runData }, + { requireStatus: 'waiting' }, + ); + + expect(result).toBe(false); + expect(executionRepository.findOne).toHaveBeenCalledWith({ + where: { id: executionId, status: 'waiting' }, + select: ['id', 'workflowId', 'storedAt'], + }); + expect(executionRepository.manager.transaction).not.toHaveBeenCalled(); + }); + }); + + describe('data updates on fs-mode executions', () => { + it('should update entity in a transaction and write a fresh bundle to fs', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + const payload = { + data: runData, + workflowData, + status: 'success' as const, + }; + + const result = await executionPersistence.updateExistingExecution(executionId, payload); + + expect(result).toBe(true); + expect(mockTx.update).toHaveBeenCalledWith( + ExecutionEntity, + { id: executionId }, + { status: 'success' }, + ); + expect(fsStore.write).toHaveBeenCalledWith( + { workflowId, executionId }, + expect.objectContaining({ + data: expect.any(String) as string, + workflowData: { + id: workflowData.id, + name: workflowData.name, + nodes: workflowData.nodes, + connections: workflowData.connections, + settings: workflowData.settings, + }, + workflowVersionId: 'v-original', + }), + mockTx, + ); + }); + + it('should preserve fields not supplied in a partial payload', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await executionPersistence.updateExistingExecution(executionId, { data: runData }); + + expect(fsStore.write).toHaveBeenCalledWith( + { workflowId, executionId }, + expect.objectContaining({ + workflowData: existingBundle.workflowData, + workflowVersionId: existingBundle.workflowVersionId, + }), + mockTx, + ); + }); + + it('should apply requireStatus condition and skip the fs write when no rows match', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + + const mockTx = mock(); + mockTx.update.mockResolvedValue({ affected: 0, generatedMaps: [], raw: {} }); + executionRepository.manager.transaction = createMockTx(mockTx); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { data: runData, status: 'success' }, + { requireStatus: 'waiting' }, + ); + + expect(result).toBe(false); + expect(mockTx.update).toHaveBeenCalledWith( + ExecutionEntity, + { id: executionId, status: 'waiting' }, + { status: 'success' }, + ); + expect(fsStore.read).not.toHaveBeenCalled(); + expect(fsStore.write).not.toHaveBeenCalled(); + }); + + it('should still write the bundle when the payload contains no entity fields', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + const result = await executionPersistence.updateExistingExecution(executionId, { + data: runData, + }); + + expect(result).toBe(true); + expect(mockTx.update).not.toHaveBeenCalled(); + expect(fsStore.write).toHaveBeenCalled(); + }); + + it('should skip the fs write on a data-only update when conditions do not match', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + + const mockTx = createMockTransaction(); + mockTx.count.mockResolvedValue(0); + executionRepository.manager.transaction = createMockTx(mockTx); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { data: runData }, + { requireNotFinished: true }, + ); + + expect(result).toBe(false); + expect(mockTx.count).toHaveBeenCalledWith(ExecutionEntity, { + where: { id: executionId, finished: false }, + }); + expect(mockTx.update).not.toHaveBeenCalled(); + expect(fsStore.read).not.toHaveBeenCalled(); + expect(fsStore.write).not.toHaveBeenCalled(); + }); + + it('should perform the fs write on a data-only update when conditions match', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + mockTx.count.mockResolvedValue(1); + executionRepository.manager.transaction = createMockTx(mockTx); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { data: runData }, + { requireNotFinished: true }, + ); + + expect(result).toBe(true); + expect(mockTx.count).toHaveBeenCalled(); + expect(fsStore.write).toHaveBeenCalled(); + }); + + it('should roll the transaction back if the fs write fails', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const writeError = new Error('disk full'); + fsStore.write.mockRejectedValue(writeError); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await expect( + executionPersistence.updateExistingExecution(executionId, { + data: runData, + status: 'success', + }), + ).rejects.toBe(writeError); + }); + + it('should throw MissingExecutionDataError when the fs bundle is missing', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(null); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await expect( + executionPersistence.updateExistingExecution(executionId, { data: runData }), + ).rejects.toBeInstanceOf(MissingExecutionDataError); + + expect(fsStore.write).not.toHaveBeenCalled(); + }); + + it('should apply requireNotFinished condition', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await executionPersistence.updateExistingExecution( + executionId, + { data: runData, status: 'success' }, + { requireNotFinished: true }, + ); + + expect(mockTx.update).toHaveBeenCalledWith( + ExecutionEntity, + { id: executionId, finished: false }, + { status: 'success' }, + ); + }); + + it('should treat undefined `affected` from the driver as zero rows updated', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + + const mockTx = mock(); + mockTx.update.mockResolvedValue({ affected: undefined, generatedMaps: [], raw: {} }); + executionRepository.manager.transaction = createMockTx(mockTx); + + const result = await executionPersistence.updateExistingExecution( + executionId, + { data: runData, status: 'success' }, + { requireStatus: 'waiting' }, + ); + + expect(result).toBe(false); + expect(fsStore.read).not.toHaveBeenCalled(); + expect(fsStore.write).not.toHaveBeenCalled(); + }); + + it('should apply requireNotCanceled condition', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await executionPersistence.updateExistingExecution( + executionId, + { data: runData, status: 'running' }, + { requireNotCanceled: true }, + ); + + expect(mockTx.update).toHaveBeenCalledWith( + ExecutionEntity, + expect.objectContaining({ id: executionId, status: expect.anything() as unknown }), + { status: 'running' }, + ); + }); + + it('should strip immutable fields before updating the entity', async () => { + const executionPersistence = createPersistenceService('fs'); + mockEntity('fs'); + fsStore.read.mockResolvedValue(existingBundle); + + const mockTx = createMockTransaction(); + executionRepository.manager.transaction = createMockTx(mockTx); + + await executionPersistence.updateExistingExecution(executionId, { + id: executionId, + data: runData, + workflowId: 'other-wf', + workflowVersionId: 'v-new', + createdAt: new Date(), + startedAt: new Date(), + customData: { foo: 'bar' }, + status: 'success', + }); + + expect(mockTx.update).toHaveBeenCalledWith( + ExecutionEntity, + { id: executionId }, + { status: 'success' }, + ); + }); + }); + }); + describe('hardDelete', () => { const executionPersistence = createPersistenceService('db'); const baseTarget = { workflowId: 'wf-1', executionId: 'exec-1' }; diff --git a/packages/cli/src/executions/execution-data/db-store.ts b/packages/cli/src/executions/execution-data/db-store.ts index 3777db45ffb..e6b644c7e6d 100644 --- a/packages/cli/src/executions/execution-data/db-store.ts +++ b/packages/cli/src/executions/execution-data/db-store.ts @@ -1,4 +1,5 @@ -import { ExecutionDataRepository } from '@n8n/db'; +import { ExecutionData, ExecutionDataRepository } from '@n8n/db'; +import type { EntityManager } from '@n8n/db'; import { Service } from '@n8n/di'; import { EXECUTION_DATA_BUNDLE_VERSION } from './constants'; @@ -13,12 +14,17 @@ import type { export class DbStore implements ExecutionDataStore { constructor(private readonly repository: ExecutionDataRepository) {} - async write({ executionId }: ExecutionRef, payload: ExecutionDataPayload) { - await this.repository.upsert({ ...payload, executionId }, ['executionId']); + async write({ executionId }: ExecutionRef, payload: ExecutionDataPayload, tx?: EntityManager) { + const repo = this.getRepository(tx); + await repo.upsert({ ...payload, executionId }, ['executionId']); } - async read({ executionId }: ExecutionRef): Promise { - const result = await this.repository.findOne({ + async read( + { executionId }: ExecutionRef, + tx?: EntityManager, + ): Promise { + const repo = this.getRepository(tx); + const result = await repo.findOne({ where: { executionId }, select: ['data', 'workflowData', 'workflowVersionId'], }); @@ -33,4 +39,8 @@ export class DbStore implements ExecutionDataStore { await this.repository.deleteMany(ids); } + + private getRepository(tx?: EntityManager) { + return tx ? tx.getRepository(ExecutionData) : this.repository; + } } diff --git a/packages/cli/src/executions/execution-data/missing-execution-data.error.ts b/packages/cli/src/executions/execution-data/missing-execution-data.error.ts new file mode 100644 index 00000000000..ecdedd80380 --- /dev/null +++ b/packages/cli/src/executions/execution-data/missing-execution-data.error.ts @@ -0,0 +1,14 @@ +import { UnexpectedError } from 'n8n-workflow'; + +import type { ExecutionRef } from './types'; + +/** + * Thrown when an update targets an execution whose data bundle is + * missing from its backing store — the entity exists, but the data + * needed to merge a partial update is gone. + */ +export class MissingExecutionDataError extends UnexpectedError { + constructor(ref: ExecutionRef) { + super('Missing execution data', { extra: { ...ref } }); + } +} diff --git a/packages/cli/src/executions/execution-data/types.ts b/packages/cli/src/executions/execution-data/types.ts index 67b60798b0b..ab3a6f9f53d 100644 --- a/packages/cli/src/executions/execution-data/types.ts +++ b/packages/cli/src/executions/execution-data/types.ts @@ -1,3 +1,4 @@ +import type { EntityManager } from '@n8n/db'; import type { IWorkflowBase } from 'n8n-workflow'; export type ExecutionRef = { @@ -24,9 +25,14 @@ export type ExecutionDataBundle = ExecutionDataPayload & { version: 1; }; +/** + * Persistence operations for execution data bundles. Methods which accept an + * optional `tx` (`EntityManager`) do so for transactional participation: + * `DbStore` uses it; `FsStore` ignores it (the filesystem is not transactional). + */ export interface ExecutionDataStore { init?(): Promise; - write(ref: ExecutionRef, payload: ExecutionDataPayload): Promise; - read(ref: ExecutionRef): Promise; + write(ref: ExecutionRef, payload: ExecutionDataPayload, tx?: EntityManager): Promise; + read(ref: ExecutionRef, tx?: EntityManager): Promise; delete(ref: ExecutionRef | ExecutionRef[]): Promise; } diff --git a/packages/cli/src/executions/execution-persistence.ts b/packages/cli/src/executions/execution-persistence.ts index 014c4daa3da..c2390c51433 100644 --- a/packages/cli/src/executions/execution-persistence.ts +++ b/packages/cli/src/executions/execution-persistence.ts @@ -4,18 +4,35 @@ import type { CreateExecutionPayload, ExecutionDataStorageLocation, ExecutionDeletionCriteria, + FindOptionsWhere, + IExecutionResponse, + UpdateExecutionConditions, } from '@n8n/db'; -import { ExecutionData, ExecutionEntity, ExecutionRepository } from '@n8n/db'; +import { ExecutionEntity, ExecutionRepository, Not } from '@n8n/db'; import { Service } from '@n8n/di'; import { stringify } from 'flatted'; import { BinaryDataService, StorageConfig } from 'n8n-core'; +import { DbStore } from './execution-data/db-store'; import { FsStore } from './execution-data/fs-store'; -import type { ExecutionRef, WorkflowSnapshot } from './execution-data/types'; +import { MissingExecutionDataError } from './execution-data/missing-execution-data.error'; +import type { ExecutionDataStore, ExecutionRef, WorkflowSnapshot } from './execution-data/types'; import { DuplicateExecutionError } from '../errors/duplicate-execution.error'; type DeletionTarget = ExecutionRef & { storedAt: ExecutionDataStorageLocation }; +type UpdatableEntityColumns = Omit< + Partial, + | 'id' + | 'data' + | 'workflowId' + | 'workflowData' + | 'workflowVersionId' + | 'createdAt' + | 'startedAt' + | 'customData' +>; + /** * Performs a persistence operation on an execution and its blob of data. * Writes per the configured storage mode. Reads per the recorded `storedAt` value. @@ -26,6 +43,7 @@ export class ExecutionPersistence { private readonly executionRepository: ExecutionRepository, private readonly binaryDataService: BinaryDataService, private readonly fsStore: FsStore, + private readonly dbStore: DbStore, private readonly storageConfig: StorageConfig, private readonly executionsConfig: ExecutionsConfig, private readonly databaseConfig: DatabaseConfig, @@ -49,21 +67,11 @@ export class ExecutionPersistence { return await this.executionRepository.manager.transaction(async (tx) => { const { identifiers } = await tx.insert(ExecutionEntity, executionEntity); const executionId = String(identifiers[0].id); + const ref = { workflowId: id, executionId }; + const bundle = { data, workflowData: workflowSnapshot, workflowVersionId }; - if (storedAt === 'db') { - await tx.insert(ExecutionData, { - executionId, - workflowData: workflowSnapshot, - data, - workflowVersionId, - }); - return executionId; - } + await this.getStoreFor(storedAt).write(ref, bundle, tx); - await this.fsStore.write( - { workflowId: id, executionId }, - { data, workflowData: workflowSnapshot, workflowVersionId }, - ); return executionId; }); } catch (error) { @@ -75,30 +83,32 @@ export class ExecutionPersistence { } /** - * Detect whether the DB rejected the insert because of the unique index on - * `execution_entity.deduplicationKey`. We expect TypeORM to surface the - * driver's error code at `error.driverError.code` as a string, with the - * code's exact value depending on the configured DB. + * Update an existing execution and, if the payload includes data fields, its data in the configured storage. + * - In `db` mode, we update both entity and data in the DB in a transaction. + * - In `fs` mode, we update the entity in the DB and write its data to the filesystem in a transaction. */ - private isDuplicateExecutionError(error: unknown): error is Error { - if (!(error instanceof Error) || !('driverError' in error)) return false; - const { driverError } = error; - if (typeof driverError !== 'object' || driverError === null || !('code' in driverError)) { - return false; - } - const { code } = driverError; - if (typeof code !== 'string') return false; - if (!error.message.includes('deduplicationKey')) return false; + async updateExistingExecution( + executionId: string, + execution: Partial, + conditions?: UpdateExecutionConditions, + ): Promise { + const hasDataField = execution.data !== undefined || execution.workflowData !== undefined; - if (this.databaseConfig.type === 'postgresdb') { - return code === '23505'; + if (!hasDataField) { + return await this.updateEntityOnly(executionId, execution, conditions); } - // SQLite reports `SQLITE_CONSTRAINT_UNIQUE` when extended result codes are - // enabled, and falls back to the base `SQLITE_CONSTRAINT` otherwise. - return ( - code === 'SQLITE_CONSTRAINT_UNIQUE' || - (code === 'SQLITE_CONSTRAINT' && error.message.includes('UNIQUE constraint failed')) - ); + + const entity = await this.executionRepository.findOne({ + where: this.buildEntityWhereCondition(executionId, conditions), + select: ['id', 'workflowId', 'storedAt'], + }); + + if (!entity) return false; + + const ref = { workflowId: entity.workflowId, executionId }; + const store = this.getStoreFor(entity.storedAt); + + return await this.applyDataUpdate(ref, store, execution, conditions); } /** @@ -138,4 +148,160 @@ export class ExecutionPersistence { const fsRefs = refs.filter((r) => r.storedAt === 'fs'); if (fsRefs.length > 0) await this.fsStore.delete(fsRefs); } + + private async updateEntityOnly( + executionId: string, + execution: Partial, + conditions?: UpdateExecutionConditions, + ): Promise { + const updatableColumns = this.pickUpdatableEntityColumns(execution); + if (Object.keys(updatableColumns).length === 0) return true; + + const whereCondition = this.buildEntityWhereCondition(executionId, conditions); + const result = await this.executionRepository.update(whereCondition, updatableColumns); + return (result.affected ?? 0) > 0; + } + + private async applyDataUpdate( + ref: ExecutionRef, + store: ExecutionDataStore, + execution: Partial, + conditions?: UpdateExecutionConditions, + ): Promise { + const { data, workflowData } = execution; + const updatableColumns = this.pickUpdatableEntityColumns(execution); + + return await this.executionRepository.manager.transaction(async (tx) => { + const whereCondition = this.buildEntityWhereCondition(ref.executionId, conditions); + + if (Object.keys(updatableColumns).length > 0) { + const result = await tx.update(ExecutionEntity, whereCondition, updatableColumns); + if ((result.affected ?? 0) === 0) return false; + } else if (conditions) { + // No entity columns to update, but the caller still requested a guarded write. + // Re-verify the conditions inside the transaction so a data-only update can't slip + // past a `requireStatus` / `requireNotFinished` / `requireNotCanceled` check. + // TODO(CAT-3212): In Postgres this COUNT alone does not prevent a concurrent + // transaction from changing the row's status between the check and the data write — + // a row-level lock (e.g. `SELECT ... FOR UPDATE`) is required for true race-safety. + // SQLite is unaffected because `BEGIN` already takes an exclusive write lock. + const matchingRows = await tx.count(ExecutionEntity, { where: whereCondition }); + if (matchingRows === 0) return false; + } + + // TODO(CAT-3213): callers may supply only `data` or only `workflowData`, so we read + // the existing bundle to merge the unchanged half back in. Most callers in practice + // overwrite both fields, in which case the read is wasted work. Split the API into an + // overwrite path (no read) and an explicit partial-update path. + const existing = await store.read(ref, tx); + if (!existing) throw new MissingExecutionDataError(ref); + + await store.write( + ref, + { + data: data !== undefined ? stringify(data) : existing.data, + workflowData: workflowData + ? this.toWorkflowSnapshot(workflowData) + : existing.workflowData, + workflowVersionId: existing.workflowVersionId, + }, + tx, + ); + + return true; + }); + } + + /** + * Narrow an {@link IExecutionResponse} payload to the subset of {@link UpdatableEntityColumns} that + * can be written directly to the `ExecutionEntity` row on update. + * + * Stripped fields fall into three categories: + * - **Identity / routing**: `id`, `workflowId` — never updated here. + * - **Stored elsewhere**: `data`, `workflowData` — persisted via the + * configured {@link ExecutionDataStore} (DB or filesystem), not as columns + * on the entity row. + * - **Immutable after creation**: `workflowVersionId`, `createdAt`, + * `startedAt` — set once at insert time and never overwritten. + * - **Not persisted on the entity**: `customData` — handled separately. + */ + private pickUpdatableEntityColumns( + execution: Partial, + ): UpdatableEntityColumns { + const { + id: _id, + data: _data, + workflowId: _workflowId, + workflowData: _workflowData, + workflowVersionId: _workflowVersionId, + createdAt: _createdAt, + startedAt: _startedAt, + customData: _customData, + ...updatableColumns + } = execution; + return updatableColumns; + } + + private buildEntityWhereCondition( + executionId: string, + conditions?: UpdateExecutionConditions, + ): FindOptionsWhere { + const where: FindOptionsWhere = { id: executionId }; + if (conditions?.requireStatus) where.status = conditions.requireStatus; + // TODO(CAT-3214): `ExecutionEntity.finished` is deprecated and we should rely on statuses + // only, but for now we still use it to filter out finished executions for parity with + // ExecutionRepository. + if (conditions?.requireNotFinished) where.finished = false; + // TODO(CAT-3215): `requireStatus` and `requireNotCanceled` both write to `where.status`, + // so if both are supplied the `Not('canceled')` clause silently overwrites the specific + // status check. In practice callers never combine them, so once we drop strict parity with + // ExecutionRepository we should assert their mutual exclusivity (or combine them somehow). + if (conditions?.requireNotCanceled) where.status = Not('canceled'); + return where; + } + + private getStoreFor(location: ExecutionDataStorageLocation): ExecutionDataStore { + switch (location) { + case 'db': + return this.dbStore; + case 'fs': + return this.fsStore; + } + const _exhaustive: never = location; + throw new Error(`Unknown storage location: ${String(_exhaustive)}`); + } + + private toWorkflowSnapshot( + workflowData: NonNullable, + ): WorkflowSnapshot { + const { id, name, nodes, connections, settings } = workflowData; + return { id, name, nodes, connections, settings }; + } + + /** + * Detect whether the DB rejected the insert because of the unique index on + * `execution_entity.deduplicationKey`. We expect TypeORM to surface the + * driver's error code at `error.driverError.code` as a string, with the + * code's exact value depending on the configured DB. + */ + private isDuplicateExecutionError(error: unknown): error is Error { + if (!(error instanceof Error) || !('driverError' in error)) return false; + const { driverError } = error; + if (typeof driverError !== 'object' || driverError === null || !('code' in driverError)) { + return false; + } + const { code } = driverError; + if (typeof code !== 'string') return false; + if (!error.message.includes('deduplicationKey')) return false; + + if (this.databaseConfig.type === 'postgresdb') { + return code === '23505'; + } + // SQLite reports `SQLITE_CONSTRAINT_UNIQUE` when extended result codes are + // enabled, and falls back to the base `SQLITE_CONSTRAINT` otherwise. + return ( + code === 'SQLITE_CONSTRAINT_UNIQUE' || + (code === 'SQLITE_CONSTRAINT' && error.message.includes('UNIQUE constraint failed')) + ); + } }