mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-31 16:57:08 +02:00
feat(core): Add project.id to workflow.execute OTEL span (#30803)
This commit is contained in:
parent
6ce4b30bc4
commit
b80738bb18
|
|
@ -39,6 +39,7 @@ describe('ExecutionLevelTracer', () => {
|
|||
executionId: 'exec-1',
|
||||
tracingContext: inboundTracingContext,
|
||||
workflow: defaultWorkflow,
|
||||
project: { id: 'project-1' },
|
||||
});
|
||||
tracer.endWorkflow({
|
||||
executionId: 'exec-1',
|
||||
|
|
@ -55,11 +56,28 @@ describe('ExecutionLevelTracer', () => {
|
|||
expect(span.attributes['n8n.workflow.id']).toBe('wf-1');
|
||||
expect(span.attributes['n8n.workflow.name']).toBe('Test');
|
||||
expect(span.attributes['n8n.execution.id']).toBe('exec-1');
|
||||
expect(span.attributes['n8n.project.id']).toBe('project-1');
|
||||
expect(span.attributes['n8n.execution.mode']).toBe('manual');
|
||||
expect(span.attributes['n8n.execution.status']).toBe('success');
|
||||
expect(span.status.code).toBe(SpanStatusCode.OK);
|
||||
});
|
||||
|
||||
it('should omit project id attribute when project is not provided', () => {
|
||||
tracer.startWorkflow({
|
||||
executionId: 'exec-no-project',
|
||||
tracingContext: inboundTracingContext,
|
||||
workflow: defaultWorkflow,
|
||||
});
|
||||
tracer.endWorkflow({
|
||||
executionId: 'exec-no-project',
|
||||
status: 'success',
|
||||
mode: 'manual',
|
||||
isRetry: false,
|
||||
});
|
||||
|
||||
expect(otel.getFinishedSpans()[0].attributes['n8n.project.id']).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should set error status on failed executions', () => {
|
||||
tracer.startWorkflow({
|
||||
executionId: 'exec-2',
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import type { Logger } from '@n8n/backend-common';
|
||||
import type {
|
||||
NodeExecuteAfterContext,
|
||||
NodeExecuteBeforeContext,
|
||||
|
|
@ -7,9 +8,11 @@ import type {
|
|||
import { mock } from 'jest-mock-extended';
|
||||
import type { IRun, IRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import type { OwnershipService } from '@/services/ownership.service';
|
||||
|
||||
import type { ExecutionLevelTracer } from '../execution-level-tracer';
|
||||
import type { OtelConfig } from '../otel.config';
|
||||
import { OtelLifecycleHandler, countInputItems, countOutputItems } from '../otel-lifecycle-handler';
|
||||
import type { OtelConfig } from '../otel.config';
|
||||
import type { TracingContext, TraceContextService } from '../tracing-context';
|
||||
|
||||
const emptyExecutionData = {
|
||||
|
|
@ -22,6 +25,8 @@ describe('OtelLifecycleHandler', () => {
|
|||
const tracer = mock<ExecutionLevelTracer>();
|
||||
const traceContextService = mock<TraceContextService>();
|
||||
const config = mock<OtelConfig>();
|
||||
const ownershipService = mock<OwnershipService>();
|
||||
const logger = mock<Logger>();
|
||||
let handler: OtelLifecycleHandler;
|
||||
|
||||
const parentTracingContext: TracingContext = {
|
||||
|
|
@ -52,8 +57,42 @@ describe('OtelLifecycleHandler', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
handler = new OtelLifecycleHandler(tracer, traceContextService, config);
|
||||
handler = new OtelLifecycleHandler(
|
||||
tracer,
|
||||
traceContextService,
|
||||
config,
|
||||
ownershipService,
|
||||
logger,
|
||||
);
|
||||
tracer.startWorkflow.mockReturnValue(generatedSpanContext);
|
||||
ownershipService.getWorkflowProjectCached.mockResolvedValue({ id: 'proj-default' } as never);
|
||||
});
|
||||
|
||||
it('should look up project via OwnershipService and pass id to the tracer', async () => {
|
||||
traceContextService.get.mockResolvedValueOnce(undefined);
|
||||
ownershipService.getWorkflowProjectCached.mockResolvedValueOnce({ id: 'proj-1' } as never);
|
||||
|
||||
await handler.onWorkflowStart(baseCtx);
|
||||
|
||||
expect(ownershipService.getWorkflowProjectCached).toHaveBeenCalledWith('wf-1');
|
||||
expect(tracer.startWorkflow).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ project: { id: 'proj-1' } }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should start workflow span without project if project lookup fails', async () => {
|
||||
traceContextService.get.mockResolvedValueOnce(undefined);
|
||||
ownershipService.getWorkflowProjectCached.mockRejectedValueOnce(new Error('DB error'));
|
||||
|
||||
await expect(handler.onWorkflowStart(baseCtx)).resolves.not.toThrow();
|
||||
|
||||
expect(tracer.startWorkflow).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ project: undefined }),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Failed to fetch project for OTEL span',
|
||||
expect.objectContaining({ workflowId: 'wf-1', executionId: 'exec-sub' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should use own tracingContext when present (webhook case)', async () => {
|
||||
|
|
@ -122,6 +161,8 @@ describe('OtelLifecycleHandler', () => {
|
|||
const tracer = mock<ExecutionLevelTracer>();
|
||||
const traceContextService = mock<TraceContextService>();
|
||||
const config = mock<OtelConfig>();
|
||||
const ownershipService = mock<OwnershipService>();
|
||||
const logger = mock<Logger>();
|
||||
let handler: OtelLifecycleHandler;
|
||||
|
||||
const prePauseContext: TracingContext = {
|
||||
|
|
@ -133,8 +174,57 @@ describe('OtelLifecycleHandler', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
handler = new OtelLifecycleHandler(tracer, traceContextService, config);
|
||||
handler = new OtelLifecycleHandler(
|
||||
tracer,
|
||||
traceContextService,
|
||||
config,
|
||||
ownershipService,
|
||||
logger,
|
||||
);
|
||||
tracer.startWorkflow.mockReturnValue(resumedSpanContext);
|
||||
ownershipService.getWorkflowProjectCached.mockResolvedValue({ id: 'proj-default' } as never);
|
||||
});
|
||||
|
||||
it('should look up project via OwnershipService on resume', async () => {
|
||||
traceContextService.get.mockResolvedValueOnce(undefined);
|
||||
ownershipService.getWorkflowProjectCached.mockResolvedValueOnce({
|
||||
id: 'resume-proj',
|
||||
} as never);
|
||||
|
||||
await handler.onWorkflowResume({
|
||||
type: 'workflowExecuteResume',
|
||||
workflow: { id: 'wf-1', name: 'Test', versionId: 'v1', nodes: [], connections: {} },
|
||||
workflowInstance: undefined as never,
|
||||
executionData: undefined as never,
|
||||
executionId: 'exec-resume',
|
||||
} as never);
|
||||
|
||||
expect(ownershipService.getWorkflowProjectCached).toHaveBeenCalledWith('wf-1');
|
||||
expect(tracer.startWorkflow).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ project: { id: 'resume-proj' } }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should start workflow span without project if project lookup fails on resume', async () => {
|
||||
ownershipService.getWorkflowProjectCached.mockRejectedValueOnce(new Error('DB error'));
|
||||
|
||||
await expect(
|
||||
handler.onWorkflowResume({
|
||||
type: 'workflowExecuteResume',
|
||||
workflow: { id: 'wf-1', name: 'Test', versionId: 'v1', nodes: [], connections: {} },
|
||||
workflowInstance: undefined as never,
|
||||
executionData: undefined as never,
|
||||
executionId: 'exec-resume',
|
||||
} as never),
|
||||
).resolves.not.toThrow();
|
||||
|
||||
expect(tracer.startWorkflow).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ project: undefined }),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Failed to fetch project for OTEL span',
|
||||
expect.objectContaining({ workflowId: 'wf-1', executionId: 'exec-resume' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should start a new root span linked to the pre-wait origin and not overwrite the persisted origin', async () => {
|
||||
|
|
@ -168,11 +258,19 @@ describe('OtelLifecycleHandler', () => {
|
|||
const tracer = mock<ExecutionLevelTracer>();
|
||||
const traceContextService = mock<TraceContextService>();
|
||||
const config = mock<OtelConfig>();
|
||||
const ownershipService = mock<OwnershipService>();
|
||||
const logger = mock<Logger>();
|
||||
let handler: OtelLifecycleHandler;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
handler = new OtelLifecycleHandler(tracer, traceContextService, config);
|
||||
handler = new OtelLifecycleHandler(
|
||||
tracer,
|
||||
traceContextService,
|
||||
config,
|
||||
ownershipService,
|
||||
logger,
|
||||
);
|
||||
});
|
||||
|
||||
const makeCtx = (
|
||||
|
|
@ -234,6 +332,8 @@ describe('OtelLifecycleHandler', () => {
|
|||
const tracer = mock<ExecutionLevelTracer>();
|
||||
const traceContextService = mock<TraceContextService>();
|
||||
const config = mock<OtelConfig>();
|
||||
const ownershipService = mock<OwnershipService>();
|
||||
const logger = mock<Logger>();
|
||||
let handler: OtelLifecycleHandler;
|
||||
|
||||
const node = { id: 'n1', name: 'Node1', type: 'test', typeVersion: 1 };
|
||||
|
|
@ -269,12 +369,24 @@ describe('OtelLifecycleHandler', () => {
|
|||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
config.includeNodeSpans = true;
|
||||
handler = new OtelLifecycleHandler(tracer, traceContextService, config);
|
||||
handler = new OtelLifecycleHandler(
|
||||
tracer,
|
||||
traceContextService,
|
||||
config,
|
||||
ownershipService,
|
||||
logger,
|
||||
);
|
||||
});
|
||||
|
||||
it('should skip node spans when includeNodeSpans is false', () => {
|
||||
config.includeNodeSpans = false;
|
||||
handler = new OtelLifecycleHandler(tracer, traceContextService, config);
|
||||
handler = new OtelLifecycleHandler(
|
||||
tracer,
|
||||
traceContextService,
|
||||
config,
|
||||
ownershipService,
|
||||
logger,
|
||||
);
|
||||
|
||||
handler.onNodeStart(makeStartCtx());
|
||||
handler.onNodeEnd(makeEndCtx());
|
||||
|
|
|
|||
|
|
@ -1,4 +1,11 @@
|
|||
import { createTeamProject, createWorkflow, getPersonalProject } from '@n8n/backend-test-utils';
|
||||
import type { ExecutionRepository } from '@n8n/db';
|
||||
import { SpanStatusCode } from '@opentelemetry/api';
|
||||
import { NodeConnectionTypes } from 'n8n-workflow';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import type { WorkflowRunner } from '@/workflow-runner';
|
||||
import { createUser } from '@test-integration/db/users';
|
||||
|
||||
import {
|
||||
initOtelTestEnvironment,
|
||||
|
|
@ -8,16 +15,11 @@ import {
|
|||
saveAndSetEnv,
|
||||
restoreEnv,
|
||||
} from './support/otel-integration-utils';
|
||||
import type { OtelTestProvider } from './support/otel-test-provider';
|
||||
import {
|
||||
createMultiNodeWorkflowFixture,
|
||||
createFailingWorkflowFixture,
|
||||
} from './support/otel-workflow-fixtures';
|
||||
import type { OtelTestProvider } from './support/otel-test-provider';
|
||||
import type { WorkflowRunner } from '@/workflow-runner';
|
||||
import type { ExecutionRepository } from '@n8n/db';
|
||||
import { createTeamProject, createWorkflow } from '@n8n/backend-test-utils';
|
||||
import { NodeConnectionTypes } from 'n8n-workflow';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
let otel: OtelTestProvider;
|
||||
let workflowRunner: WorkflowRunner;
|
||||
|
|
@ -60,9 +62,33 @@ describe('OTEL Workflow Tracing Integration', () => {
|
|||
expect(nodeSpans).toHaveLength(workflow.nodes.length);
|
||||
});
|
||||
|
||||
it('should emit n8n.project.id on workflow.execute for a team project', async () => {
|
||||
const project = await createTeamProject();
|
||||
const workflow = await createWorkflow(createMultiNodeWorkflowFixture(), project);
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, project.id);
|
||||
await waitForExecution(executionRepository, executionId);
|
||||
|
||||
const workflowSpan = otel.getFinishedSpans().find((s) => s.name === 'workflow.execute')!;
|
||||
expect(workflowSpan).toBeDefined();
|
||||
expect(workflowSpan.attributes['n8n.project.id']).toBe(project.id);
|
||||
});
|
||||
|
||||
it('should emit n8n.project.id on workflow.execute for a personal project', async () => {
|
||||
const owner = await createUser();
|
||||
const personalProject = await getPersonalProject(owner);
|
||||
const workflow = await createWorkflow(createMultiNodeWorkflowFixture(), personalProject);
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, personalProject.id);
|
||||
await waitForExecution(executionRepository, executionId);
|
||||
|
||||
const workflowSpan = otel.getFinishedSpans().find((s) => s.name === 'workflow.execute')!;
|
||||
expect(workflowSpan).toBeDefined();
|
||||
expect(workflowSpan.attributes['n8n.project.id']).toBe(personalProject.id);
|
||||
});
|
||||
|
||||
it('should persist tracingContext to the execution entity after root span creation', async () => {
|
||||
const workflow = await createWorkflow(createMultiNodeWorkflowFixture());
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, 'test-project');
|
||||
const project = await createTeamProject();
|
||||
const workflow = await createWorkflow(createMultiNodeWorkflowFixture(), project);
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, project.id);
|
||||
await waitForExecution(executionRepository, executionId);
|
||||
|
||||
const execution = await executionRepository.findOneBy({ id: executionId });
|
||||
|
|
@ -71,8 +97,9 @@ describe('OTEL Workflow Tracing Integration', () => {
|
|||
});
|
||||
|
||||
it('should set error status on failed executions', async () => {
|
||||
const workflow = await createWorkflow(createFailingWorkflowFixture());
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, 'test-project');
|
||||
const project = await createTeamProject();
|
||||
const workflow = await createWorkflow(createFailingWorkflowFixture(), project);
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, project.id);
|
||||
await waitForExecution(executionRepository, executionId);
|
||||
|
||||
const workflowSpan = otel.getFinishedSpans().find((s) => s.name === 'workflow.execute')!;
|
||||
|
|
@ -82,8 +109,9 @@ describe('OTEL Workflow Tracing Integration', () => {
|
|||
|
||||
it('should inherit traceId from inbound HTTP traceparent', async () => {
|
||||
const inboundTraceId = '9bf2bd87b5053953e3fa08d8d889494b';
|
||||
const workflow = await createWorkflow(createMultiNodeWorkflowFixture());
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, 'test-project', {
|
||||
const project = await createTeamProject();
|
||||
const workflow = await createWorkflow(createMultiNodeWorkflowFixture(), project);
|
||||
const executionId = await executeWorkflow(workflowRunner, workflow, project.id, {
|
||||
mode: 'webhook',
|
||||
tracingContext: {
|
||||
traceparent: `00-${inboundTraceId}-b7ad6b7169203331-01`,
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ export async function executeWorkflow(
|
|||
return await workflowRunner.run(
|
||||
{
|
||||
workflowData: workflow,
|
||||
userId: projectId,
|
||||
projectId,
|
||||
executionMode: mode,
|
||||
executionData,
|
||||
retryOf,
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ export class ExecutionLevelTracer {
|
|||
[ATTR.WORKFLOW_VERSION_ID]: params.workflow.versionId ?? '',
|
||||
[ATTR.WORKFLOW_NODE_COUNT]: params.workflow.nodeCount,
|
||||
[ATTR.EXECUTION_ID]: params.executionId,
|
||||
...(params.project?.id && { [ATTR.PROJECT_ID]: params.project.id }),
|
||||
},
|
||||
links,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@ import type { ExecutionStatus, WorkflowExecuteMode, INode } from 'n8n-workflow';
|
|||
|
||||
import type { TracingContext } from './tracing-context';
|
||||
|
||||
type ProjectContext = { id: string };
|
||||
type WorkflowContext = { id: string; name: string; versionId?: string; nodeCount: number };
|
||||
|
||||
export type StartWorkflowParams = {
|
||||
executionId: string;
|
||||
/** Parent context — incoming webhook traceparent or parent sub-workflow span. */
|
||||
|
|
@ -11,7 +14,8 @@ export type StartWorkflowParams = {
|
|||
* workflow is resumed after a pause.
|
||||
*/
|
||||
linkTo?: TracingContext;
|
||||
workflow: { id: string; name: string; versionId?: string; nodeCount: number };
|
||||
workflow: WorkflowContext;
|
||||
project?: ProjectContext;
|
||||
};
|
||||
|
||||
export type EndWorkflowParams = {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { Logger } from '@n8n/backend-common';
|
||||
import { OnLifecycleEvent } from '@n8n/decorators';
|
||||
import type {
|
||||
WorkflowExecuteBeforeContext,
|
||||
|
|
@ -8,6 +9,8 @@ import type {
|
|||
} from '@n8n/decorators';
|
||||
import { Service } from '@n8n/di';
|
||||
|
||||
import { OwnershipService } from '@/services/ownership.service';
|
||||
|
||||
import { ExecutionLevelTracer } from './execution-level-tracer';
|
||||
import { OtelConfig } from './otel.config';
|
||||
import { TraceContextService } from './tracing-context';
|
||||
|
|
@ -18,6 +21,8 @@ export class OtelLifecycleHandler {
|
|||
private readonly tracer: ExecutionLevelTracer,
|
||||
private readonly traceContextService: TraceContextService,
|
||||
private readonly config: OtelConfig,
|
||||
private readonly ownershipService: OwnershipService,
|
||||
private readonly logger: Logger,
|
||||
) {}
|
||||
|
||||
@OnLifecycleEvent('workflowExecuteBefore')
|
||||
|
|
@ -29,9 +34,21 @@ export class OtelLifecycleHandler {
|
|||
: // This will return "null" if there is no traceparent header in the trigger node. (e.g. webhook)
|
||||
await this.traceContextService.get(ctx.executionId);
|
||||
|
||||
const project = await this.ownershipService
|
||||
.getWorkflowProjectCached(ctx.workflow.id)
|
||||
.catch((error: unknown) => {
|
||||
this.logger.warn('Failed to fetch project for OTEL span', {
|
||||
workflowId: ctx.workflow.id,
|
||||
executionId: ctx.executionId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const spanContext = this.tracer.startWorkflow({
|
||||
executionId: ctx.executionId,
|
||||
tracingContext,
|
||||
project: project ? { id: project.id } : undefined,
|
||||
workflow: {
|
||||
id: ctx.workflow.id,
|
||||
name: ctx.workflow.name,
|
||||
|
|
@ -49,9 +66,21 @@ export class OtelLifecycleHandler {
|
|||
async onWorkflowResume(ctx: WorkflowExecuteResumeContext): Promise<void> {
|
||||
const previousWorkflowExecution = await this.traceContextService.get(ctx.executionId);
|
||||
|
||||
const project = await this.ownershipService
|
||||
.getWorkflowProjectCached(ctx.workflow.id)
|
||||
.catch((error: unknown) => {
|
||||
this.logger.warn('Failed to fetch project for OTEL span', {
|
||||
workflowId: ctx.workflow.id,
|
||||
executionId: ctx.executionId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
|
||||
this.tracer.startWorkflow({
|
||||
executionId: ctx.executionId,
|
||||
linkTo: previousWorkflowExecution,
|
||||
project: project ? { id: project.id } : undefined,
|
||||
workflow: {
|
||||
id: ctx.workflow.id,
|
||||
name: ctx.workflow.name,
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ export const ATTR = {
|
|||
INSTANCE_ID: 'n8n.instance.id',
|
||||
INSTANCE_ROLE: 'n8n.instance.role',
|
||||
|
||||
PROJECT_ID: 'n8n.project.id',
|
||||
|
||||
WORKFLOW_ID: 'n8n.workflow.id',
|
||||
WORKFLOW_VERSION_ID: 'n8n.workflow.version_id',
|
||||
WORKFLOW_NAME: 'n8n.workflow.name',
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user