add initial implementation

This commit is contained in:
Charlie Kolb 2025-11-05 04:05:39 +01:00
parent 6945e21423
commit 5cd64171f1
No known key found for this signature in database
9 changed files with 116 additions and 31 deletions

View File

@ -1,9 +1,19 @@
import { Column, Entity, JoinColumn, OneToOne, PrimaryColumn } from '@n8n/typeorm';
import {
Column,
Entity,
JoinColumn,
JoinTable,
ManyToOne,
OneToOne,
PrimaryColumn,
Relation,
} from '@n8n/typeorm';
import { IWorkflowBase } from 'n8n-workflow';
import { JsonColumn } from './abstract-entity';
import { ExecutionEntity } from './execution-entity';
import { ISimplifiedPinData } from './types-db';
import { WorkflowHistory } from './workflow-history';
import { idStringifier } from '../utils/transformers';
@Entity()
@ -24,14 +34,30 @@ export class ExecutionData {
@JsonColumn()
workflowData: Omit<IWorkflowBase, 'pinData'> & { pinData?: ISimplifiedPinData };
@Column({ type: 'varchar', length: 36, nullable: true })
workflowVersionId: string | null;
@ManyToOne(() => WorkflowHistory, { onDelete: 'SET NULL', nullable: true })
@JoinTable({
joinColumn: {
name: 'workflowVersionId',
referencedColumnName: 'versionId',
},
})
workflowHistory?: Relation<WorkflowHistory> | null;
@PrimaryColumn({ transformer: idStringifier })
executionId: string;
@OneToOne('ExecutionEntity', 'executionData', {
onDelete: 'CASCADE',
})
@OneToOne(
() => ExecutionEntity,
(ee) => ee.executionData,
{
onDelete: 'CASCADE',
},
)
@JoinColumn({
name: 'executionId',
})
execution: ExecutionEntity;
execution: Relation<ExecutionEntity>;
}

View File

@ -1,4 +1,4 @@
import { Column, Entity, ManyToOne, PrimaryColumn } from '@n8n/typeorm';
import { Column, Entity, ManyToOne, PrimaryColumn, Relation } from '@n8n/typeorm';
import { IConnections } from 'n8n-workflow';
import type { INode } from 'n8n-workflow';
@ -25,5 +25,5 @@ export class WorkflowHistory extends WithTimestamps {
@ManyToOne('WorkflowEntity', {
onDelete: 'CASCADE',
})
workflow: WorkflowEntity;
workflow: Relation<WorkflowEntity>;
}

View File

@ -0,0 +1,11 @@
import type { MigrationContext, ReversibleMigration } from '../migration-types';
export class AddWorkflowVersionIdToExecutionData1762310956274 implements ReversibleMigration {
async up({ schemaBuilder: { addColumns, column } }: MigrationContext) {
await addColumns('execution_data', [column('workflowVersionId').varchar()]);
}
async down({ schemaBuilder: { dropColumns } }: MigrationContext) {
await dropColumns('execution_data', ['workflowVersionId']);
}
}

View File

@ -1,3 +1,4 @@
import { AddWorkflowVersionIdToExecutionData1762310956274 } from './../common/1762310956274-AddWorkflowVersionIdToExecutionData';
import { DropUnusedChatHubColumns1760965142113 } from './1760965142113-DropUnusedChatHubColumns';
import { AddAudienceColumnToApiKeys1758731786132 } from '../common/1758731786132-AddAudienceColumnToApiKey';
import { CreateWorkflowDependencyTable1760314000000 } from '../common/1760314000000-CreateWorkflowDependencyTable';
@ -221,4 +222,5 @@ export const mysqlMigrations: Migration[] = [
DropUnusedChatHubColumns1760965142113,
AddWorkflowVersionColumn1761047826451,
ChangeDependencyInfoToJson1761655473000,
AddWorkflowVersionIdToExecutionData1762310956274,
];

View File

