diff --git a/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts index a6afa1403b2..22a88491d18 100644 --- a/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts @@ -3,7 +3,7 @@ import type { WorkflowEntity } from '@n8n/db'; import type { IWorkflowDb } from '@n8n/db'; import type { WorkflowExecuteAfterContext } from '@n8n/decorators'; import { Container } from '@n8n/di'; -import { In, type EntityManager } from '@n8n/typeorm'; +import { In } from '@n8n/typeorm'; import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; import { @@ -252,10 +252,17 @@ describe('workflowExecuteAfterHandler', () => { describe('workflowExecuteAfterHandler - cacheMetadata', () => { let insightsCollectionService: InsightsCollectionService; - let entityManagerMock = mock(); - const sharedWorkflowRepositoryMock: jest.Mocked = { - manager: entityManagerMock, - } as unknown as jest.Mocked; + + // Mock the repositories functions + const repositoryMocks = { + find: jest.fn(), + findBy: jest.fn(), + upsert: jest.fn(), + insert: jest.fn(), + }; + const sharedWorkflowRepositoryMock = mock(repositoryMocks); + const metadataRepositoryMock = mock(repositoryMocks); + const insightsRawRepositoryMock = mock(repositoryMocks); const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); @@ -266,23 +273,11 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { stoppedAt: stoppedAt.toJSDate(), }); - // Mock the transaction function - const trxMock = { - find: jest.fn(), - findBy: jest.fn(), - upsert: jest.fn(), - insert: jest.fn(), - }; - - entityManagerMock.transaction.mockImplementation( - jest.fn(async (runInTransaction: (entityManager: EntityManager) => Promise) => { - await runInTransaction(trxMock as unknown as EntityManager); - }) as unknown as EntityManager['transaction'], - ); - beforeAll(async () => { insightsCollectionService = new InsightsCollectionService( sharedWorkflowRepositoryMock, + insightsRawRepositoryMock, + metadataRepositoryMock, Container.get(InsightsConfig), mockLogger(), ); @@ -295,7 +290,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { project = await createTeamProject(); workflow = await createWorkflow({}, project); - trxMock.find = jest.fn().mockResolvedValue([ + repositoryMocks.find = jest.fn().mockResolvedValue([ { workflow, workflowId: workflow.id, @@ -303,7 +298,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { project: { name: 'project-name' }, }, ]); - trxMock.findBy = jest.fn().mockResolvedValue([ + repositoryMocks.findBy = jest.fn().mockResolvedValue([ { metaId: 'meta-id', workflowId: workflow.id, @@ -326,12 +321,11 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { await insightsCollectionService.flushEvents(); // ASSERT - expect(trxMock.find).toHaveBeenCalledWith(expect.anything(), { + expect(repositoryMocks.find).toHaveBeenCalledWith({ where: { workflowId: In([workflow.id]), role: 'workflow:owner' }, relations: { project: true }, }); - expect(trxMock.upsert).toHaveBeenCalledWith( - expect.anything(), + expect(repositoryMocks.upsert).toHaveBeenCalledWith( expect.arrayContaining([ { workflowId: workflow.id, @@ -348,10 +342,10 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { await insightsCollectionService.flushEvents(); // ASSERT AGAIN - trxMock.find.mockClear(); - trxMock.upsert.mockClear(); - expect(trxMock.find).not.toHaveBeenCalled(); - expect(trxMock.upsert).not.toHaveBeenCalled(); + repositoryMocks.find.mockClear(); + repositoryMocks.upsert.mockClear(); + expect(repositoryMocks.find).not.toHaveBeenCalled(); + expect(repositoryMocks.upsert).not.toHaveBeenCalled(); }); test('updates cached metadata if workflow details change', async () => { @@ -363,8 +357,8 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { await insightsCollectionService.flushEvents(); // ASSERT - expect(trxMock.find).toHaveBeenCalled(); - expect(trxMock.upsert).toHaveBeenCalled(); + expect(repositoryMocks.find).toHaveBeenCalled(); + expect(repositoryMocks.upsert).toHaveBeenCalled(); // Change the workflow name workflow.name = 'new-workflow-name'; @@ -374,12 +368,11 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { await insightsCollectionService.flushEvents(); // ASSERT AGAIN - expect(trxMock.find).toHaveBeenCalledWith(expect.anything(), { + expect(repositoryMocks.find).toHaveBeenCalledWith({ where: { workflowId: In([workflow.id]), role: 'workflow:owner' }, relations: { project: true }, }); - expect(trxMock.upsert).toHaveBeenCalledWith( - expect.anything(), + expect(repositoryMocks.upsert).toHaveBeenCalledWith( expect.arrayContaining([ { workflowId: workflow.id, @@ -397,11 +390,23 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { let project: Project; let workflow: IWorkflowDb & WorkflowEntity; let insightsCollectionService: InsightsCollectionService; - let entityManagerMock = mock(); - const sharedWorkflowRepositoryMock: jest.Mocked = { - manager: entityManagerMock, - } as unknown as jest.Mocked; - const logger = mockLogger(); + + const repoMocks = { + findSharedWorkflowRepositoryMock: jest.fn(), + findByMetadata: jest.fn(), + upsertMetadata: jest.fn(), + insertInsightsRaw: jest.fn(), + }; + const sharedWorkflowRepositoryMock = mock({ + find: repoMocks.findSharedWorkflowRepositoryMock, + }); + const metadataRepositoryMock = mock({ + findBy: repoMocks.findByMetadata, + upsert: repoMocks.upsertMetadata, + }); + const insightsRawRepositoryMock = mock({ + insert: repoMocks.insertInsightsRaw, + }); const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); const runData = mock({ @@ -411,32 +416,20 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { stoppedAt: stoppedAt.toJSDate(), }); - // Mock the transaction function - const trxMock = { - find: jest.fn(), - findBy: jest.fn(), - upsert: jest.fn(), - insert: jest.fn(), - }; - - entityManagerMock.transaction.mockImplementation( - jest.fn(async (runInTransaction: (entityManager: EntityManager) => Promise) => { - await runInTransaction(trxMock as unknown as EntityManager); - }) as unknown as EntityManager['transaction'], - ); - beforeAll(async () => { insightsCollectionService = new InsightsCollectionService( sharedWorkflowRepositoryMock, + insightsRawRepositoryMock, + metadataRepositoryMock, Container.get(InsightsConfig), - logger, + mockLogger(), ); }); beforeEach(async () => { project = await createTeamProject(); workflow = await createWorkflow({ settings: { timeSavedPerExecution: 1 } }, project); - trxMock.find = jest.fn().mockResolvedValue([ + repoMocks.findSharedWorkflowRepositoryMock.mockResolvedValue([ { workflow, workflowId: workflow.id, @@ -444,7 +437,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { project: { name: 'project-name' }, }, ]); - trxMock.findBy = jest.fn().mockResolvedValue([ + repoMocks.findByMetadata.mockResolvedValue([ { metaId: 'meta-id', workflowId: workflow.id, @@ -469,7 +462,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { await new Promise(process.nextTick); // ASSERT - expect(trxMock.insert).not.toHaveBeenCalled(); + expect(repoMocks.insertInsightsRaw).not.toHaveBeenCalled(); // ACT await insightsCollectionService.handleWorkflowExecuteAfter(ctx); @@ -477,13 +470,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { // ASSERT // await for the next tick to ensure the flush is called await new Promise(process.nextTick); - expect(trxMock.insert).toHaveBeenCalled(); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalled(); }); test('flushes events to the database after a timeout', async () => { // ARRANGE jest.useFakeTimers(); - trxMock.insert.mockClear(); + repoMocks.insertInsightsRaw.mockClear(); insightsCollectionService.startFlushingTimer(); const ctx = mock({ workflow, runData }); @@ -493,13 +486,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { await insightsCollectionService.handleWorkflowExecuteAfter(ctx); } // ASSERT - expect(trxMock.insert).not.toHaveBeenCalled(); + expect(repoMocks.insertInsightsRaw).not.toHaveBeenCalled(); // ACT await jest.advanceTimersByTimeAsync(31 * 1000); // ASSERT - expect(trxMock.insert).toHaveBeenCalledTimes(1); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1); } finally { jest.useRealTimers(); } @@ -508,7 +501,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { test('reschedule flush on flushing end', async () => { // ARRANGE jest.useFakeTimers(); - trxMock.insert.mockClear(); + repoMocks.insertInsightsRaw.mockClear(); insightsCollectionService.startFlushingTimer(); const ctx = mock({ workflow }); @@ -518,13 +511,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { await jest.advanceTimersByTimeAsync(31 * 1000); // ASSERT - expect(trxMock.insert).toHaveBeenCalledTimes(1); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1); // // ACT await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await jest.advanceTimersByTimeAsync(31 * 1000); - expect(trxMock.insert).toHaveBeenCalledTimes(2); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(2); } finally { jest.useRealTimers(); } @@ -533,7 +526,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { test('reschedule flush on no buffered insights', async () => { // ARRANGE jest.useFakeTimers(); - trxMock.insert.mockClear(); + repoMocks.insertInsightsRaw.mockClear(); insightsCollectionService.startFlushingTimer(); const flushEventsSpy = jest.spyOn(insightsCollectionService, 'flushEvents'); @@ -543,7 +536,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { // ASSERT expect(flushEventsSpy).toHaveBeenCalledTimes(1); - expect(trxMock.insert).not.toHaveBeenCalled(); + expect(repoMocks.insertInsightsRaw).not.toHaveBeenCalled(); // ACT await jest.advanceTimersByTimeAsync(31 * 1000); @@ -555,7 +548,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { test('flushes events to the database on shutdown', async () => { // ARRANGE - trxMock.insert.mockClear(); + repoMocks.insertInsightsRaw.mockClear(); const ctx = mock({ workflow, runData }); // ACT @@ -566,17 +559,17 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { await insightsCollectionService.shutdown(); // ASSERT - expect(trxMock.insert).toHaveBeenCalledTimes(1); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1); // Check that last insert call contains 30 events (10 * 3 insights) - const lastCallArgs = trxMock.insert.mock.calls.at(-1); - expect(lastCallArgs?.[1]).toHaveLength(30); + const lastCallArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1); + expect(lastCallArgs?.[0]).toHaveLength(30); }); test('flushes events synchronously while shutting down', async () => { // ARRANGE // reset insights async flushing insightsCollectionService.startFlushingTimer(); - trxMock.insert.mockClear(); + repoMocks.insertInsightsRaw.mockClear(); const ctx = mock({ workflow, runData }); // ACT @@ -589,25 +582,25 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { await insightsCollectionService.handleWorkflowExecuteAfter(ctx); // ASSERT - expect(trxMock.insert).toHaveBeenCalledTimes(2); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(2); // Check that last insert call contains 3 events (the synchronous flush after shutdown) - let callArgs = trxMock.insert.mock.calls.at(-1); - expect(callArgs?.[1]).toHaveLength(3); + let callArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1); + expect(callArgs?.[0]).toHaveLength(3); // ACT // await for the next tick to ensure the flush is called await new Promise(process.nextTick); // Check that the one before that contains 30 events (the shutdown flush) - callArgs = trxMock.insert.mock.calls.at(-2); - expect(callArgs?.[1]).toHaveLength(30); + callArgs = repoMocks.insertInsightsRaw.mock.calls.at(-2); + expect(callArgs?.[0]).toHaveLength(30); }); test('restore buffer events on flushing error', async () => { // ARRANGE jest.useFakeTimers(); - trxMock.insert.mockClear(); - trxMock.insert.mockRejectedValueOnce(new Error('Test error')); + repoMocks.insertInsightsRaw.mockClear(); + repoMocks.insertInsightsRaw.mockRejectedValueOnce(new Error('Test error')); insightsCollectionService.startFlushingTimer(); const ctx = mock({ workflow, runData }); @@ -617,17 +610,17 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { await jest.advanceTimersByTimeAsync(31 * 1000); // ASSERT - expect(trxMock.insert).toHaveBeenCalledTimes(1); - const insertArgs = trxMock.insert.mock.calls.at(-1); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1); + const insertArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1); // ACT await insightsCollectionService.flushEvents(); - expect(trxMock.insert).toHaveBeenCalledTimes(2); - const newInsertArgs = trxMock.insert.mock.calls.at(-1); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(2); + const newInsertArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1); // Check that last insert call contains the same 3 insights as previous failed flush - expect(newInsertArgs?.[1]).toHaveLength(3); - expect(newInsertArgs?.[1]).toEqual(insertArgs?.[1]); + expect(newInsertArgs?.[0]).toHaveLength(3); + expect(newInsertArgs?.[0]).toEqual(insertArgs?.[0]); } finally { jest.useRealTimers(); } @@ -638,7 +631,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { const config = Container.get(InsightsConfig); config.flushBatchSize = 10; insightsCollectionService.startFlushingTimer(); - trxMock.insert.mockClear(); + repoMocks.insertInsightsRaw.mockClear(); const ctx = mock({ workflow, runData }); @@ -646,7 +639,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { const { resolve: flushResolve, promise: flushPromise } = createDeferredPromise(); // First flush will "hang" (simulate long save) - trxMock.insert.mockImplementationOnce(async () => { + repoMocks.insertInsightsRaw.mockImplementationOnce(async () => { await flushPromise; }); @@ -678,6 +671,6 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { // ASSERT expect(shutdownResolved).toBe(true); - expect(trxMock.insert).toHaveBeenCalledTimes(1); + expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts index fc83dc8d278..842f1e2515d 100644 --- a/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts @@ -1,4 +1,3 @@ -import { GlobalConfig } from '@n8n/config'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; @@ -19,591 +18,579 @@ import { InsightsByPeriodRepository } from '../database/repositories/insights-by import { InsightsCompactionService } from '../insights-compaction.service'; import { InsightsConfig } from '../insights.config'; -const globalConfig = Container.get(GlobalConfig); -const dbType = globalConfig.database.type; +// Initialize DB once for all tests +beforeAll(async () => { + await testDb.init(); +}); -// Disable tests for legacy sqlite -if (dbType === 'sqlite' && !globalConfig.database.sqlite.poolSize) { - test('dummy', () => { - expect(true).toBe(true); - }); -} else { - // Initialize DB once for all tests - beforeAll(async () => { - await testDb.init(); - }); +beforeEach(async () => { + await testDb.truncate([ + 'InsightsRaw', + 'InsightsByPeriod', + 'InsightsMetadata', + 'WorkflowEntity', + 'Project', + ]); +}); - beforeEach(async () => { - await testDb.truncate([ - 'InsightsRaw', - 'InsightsByPeriod', - 'InsightsMetadata', - 'WorkflowEntity', - 'Project', - ]); - }); +// Terminate DB once after all tests complete +afterAll(async () => { + await testDb.terminate(); +}); - // Terminate DB once after all tests complete - afterAll(async () => { - await testDb.terminate(); - }); +describe('compaction', () => { + describe('compactRawToHour', () => { + type TestData = { + name: string; + timestamps: DateTime[]; + batches: number[]; + }; - describe('compaction', () => { - describe('compactRawToHour', () => { - type TestData = { - name: string; - timestamps: DateTime[]; - batches: number[]; - }; + test.each([ + { + name: 'compact into 2 rows', + timestamps: [ + DateTime.utc(2000, 1, 1, 0, 0), + DateTime.utc(2000, 1, 1, 0, 59), + DateTime.utc(2000, 1, 1, 1, 0), + ], + batches: [2, 1], + }, + { + name: 'compact into 3 rows', + timestamps: [ + DateTime.utc(2000, 1, 1, 0, 0), + DateTime.utc(2000, 1, 1, 1, 0), + DateTime.utc(2000, 1, 1, 2, 0), + ], + batches: [1, 1, 1], + }, + ])('$name', async ({ timestamps, batches }) => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsRawRepository = Container.get(InsightsRawRepository); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - test.each([ - { - name: 'compact into 2 rows', - timestamps: [ - DateTime.utc(2000, 1, 1, 0, 0), - DateTime.utc(2000, 1, 1, 0, 59), - DateTime.utc(2000, 1, 1, 1, 0), - ], - batches: [2, 1], - }, - { - name: 'compact into 3 rows', - timestamps: [ - DateTime.utc(2000, 1, 1, 0, 0), - DateTime.utc(2000, 1, 1, 1, 0), - DateTime.utc(2000, 1, 1, 2, 0), - ], - batches: [1, 1, 1], - }, - ])('$name', async ({ timestamps, batches }) => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsRawRepository = Container.get(InsightsRawRepository); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - // create before so we can create the raw events in parallel - await createMetadata(workflow); - for (const timestamp of timestamps) { - await createRawInsightsEvent(workflow, { - type: 'success', - value: 1, - timestamp, - }); - } - - // ACT - const compactedRows = await insightsCompactionService.compactRawToHour(); - - // ASSERT - expect(compactedRows).toBe(timestamps.length); - await expect(insightsRawRepository.count()).resolves.toBe(0); - const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); - expect(allCompacted).toHaveLength(batches.length); - for (const [index, compacted] of allCompacted.entries()) { - expect(compacted.value).toBe(batches[index]); - } - }); - - test('batch compaction split events in hourly insight periods', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsRawRepository = Container.get(InsightsRawRepository); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - const batchSize = 100; - - let timestamp = DateTime.utc().startOf('hour'); - for (let i = 0; i < batchSize; i++) { - await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp }); - // create 60 events per hour - timestamp = timestamp.plus({ minute: 1 }); - } - - // ACT - await insightsCompactionService.compactInsights(); - - // ASSERT - await expect(insightsRawRepository.count()).resolves.toBe(0); - - const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); - const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); - expect(accumulatedValues).toBe(batchSize); - expect(allCompacted[0].value).toBe(60); - expect(allCompacted[1].value).toBe(40); - }); - - test('batch compaction split events in hourly insight periods by type and workflow', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsRawRepository = Container.get(InsightsRawRepository); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - - const project = await createTeamProject(); - const workflow1 = await createWorkflow({}, project); - const workflow2 = await createWorkflow({}, project); - - const batchSize = 100; - - let timestamp = DateTime.utc().startOf('hour'); - for (let i = 0; i < batchSize / 4; i++) { - await createRawInsightsEvent(workflow1, { type: 'success', value: 1, timestamp }); - timestamp = timestamp.plus({ minute: 1 }); - } - - for (let i = 0; i < batchSize / 4; i++) { - await createRawInsightsEvent(workflow1, { type: 'failure', value: 1, timestamp }); - timestamp = timestamp.plus({ minute: 1 }); - } - - for (let i = 0; i < batchSize / 4; i++) { - await createRawInsightsEvent(workflow2, { type: 'runtime_ms', value: 1200, timestamp }); - timestamp = timestamp.plus({ minute: 1 }); - } - - for (let i = 0; i < batchSize / 4; i++) { - await createRawInsightsEvent(workflow2, { type: 'time_saved_min', value: 3, timestamp }); - timestamp = timestamp.plus({ minute: 1 }); - } - - // ACT - await insightsCompactionService.compactInsights(); - - // ASSERT - await expect(insightsRawRepository.count()).resolves.toBe(0); - - const allCompacted = await insightsByPeriodRepository.find({ - order: { metaId: 'ASC', periodStart: 'ASC' }, - }); - - // Expect 2 insights for workflow 1 (for success and failure) - // and 3 for workflow 2 (2 period starts for runtime_ms and 1 for time_saved_min) - expect(allCompacted).toHaveLength(5); - const metaIds = allCompacted.map((event) => event.metaId); - - // meta id are ordered. first 2 are for workflow 1, last 3 are for workflow 2 - const uniqueMetaIds = [metaIds[0], metaIds[2]]; - const workflow1Insights = allCompacted.filter((event) => event.metaId === uniqueMetaIds[0]); - const workflow2Insights = allCompacted.filter((event) => event.metaId === uniqueMetaIds[1]); - - expect(workflow1Insights).toHaveLength(2); - expect(workflow2Insights).toHaveLength(3); - - const successInsights = workflow1Insights.find((event) => event.type === 'success'); - const failureInsights = workflow1Insights.find((event) => event.type === 'failure'); - - expect(successInsights).toBeTruthy(); - expect(failureInsights).toBeTruthy(); - // success and failure insights should have the value matching the number or raw events (because value = 1) - expect(successInsights!.value).toBe(25); - expect(failureInsights!.value).toBe(25); - - const runtimeMsEvents = workflow2Insights.filter((event) => event.type === 'runtime_ms'); - const timeSavedMinEvents = workflow2Insights.find( - (event) => event.type === 'time_saved_min', - ); - expect(runtimeMsEvents).toHaveLength(2); - - // The last 10 minutes of the first hour - expect(runtimeMsEvents[0].value).toBe(1200 * 10); - - // The first 15 minutes of the second hour - expect(runtimeMsEvents[1].value).toBe(1200 * 15); - expect(timeSavedMinEvents).toBeTruthy(); - expect(timeSavedMinEvents!.value).toBe(3 * 25); - }); - - test('should return the number of compacted events', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - const batchSize = 100; - - let timestamp = DateTime.utc(2000, 1, 1, 0, 0); - for (let i = 0; i < batchSize; i++) { - await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp }); - // create 60 events per hour - timestamp = timestamp.plus({ minute: 1 }); - } - - // ACT - const numberOfCompactedData = await insightsCompactionService.compactRawToHour(); - - // ASSERT - expect(numberOfCompactedData).toBe(100); - }); - - test('works with data in the compacted table', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsRawRepository = Container.get(InsightsRawRepository); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - const batchSize = 100; - - let timestamp = DateTime.utc().startOf('hour'); - - // Create an existing compacted event for the first hour - await createCompactedInsightsEvent(workflow, { + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + // create before so we can create the raw events in parallel + await createMetadata(workflow); + for (const timestamp of timestamps) { + await createRawInsightsEvent(workflow, { type: 'success', - value: 10, - periodUnit: 'hour', - periodStart: timestamp, + value: 1, + timestamp, }); + } - const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>(); - for (let i = 0; i < batchSize; i++) { - events.push({ type: 'success', value: 1, timestamp }); - timestamp = timestamp.plus({ minute: 1 }); - } - await createRawInsightsEvents(workflow, events); + // ACT + const compactedRows = await insightsCompactionService.compactRawToHour(); - // ACT - await insightsCompactionService.compactInsights(); - - // ASSERT - await expect(insightsRawRepository.count()).resolves.toBe(0); - - const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); - const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); - expect(accumulatedValues).toBe(batchSize + 10); - expect(allCompacted[0].value).toBe(70); - expect(allCompacted[1].value).toBe(40); - }); - - test('works with data bigger than the batch size', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsRawRepository = Container.get(InsightsRawRepository); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - - // spy on the compactRawToHour method to check if it's called multiple times - const rawToHourSpy = jest.spyOn(insightsCompactionService, 'compactRawToHour'); - - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - // create 100 more events than the batch size (500) - const batchSize = 600; - - let timestamp = DateTime.utc().startOf('hour'); - const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>(); - for (let i = 0; i < batchSize; i++) { - events.push({ type: 'success', value: 1, timestamp }); - timestamp = timestamp.plus({ minute: 1 }); - } - await createRawInsightsEvents(workflow, events); - - // ACT - await insightsCompactionService.compactInsights(); - - // ASSERT - // compaction batch size is 500, so rawToHour should be called 3 times: - // 1st call: 500 events, 2nd call: 100 events, and third call that returns nothing - expect(rawToHourSpy).toHaveBeenCalledTimes(3); - await expect(insightsRawRepository.count()).resolves.toBe(0); - const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); - const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); - expect(accumulatedValues).toBe(batchSize); - }); + // ASSERT + expect(compactedRows).toBe(timestamps.length); + await expect(insightsRawRepository.count()).resolves.toBe(0); + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + expect(allCompacted).toHaveLength(batches.length); + for (const [index, compacted] of allCompacted.entries()) { + expect(compacted.value).toBe(batches[index]); + } }); - describe('compactionSchedule', () => { - test('compaction is running on schedule', async () => { - // ARRANGE - jest.useFakeTimers(); - const insightsCompactionService = new InsightsCompactionService( - mock(), - mock(), - mock({ - compactionIntervalMinutes: 60, - }), - mockLogger(), - ); - // spy on the compactInsights method to check if it's called - const compactInsightsSpy = jest.spyOn(insightsCompactionService, 'compactInsights'); + test('batch compaction split events in hourly insight periods', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsRawRepository = Container.get(InsightsRawRepository); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - try { - insightsCompactionService.startCompactionTimer(); + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); - // ACT - // advance by 1 hour and 1 minute - jest.advanceTimersByTime(1000 * 60 * 61); + const batchSize = 100; - // ASSERT - expect(compactInsightsSpy).toHaveBeenCalledTimes(1); - } finally { - insightsCompactionService.stopCompactionTimer(); - jest.useRealTimers(); - } - }); + let timestamp = DateTime.utc().startOf('hour'); + for (let i = 0; i < batchSize; i++) { + await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp }); + // create 60 events per hour + timestamp = timestamp.plus({ minute: 1 }); + } + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expect(insightsRawRepository.count()).resolves.toBe(0); + + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); + expect(accumulatedValues).toBe(batchSize); + expect(allCompacted[0].value).toBe(60); + expect(allCompacted[1].value).toBe(40); }); - describe('compactHourToDay', () => { - type TestData = { - name: string; - periodStarts: DateTime[]; - batches: number[]; - }; + test('batch compaction split events in hourly insight periods by type and workflow', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsRawRepository = Container.get(InsightsRawRepository); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - test.each([ - { - name: 'compact into 2 rows', - periodStarts: [ - DateTime.utc(2000, 1, 1, 0, 0), - DateTime.utc(2000, 1, 1, 23, 59), - DateTime.utc(2000, 1, 2, 1, 0), - ], - batches: [2, 1], - }, - { - name: 'compact into 3 rows', - periodStarts: [ - DateTime.utc(2000, 1, 1, 0, 0), - DateTime.utc(2000, 1, 1, 23, 59), - DateTime.utc(2000, 1, 2, 0, 0), - DateTime.utc(2000, 1, 2, 23, 59), - DateTime.utc(2000, 1, 3, 23, 59), - ], - batches: [2, 2, 1], - }, - ])('$name', async ({ periodStarts, batches }) => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + const project = await createTeamProject(); + const workflow1 = await createWorkflow({}, project); + const workflow2 = await createWorkflow({}, project); - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - // create before so we can create the raw events in parallel - await createMetadata(workflow); - for (const periodStart of periodStarts) { - await createCompactedInsightsEvent(workflow, { - type: 'success', - value: 1, - periodUnit: 'hour', - periodStart, - }); - } + const batchSize = 100; - // ACT - const compactedRows = await insightsCompactionService.compactHourToDay(); + let timestamp = DateTime.utc().startOf('hour'); + for (let i = 0; i < batchSize / 4; i++) { + await createRawInsightsEvent(workflow1, { type: 'success', value: 1, timestamp }); + timestamp = timestamp.plus({ minute: 1 }); + } - // ASSERT - expect(compactedRows).toBe(periodStarts.length); - const hourInsights = (await insightsByPeriodRepository.find()).filter( - (insight) => insight.periodUnit !== 'day', - ); - expect(hourInsights).toBeEmptyArray(); - const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); - expect(allCompacted).toHaveLength(batches.length); - for (const [index, compacted] of allCompacted.entries()) { - expect(compacted.value).toBe(batches[index]); - } + for (let i = 0; i < batchSize / 4; i++) { + await createRawInsightsEvent(workflow1, { type: 'failure', value: 1, timestamp }); + timestamp = timestamp.plus({ minute: 1 }); + } + + for (let i = 0; i < batchSize / 4; i++) { + await createRawInsightsEvent(workflow2, { type: 'runtime_ms', value: 1200, timestamp }); + timestamp = timestamp.plus({ minute: 1 }); + } + + for (let i = 0; i < batchSize / 4; i++) { + await createRawInsightsEvent(workflow2, { type: 'time_saved_min', value: 3, timestamp }); + timestamp = timestamp.plus({ minute: 1 }); + } + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expect(insightsRawRepository.count()).resolves.toBe(0); + + const allCompacted = await insightsByPeriodRepository.find({ + order: { metaId: 'ASC', periodStart: 'ASC' }, }); - test('recent insight periods should not be compacted', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); + // Expect 2 insights for workflow 1 (for success and failure) + // and 3 for workflow 2 (2 period starts for runtime_ms and 1 for time_saved_min) + expect(allCompacted).toHaveLength(5); + const metaIds = allCompacted.map((event) => event.metaId); - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - // create before so we can create the raw events in parallel - await createMetadata(workflow); + // meta id are ordered. first 2 are for workflow 1, last 3 are for workflow 2 + const uniqueMetaIds = [metaIds[0], metaIds[2]]; + const workflow1Insights = allCompacted.filter((event) => event.metaId === uniqueMetaIds[0]); + const workflow2Insights = allCompacted.filter((event) => event.metaId === uniqueMetaIds[1]); + + expect(workflow1Insights).toHaveLength(2); + expect(workflow2Insights).toHaveLength(3); + + const successInsights = workflow1Insights.find((event) => event.type === 'success'); + const failureInsights = workflow1Insights.find((event) => event.type === 'failure'); + + expect(successInsights).toBeTruthy(); + expect(failureInsights).toBeTruthy(); + // success and failure insights should have the value matching the number or raw events (because value = 1) + expect(successInsights!.value).toBe(25); + expect(failureInsights!.value).toBe(25); + + const runtimeMsEvents = workflow2Insights.filter((event) => event.type === 'runtime_ms'); + const timeSavedMinEvents = workflow2Insights.find((event) => event.type === 'time_saved_min'); + expect(runtimeMsEvents).toHaveLength(2); + + // The last 10 minutes of the first hour + expect(runtimeMsEvents[0].value).toBe(1200 * 10); + + // The first 15 minutes of the second hour + expect(runtimeMsEvents[1].value).toBe(1200 * 15); + expect(timeSavedMinEvents).toBeTruthy(); + expect(timeSavedMinEvents!.value).toBe(3 * 25); + }); + + test('should return the number of compacted events', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + const batchSize = 100; + + let timestamp = DateTime.utc(2000, 1, 1, 0, 0); + for (let i = 0; i < batchSize; i++) { + await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp }); + // create 60 events per hour + timestamp = timestamp.plus({ minute: 1 }); + } + + // ACT + const numberOfCompactedData = await insightsCompactionService.compactRawToHour(); + + // ASSERT + expect(numberOfCompactedData).toBe(100); + }); + + test('works with data in the compacted table', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsRawRepository = Container.get(InsightsRawRepository); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + const batchSize = 100; + + let timestamp = DateTime.utc().startOf('hour'); + + // Create an existing compacted event for the first hour + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 10, + periodUnit: 'hour', + periodStart: timestamp, + }); + + const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>(); + for (let i = 0; i < batchSize; i++) { + events.push({ type: 'success', value: 1, timestamp }); + timestamp = timestamp.plus({ minute: 1 }); + } + await createRawInsightsEvents(workflow, events); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expect(insightsRawRepository.count()).resolves.toBe(0); + + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); + expect(accumulatedValues).toBe(batchSize + 10); + expect(allCompacted[0].value).toBe(70); + expect(allCompacted[1].value).toBe(40); + }); + + test('works with data bigger than the batch size', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsRawRepository = Container.get(InsightsRawRepository); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + + // spy on the compactRawToHour method to check if it's called multiple times + const rawToHourSpy = jest.spyOn(insightsCompactionService, 'compactRawToHour'); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + // create 100 more events than the batch size (500) + const batchSize = 600; + + let timestamp = DateTime.utc().startOf('hour'); + const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>(); + for (let i = 0; i < batchSize; i++) { + events.push({ type: 'success', value: 1, timestamp }); + timestamp = timestamp.plus({ minute: 1 }); + } + await createRawInsightsEvents(workflow, events); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + // compaction batch size is 500, so rawToHour should be called 3 times: + // 1st call: 500 events, 2nd call: 100 events, and third call that returns nothing + expect(rawToHourSpy).toHaveBeenCalledTimes(3); + await expect(insightsRawRepository.count()).resolves.toBe(0); + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); + expect(accumulatedValues).toBe(batchSize); + }); + }); + + describe('compactionSchedule', () => { + test('compaction is running on schedule', async () => { + // ARRANGE + jest.useFakeTimers(); + const insightsCompactionService = new InsightsCompactionService( + mock(), + mock(), + mock({ + compactionIntervalMinutes: 60, + }), + mockLogger(), + ); + // spy on the compactInsights method to check if it's called + const compactInsightsSpy = jest.spyOn(insightsCompactionService, 'compactInsights'); + + try { + insightsCompactionService.startCompactionTimer(); + + // ACT + // advance by 1 hour and 1 minute + jest.advanceTimersByTime(1000 * 60 * 61); + + // ASSERT + expect(compactInsightsSpy).toHaveBeenCalledTimes(1); + } finally { + insightsCompactionService.stopCompactionTimer(); + jest.useRealTimers(); + } + }); + }); + + describe('compactHourToDay', () => { + type TestData = { + name: string; + periodStarts: DateTime[]; + batches: number[]; + }; + + test.each([ + { + name: 'compact into 2 rows', + periodStarts: [ + DateTime.utc(2000, 1, 1, 0, 0), + DateTime.utc(2000, 1, 1, 23, 59), + DateTime.utc(2000, 1, 2, 1, 0), + ], + batches: [2, 1], + }, + { + name: 'compact into 3 rows', + periodStarts: [ + DateTime.utc(2000, 1, 1, 0, 0), + DateTime.utc(2000, 1, 1, 23, 59), + DateTime.utc(2000, 1, 2, 0, 0), + DateTime.utc(2000, 1, 2, 23, 59), + DateTime.utc(2000, 1, 3, 23, 59), + ], + batches: [2, 2, 1], + }, + ])('$name', async ({ periodStarts, batches }) => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + // create before so we can create the raw events in parallel + await createMetadata(workflow); + for (const periodStart of periodStarts) { await createCompactedInsightsEvent(workflow, { type: 'success', value: 1, periodUnit: 'hour', - periodStart: DateTime.utc().minus({ day: 79 }).startOf('hour'), + periodStart, }); + } - // ACT - const compactedRows = await insightsCompactionService.compactHourToDay(); + // ACT + const compactedRows = await insightsCompactionService.compactHourToDay(); - // ASSERT - expect(compactedRows).toBe(0); - }); + // ASSERT + expect(compactedRows).toBe(periodStarts.length); + const hourInsights = (await insightsByPeriodRepository.find()).filter( + (insight) => insight.periodUnit !== 'day', + ); + expect(hourInsights).toBeEmptyArray(); + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + expect(allCompacted).toHaveLength(batches.length); + for (const [index, compacted] of allCompacted.entries()) { + expect(compacted.value).toBe(batches[index]); + } }); - describe('compactDayToWeek', () => { - type TestData = { - name: string; - periodStarts: DateTime[]; - batches: number[]; - }; + test('recent insight periods should not be compacted', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); - test.each([ - { - name: 'compact into 2 rows', - periodStarts: [ - // 2000-01-03 is a Monday - DateTime.utc(2000, 1, 3, 0, 0), - DateTime.utc(2000, 1, 5, 23, 59), - DateTime.utc(2000, 1, 11, 1, 0), - ], - batches: [2, 1], - }, - { - name: 'compact into 3 rows', - periodStarts: [ - // 2000-01-03 is a Monday - DateTime.utc(2000, 1, 3, 0, 0), - DateTime.utc(2000, 1, 4, 23, 59), - DateTime.utc(2000, 1, 11, 0, 0), - DateTime.utc(2000, 1, 12, 23, 59), - DateTime.utc(2000, 1, 18, 23, 59), - ], - batches: [2, 2, 1], - }, - ])('$name', async ({ periodStarts, batches }) => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - await createMetadata(workflow); - for (const periodStart of periodStarts) { - await createCompactedInsightsEvent(workflow, { - type: 'success', - value: 1, - periodUnit: 'day', - periodStart, - }); - } - - // ACT - const compactedRows = await insightsCompactionService.compactDayToWeek(); - - // ASSERT - expect(compactedRows).toBe(periodStarts.length); - const hourAndDayInsights = (await insightsByPeriodRepository.find()).filter( - (insight) => insight.periodUnit !== 'week', - ); - expect(hourAndDayInsights).toBeEmptyArray(); - const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); - expect(allCompacted).toHaveLength(batches.length); - for (const [index, compacted] of allCompacted.entries()) { - expect(compacted.periodStart.getDay()).toBe(1); - expect(compacted.value).toBe(batches[index]); - } + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + // create before so we can create the raw events in parallel + await createMetadata(workflow); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: DateTime.utc().minus({ day: 79 }).startOf('hour'), }); - test('recent insight periods should not be compacted', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); + // ACT + const compactedRows = await insightsCompactionService.compactHourToDay(); - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - await createMetadata(workflow); + // ASSERT + expect(compactedRows).toBe(0); + }); + }); + + describe('compactDayToWeek', () => { + type TestData = { + name: string; + periodStarts: DateTime[]; + batches: number[]; + }; + + test.each([ + { + name: 'compact into 2 rows', + periodStarts: [ + // 2000-01-03 is a Monday + DateTime.utc(2000, 1, 3, 0, 0), + DateTime.utc(2000, 1, 5, 23, 59), + DateTime.utc(2000, 1, 11, 1, 0), + ], + batches: [2, 1], + }, + { + name: 'compact into 3 rows', + periodStarts: [ + // 2000-01-03 is a Monday + DateTime.utc(2000, 1, 3, 0, 0), + DateTime.utc(2000, 1, 4, 23, 59), + DateTime.utc(2000, 1, 11, 0, 0), + DateTime.utc(2000, 1, 12, 23, 59), + DateTime.utc(2000, 1, 18, 23, 59), + ], + batches: [2, 2, 1], + }, + ])('$name', async ({ periodStarts, batches }) => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + await createMetadata(workflow); + for (const periodStart of periodStarts) { await createCompactedInsightsEvent(workflow, { type: 'success', value: 1, periodUnit: 'day', - periodStart: DateTime.utc().minus({ day: 179 }).startOf('day'), + periodStart, }); + } - // ACT - const compactedRows = await insightsCompactionService.compactDayToWeek(); + // ACT + const compactedRows = await insightsCompactionService.compactDayToWeek(); - // ASSERT - expect(compactedRows).toBe(0); - }); + // ASSERT + expect(compactedRows).toBe(periodStarts.length); + const hourAndDayInsights = (await insightsByPeriodRepository.find()).filter( + (insight) => insight.periodUnit !== 'week', + ); + expect(hourAndDayInsights).toBeEmptyArray(); + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + expect(allCompacted).toHaveLength(batches.length); + for (const [index, compacted] of allCompacted.entries()) { + expect(compacted.periodStart.getDay()).toBe(1); + expect(compacted.value).toBe(batches[index]); + } }); - describe('compaction threshold configuration', () => { - test('insights by period older than the hourly to daily threshold are not compacted', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - const config = Container.get(InsightsConfig); + test('recent insight periods should not be compacted', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - const thresholdDays = config.compactionHourlyToDailyThresholdDays; - - // Create insights by period within and beyond the threshold - const withinThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays - 1 }); - const beyondThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays + 1 }); - - await createCompactedInsightsEvent(workflow, { - type: 'success', - value: 1, - periodUnit: 'hour', - periodStart: withinThresholdTimestamp, - }); - - await createCompactedInsightsEvent(workflow, { - type: 'success', - value: 1, - periodUnit: 'hour', - periodStart: beyondThresholdTimestamp, - }); - - // ACT - const compactedRows = await insightsCompactionService.compactHourToDay(); - - // ASSERT - expect(compactedRows).toBe(1); // Only the event within the threshold should be compacted - const insightsByPeriods = await insightsByPeriodRepository.find(); - const dailyInsights = insightsByPeriods.filter((insight) => insight.periodUnit === 'day'); - expect(dailyInsights).toHaveLength(1); // The event beyond the threshold should remain - expect(dailyInsights[0].periodStart.toISOString()).toEqual( - beyondThresholdTimestamp.startOf('day').toISO(), - ); + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + await createMetadata(workflow); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'day', + periodStart: DateTime.utc().minus({ day: 179 }).startOf('day'), }); - test('insights by period older than the daily to weekly threshold are not compacted', async () => { - // ARRANGE - const insightsCompactionService = Container.get(InsightsCompactionService); - const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - const config = Container.get(InsightsConfig); + // ACT + const compactedRows = await insightsCompactionService.compactDayToWeek(); - const project = await createTeamProject(); - const workflow = await createWorkflow({}, project); - - const thresholdDays = config.compactionDailyToWeeklyThresholdDays; - - // Create insights by period within and beyond the threshold - const withinThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays - 1 }); - const beyondThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays + 1 }); - - await createCompactedInsightsEvent(workflow, { - type: 'success', - value: 1, - periodUnit: 'day', - periodStart: withinThresholdTimestamp, - }); - await createCompactedInsightsEvent(workflow, { - type: 'success', - value: 1, - periodUnit: 'day', - periodStart: beyondThresholdTimestamp, - }); - - // ACT - const compactedRows = await insightsCompactionService.compactDayToWeek(); - - // ASSERT - expect(compactedRows).toBe(1); // Only the event within the threshold should be compacted - const insightsByPeriods = await insightsByPeriodRepository.find(); - const weeklyInsights = insightsByPeriods.filter((insight) => insight.periodUnit === 'week'); - expect(weeklyInsights).toHaveLength(1); // The event beyond the threshold should remain - expect(weeklyInsights[0].periodStart.toISOString()).toEqual( - beyondThresholdTimestamp.startOf('week').toISO(), - ); - }); + // ASSERT + expect(compactedRows).toBe(0); }); }); -} + + describe('compaction threshold configuration', () => { + test('insights by period older than the hourly to daily threshold are not compacted', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + const config = Container.get(InsightsConfig); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + const thresholdDays = config.compactionHourlyToDailyThresholdDays; + + // Create insights by period within and beyond the threshold + const withinThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays - 1 }); + const beyondThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays + 1 }); + + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: withinThresholdTimestamp, + }); + + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: beyondThresholdTimestamp, + }); + + // ACT + const compactedRows = await insightsCompactionService.compactHourToDay(); + + // ASSERT + expect(compactedRows).toBe(1); // Only the event within the threshold should be compacted + const insightsByPeriods = await insightsByPeriodRepository.find(); + const dailyInsights = insightsByPeriods.filter((insight) => insight.periodUnit === 'day'); + expect(dailyInsights).toHaveLength(1); // The event beyond the threshold should remain + expect(dailyInsights[0].periodStart.toISOString()).toEqual( + beyondThresholdTimestamp.startOf('day').toISO(), + ); + }); + + test('insights by period older than the daily to weekly threshold are not compacted', async () => { + // ARRANGE + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + const config = Container.get(InsightsConfig); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + const thresholdDays = config.compactionDailyToWeeklyThresholdDays; + + // Create insights by period within and beyond the threshold + const withinThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays - 1 }); + const beyondThresholdTimestamp = DateTime.utc().minus({ days: thresholdDays + 1 }); + + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'day', + periodStart: withinThresholdTimestamp, + }); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'day', + periodStart: beyondThresholdTimestamp, + }); + + // ACT + const compactedRows = await insightsCompactionService.compactDayToWeek(); + + // ASSERT + expect(compactedRows).toBe(1); // Only the event within the threshold should be compacted + const insightsByPeriods = await insightsByPeriodRepository.find(); + const weeklyInsights = insightsByPeriods.filter((insight) => insight.periodUnit === 'week'); + expect(weeklyInsights).toHaveLength(1); // The event beyond the threshold should remain + expect(weeklyInsights[0].periodStart.toISOString()).toEqual( + beyondThresholdTimestamp.startOf('week').toISO(), + ); + }); + }); +}); diff --git a/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts b/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts index 00031b7b6f0..b3290042610 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts @@ -10,34 +10,15 @@ describe('InsightsModulePreInit', () => { it('should return false if instance type is not "main"', () => { const ctx: ModulePreInitContext = { instance: mock({ instanceType: 'worker' }), - database: mock({ type: 'sqlite', sqlite: { poolSize: 10 } }), + database: mock(), }; expect(shouldLoadModule(ctx)).toBe(false); }); - it('should return false if database type is "sqlite" and poolSize is < 1', () => { + it('should return true if instance type is "main"', () => { const ctx: ModulePreInitContext = { instance: mock({ instanceType: 'main' }), - database: mock({ type: 'sqlite', sqlite: { poolSize: 0 } }), - }; - expect(shouldLoadModule(ctx)).toBe(false); - }); - - it.each(['postgresdb', 'mariadb', 'mysqldb'])( - 'should return true if instance type is "main" and database is not sqlite', - (dbType: 'postgresdb' | 'mysqldb' | 'sqlite' | 'mariadb') => { - const ctx: ModulePreInitContext = { - instance: mock({ instanceType: 'main' }), - database: mock({ type: dbType }), - }; - expect(shouldLoadModule(ctx)).toBe(true); - }, - ); - - it('should return true if instance type is "main" and sqlite poolSize is >= 1', () => { - const ctx: ModulePreInitContext = { - instance: mock({ instanceType: 'main' }), - database: mock({ type: 'sqlite', sqlite: { poolSize: 1 } }), + database: mock(), }; expect(shouldLoadModule(ctx)).toBe(true); }); diff --git a/packages/cli/src/modules/insights/__tests__/insights.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights.service.test.ts index 211d4290abd..989cc80689b 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.service.test.ts @@ -3,21 +3,28 @@ import type { LicenseState } from '@n8n/backend-common'; import type { Project } from '@n8n/db'; import type { WorkflowEntity } from '@n8n/db'; import type { IWorkflowDb } from '@n8n/db'; +import type { WorkflowExecuteAfterContext } from '@n8n/decorators'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; +import type { IRun } from 'n8n-workflow'; import { mockLogger } from '@test/mocking'; import { createTeamProject } from '@test-integration/db/projects'; import { createWorkflow } from '@test-integration/db/workflows'; import * as testDb from '@test-integration/test-db'; -import { createCompactedInsightsEvent } from '../database/entities/__tests__/db-utils'; +import { + createCompactedInsightsEvent, + createMetadata, + createRawInsightsEvents, +} from '../database/entities/__tests__/db-utils'; +import type { InsightsRaw } from '../database/entities/insights-raw'; import type { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository'; -import type { InsightsCollectionService } from '../insights-collection.service'; -import type { InsightsCompactionService } from '../insights-compaction.service'; +import { InsightsCollectionService } from '../insights-collection.service'; +import { InsightsCompactionService } from '../insights-compaction.service'; import type { InsightsPruningService } from '../insights-pruning.service'; -import type { InsightsConfig } from '../insights.config'; +import { InsightsConfig } from '../insights.config'; import { InsightsService } from '../insights.service'; // Initialize DB once for all tests @@ -779,3 +786,73 @@ describe('timers', () => { expect(mockPruningService.stopPruningTimer).toHaveBeenCalled(); }); }); + +describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => { + let initialFlushBatchSize: number; + let insightsConfig: InsightsConfig; + beforeAll(() => { + insightsConfig = Container.get(InsightsConfig); + initialFlushBatchSize = insightsConfig.flushBatchSize; + + insightsConfig.flushBatchSize = 50; + }); + + afterAll(() => { + insightsConfig.flushBatchSize = initialFlushBatchSize; + }); + + test('should handle concurrent flush and compaction without error', async () => { + const insightsCollectionService = Container.get(InsightsCollectionService); + const insightsCompactionService = Container.get(InsightsCompactionService); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + await createMetadata(workflow); + + const ctx = mock({ workflow }); + const startedAt = DateTime.utc(); + const stoppedAt = startedAt.plus({ seconds: 5 }); + ctx.runData = mock({ + mode: 'webhook', + status: 'success', + startedAt: startedAt.toJSDate(), + stoppedAt: stoppedAt.toJSDate(), + }); + + // Create test data + const rawInsights = []; + for (let i = 0; i < 100; i++) { + rawInsights.push({ + type: 'success' as InsightsRaw['type'], + value: 1, + periodUnit: 'hour', + periodStart: DateTime.now().minus({ day: 91, hour: i + 1 }), + }); + } + // Create raw insights events to be compacted + await createRawInsightsEvents(workflow, rawInsights); + + // + for (let i = 0; i < 100; i++) { + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: DateTime.now().minus({ day: 91, hour: i + 1 }), + }); + } + + for (let i = 0; i < 100; i++) { + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); + } + + // ACT + const promises = [ + insightsCollectionService.flushEvents(), + insightsCollectionService.flushEvents(), + insightsCompactionService.compactRawToHour(), + insightsCompactionService.compactHourToDay(), + ]; + await expect(Promise.all(promises)).resolves.toBeDefined(); + }); +}); diff --git a/packages/cli/src/modules/insights/insights-collection.service.ts b/packages/cli/src/modules/insights/insights-collection.service.ts index ecd76d16888..03a7af6b066 100644 --- a/packages/cli/src/modules/insights/insights-collection.service.ts +++ b/packages/cli/src/modules/insights/insights-collection.service.ts @@ -1,4 +1,3 @@ -import { SharedWorkflow } from '@n8n/db'; import { OnLifecycleEvent, type WorkflowExecuteAfterContext } from '@n8n/decorators'; import { Service } from '@n8n/di'; import { In } from '@n8n/typeorm'; @@ -10,6 +9,8 @@ import { SharedWorkflowRepository } from '@/databases/repositories/shared-workfl import { InsightsMetadata } from '@/modules/insights/database/entities/insights-metadata'; import { InsightsRaw } from '@/modules/insights/database/entities/insights-raw'; +import { InsightsMetadataRepository } from './database/repositories/insights-metadata.repository'; +import { InsightsRawRepository } from './database/repositories/insights-raw.repository'; import { InsightsConfig } from './insights.config'; const shouldSkipStatus: Record = { @@ -64,6 +65,8 @@ export class InsightsCollectionService { constructor( private readonly sharedWorkflowRepository: SharedWorkflowRepository, + private readonly insightsRawRepository: InsightsRawRepository, + private readonly insightsMetadataRepository: InsightsMetadataRepository, private readonly insightsConfig: InsightsConfig, private readonly logger: Logger, ) { @@ -159,62 +162,60 @@ export class InsightsCollectionService { workflowIdNames.set(event.workflowId, event.workflowName); } - await this.sharedWorkflowRepository.manager.transaction(async (trx) => { - const sharedWorkflows = await trx.find(SharedWorkflow, { - where: { workflowId: In([...workflowIdNames.keys()]), role: 'workflow:owner' }, - relations: { project: true }, - }); - - // Upsert metadata for the workflows that are not already in the cache or have - // different project or workflow names - const metadataToUpsert = sharedWorkflows.reduce((acc, workflow) => { - const cachedMetadata = this.cachedMetadata.get(workflow.workflowId); - if ( - !cachedMetadata || - cachedMetadata.projectId !== workflow.projectId || - cachedMetadata.projectName !== workflow.project.name || - cachedMetadata.workflowName !== workflowIdNames.get(workflow.workflowId) - ) { - const metadata = new InsightsMetadata(); - metadata.projectId = workflow.projectId; - metadata.projectName = workflow.project.name; - metadata.workflowId = workflow.workflowId; - metadata.workflowName = workflowIdNames.get(workflow.workflowId)!; - - acc.push(metadata); - } - return acc; - }, [] as InsightsMetadata[]); - - await trx.upsert(InsightsMetadata, metadataToUpsert, ['workflowId']); - - const upsertMetadata = await trx.findBy(InsightsMetadata, { - workflowId: In(metadataToUpsert.map((m) => m.workflowId)), - }); - for (const metadata of upsertMetadata) { - this.cachedMetadata.set(metadata.workflowId, metadata); - } - - const events: InsightsRaw[] = []; - for (const event of insightsRawToInsertBuffer) { - const insight = new InsightsRaw(); - const metadata = this.cachedMetadata.get(event.workflowId); - if (!metadata) { - // could not find shared workflow for this insight (not supposed to happen) - throw new UnexpectedError( - `Could not find shared workflow for insight with workflowId ${event.workflowId}`, - ); - } - insight.metaId = metadata.metaId; - insight.type = event.type; - insight.value = event.value; - insight.timestamp = event.timestamp; - - events.push(insight); - } - - await trx.insert(InsightsRaw, events); + const sharedWorkflows = await this.sharedWorkflowRepository.find({ + where: { workflowId: In([...workflowIdNames.keys()]), role: 'workflow:owner' }, + relations: { project: true }, }); + + // Upsert metadata for the workflows that are not already in the cache or have + // different project or workflow names + const metadataToUpsert = sharedWorkflows.reduce((acc, workflow) => { + const cachedMetadata = this.cachedMetadata.get(workflow.workflowId); + if ( + !cachedMetadata || + cachedMetadata.projectId !== workflow.projectId || + cachedMetadata.projectName !== workflow.project.name || + cachedMetadata.workflowName !== workflowIdNames.get(workflow.workflowId) + ) { + const metadata = new InsightsMetadata(); + metadata.projectId = workflow.projectId; + metadata.projectName = workflow.project.name; + metadata.workflowId = workflow.workflowId; + metadata.workflowName = workflowIdNames.get(workflow.workflowId)!; + + acc.push(metadata); + } + return acc; + }, [] as InsightsMetadata[]); + + await this.insightsMetadataRepository.upsert(metadataToUpsert, ['workflowId']); + + const upsertMetadata = await this.insightsMetadataRepository.findBy({ + workflowId: In(metadataToUpsert.map((m) => m.workflowId)), + }); + for (const metadata of upsertMetadata) { + this.cachedMetadata.set(metadata.workflowId, metadata); + } + + const events: InsightsRaw[] = []; + for (const event of insightsRawToInsertBuffer) { + const insight = new InsightsRaw(); + const metadata = this.cachedMetadata.get(event.workflowId); + if (!metadata) { + // could not find shared workflow for this insight (not supposed to happen) + throw new UnexpectedError( + `Could not find shared workflow for insight with workflowId ${event.workflowId}`, + ); + } + insight.metaId = metadata.metaId; + insight.type = event.type; + insight.value = event.value; + insight.timestamp = event.timestamp; + + events.push(insight); + } + + await this.insightsRawRepository.insert(events); } async flushEvents() { diff --git a/packages/cli/src/modules/insights/insights.pre-init.ts b/packages/cli/src/modules/insights/insights.pre-init.ts index 040c1f8f25e..b91be86e341 100644 --- a/packages/cli/src/modules/insights/insights.pre-init.ts +++ b/packages/cli/src/modules/insights/insights.pre-init.ts @@ -3,7 +3,4 @@ import type { ModulePreInitContext } from '../modules.config'; export const shouldLoadModule = (ctx: ModulePreInitContext) => // Only main instance(s) should collect insights // Because main instances are informed of all finished workflow executions, whatever the mode - ctx.instance.instanceType === 'main' && - // This is because legacy sqlite (without pool) does not support nested transactions needed for insights - // TODO: remove once benchmarks confirm this issue is solved with buffering / flushing mechanism - (ctx.database.type !== 'sqlite' || ctx.database.sqlite.poolSize > 0); + ctx.instance.instanceType === 'main';