mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 18:49:20 +02:00
feat(core): Surface workflow execution source to the editor (no-changelog) (#31724)
This commit is contained in:
parent
bc7aeb6fe8
commit
22eb20f183
|
|
@ -4,6 +4,7 @@ import type {
|
|||
ITaskStartedData,
|
||||
NodeConnectionType,
|
||||
WorkflowExecuteMode,
|
||||
WorkflowExecutionSource,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
export type ExecutionStarted = {
|
||||
|
|
@ -11,6 +12,11 @@ export type ExecutionStarted = {
|
|||
data: {
|
||||
executionId: string;
|
||||
mode: WorkflowExecuteMode;
|
||||
/**
|
||||
* Who initiated the run. Absent for ordinary user runs; `'instance_ai'`
|
||||
* when the AI assistant ran the workflow on the user's behalf.
|
||||
*/
|
||||
source?: WorkflowExecutionSource;
|
||||
startedAt: Date;
|
||||
workflowId: string;
|
||||
workflowName?: string;
|
||||
|
|
@ -23,6 +29,7 @@ export type ExecutionWaiting = {
|
|||
type: 'executionWaiting';
|
||||
data: {
|
||||
executionId: string;
|
||||
source?: WorkflowExecutionSource;
|
||||
};
|
||||
};
|
||||
|
||||
|
|
@ -32,6 +39,11 @@ export type ExecutionFinished = {
|
|||
executionId: string;
|
||||
workflowId: string;
|
||||
status: ExecutionStatus;
|
||||
/**
|
||||
* Who initiated the run. Absent for ordinary user runs; `'instance_ai'`
|
||||
* when the AI assistant ran the workflow on the user's behalf.
|
||||
*/
|
||||
source?: WorkflowExecutionSource;
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -2399,8 +2399,8 @@ describe('TelemetryEventRelay', () => {
|
|||
executionId: 'execution123',
|
||||
userId: 'user123',
|
||||
runData,
|
||||
source: 'instance_ai',
|
||||
telemetryMetadata: {
|
||||
source: 'instance_ai',
|
||||
mockDataSources: ['trigger_input', 'verification_pin_data'],
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -153,6 +153,7 @@ export type RelayEventMap = {
|
|||
runData?: IRun;
|
||||
projectId?: string;
|
||||
projectName?: string;
|
||||
source?: IWorkflowExecutionDataProcess['source'];
|
||||
telemetryMetadata?: IWorkflowExecutionDataProcess['telemetryMetadata'];
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -16,14 +16,17 @@ function hasUser(event: WorkflowExecutedEvent): event is WorkflowExecutedEventWi
|
|||
return event.user !== undefined;
|
||||
}
|
||||
|
||||
function withoutTelemetryMetadata(
|
||||
function withoutExecutionMetadata(
|
||||
event: RelayEventMap['workflow-post-execute'],
|
||||
): Omit<RelayEventMap['workflow-post-execute'], 'telemetryMetadata'> {
|
||||
const eventWithoutTelemetryMetadata = { ...event };
|
||||
): Omit<RelayEventMap['workflow-post-execute'], 'source' | 'telemetryMetadata'> {
|
||||
const trimmed = { ...event };
|
||||
|
||||
delete eventWithoutTelemetryMetadata.telemetryMetadata;
|
||||
// Execution metadata (provenance + telemetry) is internal and not part of
|
||||
// the log-streaming payload contract.
|
||||
delete trimmed.source;
|
||||
delete trimmed.telemetryMetadata;
|
||||
|
||||
return eventWithoutTelemetryMetadata;
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
@Service()
|
||||
|
|
@ -276,7 +279,7 @@ export class LogStreamingEventRelay extends EventRelay {
|
|||
|
||||
private workflowPostExecute(event: RelayEventMap['workflow-post-execute']) {
|
||||
const { runData, workflow, executionId, projectId, projectName, ...rest } =
|
||||
withoutTelemetryMetadata(event);
|
||||
withoutExecutionMetadata(event);
|
||||
|
||||
const payload = {
|
||||
...rest,
|
||||
|
|
|
|||
|
|
@ -63,9 +63,10 @@ function limitNodeGraphStringSize(nodeGraphString: string): string {
|
|||
}
|
||||
|
||||
function getExecutionTelemetryProperties(
|
||||
source: RelayEventMap['workflow-post-execute']['source'],
|
||||
telemetryMetadata: RelayEventMap['workflow-post-execute']['telemetryMetadata'],
|
||||
): ITelemetryTrackProperties {
|
||||
const executionSource = telemetryMetadata?.source ?? 'user';
|
||||
const executionSource = source ?? 'user';
|
||||
|
||||
if (executionSource !== 'instance_ai') return { execution_source: executionSource };
|
||||
|
||||
|
|
@ -1042,13 +1043,14 @@ export class TelemetryEventRelay extends EventRelay {
|
|||
workflow,
|
||||
runData,
|
||||
userId,
|
||||
source,
|
||||
telemetryMetadata,
|
||||
}: RelayEventMap['workflow-post-execute']) {
|
||||
if (!workflow.id) {
|
||||
return;
|
||||
}
|
||||
|
||||
const executionTelemetryProperties = getExecutionTelemetryProperties(telemetryMetadata);
|
||||
const executionTelemetryProperties = getExecutionTelemetryProperties(source, telemetryMetadata);
|
||||
|
||||
const telemetryProperties: IExecutionTrackProperties = {
|
||||
workflow_id: workflow.id,
|
||||
|
|
|
|||
|
|
@ -494,13 +494,20 @@ describe('Execution Lifecycle Hooks', () => {
|
|||
externalHooksTests();
|
||||
statisticsTests();
|
||||
|
||||
it('should include execution telemetry metadata in workflow-post-execute events', async () => {
|
||||
it('should include execution source and telemetry metadata in workflow-post-execute events', async () => {
|
||||
const telemetryMetadata = {
|
||||
source: 'instance_ai' as const,
|
||||
mockDataSources: ['trigger_input' as const],
|
||||
};
|
||||
const lifecycleHooks = getLifecycleHooksForRegularMain(
|
||||
{ executionMode: 'manual', workflowData, pushRef, retryOf, userId, telemetryMetadata },
|
||||
{
|
||||
executionMode: 'manual',
|
||||
workflowData,
|
||||
pushRef,
|
||||
retryOf,
|
||||
userId,
|
||||
source: 'instance_ai',
|
||||
telemetryMetadata,
|
||||
},
|
||||
executionId,
|
||||
);
|
||||
|
||||
|
|
@ -511,6 +518,7 @@ describe('Execution Lifecycle Hooks', () => {
|
|||
runData: successfulRun,
|
||||
workflow: workflowData,
|
||||
userId,
|
||||
source: 'instance_ai',
|
||||
telemetryMetadata,
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -144,6 +144,7 @@ function hookFunctionsWorkflowEvents(
|
|||
userId?: string,
|
||||
projectId?: string,
|
||||
projectName?: string,
|
||||
source?: IWorkflowExecutionDataProcess['source'],
|
||||
telemetryMetadata?: IWorkflowExecutionDataProcess['telemetryMetadata'],
|
||||
) {
|
||||
const eventService = Container.get(EventService);
|
||||
|
|
@ -193,6 +194,7 @@ function hookFunctionsWorkflowEvents(
|
|||
userId,
|
||||
projectId,
|
||||
projectName,
|
||||
...(source ? { source } : {}),
|
||||
...(telemetryMetadata ? { telemetryMetadata } : {}),
|
||||
});
|
||||
});
|
||||
|
|
@ -257,6 +259,7 @@ function hookFunctionsPush(
|
|||
hooks: ExecutionLifecycleHooks,
|
||||
{ pushRef, retryOf }: HooksSetupParameters,
|
||||
userId?: string,
|
||||
source?: IWorkflowExecutionDataProcess['source'],
|
||||
) {
|
||||
if (!pushRef) return;
|
||||
const logger = Container.get(Logger);
|
||||
|
|
@ -410,6 +413,7 @@ function hookFunctionsPush(
|
|||
data: {
|
||||
executionId,
|
||||
mode: this.mode,
|
||||
source,
|
||||
startedAt: new Date(),
|
||||
retryOf,
|
||||
workflowId,
|
||||
|
|
@ -431,10 +435,10 @@ function hookFunctionsPush(
|
|||
|
||||
const { status } = fullRunData;
|
||||
if (status === 'waiting') {
|
||||
pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef);
|
||||
pushInstance.send({ type: 'executionWaiting', data: { executionId, source } }, pushRef);
|
||||
} else {
|
||||
pushInstance.send(
|
||||
{ type: 'executionFinished', data: { executionId, workflowId, status } },
|
||||
{ type: 'executionFinished', data: { executionId, workflowId, status, source } },
|
||||
pushRef,
|
||||
);
|
||||
}
|
||||
|
|
@ -742,7 +746,7 @@ export function getLifecycleHooksForScalingWorker(
|
|||
hookFunctionsExternalHooks(hooks);
|
||||
|
||||
if (executionMode === 'manual' && Container.get(InstanceSettings).isWorker) {
|
||||
hookFunctionsPush(hooks, optionalParameters, data.userId);
|
||||
hookFunctionsPush(hooks, optionalParameters, data.userId, data.source);
|
||||
}
|
||||
|
||||
Container.get(ModulesHooksRegistry).addHooks(hooks);
|
||||
|
|
@ -765,6 +769,7 @@ export function getLifecycleHooksForScalingMain(
|
|||
userId,
|
||||
projectId,
|
||||
projectName,
|
||||
source,
|
||||
telemetryMetadata,
|
||||
} = data;
|
||||
const hooks = new ExecutionLifecycleHooks(
|
||||
|
|
@ -778,7 +783,7 @@ export function getLifecycleHooksForScalingMain(
|
|||
const executionRepository = Container.get(ExecutionRepository);
|
||||
const executionPersistence = Container.get(ExecutionPersistence);
|
||||
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName, telemetryMetadata);
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName, source, telemetryMetadata);
|
||||
hookFunctionsSaveProgress(hooks, optionalParameters);
|
||||
hookFunctionsExternalHooks(hooks);
|
||||
hookFunctionsFinalizeExecutionStatus(hooks);
|
||||
|
|
@ -852,6 +857,7 @@ export function getLifecycleHooksForRegularMain(
|
|||
userId,
|
||||
projectId,
|
||||
projectName,
|
||||
source,
|
||||
telemetryMetadata,
|
||||
} = data;
|
||||
const hooks = new ExecutionLifecycleHooks(
|
||||
|
|
@ -862,11 +868,11 @@ export function getLifecycleHooksForRegularMain(
|
|||
);
|
||||
const saveSettings = toSaveSettings(workflowData.settings);
|
||||
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName, telemetryMetadata);
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName, source, telemetryMetadata);
|
||||
hookFunctionsNodeEvents(hooks);
|
||||
hookFunctionsFinalizeExecutionStatus(hooks);
|
||||
hookFunctionsSave(hooks, optionalParameters);
|
||||
hookFunctionsPush(hooks, optionalParameters, userId);
|
||||
hookFunctionsPush(hooks, optionalParameters, userId, source);
|
||||
hookFunctionsSaveProgress(hooks, optionalParameters);
|
||||
hookFunctionsStatistics(hooks);
|
||||
hookFunctionsExternalHooks(hooks);
|
||||
|
|
|
|||
|
|
@ -2656,8 +2656,8 @@ describe('createExecutionAdapter run()', () => {
|
|||
|
||||
const runData = mockWorkflowRunner.run.mock.calls[0][0];
|
||||
|
||||
expect(runData.source).toBe('instance_ai');
|
||||
expect(runData.telemetryMetadata).toEqual({
|
||||
source: 'instance_ai',
|
||||
mockDataSources: ['trigger_input', 'verification_pin_data', 'workflow_pin_data'],
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -984,8 +984,8 @@ export class InstanceAiAdapterService {
|
|||
runData.pinData = basePinData;
|
||||
}
|
||||
|
||||
runData.source = 'instance_ai';
|
||||
runData.telemetryMetadata = {
|
||||
source: 'instance_ai',
|
||||
mockDataSources,
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -70,6 +70,10 @@ const removeExecutionFinishedListener = pushStore.addEventListener((event) => {
|
|||
if (event.type !== 'executionFinished') return;
|
||||
if (event.data.workflowId !== props.workflowId) return;
|
||||
if (event.data.status === 'success') return;
|
||||
// Only offer "Fix with AI" for human-initiated runs. When the agent ran the
|
||||
// workflow itself (source 'instance_ai'), it already sees the errors in its
|
||||
// tool result and fixes them on its own.
|
||||
if (event.data.source === 'instance_ai') return;
|
||||
|
||||
const execStore = useExecutionDataStore(createExecutionDataId(event.data.executionId));
|
||||
const runData = execStore.executionRunData;
|
||||
|
|
|
|||
|
|
@ -3124,7 +3124,6 @@ export type WorkflowExecutionMockDataSource =
|
|||
| 'workflow_pin_data';
|
||||
|
||||
export interface IWorkflowExecutionTelemetryMetadata {
|
||||
source: WorkflowExecutionSource;
|
||||
mockDataSources?: WorkflowExecutionMockDataSource[];
|
||||
}
|
||||
|
||||
|
|
@ -3151,6 +3150,13 @@ export interface IWorkflowExecutionDataProcess {
|
|||
userId?: string;
|
||||
projectId?: string;
|
||||
projectName?: string;
|
||||
/**
|
||||
* Who initiated this run. Unset means a regular user-initiated run;
|
||||
* `'instance_ai'` when the AI assistant ran the workflow on the user's
|
||||
* behalf. Consumed by the execution push (so the editor can tell agent runs
|
||||
* apart) and by telemetry.
|
||||
*/
|
||||
source?: WorkflowExecutionSource;
|
||||
telemetryMetadata?: IWorkflowExecutionTelemetryMetadata;
|
||||
dirtyNodeNames?: string[];
|
||||
triggerToStartFrom?: {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user