From 7e2ebc820c0822b7efe3f0ce64190bddea577038 Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Mon, 26 May 2025 17:39:05 +0200 Subject: [PATCH] chore(core): Parameterize sql queries and escape table names (#15186) --- .../workflow-statistics.repository.ts | 46 ++- .../workflow-statistics.service.test.ts | 331 +++++++++++------- 2 files changed, 221 insertions(+), 156 deletions(-) diff --git a/packages/@n8n/db/src/repositories/workflow-statistics.repository.ts b/packages/@n8n/db/src/repositories/workflow-statistics.repository.ts index 96f68e2d0ef..bf6a367ad9f 100644 --- a/packages/@n8n/db/src/repositories/workflow-statistics.repository.ts +++ b/packages/@n8n/db/src/repositories/workflow-statistics.repository.ts @@ -55,59 +55,57 @@ export class WorkflowStatisticsRepository extends Repository isRootExecution: boolean, ): Promise { const dbType = this.globalConfig.database.type; - const { tableName } = this.metadata; + const escapedTableName = this.manager.connection.driver.escape(this.metadata.tableName); + try { + const rootCountIncrement = isRootExecution ? 1 : 0; if (dbType === 'sqlite') { await this.query( - `INSERT INTO "${tableName}" ("count", "rootCount", "name", "workflowId", "latestEvent") - VALUES (1, ${isRootExecution ? '1' : '0'}, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP) + `INSERT INTO ${escapedTableName} ("count", "rootCount", "name", "workflowId", "latestEvent") + VALUES (1, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT (workflowId, name) DO UPDATE SET count = count + 1, - rootCount = ${isRootExecution ? 'rootCount + 1' : 'rootCount'}, + rootCount = rootCount + ?, latestEvent = CURRENT_TIMESTAMP`, + [rootCountIncrement, eventName, workflowId, rootCountIncrement], ); + // SQLite does not offer a reliable way to know whether or not an insert or update happened. // We'll use a naive approach in this case. Query again after and it might cause us to miss the // first production execution sometimes due to concurrency, but it's the only way. const counter = await this.findOne({ select: ['count'], - where: { - name: eventName, - workflowId, - }, + where: { name: eventName, workflowId }, }); return (counter?.count ?? 0) > 1 ? 'update' : counter?.count === 1 ? 'insert' : 'failed'; } else if (dbType === 'postgresdb') { - const upsertRootCount = isRootExecution - ? `"${tableName}"."rootCount" + 1` - : `"${tableName}"."rootCount"`; const queryResult = (await this.query( - `INSERT INTO "${tableName}" ("count", "rootCount", "name", "workflowId", "latestEvent") - VALUES (1, ${isRootExecution ? '1' : '0'}, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) + `INSERT INTO ${escapedTableName} ("count", "rootCount", "name", "workflowId", "latestEvent") + VALUES (1, $1, $2, $3, CURRENT_TIMESTAMP) ON CONFLICT ("name", "workflowId") DO UPDATE SET - "count" = "${tableName}"."count" + 1, - "rootCount" = ${upsertRootCount}, + "count" = ${escapedTableName}."count" + 1, + "rootCount" = ${escapedTableName}."rootCount" + $4, "latestEvent" = CURRENT_TIMESTAMP RETURNING *;`, - )) as Array<{ - count: number; - }>; + [rootCountIncrement, eventName, workflowId, rootCountIncrement], + )) as Array<{ count: number }>; + return queryResult[0].count === 1 ? 'insert' : 'update'; } else { const queryResult = (await this.query( - `INSERT INTO \`${tableName}\` (count, rootCount, name, workflowId, latestEvent) - VALUES (1, ${isRootExecution ? '1' : '0'}, "${eventName}", "${workflowId}", NOW()) + `INSERT INTO ${escapedTableName} (count, rootCount, name, workflowId, latestEvent) + VALUES (1, ?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE count = count + 1, - rootCount = ${isRootExecution ? 'rootCount + 1' : 'rootCount'}, + rootCount = rootCount + ?, latestEvent = NOW();`, - )) as { - affectedRows: number; - }; + [rootCountIncrement, eventName, workflowId, rootCountIncrement], + )) as { affectedRows: number }; + // MySQL returns 2 affected rows on update return queryResult.affectedRows === 1 ? 'insert' : 'update'; } diff --git a/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts b/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts index 601065c228a..abd75d9348f 100644 --- a/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts +++ b/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts @@ -1,7 +1,6 @@ import { GlobalConfig } from '@n8n/config'; -import type { Project } from '@n8n/db'; +import type { IWorkflowDb, Project, WorkflowEntity } from '@n8n/db'; import type { User } from '@n8n/db'; -import type { WorkflowStatistics } from '@n8n/db'; import { WorkflowStatisticsRepository } from '@n8n/db'; import { Container } from '@n8n/di'; import { @@ -12,7 +11,6 @@ import { } from '@n8n/typeorm'; import { mocked } from 'jest-mock'; import { mock } from 'jest-mock-extended'; -import type { IWorkflowBase } from 'n8n-workflow'; import { type ExecutionStatus, type INode, @@ -21,68 +19,49 @@ import { } from 'n8n-workflow'; import config from '@/config'; -import type { EventService } from '@/events/event.service'; +import { EventService } from '@/events/event.service'; import { OwnershipService } from '@/services/ownership.service'; import { UserService } from '@/services/user.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { mockInstance } from '@test/mocking'; +import { getPersonalProject } from '@test-integration/db/projects'; +import { createUser } from '@test-integration/db/users'; +import { createWorkflow } from '@test-integration/db/workflows'; + +import * as testDb from '../../../test/integration/shared/test-db'; describe('WorkflowStatisticsService', () => { - const fakeUser = mock({ id: 'abcde-fghij' }); - const fakeProject = mock({ id: '12345-67890', type: 'personal' }); - const fakeWorkflow = mock({ id: '1' }); - const ownershipService = mockInstance(OwnershipService); - const userService = mockInstance(UserService); - const globalConfig = Container.get(GlobalConfig); - const dbType = globalConfig.database.type; - - const entityManager = mock(); - const dataSource = mock({ - manager: entityManager, - getMetadata: () => - mock({ - tableName: 'workflow_statistics', - }), - }); - Object.assign(entityManager, { connection: dataSource }); - - globalConfig.diagnostics.enabled = true; - config.set('deployment.type', 'n8n-testing'); - mocked(ownershipService.getWorkflowProjectCached).mockResolvedValue(fakeProject); - mocked(ownershipService.getPersonalProjectOwnerCached).mockResolvedValue(fakeUser); - const updateSettingsMock = jest.spyOn(userService, 'updateSettings').mockImplementation(); - - const eventService = mock(); - const workflowStatisticsService = new WorkflowStatisticsService( - mock(), - new WorkflowStatisticsRepository(dataSource, globalConfig), - ownershipService, - userService, - eventService, - ); - - beforeEach(() => { - jest.clearAllMocks(); - }); - - const mockDBCall = (count = 1) => { - if (dbType === 'sqlite') { - entityManager.findOne.mockResolvedValueOnce(mock({ count })); - } else { - const result = dbType === 'postgresdb' ? [{ count }] : { affectedRows: count }; - entityManager.query.mockImplementationOnce(async (query) => - query.startsWith('INSERT INTO') ? result : null, - ); - } - }; - describe('workflowExecutionCompleted', () => { - const rootCountRegex = /"?rootCount"?\s*=\s*(?:"?\w+"?\.)?"?rootCount"?\s*\+\s*1/; + let workflowStatisticsService: WorkflowStatisticsService; + let workflowStatisticsRepository: WorkflowStatisticsRepository; + let userService: UserService; + let user: User; + let personalProject: Project; + let workflow: IWorkflowDb & WorkflowEntity; + + beforeAll(async () => { + await testDb.init(); + workflowStatisticsService = Container.get(WorkflowStatisticsService); + workflowStatisticsRepository = Container.get(WorkflowStatisticsRepository); + userService = Container.get(UserService); + user = await createUser(); + personalProject = await getPersonalProject(user); + workflow = await createWorkflow({}, user); + }); + + afterAll(async () => { + await testDb.terminate(); + }); + + beforeEach(async () => { + jest.restoreAllMocks(); + await testDb.truncate(['WorkflowStatistics']); + }); test.each(['cli', 'error', 'retry', 'trigger', 'webhook', 'evaluation'])( - 'should upsert with root executions for execution mode %s', + 'should upsert `count` and `rootCount` for execution mode %s', async (mode) => { - // Call the function with a production success result, ensure metrics hook gets called + // ARRANGE const runData: IRun = { finished: true, status: 'success', @@ -91,36 +70,58 @@ describe('WorkflowStatisticsService', () => { startedAt: new Date(), }; - await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData); - expect(entityManager.query).toHaveBeenCalledWith( - expect.stringMatching(rootCountRegex), - undefined, - ); + // ACT + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + + // ASSERT + const statistics = await workflowStatisticsRepository.find(); + expect(statistics).toHaveLength(1); + expect(statistics[0]).toMatchObject({ + count: 2, + rootCount: 2, + latestEvent: expect.any(Date), + name: 'production_success', + workflowId: workflow.id, + }); }, ); test.each(['manual', 'integrated', 'internal'])( - 'should upsert without root executions for execution mode %s', + 'should upsert `count`, but not `rootCount` for execution mode %s', async (mode) => { + // ARRANGE const runData: IRun = { finished: true, + // use `success` to make sure it would upsert if it were not for the + // mode used status: 'success', data: { resultData: { runData: {} } }, mode, startedAt: new Date(), }; - await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData); - expect(entityManager.query).toHaveBeenCalledWith( - expect.not.stringMatching(rootCountRegex), - undefined, - ); + // ACT + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + + // ASSERT + const statistics = await workflowStatisticsRepository.find(); + expect(statistics).toHaveLength(1); + expect(statistics[0]).toMatchObject({ + count: 2, + rootCount: 0, + latestEvent: expect.any(Date), + name: mode === 'manual' ? 'manual_success' : 'production_success', + workflowId: workflow.id, + }); }, ); test.each(['success', 'crashed', 'error'])( - 'should upsert with root executions for execution status %s', + 'should upsert `count` and `rootCount` for execution status %s', async (status) => { + // ARRANGE const runData: IRun = { finished: true, status, @@ -129,112 +130,176 @@ describe('WorkflowStatisticsService', () => { startedAt: new Date(), }; - await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData); - expect(entityManager.query).toHaveBeenCalledWith( - expect.stringMatching(rootCountRegex), - undefined, - ); + // ACT + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + + // ASSERT + const statistics = await workflowStatisticsRepository.find(); + expect(statistics).toHaveLength(1); + expect(statistics[0]).toMatchObject({ + count: 2, + rootCount: 2, + latestEvent: expect.any(Date), + name: status === 'success' ? 'production_success' : 'production_error', + workflowId: workflow.id, + }); }, ); test.each(['canceled', 'new', 'running', 'unknown', 'waiting'])( - 'should upsert without root executions for execution status %s', + 'should upsert `count`, but not `rootCount` for execution status %s', async (status) => { + // ARRANGE const runData: IRun = { finished: true, status, data: { resultData: { runData: {} } }, + // use `trigger` to make sure it would upsert if it were not for the + // status used mode: 'trigger', startedAt: new Date(), }; - await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData); - expect(entityManager.query).toHaveBeenCalledWith( - expect.not.stringMatching(rootCountRegex), - undefined, - ); + // ACT + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + + // ASSERT + const statistics = await workflowStatisticsRepository.find(); + expect(statistics).toHaveLength(1); + expect(statistics[0]).toMatchObject({ + count: 2, + rootCount: 0, + latestEvent: expect.any(Date), + name: 'production_error', + workflowId: workflow.id, + }); }, ); - test('should create metrics for production successes', async () => { - // Call the function with a production success result, ensure metrics hook gets called - const workflow = { - id: '1', - name: '', - active: false, - isArchived: false, - createdAt: new Date(), - updatedAt: new Date(), - nodes: [], - connections: {}, - }; + test('updates user settings and emit first-production-workflow-succeeded', async () => { + // ARRANGE const runData: IRun = { finished: true, status: 'success', data: { resultData: { runData: {} } }, - mode: 'internal' as WorkflowExecuteMode, + mode: 'internal', startedAt: new Date(), }; - mockDBCall(); + const emitSpy = jest.spyOn(Container.get(EventService), 'emit'); + const updateSettingsSpy = jest.spyOn(userService, 'updateSettings'); + // ACT await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); - expect(updateSettingsMock).toHaveBeenCalledTimes(1); - expect(eventService.emit).toHaveBeenCalledWith('first-production-workflow-succeeded', { - projectId: fakeProject.id, + + // ASSERT + expect(updateSettingsSpy).toHaveBeenCalledTimes(1); + expect(updateSettingsSpy).toHaveBeenCalledWith(user.id, { + firstSuccessfulWorkflowId: workflow.id, + userActivated: true, + userActivatedAt: runData.startedAt.getTime(), + }); + expect(emitSpy).toHaveBeenCalledTimes(1); + expect(emitSpy).toHaveBeenCalledWith('first-production-workflow-succeeded', { + projectId: personalProject.id, workflowId: workflow.id, - userId: fakeUser.id, + userId: user.id, }); }); - test('should only create metrics for production successes', async () => { - // Call the function with a non production success result, ensure metrics hook is never called - const workflow = { - id: '1', - name: '', - active: false, - isArchived: false, - createdAt: new Date(), - updatedAt: new Date(), - nodes: [], - connections: {}, - }; + test('does not update user settings and does not emit first-production-workflow-succeeded for failing executions', async () => { + // ARRANGE const runData: IRun = { finished: false, status: 'error', data: { resultData: { runData: {} } }, - mode: 'internal' as WorkflowExecuteMode, + mode: 'internal', startedAt: new Date(), }; + const emitSpy = jest.spyOn(Container.get(EventService), 'emit'); + const updateSettingsSpy = jest.spyOn(userService, 'updateSettings'); + + // ACT await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); - expect(eventService.emit).not.toHaveBeenCalled(); + + // ASSERT + expect(updateSettingsSpy).not.toHaveBeenCalled(); + expect(emitSpy).not.toHaveBeenCalled(); }); - test('should not send metrics for updated entries', async () => { - // Call the function with a fail insert, ensure update is called *and* metrics aren't sent - const workflow = { - id: '1', - name: '', - active: false, - isArchived: false, - createdAt: new Date(), - updatedAt: new Date(), - nodes: [], - connections: {}, - }; + test('does not update user settings and does not emit first-production-workflow-succeeded for successive executions', async () => { + // ARRANGE const runData: IRun = { finished: true, status: 'success', data: { resultData: { runData: {} } }, - mode: 'internal' as WorkflowExecuteMode, + mode: 'internal', startedAt: new Date(), }; - mockDBCall(2); await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); - expect(eventService.emit).not.toHaveBeenCalled(); + const emitSpy = jest.spyOn(Container.get(EventService), 'emit'); + const updateSettingsSpy = jest.spyOn(Container.get(UserService), 'updateSettings'); + + // ACT + await workflowStatisticsService.workflowExecutionCompleted(workflow, runData); + + // ASSERT + expect(updateSettingsSpy).not.toHaveBeenCalled(); + expect(emitSpy).not.toHaveBeenCalled(); }); }); describe('nodeFetchedData', () => { + let workflowStatisticsService: WorkflowStatisticsService; + let eventService: EventService; + let user: User; + let project: Project; + let ownershipService: OwnershipService; + let entityManager: EntityManager; + let userService: UserService; + let workflowStatisticsRepository: WorkflowStatisticsRepository; + + beforeAll(() => { + user = mock({ id: 'abcde-fghij' }); + project = mock({ id: '12345-67890', type: 'personal' }); + ownershipService = mockInstance(OwnershipService); + userService = mockInstance(UserService); + const globalConfig = Container.get(GlobalConfig); + + entityManager = mock(); + const dataSource = mock({ + manager: entityManager, + getMetadata: () => + mock({ + tableName: 'workflow_statistics', + }), + driver: { escape: jest.fn((id) => id) }, + }); + Object.assign(entityManager, { connection: dataSource }); + eventService = mock(); + workflowStatisticsRepository = new WorkflowStatisticsRepository(dataSource, globalConfig); + workflowStatisticsService = new WorkflowStatisticsService( + mock(), + workflowStatisticsRepository, + ownershipService, + userService, + eventService, + ); + globalConfig.diagnostics.enabled = true; + config.set('deployment.type', 'n8n-testing'); + mocked(ownershipService.getWorkflowProjectCached).mockResolvedValue(project); + mocked(ownershipService.getPersonalProjectOwnerCached).mockResolvedValue(user); + }); + + afterAll(() => { + jest.resetAllMocks(); + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + test('should create metrics when the db is updated', async () => { // Call the function with a production success result, ensure metrics hook gets called const workflowId = '1'; @@ -248,8 +313,8 @@ describe('WorkflowStatisticsService', () => { }; await workflowStatisticsService.nodeFetchedData(workflowId, node); expect(eventService.emit).toHaveBeenCalledWith('first-workflow-data-loaded', { - userId: fakeUser.id, - project: fakeProject.id, + userId: user.id, + project: project.id, workflowId, nodeType: node.type, nodeId: node.id, @@ -258,14 +323,14 @@ describe('WorkflowStatisticsService', () => { test('should emit event with no `userId` if workflow is owned by team project', async () => { const workflowId = '123'; - ownershipService.getPersonalProjectOwnerCached.mockResolvedValueOnce(null); + mocked(ownershipService.getPersonalProjectOwnerCached).mockResolvedValueOnce(null); const node = mock({ id: '123', type: 'n8n-nodes-base.noOp', credentials: {} }); await workflowStatisticsService.nodeFetchedData(workflowId, node); expect(eventService.emit).toHaveBeenCalledWith('first-workflow-data-loaded', { userId: '', - project: fakeProject.id, + project: project.id, workflowId, nodeType: node.type, nodeId: node.id, @@ -291,8 +356,8 @@ describe('WorkflowStatisticsService', () => { }; await workflowStatisticsService.nodeFetchedData(workflowId, node); expect(eventService.emit).toHaveBeenCalledWith('first-workflow-data-loaded', { - userId: fakeUser.id, - project: fakeProject.id, + userId: user.id, + project: project.id, workflowId, nodeType: node.type, nodeId: node.id, @@ -303,7 +368,9 @@ describe('WorkflowStatisticsService', () => { test('should not send metrics for entries that already have the flag set', async () => { // Fetch data for workflow 2 which is set up to not be altered in the mocks - entityManager.insert.mockRejectedValueOnce(new QueryFailedError('', undefined, new Error())); + mocked(entityManager.insert).mockRejectedValueOnce( + new QueryFailedError('', undefined, new Error()), + ); const workflowId = '1'; const node = { id: 'abcde',