mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
perf(editor): Omit execution data from executionFinished event (#20001)
This commit is contained in:
parent
3b20e82742
commit
723b9b8578
|
|
@ -1,151 +0,0 @@
|
|||
import { stringify } from 'flatted';
|
||||
import pick from 'lodash/pick';
|
||||
import type {
|
||||
IDataObject,
|
||||
IRunData,
|
||||
IRunExecutionData,
|
||||
ITaskData,
|
||||
ITaskDataConnections,
|
||||
} from 'n8n-workflow';
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import { clickExecuteWorkflowButton } from '../composables/workflow';
|
||||
|
||||
export function createMockNodeExecutionData(
|
||||
name: string,
|
||||
{
|
||||
data,
|
||||
inputOverride,
|
||||
executionStatus = 'success',
|
||||
jsonData,
|
||||
...rest
|
||||
}: Partial<ITaskData> & { jsonData?: Record<string, IDataObject> },
|
||||
): Record<string, ITaskData> {
|
||||
return {
|
||||
[name]: {
|
||||
startTime: Date.now(),
|
||||
executionIndex: 0,
|
||||
executionTime: 1,
|
||||
executionStatus,
|
||||
data: jsonData
|
||||
? Object.keys(jsonData).reduce((acc, key) => {
|
||||
acc[key] = [
|
||||
[
|
||||
{
|
||||
json: jsonData[key],
|
||||
pairedItem: { item: 0 },
|
||||
},
|
||||
],
|
||||
];
|
||||
|
||||
return acc;
|
||||
}, {} as ITaskDataConnections)
|
||||
: data,
|
||||
source: [null],
|
||||
inputOverride,
|
||||
...rest,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function runMockWorkflowExecution({
|
||||
trigger,
|
||||
lastNodeExecuted,
|
||||
runData,
|
||||
}: {
|
||||
trigger?: () => void;
|
||||
lastNodeExecuted: string;
|
||||
runData: Array<ReturnType<typeof createMockNodeExecutionData>>;
|
||||
}) {
|
||||
const workflowId = nanoid();
|
||||
const executionId = Math.floor(Math.random() * 1_000_000).toString();
|
||||
|
||||
const resolvedRunData = runData.reduce<IRunData>((acc, nodeExecution) => {
|
||||
const nodeName = Object.keys(nodeExecution)[0];
|
||||
acc[nodeName] = [nodeExecution[nodeName]];
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: resolvedRunData,
|
||||
pinData: {},
|
||||
lastNodeExecuted,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
|
||||
cy.intercept('POST', '/rest/workflows/**/run', {
|
||||
statusCode: 201,
|
||||
body: {
|
||||
data: {
|
||||
executionId,
|
||||
},
|
||||
},
|
||||
}).as('runWorkflow');
|
||||
|
||||
if (trigger) {
|
||||
trigger();
|
||||
} else {
|
||||
clickExecuteWorkflowButton();
|
||||
}
|
||||
|
||||
cy.wait('@runWorkflow');
|
||||
|
||||
cy.push('executionStarted', {
|
||||
workflowId,
|
||||
executionId,
|
||||
mode: 'manual',
|
||||
startedAt: new Date(),
|
||||
workflowName: '',
|
||||
flattedRunData: '',
|
||||
});
|
||||
|
||||
runData.forEach((nodeExecution) => {
|
||||
const nodeName = Object.keys(nodeExecution)[0];
|
||||
const nodeRunData = nodeExecution[nodeName];
|
||||
|
||||
cy.push('nodeExecuteBefore', {
|
||||
executionId,
|
||||
nodeName,
|
||||
data: pick(nodeRunData, ['startTime', 'executionIndex', 'source', 'hints']),
|
||||
});
|
||||
const { data: _, ...taskData } = nodeRunData;
|
||||
const itemCountByConnectionType: Record<string, number[]> = {};
|
||||
for (const connectionType of Object.keys(nodeRunData.data ?? {})) {
|
||||
const connectionData = nodeRunData.data?.[connectionType];
|
||||
if (Array.isArray(connectionData)) {
|
||||
itemCountByConnectionType[connectionType] = connectionData.map((d) => (d ? d.length : 0));
|
||||
} else {
|
||||
itemCountByConnectionType[connectionType] = [0];
|
||||
}
|
||||
}
|
||||
|
||||
cy.push('nodeExecuteAfter', {
|
||||
executionId,
|
||||
nodeName,
|
||||
data: taskData,
|
||||
itemCountByConnectionType,
|
||||
});
|
||||
cy.push('nodeExecuteAfterData', {
|
||||
executionId,
|
||||
nodeName,
|
||||
data: nodeRunData,
|
||||
itemCountByConnectionType,
|
||||
});
|
||||
});
|
||||
|
||||
cy.push('executionFinished', {
|
||||
executionId,
|
||||
workflowId,
|
||||
status: 'success',
|
||||
rawData: stringify(executionData),
|
||||
});
|
||||
}
|
||||
|
|
@ -1,3 +1,2 @@
|
|||
export * from './executions';
|
||||
export * from './modal';
|
||||
export * from './popper';
|
||||
|
|
|
|||
|
|
@ -32,8 +32,6 @@ export type ExecutionFinished = {
|
|||
executionId: string;
|
||||
workflowId: string;
|
||||
status: ExecutionStatus;
|
||||
/** @deprecated: Please construct execution data in the frontend from the data pushed in previous messages, instead of depending on this additional payload serialization */
|
||||
rawData?: string;
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ import { Logger } from '@n8n/backend-common';
|
|||
import { mockInstance } from '@n8n/backend-test-utils';
|
||||
import type { Project } from '@n8n/db';
|
||||
import { ExecutionRepository } from '@n8n/db';
|
||||
import { stringify } from 'flatted';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import {
|
||||
BinaryDataService,
|
||||
|
|
@ -437,7 +436,6 @@ describe('Execution Lifecycle Hooks', () => {
|
|||
type: 'executionFinished',
|
||||
data: {
|
||||
executionId,
|
||||
rawData: stringify(successfulRun.data),
|
||||
status: 'success',
|
||||
workflowId: 'test-workflow-id',
|
||||
},
|
||||
|
|
|
|||
|
|
@ -253,9 +253,8 @@ function hookFunctionsPush(
|
|||
if (status === 'waiting') {
|
||||
pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef);
|
||||
} else {
|
||||
const rawData = stringify(fullRunData.data);
|
||||
pushInstance.send(
|
||||
{ type: 'executionFinished', data: { executionId, workflowId, status, rawData } },
|
||||
{ type: 'executionFinished', data: { executionId, workflowId, status } },
|
||||
pushRef,
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import { ServerResponse } from 'http';
|
|||
import type { Server } from 'http';
|
||||
import pick from 'lodash/pick';
|
||||
import { InstanceSettings } from 'n8n-core';
|
||||
import { deepCopy } from 'n8n-workflow';
|
||||
import { parse as parseUrl } from 'url';
|
||||
import { Server as WSServer } from 'ws';
|
||||
|
||||
|
|
@ -228,45 +227,30 @@ export class Push extends TypedEmitter<PushEvents> {
|
|||
* See {@link shouldRelayViaPubSub} for more details.
|
||||
*/
|
||||
private relayViaPubSub(pushMsg: PushMessage, pushRef: string, asBinary: boolean = false) {
|
||||
const eventSizeBytes = new TextEncoder().encode(JSON.stringify(pushMsg.data)).length;
|
||||
|
||||
if (eventSizeBytes <= MAX_PAYLOAD_SIZE_BYTES) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'relay-execution-lifecycle-event',
|
||||
payload: { ...pushMsg, pushRef, asBinary },
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// too large for pubsub channel, trim it
|
||||
|
||||
const { type } = pushMsg;
|
||||
const toMb = (bytes: number) => (bytes / (1024 * 1024)).toFixed(0);
|
||||
const eventMb = toMb(eventSizeBytes);
|
||||
const maxMb = toMb(MAX_PAYLOAD_SIZE_BYTES);
|
||||
|
||||
if (type === 'nodeExecuteAfterData') {
|
||||
this.logger.warn(
|
||||
`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Skipping...`,
|
||||
);
|
||||
// In case of nodeExecuteAfterData, we omit the message entirely. We
|
||||
// already include the amount of items in the nodeExecuteAfter message,
|
||||
// based on which the FE will construct placeholder data. The actual
|
||||
// data is then fetched at the end of the execution.
|
||||
return;
|
||||
}
|
||||
const eventSizeBytes = new TextEncoder().encode(JSON.stringify(pushMsg.data)).length;
|
||||
|
||||
this.logger.warn(`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Trimming...`);
|
||||
if (eventSizeBytes > MAX_PAYLOAD_SIZE_BYTES) {
|
||||
const toMb = (bytes: number) => (bytes / (1024 * 1024)).toFixed(0);
|
||||
const eventMb = toMb(eventSizeBytes);
|
||||
const maxMb = toMb(MAX_PAYLOAD_SIZE_BYTES);
|
||||
|
||||
const pushMsgCopy = deepCopy(pushMsg);
|
||||
|
||||
if (pushMsgCopy.type === 'executionFinished') {
|
||||
pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB
|
||||
this.logger.warn(
|
||||
`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Skipping...`,
|
||||
);
|
||||
// In case of nodeExecuteAfterData, we omit the message entirely. We
|
||||
// already include the amount of items in the nodeExecuteAfter message,
|
||||
// based on which the FE will construct placeholder data. The actual
|
||||
// data is then fetched at the end of the execution.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void this.publisher.publishCommand({
|
||||
command: 'relay-execution-lifecycle-event',
|
||||
payload: { ...pushMsgCopy, pushRef, asBinary },
|
||||
payload: { ...pushMsg, pushRef, asBinary },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,30 +103,17 @@ export async function executionFinished(
|
|||
uiStore.setProcessingExecutionResults(true);
|
||||
|
||||
let successToastAlreadyShown = false;
|
||||
let execution: SimplifiedExecution | undefined;
|
||||
if (data.rawData) {
|
||||
const { executionId, workflowId, status, rawData } = data;
|
||||
|
||||
execution = {
|
||||
id: executionId,
|
||||
workflowId,
|
||||
workflowData: workflowsStore.workflow,
|
||||
data: parse(rawData),
|
||||
status,
|
||||
startedAt: workflowsStore.workflowExecutionData?.startedAt ?? new Date(),
|
||||
stoppedAt: new Date(),
|
||||
};
|
||||
} else {
|
||||
if (data.status === 'success') {
|
||||
handleExecutionFinishedSuccessfully(data.workflowId);
|
||||
successToastAlreadyShown = true;
|
||||
}
|
||||
if (data.status === 'success') {
|
||||
handleExecutionFinishedWithOther(successToastAlreadyShown);
|
||||
successToastAlreadyShown = true;
|
||||
}
|
||||
|
||||
execution = await fetchExecutionData(data.executionId);
|
||||
if (!execution) {
|
||||
uiStore.setProcessingExecutionResults(false);
|
||||
return;
|
||||
}
|
||||
const execution = await fetchExecutionData(data.executionId);
|
||||
|
||||
if (!execution) {
|
||||
uiStore.setProcessingExecutionResults(false);
|
||||
return;
|
||||
}
|
||||
|
||||
const runExecutionData = getRunExecutionData(execution);
|
||||
|
|
@ -358,16 +345,15 @@ export function handleExecutionFinishedWithErrorOrCanceled(
|
|||
* immediately, even though we still need to fetch and deserialize the
|
||||
* full execution data, to minimize perceived latency.
|
||||
*/
|
||||
export function handleExecutionFinishedSuccessfully(workflowId: string) {
|
||||
function handleExecutionFinishedSuccessfully(workflowName: string, message: string) {
|
||||
const workflowsStore = useWorkflowsStore();
|
||||
const workflowHelpers = useWorkflowHelpers();
|
||||
const toast = useToast();
|
||||
const i18n = useI18n();
|
||||
|
||||
workflowHelpers.setDocumentTitle(workflowsStore.getWorkflowById(workflowId)?.name, 'IDLE');
|
||||
workflowHelpers.setDocumentTitle(workflowName, 'IDLE');
|
||||
workflowsStore.setActiveExecutionId(undefined);
|
||||
toast.showMessage({
|
||||
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
|
||||
title: message,
|
||||
type: 'success',
|
||||
});
|
||||
}
|
||||
|
|
@ -382,8 +368,9 @@ export function handleExecutionFinishedWithOther(successToastAlreadyShown: boole
|
|||
const workflowHelpers = useWorkflowHelpers();
|
||||
const nodeTypesStore = useNodeTypesStore();
|
||||
const workflowObject = workflowsStore.workflowObject;
|
||||
const workflowName = workflowObject.name ?? '';
|
||||
|
||||
workflowHelpers.setDocumentTitle(workflowObject.name as string, 'IDLE');
|
||||
workflowHelpers.setDocumentTitle(workflowName, 'IDLE');
|
||||
|
||||
const workflowExecution = workflowsStore.getWorkflowExecution;
|
||||
if (workflowExecution?.executedNode) {
|
||||
|
|
@ -406,17 +393,17 @@ export function handleExecutionFinishedWithOther(successToastAlreadyShown: boole
|
|||
}),
|
||||
type: 'success',
|
||||
});
|
||||
} else {
|
||||
toast.showMessage({
|
||||
title: i18n.baseText('pushConnection.nodeExecutedSuccessfully'),
|
||||
type: 'success',
|
||||
});
|
||||
} else if (!successToastAlreadyShown) {
|
||||
handleExecutionFinishedSuccessfully(
|
||||
workflowName,
|
||||
i18n.baseText('pushConnection.nodeExecutedSuccessfully'),
|
||||
);
|
||||
}
|
||||
} else if (!successToastAlreadyShown) {
|
||||
toast.showMessage({
|
||||
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
|
||||
type: 'success',
|
||||
});
|
||||
handleExecutionFinishedSuccessfully(
|
||||
workflowName,
|
||||
i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -431,12 +418,16 @@ export function setRunExecutionData(
|
|||
|
||||
workflowsStore.executingNode.length = 0;
|
||||
|
||||
if (workflowExecution === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
workflowsStore.setWorkflowExecutionData({
|
||||
...workflowExecution,
|
||||
status: execution.status,
|
||||
id: execution.id,
|
||||
stoppedAt: execution.stoppedAt,
|
||||
} as IExecutionResponse);
|
||||
});
|
||||
workflowsStore.setWorkflowExecutionRunData(runExecutionData);
|
||||
workflowsStore.setActiveExecutionId(undefined);
|
||||
|
||||
|
|
|
|||
|
|
@ -857,6 +857,7 @@ describe('useWorkflowsStore', () => {
|
|||
...executionResponse,
|
||||
data: {
|
||||
resultData: {
|
||||
lastNodeExecuted: 'When clicking ‘Execute workflow’',
|
||||
runData: {
|
||||
[successEvent.nodeName]: [successEvent.data],
|
||||
},
|
||||
|
|
@ -887,6 +888,7 @@ describe('useWorkflowsStore', () => {
|
|||
...executionResponse,
|
||||
data: {
|
||||
resultData: {
|
||||
lastNodeExecuted: 'Edit Fields',
|
||||
runData: {
|
||||
[errorEvent.nodeName]: [errorEvent.data],
|
||||
},
|
||||
|
|
@ -969,6 +971,7 @@ describe('useWorkflowsStore', () => {
|
|||
...runWithExistingRunData,
|
||||
data: {
|
||||
resultData: {
|
||||
lastNodeExecuted: 'When clicking ‘Execute workflow’',
|
||||
runData: {
|
||||
[successEvent.nodeName]: [successEvent.data],
|
||||
},
|
||||
|
|
@ -977,7 +980,7 @@ describe('useWorkflowsStore', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('should replace existing placeholder task data in new log view', () => {
|
||||
it('should replace existing placeholder task data and lastNodeExecuted', () => {
|
||||
const successEventWithExecutionIndex = deepCopy(successEvent);
|
||||
successEventWithExecutionIndex.data.executionIndex = 1;
|
||||
|
||||
|
|
@ -1023,6 +1026,7 @@ describe('useWorkflowsStore', () => {
|
|||
...executionResponse,
|
||||
data: {
|
||||
resultData: {
|
||||
lastNodeExecuted: 'When clicking ‘Execute workflow’',
|
||||
runData: {
|
||||
[successEvent.nodeName]: [successEventWithExecutionIndex.data],
|
||||
},
|
||||
|
|
|
|||
|
|
@ -1626,6 +1626,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
|
|||
const node = getNodeByName(nodeName);
|
||||
if (!node) return;
|
||||
|
||||
workflowExecutionData.value.data.resultData.lastNodeExecuted = nodeName;
|
||||
|
||||
if (workflowExecutionData.value.data.resultData.runData[nodeName] === undefined) {
|
||||
workflowExecutionData.value.data.resultData.runData[nodeName] = [];
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user