fix(core): Fix Insights concurrency issues for legacy sqlite (#15028)

This commit is contained in:
Guillaume Jacquart 2025-05-09 16:52:24 +02:00 committed by GitHub
parent 3a36f941b5
commit e34bca779b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 746 additions and 710 deletions

View File

@ -3,7 +3,7 @@ import type { WorkflowEntity } from '@n8n/db';
import type { IWorkflowDb } from '@n8n/db';
import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
import { Container } from '@n8n/di';
import { In, type EntityManager } from '@n8n/typeorm';
import { In } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import {
@ -252,10 +252,17 @@ describe('workflowExecuteAfterHandler', () => {
describe('workflowExecuteAfterHandler - cacheMetadata', () => {
let insightsCollectionService: InsightsCollectionService;
let entityManagerMock = mock<EntityManager>();
const sharedWorkflowRepositoryMock: jest.Mocked<SharedWorkflowRepository> = {
manager: entityManagerMock,
} as unknown as jest.Mocked<SharedWorkflowRepository>;
// Mock the repositories functions
const repositoryMocks = {
find: jest.fn(),
findBy: jest.fn(),
upsert: jest.fn(),
insert: jest.fn(),
};
const sharedWorkflowRepositoryMock = mock<SharedWorkflowRepository>(repositoryMocks);
const metadataRepositoryMock = mock<InsightsMetadataRepository>(repositoryMocks);
const insightsRawRepositoryMock = mock<InsightsRawRepository>(repositoryMocks);
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
@ -266,23 +273,11 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
stoppedAt: stoppedAt.toJSDate(),
});
// Mock the transaction function
const trxMock = {
find: jest.fn(),
findBy: jest.fn(),
upsert: jest.fn(),
insert: jest.fn(),
};
entityManagerMock.transaction.mockImplementation(
jest.fn(async (runInTransaction: (entityManager: EntityManager) => Promise<void>) => {
await runInTransaction(trxMock as unknown as EntityManager);
}) as unknown as EntityManager['transaction'],
);
beforeAll(async () => {
insightsCollectionService = new InsightsCollectionService(
sharedWorkflowRepositoryMock,
insightsRawRepositoryMock,
metadataRepositoryMock,
Container.get(InsightsConfig),
mockLogger(),
);
@ -295,7 +290,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
project = await createTeamProject();
workflow = await createWorkflow({}, project);
trxMock.find = jest.fn().mockResolvedValue([
repositoryMocks.find = jest.fn().mockResolvedValue([
{
workflow,
workflowId: workflow.id,
@ -303,7 +298,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
project: { name: 'project-name' },
},
]);
trxMock.findBy = jest.fn().mockResolvedValue([
repositoryMocks.findBy = jest.fn().mockResolvedValue([
{
metaId: 'meta-id',
workflowId: workflow.id,
@ -326,12 +321,11 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
await insightsCollectionService.flushEvents();
// ASSERT
expect(trxMock.find).toHaveBeenCalledWith(expect.anything(), {
expect(repositoryMocks.find).toHaveBeenCalledWith({
where: { workflowId: In([workflow.id]), role: 'workflow:owner' },
relations: { project: true },
});
expect(trxMock.upsert).toHaveBeenCalledWith(
expect.anything(),
expect(repositoryMocks.upsert).toHaveBeenCalledWith(
expect.arrayContaining([
{
workflowId: workflow.id,
@ -348,10 +342,10 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
await insightsCollectionService.flushEvents();
// ASSERT AGAIN
trxMock.find.mockClear();
trxMock.upsert.mockClear();
expect(trxMock.find).not.toHaveBeenCalled();
expect(trxMock.upsert).not.toHaveBeenCalled();
repositoryMocks.find.mockClear();
repositoryMocks.upsert.mockClear();
expect(repositoryMocks.find).not.toHaveBeenCalled();
expect(repositoryMocks.upsert).not.toHaveBeenCalled();
});
test('updates cached metadata if workflow details change', async () => {
@ -363,8 +357,8 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
await insightsCollectionService.flushEvents();
// ASSERT
expect(trxMock.find).toHaveBeenCalled();
expect(trxMock.upsert).toHaveBeenCalled();
expect(repositoryMocks.find).toHaveBeenCalled();
expect(repositoryMocks.upsert).toHaveBeenCalled();
// Change the workflow name
workflow.name = 'new-workflow-name';
@ -374,12 +368,11 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
await insightsCollectionService.flushEvents();
// ASSERT AGAIN
expect(trxMock.find).toHaveBeenCalledWith(expect.anything(), {
expect(repositoryMocks.find).toHaveBeenCalledWith({
where: { workflowId: In([workflow.id]), role: 'workflow:owner' },
relations: { project: true },
});
expect(trxMock.upsert).toHaveBeenCalledWith(
expect.anything(),
expect(repositoryMocks.upsert).toHaveBeenCalledWith(
expect.arrayContaining([
{
workflowId: workflow.id,
@ -397,11 +390,23 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
let project: Project;
let workflow: IWorkflowDb & WorkflowEntity;
let insightsCollectionService: InsightsCollectionService;
let entityManagerMock = mock<EntityManager>();
const sharedWorkflowRepositoryMock: jest.Mocked<SharedWorkflowRepository> = {
manager: entityManagerMock,
} as unknown as jest.Mocked<SharedWorkflowRepository>;
const logger = mockLogger();
const repoMocks = {
findSharedWorkflowRepositoryMock: jest.fn(),
findByMetadata: jest.fn(),
upsertMetadata: jest.fn(),
insertInsightsRaw: jest.fn(),
};
const sharedWorkflowRepositoryMock = mock<SharedWorkflowRepository>({
find: repoMocks.findSharedWorkflowRepositoryMock,
});
const metadataRepositoryMock = mock<InsightsMetadataRepository>({
findBy: repoMocks.findByMetadata,
upsert: repoMocks.upsertMetadata,
});
const insightsRawRepositoryMock = mock<InsightsRawRepository>({
insert: repoMocks.insertInsightsRaw,
});
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const runData = mock<IRun>({
@ -411,32 +416,20 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
stoppedAt: stoppedAt.toJSDate(),
});
// Mock the transaction function
const trxMock = {
find: jest.fn(),
findBy: jest.fn(),
upsert: jest.fn(),
insert: jest.fn(),
};
entityManagerMock.transaction.mockImplementation(
jest.fn(async (runInTransaction: (entityManager: EntityManager) => Promise<void>) => {
await runInTransaction(trxMock as unknown as EntityManager);
}) as unknown as EntityManager['transaction'],
);
beforeAll(async () => {
insightsCollectionService = new InsightsCollectionService(
sharedWorkflowRepositoryMock,
insightsRawRepositoryMock,
metadataRepositoryMock,
Container.get(InsightsConfig),
logger,
mockLogger(),
);
});
beforeEach(async () => {
project = await createTeamProject();
workflow = await createWorkflow({ settings: { timeSavedPerExecution: 1 } }, project);
trxMock.find = jest.fn().mockResolvedValue([
repoMocks.findSharedWorkflowRepositoryMock.mockResolvedValue([
{
workflow,
workflowId: workflow.id,
@ -444,7 +437,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
project: { name: 'project-name' },
},
]);
trxMock.findBy = jest.fn().mockResolvedValue([
repoMocks.findByMetadata.mockResolvedValue([
{
metaId: 'meta-id',
workflowId: workflow.id,
@ -469,7 +462,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
await new Promise(process.nextTick);
// ASSERT
expect(trxMock.insert).not.toHaveBeenCalled();
expect(repoMocks.insertInsightsRaw).not.toHaveBeenCalled();
// ACT
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
@ -477,13 +470,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
// ASSERT
// await for the next tick to ensure the flush is called
await new Promise(process.nextTick);
expect(trxMock.insert).toHaveBeenCalled();
expect(repoMocks.insertInsightsRaw).toHaveBeenCalled();
});
test('flushes events to the database after a timeout', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
repoMocks.insertInsightsRaw.mockClear();
insightsCollectionService.startFlushingTimer();
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
@ -493,13 +486,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
// ASSERT
expect(trxMock.insert).not.toHaveBeenCalled();
expect(repoMocks.insertInsightsRaw).not.toHaveBeenCalled();
// ACT
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1);
} finally {
jest.useRealTimers();
}
@ -508,7 +501,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
test('reschedule flush on flushing end', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
repoMocks.insertInsightsRaw.mockClear();
insightsCollectionService.startFlushingTimer();
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
@ -518,13 +511,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1);
// // ACT
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await jest.advanceTimersByTimeAsync(31 * 1000);
expect(trxMock.insert).toHaveBeenCalledTimes(2);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(2);
} finally {
jest.useRealTimers();
}
@ -533,7 +526,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
test('reschedule flush on no buffered insights', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
repoMocks.insertInsightsRaw.mockClear();
insightsCollectionService.startFlushingTimer();
const flushEventsSpy = jest.spyOn(insightsCollectionService, 'flushEvents');
@ -543,7 +536,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
// ASSERT
expect(flushEventsSpy).toHaveBeenCalledTimes(1);
expect(trxMock.insert).not.toHaveBeenCalled();
expect(repoMocks.insertInsightsRaw).not.toHaveBeenCalled();
// ACT
await jest.advanceTimersByTimeAsync(31 * 1000);
@ -555,7 +548,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
test('flushes events to the database on shutdown', async () => {
// ARRANGE
trxMock.insert.mockClear();
repoMocks.insertInsightsRaw.mockClear();
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
// ACT
@ -566,17 +559,17 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
await insightsCollectionService.shutdown();
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1);
// Check that last insert call contains 30 events (10 * 3 insights)
const lastCallArgs = trxMock.insert.mock.calls.at(-1);
expect(lastCallArgs?.[1]).toHaveLength(30);
const lastCallArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1);
expect(lastCallArgs?.[0]).toHaveLength(30);
});
test('flushes events synchronously while shutting down', async () => {
// ARRANGE
// reset insights async flushing
insightsCollectionService.startFlushingTimer();
trxMock.insert.mockClear();
repoMocks.insertInsightsRaw.mockClear();
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
// ACT
@ -589,25 +582,25 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(2);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(2);
// Check that last insert call contains 3 events (the synchronous flush after shutdown)
let callArgs = trxMock.insert.mock.calls.at(-1);
expect(callArgs?.[1]).toHaveLength(3);
let callArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1);
expect(callArgs?.[0]).toHaveLength(3);
// ACT
// await for the next tick to ensure the flush is called
await new Promise(process.nextTick);
// Check that the one before that contains 30 events (the shutdown flush)
callArgs = trxMock.insert.mock.calls.at(-2);
expect(callArgs?.[1]).toHaveLength(30);
callArgs = repoMocks.insertInsightsRaw.mock.calls.at(-2);
expect(callArgs?.[0]).toHaveLength(30);
});
test('restore buffer events on flushing error', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
trxMock.insert.mockRejectedValueOnce(new Error('Test error'));
repoMocks.insertInsightsRaw.mockClear();
repoMocks.insertInsightsRaw.mockRejectedValueOnce(new Error('Test error'));
insightsCollectionService.startFlushingTimer();
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
@ -617,17 +610,17 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
const insertArgs = trxMock.insert.mock.calls.at(-1);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1);
const insertArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1);
// ACT
await insightsCollectionService.flushEvents();
expect(trxMock.insert).toHaveBeenCalledTimes(2);
const newInsertArgs = trxMock.insert.mock.calls.at(-1);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(2);
const newInsertArgs = repoMocks.insertInsightsRaw.mock.calls.at(-1);
// Check that last insert call contains the same 3 insights as previous failed flush
expect(newInsertArgs?.[1]).toHaveLength(3);
expect(newInsertArgs?.[1]).toEqual(insertArgs?.[1]);
expect(newInsertArgs?.[0]).toHaveLength(3);
expect(newInsertArgs?.[0]).toEqual(insertArgs?.[0]);
} finally {
jest.useRealTimers();
}
@ -638,7 +631,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
const config = Container.get(InsightsConfig);
config.flushBatchSize = 10;
insightsCollectionService.startFlushingTimer();
trxMock.insert.mockClear();
repoMocks.insertInsightsRaw.mockClear();
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
@ -646,7 +639,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
const { resolve: flushResolve, promise: flushPromise } = createDeferredPromise();
// First flush will "hang" (simulate long save)
trxMock.insert.mockImplementationOnce(async () => {
repoMocks.insertInsightsRaw.mockImplementationOnce(async () => {
await flushPromise;
});
@ -678,6 +671,6 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
// ASSERT
expect(shutdownResolved).toBe(true);
expect(trxMock.insert).toHaveBeenCalledTimes(1);
expect(repoMocks.insertInsightsRaw).toHaveBeenCalledTimes(1);
});
});

View File

@ -10,34 +10,15 @@ describe('InsightsModulePreInit', () => {
it('should return false if instance type is not "main"', () => {
const ctx: ModulePreInitContext = {
instance: mock<InstanceSettings>({ instanceType: 'worker' }),
database: mock<DatabaseConfig>({ type: 'sqlite', sqlite: { poolSize: 10 } }),
database: mock<DatabaseConfig>(),
};
expect(shouldLoadModule(ctx)).toBe(false);
});
it('should return false if database type is "sqlite" and poolSize is < 1', () => {
it('should return true if instance type is "main"', () => {
const ctx: ModulePreInitContext = {
instance: mock<InstanceSettings>({ instanceType: 'main' }),
database: mock<DatabaseConfig>({ type: 'sqlite', sqlite: { poolSize: 0 } }),
};
expect(shouldLoadModule(ctx)).toBe(false);
});
it.each(['postgresdb', 'mariadb', 'mysqldb'])(
'should return true if instance type is "main" and database is not sqlite',
(dbType: 'postgresdb' | 'mysqldb' | 'sqlite' | 'mariadb') => {
const ctx: ModulePreInitContext = {
instance: mock<InstanceSettings>({ instanceType: 'main' }),
database: mock<DatabaseConfig>({ type: dbType }),
};
expect(shouldLoadModule(ctx)).toBe(true);
},
);
it('should return true if instance type is "main" and sqlite poolSize is >= 1', () => {
const ctx: ModulePreInitContext = {
instance: mock<InstanceSettings>({ instanceType: 'main' }),
database: mock<DatabaseConfig>({ type: 'sqlite', sqlite: { poolSize: 1 } }),
database: mock<DatabaseConfig>(),
};
expect(shouldLoadModule(ctx)).toBe(true);
});

View File

@ -3,21 +3,28 @@ import type { LicenseState } from '@n8n/backend-common';
import type { Project } from '@n8n/db';
import type { WorkflowEntity } from '@n8n/db';
import type { IWorkflowDb } from '@n8n/db';
import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import type { IRun } from 'n8n-workflow';
import { mockLogger } from '@test/mocking';
import { createTeamProject } from '@test-integration/db/projects';
import { createWorkflow } from '@test-integration/db/workflows';
import * as testDb from '@test-integration/test-db';
import { createCompactedInsightsEvent } from '../database/entities/__tests__/db-utils';
import {
createCompactedInsightsEvent,
createMetadata,
createRawInsightsEvents,
} from '../database/entities/__tests__/db-utils';
import type { InsightsRaw } from '../database/entities/insights-raw';
import type { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository';
import type { InsightsCollectionService } from '../insights-collection.service';
import type { InsightsCompactionService } from '../insights-compaction.service';
import { InsightsCollectionService } from '../insights-collection.service';
import { InsightsCompactionService } from '../insights-compaction.service';
import type { InsightsPruningService } from '../insights-pruning.service';
import type { InsightsConfig } from '../insights.config';
import { InsightsConfig } from '../insights.config';
import { InsightsService } from '../insights.service';
// Initialize DB once for all tests
@ -779,3 +786,73 @@ describe('timers', () => {
expect(mockPruningService.stopPruningTimer).toHaveBeenCalled();
});
});
describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => {
let initialFlushBatchSize: number;
let insightsConfig: InsightsConfig;
beforeAll(() => {
insightsConfig = Container.get(InsightsConfig);
initialFlushBatchSize = insightsConfig.flushBatchSize;
insightsConfig.flushBatchSize = 50;
});
afterAll(() => {
insightsConfig.flushBatchSize = initialFlushBatchSize;
});
test('should handle concurrent flush and compaction without error', async () => {
const insightsCollectionService = Container.get(InsightsCollectionService);
const insightsCompactionService = Container.get(InsightsCompactionService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createMetadata(workflow);
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
ctx.runData = mock<IRun>({
mode: 'webhook',
status: 'success',
startedAt: startedAt.toJSDate(),
stoppedAt: stoppedAt.toJSDate(),
});
// Create test data
const rawInsights = [];
for (let i = 0; i < 100; i++) {
rawInsights.push({
type: 'success' as InsightsRaw['type'],
value: 1,
periodUnit: 'hour',
periodStart: DateTime.now().minus({ day: 91, hour: i + 1 }),
});
}
// Create raw insights events to be compacted
await createRawInsightsEvents(workflow, rawInsights);
//
for (let i = 0; i < 100; i++) {
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'hour',
periodStart: DateTime.now().minus({ day: 91, hour: i + 1 }),
});
}
for (let i = 0; i < 100; i++) {
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
// ACT
const promises = [
insightsCollectionService.flushEvents(),
insightsCollectionService.flushEvents(),
insightsCompactionService.compactRawToHour(),
insightsCompactionService.compactHourToDay(),
];
await expect(Promise.all(promises)).resolves.toBeDefined();
});
});

View File

@ -1,4 +1,3 @@
import { SharedWorkflow } from '@n8n/db';
import { OnLifecycleEvent, type WorkflowExecuteAfterContext } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { In } from '@n8n/typeorm';
@ -10,6 +9,8 @@ import { SharedWorkflowRepository } from '@/databases/repositories/shared-workfl
import { InsightsMetadata } from '@/modules/insights/database/entities/insights-metadata';
import { InsightsRaw } from '@/modules/insights/database/entities/insights-raw';
import { InsightsMetadataRepository } from './database/repositories/insights-metadata.repository';
import { InsightsRawRepository } from './database/repositories/insights-raw.repository';
import { InsightsConfig } from './insights.config';
const shouldSkipStatus: Record<ExecutionStatus, boolean> = {
@ -64,6 +65,8 @@ export class InsightsCollectionService {
constructor(
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly insightsRawRepository: InsightsRawRepository,
private readonly insightsMetadataRepository: InsightsMetadataRepository,
private readonly insightsConfig: InsightsConfig,
private readonly logger: Logger,
) {
@ -159,62 +162,60 @@ export class InsightsCollectionService {
workflowIdNames.set(event.workflowId, event.workflowName);
}
await this.sharedWorkflowRepository.manager.transaction(async (trx) => {
const sharedWorkflows = await trx.find(SharedWorkflow, {
where: { workflowId: In([...workflowIdNames.keys()]), role: 'workflow:owner' },
relations: { project: true },
});
// Upsert metadata for the workflows that are not already in the cache or have
// different project or workflow names
const metadataToUpsert = sharedWorkflows.reduce((acc, workflow) => {
const cachedMetadata = this.cachedMetadata.get(workflow.workflowId);
if (
!cachedMetadata ||
cachedMetadata.projectId !== workflow.projectId ||
cachedMetadata.projectName !== workflow.project.name ||
cachedMetadata.workflowName !== workflowIdNames.get(workflow.workflowId)
) {
const metadata = new InsightsMetadata();
metadata.projectId = workflow.projectId;
metadata.projectName = workflow.project.name;
metadata.workflowId = workflow.workflowId;
metadata.workflowName = workflowIdNames.get(workflow.workflowId)!;
acc.push(metadata);
}
return acc;
}, [] as InsightsMetadata[]);
await trx.upsert(InsightsMetadata, metadataToUpsert, ['workflowId']);
const upsertMetadata = await trx.findBy(InsightsMetadata, {
workflowId: In(metadataToUpsert.map((m) => m.workflowId)),
});
for (const metadata of upsertMetadata) {
this.cachedMetadata.set(metadata.workflowId, metadata);
}
const events: InsightsRaw[] = [];
for (const event of insightsRawToInsertBuffer) {
const insight = new InsightsRaw();
const metadata = this.cachedMetadata.get(event.workflowId);
if (!metadata) {
// could not find shared workflow for this insight (not supposed to happen)
throw new UnexpectedError(
`Could not find shared workflow for insight with workflowId ${event.workflowId}`,
);
}
insight.metaId = metadata.metaId;
insight.type = event.type;
insight.value = event.value;
insight.timestamp = event.timestamp;
events.push(insight);
}
await trx.insert(InsightsRaw, events);
const sharedWorkflows = await this.sharedWorkflowRepository.find({
where: { workflowId: In([...workflowIdNames.keys()]), role: 'workflow:owner' },
relations: { project: true },
});
// Upsert metadata for the workflows that are not already in the cache or have
// different project or workflow names
const metadataToUpsert = sharedWorkflows.reduce((acc, workflow) => {
const cachedMetadata = this.cachedMetadata.get(workflow.workflowId);
if (
!cachedMetadata ||
cachedMetadata.projectId !== workflow.projectId ||
cachedMetadata.projectName !== workflow.project.name ||
cachedMetadata.workflowName !== workflowIdNames.get(workflow.workflowId)
) {
const metadata = new InsightsMetadata();
metadata.projectId = workflow.projectId;
metadata.projectName = workflow.project.name;
metadata.workflowId = workflow.workflowId;
metadata.workflowName = workflowIdNames.get(workflow.workflowId)!;
acc.push(metadata);
}
return acc;
}, [] as InsightsMetadata[]);
await this.insightsMetadataRepository.upsert(metadataToUpsert, ['workflowId']);
const upsertMetadata = await this.insightsMetadataRepository.findBy({
workflowId: In(metadataToUpsert.map((m) => m.workflowId)),
});
for (const metadata of upsertMetadata) {
this.cachedMetadata.set(metadata.workflowId, metadata);
}
const events: InsightsRaw[] = [];
for (const event of insightsRawToInsertBuffer) {
const insight = new InsightsRaw();
const metadata = this.cachedMetadata.get(event.workflowId);
if (!metadata) {
// could not find shared workflow for this insight (not supposed to happen)
throw new UnexpectedError(
`Could not find shared workflow for insight with workflowId ${event.workflowId}`,
);
}
insight.metaId = metadata.metaId;
insight.type = event.type;
insight.value = event.value;
insight.timestamp = event.timestamp;
events.push(insight);
}
await this.insightsRawRepository.insert(events);
}
async flushEvents() {

View File

@ -3,7 +3,4 @@ import type { ModulePreInitContext } from '../modules.config';
export const shouldLoadModule = (ctx: ModulePreInitContext) =>
// Only main instance(s) should collect insights
// Because main instances are informed of all finished workflow executions, whatever the mode
ctx.instance.instanceType === 'main' &&
// This is because legacy sqlite (without pool) does not support nested transactions needed for insights
// TODO: remove once benchmarks confirm this issue is solved with buffering / flushing mechanism
(ctx.database.type !== 'sqlite' || ctx.database.sqlite.poolSize > 0);
ctx.instance.instanceType === 'main';