@ -107,6 +107,7 @@ import { CreateChatHubAgentTable1760020000000 } from '../common/1760020000000-Cr
import { UniqueRoleNames1760020838000 } from '../common/1760020838000-UniqueRoleNames';
import { CreateWorkflowDependencyTable1760314000000 } from '../common/1760314000000-CreateWorkflowDependencyTable';
import { DropUnusedChatHubColumns1760965142113 } from '../common/1760965142113-DropUnusedChatHubColumns';
import { AddWorkflowVersionIdToExecutionData1762310956274 } from '../common/1762310956274-AddWorkflowVersionIdToExecutionData';
import type { Migration } from '../migration-types';
export const postgresMigrations: Migration[] = [
@ -219,4 +220,5 @@ export const postgresMigrations: Migration[] = [
DropUnusedChatHubColumns1760965142113,
AddWorkflowVersionColumn1761047826451,
ChangeDependencyInfoToJson1761655473000,
AddWorkflowVersionIdToExecutionData1762310956274,
];

View File

@ -105,6 +105,7 @@ import { CreateChatHubTables1760019379982 } from '../common/1760019379982-Create
import { CreateChatHubAgentTable1760020000000 } from '../common/1760020000000-CreateChatHubAgentTable';
import { CreateWorkflowDependencyTable1760314000000 } from '../common/1760314000000-CreateWorkflowDependencyTable';
import { DropUnusedChatHubColumns1760965142113 } from '../common/1760965142113-DropUnusedChatHubColumns';
import { AddWorkflowVersionIdToExecutionData1762310956274 } from '../common/1762310956274-AddWorkflowVersionIdToExecutionData';
const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
@ -213,6 +214,7 @@ const sqliteMigrations: Migration[] = [
DropUnusedChatHubColumns1760965142113,
AddWorkflowVersionColumn1761047826451,
ChangeDependencyInfoToJson1761655473000,
AddWorkflowVersionIdToExecutionData1762310956274,
];
export { sqliteMigrations };

View File

@ -4,6 +4,8 @@ import type { EntityManager } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import { ExecutionData } from '../entities';
import { IWorkflowBase } from 'n8n-workflow';
import { ISimplifiedPinData } from 'entities/types-db';
@Service()
export class ExecutionDataRepository extends Repository<ExecutionData> {
@ -18,12 +20,22 @@ export class ExecutionDataRepository extends Repository<ExecutionData> {
return await transactionManager.insert(ExecutionData, data);
}
async findByExecutionIds(executionIds: string[]) {
async findByExecutionIds(executionIds: string[]): Promise<
Array<
Omit<IWorkflowBase, 'pinData'> & {
pinData?: ISimplifiedPinData;
}
>
> {
return await this.find({
select: ['workflowData'],
select: ['workflowData', 'workflowHistory'],
where: {
executionId: In(executionIds),
},
}).then((executionData) => executionData.map(({ workflowData }) => workflowData));
}).then((executionData) =>
executionData.map(({ workflowData, workflowHistory }) =>
workflowHistory ? { ...workflowHistory.workflow, ...workflowHistory } : workflowData,
),
);
}
}

View File

