mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-02 01:37:07 +02:00
feat(core): Add execution source telemetry (no-changelog) (#31133)
This commit is contained in:
parent
94f463ad28
commit
4261116180
|
|
@ -1372,6 +1372,7 @@ describe('TelemetryEventRelay', () => {
|
|||
await flushPromises();
|
||||
|
||||
expect(telemetry.trackWorkflowExecution).toHaveBeenCalledWith({
|
||||
execution_source: 'user',
|
||||
is_manual: false,
|
||||
success: false,
|
||||
user_id: 'user123',
|
||||
|
|
@ -2048,6 +2049,41 @@ describe('TelemetryEventRelay', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should add Instance AI execution metadata to workflow execution telemetry', async () => {
|
||||
const runData = {
|
||||
finished: true,
|
||||
status: 'success',
|
||||
mode: 'manual',
|
||||
data: { resultData: { runData: {} } },
|
||||
} as unknown as IRun;
|
||||
|
||||
const event: RelayEventMap['workflow-post-execute'] = {
|
||||
workflow: mockWorkflowBase,
|
||||
executionId: 'execution123',
|
||||
userId: 'user123',
|
||||
runData,
|
||||
telemetryMetadata: {
|
||||
source: 'instance_ai',
|
||||
mockDataSources: ['trigger_input', 'verification_pin_data'],
|
||||
},
|
||||
};
|
||||
|
||||
eventService.emit('workflow-post-execute', event);
|
||||
|
||||
await flushPromises();
|
||||
|
||||
const expectedProperties = expect.objectContaining({
|
||||
execution_source: 'instance_ai',
|
||||
mock_data_sources: 'trigger_input,verification_pin_data',
|
||||
});
|
||||
|
||||
expect(telemetry.track).toHaveBeenCalledWith(
|
||||
'Manual workflow exec finished',
|
||||
expectedProperties,
|
||||
);
|
||||
expect(telemetry.trackWorkflowExecution).toHaveBeenCalledWith(expectedProperties);
|
||||
});
|
||||
|
||||
it('should call telemetry.track when manual node execution finished', async () => {
|
||||
sharedWorkflowRepository.findSharingRole.mockResolvedValue('workflow:editor');
|
||||
credentialsRepository.findOneBy.mockResolvedValue(
|
||||
|
|
|
|||
|
|
@ -156,6 +156,7 @@ export type RelayEventMap = {
|
|||
runData?: IRun;
|
||||
projectId?: string;
|
||||
projectName?: string;
|
||||
telemetryMetadata?: IWorkflowExecutionDataProcess['telemetryMetadata'];
|
||||
};
|
||||
|
||||
'workflow-sharing-updated': {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,16 @@ function hasUser(event: WorkflowExecutedEvent): event is WorkflowExecutedEventWi
|
|||
return event.user !== undefined;
|
||||
}
|
||||
|
||||
function withoutTelemetryMetadata(
|
||||
event: RelayEventMap['workflow-post-execute'],
|
||||
): Omit<RelayEventMap['workflow-post-execute'], 'telemetryMetadata'> {
|
||||
const eventWithoutTelemetryMetadata = { ...event };
|
||||
|
||||
delete eventWithoutTelemetryMetadata.telemetryMetadata;
|
||||
|
||||
return eventWithoutTelemetryMetadata;
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class LogStreamingEventRelay extends EventRelay {
|
||||
constructor(
|
||||
|
|
@ -264,7 +274,8 @@ export class LogStreamingEventRelay extends EventRelay {
|
|||
}
|
||||
|
||||
private workflowPostExecute(event: RelayEventMap['workflow-post-execute']) {
|
||||
const { runData, workflow, executionId, projectId, projectName, ...rest } = event;
|
||||
const { runData, workflow, executionId, projectId, projectName, ...rest } =
|
||||
withoutTelemetryMetadata(event);
|
||||
|
||||
const payload = {
|
||||
...rest,
|
||||
|
|
|
|||
|
|
@ -49,6 +49,21 @@ function limitNodeGraphStringSize(nodeGraphString: string): string {
|
|||
return nodeGraphString;
|
||||
}
|
||||
|
||||
function getExecutionTelemetryProperties(
|
||||
telemetryMetadata: RelayEventMap['workflow-post-execute']['telemetryMetadata'],
|
||||
): ITelemetryTrackProperties {
|
||||
const executionSource = telemetryMetadata?.source ?? 'user';
|
||||
|
||||
if (executionSource !== 'instance_ai') return { execution_source: executionSource };
|
||||
|
||||
const { mockDataSources } = telemetryMetadata ?? {};
|
||||
|
||||
return {
|
||||
execution_source: executionSource,
|
||||
...(mockDataSources?.length ? { mock_data_sources: mockDataSources.join(',') } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class TelemetryEventRelay extends EventRelay {
|
||||
constructor(
|
||||
|
|
@ -902,16 +917,20 @@ export class TelemetryEventRelay extends EventRelay {
|
|||
workflow,
|
||||
runData,
|
||||
userId,
|
||||
telemetryMetadata,
|
||||
}: RelayEventMap['workflow-post-execute']) {
|
||||
if (!workflow.id) {
|
||||
return;
|
||||
}
|
||||
|
||||
const executionTelemetryProperties = getExecutionTelemetryProperties(telemetryMetadata);
|
||||
|
||||
const telemetryProperties: IExecutionTrackProperties = {
|
||||
workflow_id: workflow.id,
|
||||
is_manual: false,
|
||||
version_cli: N8N_VERSION,
|
||||
success: false,
|
||||
...executionTelemetryProperties,
|
||||
used_dynamic_credentials: Object.values(runData?.data?.resultData?.runData ?? {}).some(
|
||||
(taskDataList) => taskDataList.some((taskData) => taskData.usedDynamicCredentials),
|
||||
),
|
||||
|
|
@ -1012,6 +1031,7 @@ export class TelemetryEventRelay extends EventRelay {
|
|||
eval_rows_left: null,
|
||||
meta: JSON.stringify(workflow.meta),
|
||||
used_dynamic_credentials: telemetryProperties.used_dynamic_credentials,
|
||||
...executionTelemetryProperties,
|
||||
...TelemetryHelpers.resolveAIMetrics(workflow.nodes, this.nodeTypes),
|
||||
...TelemetryHelpers.resolveVectorStoreMetrics(workflow.nodes, this.nodeTypes, runData),
|
||||
...TelemetryHelpers.extractLastExecutedNodeStructuredOutputErrorInfo(
|
||||
|
|
|
|||
|
|
@ -494,6 +494,27 @@ describe('Execution Lifecycle Hooks', () => {
|
|||
externalHooksTests();
|
||||
statisticsTests();
|
||||
|
||||
it('should include execution telemetry metadata in workflow-post-execute events', async () => {
|
||||
const telemetryMetadata = {
|
||||
source: 'instance_ai' as const,
|
||||
mockDataSources: ['trigger_input' as const],
|
||||
};
|
||||
const lifecycleHooks = getLifecycleHooksForRegularMain(
|
||||
{ executionMode: 'manual', workflowData, pushRef, retryOf, userId, telemetryMetadata },
|
||||
executionId,
|
||||
);
|
||||
|
||||
await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]);
|
||||
|
||||
expect(eventService.emit).toHaveBeenCalledWith('workflow-post-execute', {
|
||||
executionId,
|
||||
runData: successfulRun,
|
||||
workflow: workflowData,
|
||||
userId,
|
||||
telemetryMetadata,
|
||||
});
|
||||
});
|
||||
|
||||
it('should setup the correct set of hooks', () => {
|
||||
expect(lifecycleHooks).toBeInstanceOf(ExecutionLifecycleHooks);
|
||||
expect(lifecycleHooks.mode).toBe('manual');
|
||||
|
|
|
|||
|
|
@ -144,6 +144,7 @@ function hookFunctionsWorkflowEvents(
|
|||
userId?: string,
|
||||
projectId?: string,
|
||||
projectName?: string,
|
||||
telemetryMetadata?: IWorkflowExecutionDataProcess['telemetryMetadata'],
|
||||
) {
|
||||
const eventService = Container.get(EventService);
|
||||
hooks.addHandler('workflowExecuteBefore', function () {
|
||||
|
|
@ -192,6 +193,7 @@ function hookFunctionsWorkflowEvents(
|
|||
userId,
|
||||
projectId,
|
||||
projectName,
|
||||
...(telemetryMetadata ? { telemetryMetadata } : {}),
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
@ -755,7 +757,16 @@ export function getLifecycleHooksForScalingMain(
|
|||
data: IWorkflowExecutionDataProcess,
|
||||
executionId: string,
|
||||
): ExecutionLifecycleHooks {
|
||||
const { pushRef, retryOf, executionMode, workflowData, userId, projectId, projectName } = data;
|
||||
const {
|
||||
pushRef,
|
||||
retryOf,
|
||||
executionMode,
|
||||
workflowData,
|
||||
userId,
|
||||
projectId,
|
||||
projectName,
|
||||
telemetryMetadata,
|
||||
} = data;
|
||||
const hooks = new ExecutionLifecycleHooks(
|
||||
executionMode,
|
||||
executionId,
|
||||
|
|
@ -767,7 +778,7 @@ export function getLifecycleHooksForScalingMain(
|
|||
const executionRepository = Container.get(ExecutionRepository);
|
||||
const executionPersistence = Container.get(ExecutionPersistence);
|
||||
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName);
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName, telemetryMetadata);
|
||||
hookFunctionsSaveProgress(hooks, optionalParameters);
|
||||
hookFunctionsExternalHooks(hooks);
|
||||
hookFunctionsFinalizeExecutionStatus(hooks);
|
||||
|
|
@ -833,7 +844,16 @@ export function getLifecycleHooksForRegularMain(
|
|||
data: IWorkflowExecutionDataProcess,
|
||||
executionId: string,
|
||||
): ExecutionLifecycleHooks {
|
||||
const { pushRef, retryOf, executionMode, workflowData, userId, projectId, projectName } = data;
|
||||
const {
|
||||
pushRef,
|
||||
retryOf,
|
||||
executionMode,
|
||||
workflowData,
|
||||
userId,
|
||||
projectId,
|
||||
projectName,
|
||||
telemetryMetadata,
|
||||
} = data;
|
||||
const hooks = new ExecutionLifecycleHooks(
|
||||
executionMode,
|
||||
executionId,
|
||||
|
|
@ -842,7 +862,7 @@ export function getLifecycleHooksForRegularMain(
|
|||
);
|
||||
const saveSettings = toSaveSettings(workflowData.settings);
|
||||
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName);
|
||||
hookFunctionsWorkflowEvents(hooks, userId, projectId, projectName, telemetryMetadata);
|
||||
hookFunctionsNodeEvents(hooks);
|
||||
hookFunctionsFinalizeExecutionStatus(hooks);
|
||||
hookFunctionsSave(hooks, optionalParameters);
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import type {
|
|||
ExecutionSummary,
|
||||
IWorkflowExecutionDataProcess,
|
||||
IExecutionContext,
|
||||
WorkflowExecutionSource,
|
||||
} from 'n8n-workflow';
|
||||
import type PCancelable from 'p-cancelable';
|
||||
|
||||
|
|
@ -185,6 +186,8 @@ export interface IExecutionTrackProperties extends ITelemetryTrackProperties {
|
|||
is_manual: boolean;
|
||||
crashed?: boolean;
|
||||
used_dynamic_credentials?: boolean;
|
||||
execution_source?: WorkflowExecutionSource;
|
||||
mock_data_sources?: string;
|
||||
}
|
||||
|
||||
export interface IAgentExecutionTrackProperties extends ITelemetryTrackProperties {
|
||||
|
|
|
|||
|
|
@ -2568,6 +2568,42 @@ describe('createExecutionAdapter run()', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('attaches Instance AI execution telemetry metadata to workflow runs', async () => {
|
||||
const { adapter, mockWorkflowRunner } = createRunAdapterForTests({
|
||||
id: 'wf-1',
|
||||
nodes: [
|
||||
{
|
||||
id: 'node-1',
|
||||
name: 'Webhook',
|
||||
type: 'n8n-nodes-base.webhook',
|
||||
typeVersion: 2,
|
||||
parameters: {},
|
||||
position: [0, 0],
|
||||
},
|
||||
],
|
||||
pinData: {
|
||||
Existing: [{ json: { id: 'existing' } }],
|
||||
},
|
||||
});
|
||||
|
||||
await adapter.run(
|
||||
'wf-1',
|
||||
{ id: 'input' },
|
||||
{
|
||||
pinData: {
|
||||
Mocked: [{ id: 'mocked' }],
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
const runData = mockWorkflowRunner.run.mock.calls[0][0];
|
||||
|
||||
expect(runData.telemetryMetadata).toEqual({
|
||||
source: 'instance_ai',
|
||||
mockDataSources: ['trigger_input', 'verification_pin_data', 'workflow_pin_data'],
|
||||
});
|
||||
});
|
||||
|
||||
it('tracks workflow id and success status when a builder execution finishes', async () => {
|
||||
const { adapter, mockTelemetry } = createRunAdapterForTests(
|
||||
{
|
||||
|
|
|
|||
|
|
@ -85,6 +85,7 @@ import {
|
|||
type DataTableRow,
|
||||
type DataTableRows,
|
||||
type WorkflowExecuteMode,
|
||||
type WorkflowExecutionMockDataSource,
|
||||
type ExecutionError,
|
||||
NodeHelpers,
|
||||
Workflow,
|
||||
|
|
@ -912,6 +913,19 @@ export class InstanceAiAdapterService {
|
|||
? (sdkPinDataToRuntime(options.pinData) ?? {})
|
||||
: {};
|
||||
const basePinData = { ...workflowPinData, ...overridePinData };
|
||||
const mockDataSources: WorkflowExecutionMockDataSource[] = [];
|
||||
|
||||
if (inputData && triggerNode) {
|
||||
mockDataSources.push('trigger_input');
|
||||
}
|
||||
|
||||
if (Object.keys(overridePinData).length > 0) {
|
||||
mockDataSources.push('verification_pin_data');
|
||||
}
|
||||
|
||||
if (Object.keys(workflowPinData).length > 0) {
|
||||
mockDataSources.push('workflow_pin_data');
|
||||
}
|
||||
|
||||
if (inputData && triggerNode) {
|
||||
const triggerPinData = getPinDataForTrigger(triggerNode, inputData);
|
||||
|
|
@ -948,6 +962,11 @@ export class InstanceAiAdapterService {
|
|||
runData.pinData = basePinData;
|
||||
}
|
||||
|
||||
runData.telemetryMetadata = {
|
||||
source: 'instance_ai',
|
||||
mockDataSources,
|
||||
};
|
||||
|
||||
const trackBuilderExecutedWorkflow = (status: ExecutionResult['status']) => {
|
||||
if (!threadId) return;
|
||||
|
||||
|
|
|
|||
|
|
@ -116,6 +116,48 @@ describe('Telemetry', () => {
|
|||
expect(execBuffer['1'].prod_error?.first).toEqual(execTime4);
|
||||
});
|
||||
|
||||
test('should count Instance AI source buckets alongside existing mode buckets', async () => {
|
||||
const payload: Parameters<Telemetry['trackWorkflowExecution']>[0] = {
|
||||
workflow_id: '1',
|
||||
is_manual: true,
|
||||
success: true,
|
||||
error_node_type: 'custom-nodes-base.node-type',
|
||||
execution_source: 'user',
|
||||
};
|
||||
|
||||
const userManualExecTime = fakeJestSystemTime('2022-01-01 12:00:00');
|
||||
telemetry.trackWorkflowExecution(payload);
|
||||
|
||||
payload.execution_source = 'instance_ai';
|
||||
payload.mock_data_sources = 'trigger_input';
|
||||
|
||||
const instanceAiMockManualExecTime = fakeJestSystemTime('2022-01-01 13:00:00');
|
||||
telemetry.trackWorkflowExecution(payload);
|
||||
|
||||
payload.is_manual = false;
|
||||
delete payload.mock_data_sources;
|
||||
|
||||
const instanceAiRealProdExecTime = fakeJestSystemTime('2022-01-01 14:00:00');
|
||||
telemetry.trackWorkflowExecution(payload);
|
||||
|
||||
const execBuffer = telemetry.getCountsBuffer();
|
||||
|
||||
expect(execBuffer['1'].manual_success?.count).toBe(2);
|
||||
expect(execBuffer['1'].manual_success?.first).toEqual(userManualExecTime);
|
||||
expect(execBuffer['1'].prod_success?.count).toBe(1);
|
||||
expect(execBuffer['1'].prod_success?.first).toEqual(instanceAiRealProdExecTime);
|
||||
|
||||
expect(execBuffer['1']).not.toHaveProperty('user_manual_success');
|
||||
expect(execBuffer['1'].instance_ai_mock_manual_success?.count).toBe(1);
|
||||
expect(execBuffer['1'].instance_ai_mock_manual_success?.first).toEqual(
|
||||
instanceAiMockManualExecTime,
|
||||
);
|
||||
expect(execBuffer['1'].instance_ai_real_prod_success?.count).toBe(1);
|
||||
expect(execBuffer['1'].instance_ai_real_prod_success?.first).toEqual(
|
||||
instanceAiRealProdExecTime,
|
||||
);
|
||||
});
|
||||
|
||||
test('should fire "Workflow execution errored" event for failed executions', async () => {
|
||||
const payload = {
|
||||
workflow_id: '1',
|
||||
|
|
|
|||
|
|
@ -26,23 +26,20 @@ type ExecutionTrackDataKey =
|
|||
| 'prod_error'
|
||||
| 'prod_success'
|
||||
| 'manual_crashed'
|
||||
| 'prod_crashed';
|
||||
| 'prod_crashed'
|
||||
| `${'instance_ai'}_${'mock' | 'real'}_${'manual' | 'prod'}_${'error' | 'success' | 'crashed'}`;
|
||||
|
||||
interface IExecutionTrackData {
|
||||
count: number;
|
||||
first: Date;
|
||||
}
|
||||
|
||||
type IExecutionsBufferEntry = Partial<Record<ExecutionTrackDataKey, IExecutionTrackData>> & {
|
||||
user_id: string | undefined;
|
||||
};
|
||||
|
||||
interface IExecutionsBuffer {
|
||||
[workflowId: string]: {
|
||||
manual_error?: IExecutionTrackData;
|
||||
manual_success?: IExecutionTrackData;
|
||||
prod_error?: IExecutionTrackData;
|
||||
prod_success?: IExecutionTrackData;
|
||||
manual_crashed?: IExecutionTrackData;
|
||||
prod_crashed?: IExecutionTrackData;
|
||||
user_id: string | undefined;
|
||||
};
|
||||
[workflowId: string]: IExecutionsBufferEntry;
|
||||
}
|
||||
|
||||
interface IApiInvocationProperties {
|
||||
|
|
@ -294,15 +291,19 @@ export class Telemetry {
|
|||
}`;
|
||||
}
|
||||
|
||||
const executionTrackDataKey = this.executionCountsBuffer[workflowId][key];
|
||||
this.addExecutionTrackData(workflowId, key, execTime);
|
||||
|
||||
if (!executionTrackDataKey) {
|
||||
this.executionCountsBuffer[workflowId][key] = {
|
||||
count: 1,
|
||||
first: execTime,
|
||||
};
|
||||
} else {
|
||||
executionTrackDataKey.count++;
|
||||
const executionStatus = properties.crashed
|
||||
? 'crashed'
|
||||
: properties.success
|
||||
? 'success'
|
||||
: 'error';
|
||||
const executionMode = properties.is_manual ? 'manual' : 'prod';
|
||||
|
||||
if (properties.execution_source === 'instance_ai') {
|
||||
const instanceAiDataType = properties.mock_data_sources ? 'mock' : 'real';
|
||||
const sourceKey: ExecutionTrackDataKey = `instance_ai_${instanceAiDataType}_${executionMode}_${executionStatus}`;
|
||||
this.addExecutionTrackData(workflowId, sourceKey, execTime);
|
||||
}
|
||||
|
||||
if (properties.used_dynamic_credentials) {
|
||||
|
|
@ -319,6 +320,19 @@ export class Telemetry {
|
|||
}
|
||||
}
|
||||
|
||||
private addExecutionTrackData(workflowId: string, key: ExecutionTrackDataKey, execTime: Date) {
|
||||
const executionTrackData = this.executionCountsBuffer[workflowId][key];
|
||||
|
||||
if (!executionTrackData) {
|
||||
this.executionCountsBuffer[workflowId][key] = {
|
||||
count: 1,
|
||||
first: execTime,
|
||||
};
|
||||
} else {
|
||||
executionTrackData.count++;
|
||||
}
|
||||
}
|
||||
|
||||
trackAgentExecution(properties: IAgentExecutionTrackProperties) {
|
||||
if (!this.rudderStack) return;
|
||||
|
||||
|
|
|
|||
|
|
@ -3091,6 +3091,18 @@ export interface IDestinationNode {
|
|||
mode: 'inclusive' | 'exclusive';
|
||||
}
|
||||
|
||||
export type WorkflowExecutionSource = 'user' | 'instance_ai';
|
||||
|
||||
export type WorkflowExecutionMockDataSource =
|
||||
| 'trigger_input'
|
||||
| 'verification_pin_data'
|
||||
| 'workflow_pin_data';
|
||||
|
||||
export interface IWorkflowExecutionTelemetryMetadata {
|
||||
source: WorkflowExecutionSource;
|
||||
mockDataSources?: WorkflowExecutionMockDataSource[];
|
||||
}
|
||||
|
||||
export interface IWorkflowExecutionDataProcess {
|
||||
destinationNode?: IDestinationNode;
|
||||
restartExecutionId?: string;
|
||||
|
|
@ -3114,6 +3126,7 @@ export interface IWorkflowExecutionDataProcess {
|
|||
userId?: string;
|
||||
projectId?: string;
|
||||
projectName?: string;
|
||||
telemetryMetadata?: IWorkflowExecutionTelemetryMetadata;
|
||||
dirtyNodeNames?: string[];
|
||||
triggerToStartFrom?: {
|
||||
name: string;
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user