@ -364,7 +364,12 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
// In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
await this.executionDataRepository.insert({ executionId, workflowData, data });
await this.executionDataRepository.insert({
executionId,
workflowData,
data,
workflowVersionId: currentWorkflow.versionId,
});
return String(executionId);
} else {
// All other database drivers should create executions and execution-data atomically
@ -375,7 +380,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
const { id: executionId } = inserted[0] as { id: string };
await this.executionDataRepository.createExecutionDataForExecution(
{ executionId, workflowData, data },
{ executionId, workflowData, data, workflowVersionId: currentWorkflow.versionId },
transactionManager,
);
return String(executionId);

View File

@ -10,7 +10,7 @@ import {
PrimaryGeneratedColumn,
} from '@n8n/typeorm';
import type { ChatHubSession } from './chat-hub-session.entity';
import { ChatHubSession } from './chat-hub-session.entity';
@Entity({ name: 'chat_hub_messages' })
export class ChatHubMessage extends WithTimestamps {
@ -26,7 +26,11 @@ export class ChatHubMessage extends WithTimestamps {
/**
* The chat session/conversation this message belongs to.
*/
@ManyToOne('ChatHubSession', 'messages', { onDelete: 'CASCADE' })
@ManyToOne(
() => ChatHubSession,
(chatHubSession) => chatHubSession.messages,
{ onDelete: 'CASCADE' },
)
@JoinColumn({ name: 'sessionId' })
session: Relation<ChatHubSession>;
@ -72,7 +76,7 @@ export class ChatHubMessage extends WithTimestamps {
/**
* Custom n8n agent workflow that produced this message (if applicable).
*/
@ManyToOne('WorkflowEntity', { onDelete: 'SET NULL', nullable: true })
@ManyToOne(() => WorkflowEntity, { onDelete: 'SET NULL', nullable: true })
@JoinColumn({ name: 'workflowId' })
workflow?: Relation<WorkflowEntity> | null;
@ -92,7 +96,7 @@ export class ChatHubMessage extends WithTimestamps {
/**
* Execution that produced this message (reset to null when the execution is deleted)
*/
@ManyToOne('ExecutionEntity', { onDelete: 'SET NULL', nullable: true })
@ManyToOne(() => ExecutionEntity, { onDelete: 'SET NULL', nullable: true })
@JoinColumn({ name: 'executionId' })
execution?: Relation<ExecutionEntity> | null;
@ -105,17 +109,24 @@ export class ChatHubMessage extends WithTimestamps {
/**
* The previous message this message is a response to, NULL on the initial message.
*/
@ManyToOne('ChatHubMessage', (m: ChatHubMessage) => m.responses, {
onDelete: 'CASCADE',
nullable: true,
})
@ManyToOne(
() => ChatHubMessage,
(m: ChatHubMessage) => m.responses,
{
onDelete: 'CASCADE',
nullable: true,
},
)
@JoinColumn({ name: 'previousMessageId' })
previousMessage?: Relation<ChatHubMessage> | null;
/**
* Messages that are responses to this message. This could branch out to multiple threads.
*/
@OneToMany('ChatHubMessage', (m: ChatHubMessage) => m.previousMessage)
@OneToMany(
() => ChatHubMessage,
(m: ChatHubMessage) => m.previousMessage,
)
responses?: Array<Relation<ChatHubMessage>>;
/**
@ -127,17 +138,24 @@ export class ChatHubMessage extends WithTimestamps {
/**
* The message that this message is a retry of (if applicable).
*/
@ManyToOne('ChatHubMessage', (m: ChatHubMessage) => m.retries, {
onDelete: 'CASCADE',
nullable: true,
})
@ManyToOne(
() => ChatHubMessage,
(m: ChatHubMessage) => m.retries,
{
onDelete: 'CASCADE',
nullable: true,
},
)
@JoinColumn({ name: 'retryOfMessageId' })
retryOfMessage?: Relation<ChatHubMessage> | null;
/**
* All messages that are retries of this message (if applicable).
*/
@OneToMany('ChatHubMessage', (m: ChatHubMessage) => m.retryOfMessage)
@OneToMany(
() => ChatHubMessage,
(m: ChatHubMessage) => m.retryOfMessage,
)
retries?: Array<Relation<ChatHubMessage>>;
/**
@ -149,17 +167,24 @@ export class ChatHubMessage extends WithTimestamps {
/**
* The message that this message is a revision/edit of (if applicable).
*/
@ManyToOne('ChatHubMessage', (m: ChatHubMessage) => m.revisions, {
onDelete: 'CASCADE',
nullable: true,
})
@ManyToOne(
() => ChatHubMessage,
(m: ChatHubMessage) => m.revisions,
{
onDelete: 'CASCADE',
nullable: true,
},
)
@JoinColumn({ name: 'revisionOfMessageId' })
revisionOfMessage?: Relation<ChatHubMessage> | null;
/**
* All messages that are revisions/edits of this message (if applicable).
*/
@OneToMany('ChatHubMessage', (m: ChatHubMessage) => m.revisionOfMessage)
@OneToMany(
() => ChatHubMessage,
(m: ChatHubMessage) => m.revisionOfMessage,
)
revisions?: Array<Relation<ChatHubMessage>>;
/**